1+ """AgentOperator.py
2+
3+ This module defines the AgentOperator class, which manages the lifecycle and execution
4+ of Agent instances, including their initialization, proxy setup, logging, and shutdown.
5+ It provides mechanisms for running agents in separate processes or threads, handling
6+ their status, and ensuring proper cleanup.
7+
8+ Classes:
9+ AgentOperator: Manages agent processes, proxies, and lifecycle events.
10+
11+ Functions:
12+ (All public and private methods of AgentOperator are documented at the class and function level.)
13+ """
14+
115import logging
216import multiprocessing
317import multiprocessing .managers
822import time
923from functools import partial
1024from concurrent .futures import ProcessPoolExecutor , Future
11- from typing import Any , Optional , Callable , Union
25+ from typing import Optional , Callable , Union
1226
1327from JXTABLES .XTablesClient import XTablesClient
1428
2741
2842# subscribes to command request with xtables and then executes when requested
2943class AgentOperator :
44+ """
45+ Manages the lifecycle, execution, and communication of Agent instances.
46+
47+ Responsibilities include:
48+ - Starting and stopping agents, both in the main thread and in subprocesses.
49+ - Handling agent proxies for inter-process communication.
50+ - Managing agent logging and status reporting.
51+ - Ensuring proper cleanup and shutdown of agents.
52+ """
53+
3054 STATUS = "status"
3155 DESCRIPTION = "description"
3256 ERRORS = "errors"
@@ -37,15 +61,22 @@ class AgentOperator:
3761 SHUTDOWNTIMER = "shutdown"
3862 CLOSETIMER = "close"
3963
40-
41-
4264 def __init__ (
4365 self ,
4466 manager : multiprocessing .managers .SyncManager ,
4567 shareOp : ShareOperator ,
4668 streamOp : StreamOperator ,
4769 logStreamOp : LogStreamOperator
4870 ) -> None :
71+ """
72+ Initializes the AgentOperator.
73+
74+ Args:
75+ manager (multiprocessing.managers.SyncManager): Manager for shared objects.
76+ shareOp (ShareOperator): Operator for shared data.
77+ streamOp (StreamOperator): Operator for stream proxies.
78+ logStreamOp (LogStreamOperator): Operator for log stream proxies.
79+ """
4980 self .__executor : ProcessPoolExecutor = ProcessPoolExecutor ()
5081 self .__stop : threading .Event = manager .Event () # flag
5182 self .futures : list [Future ] = []
@@ -57,6 +88,12 @@ def __init__(
5788 self .manager = manager
5889
5990 def __setStop (self , stop : bool ):
91+ """
92+ Sets or clears the stop flag for agent execution.
93+
94+ Args:
95+ stop (bool): If True, sets the stop flag; otherwise, clears it.
96+ """
6097 try :
6198 if stop :
6299 self .__stop .set ()
@@ -66,20 +103,34 @@ def __setStop(self, stop: bool):
66103 pass
67104
68105 def stopAndWait (self ) -> None :
69- """Stop all agents but allow them to clean up, and wait for them to finish"""
106+ """Stop all agents but allow them to clean up, and wait for them to finish. """
70107 self .__setStop (True )
71108 self .waitForAgentsToFinish ()
72109 self .__setStop (False )
73110
74111 def stopPermanent (self ) -> None :
75- """Set the stop flag permanently (will not be reset)"""
112+ """Set the stop flag permanently (will not be reset). """
76113 self .__setStop (True )
77114
78115 def __setMainAgent (self , agent : Agent ):
116+ """
117+ Sets the main agent reference.
118+
119+ Args:
120+ agent (Agent): The agent to set as the main agent.
121+ """
79122 self .mainAgent = agent
80123
81124 def getUniqueAgentName (self , agentClass : Union [partial , type [Agent ]]):
82- """Handles duplicate agent names"""
125+ """
126+ Generates a unique name for an agent, handling duplicates.
127+
128+ Args:
129+ agentClass (Union[partial, type[Agent]]): The agent class or partial.
130+
131+ Returns:
132+ str: A unique agent name.
133+ """
83134 if isinstance (agentClass , partial ):
84135 name = agentClass .func .__name__
85136 else :
@@ -104,6 +155,17 @@ def initalizeProxies(
104155 agentName : str ,
105156 proxyDict : multiprocessing .managers .DictProxy ,
106157 ) -> tuple [multiprocessing .managers .DictProxy , queue .Queue ]:
158+ """
159+ Initializes proxies for the agent, including stream and log proxies.
160+
161+ Args:
162+ agentClass (Union[partial, type[Agent]]): The agent class or partial.
163+ agentName (str): The unique agent name.
164+ proxyDict (multiprocessing.managers.DictProxy): The proxy dictionary.
165+
166+ Returns:
167+ tuple: (proxyDict, logProxy)
168+ """
107169 for requestName , proxyType in ProxyType .getProxyRequests (agentClass ).items ():
108170 # TODO add more
109171 if proxyType is ProxyType .STREAM :
@@ -117,6 +179,13 @@ def initalizeProxies(
117179 return proxyDict , logProxy
118180
119181 def wakeAgent (self , agentClass : type [Agent ], isMainThread : bool ) -> None :
182+ """
183+ Starts an agent, either in the main thread or in a subprocess.
184+
185+ Args:
186+ agentClass (type[Agent]): The agent class to instantiate and run.
187+ isMainThread (bool): Whether to run in the main thread.
188+ """
120189 Sentinel .info (f"Waking agent!" )
121190
122191 agentName = self .getUniqueAgentName (agentClass )
@@ -173,7 +242,15 @@ def _handleLog(
173242 newLog : str ,
174243 maxLogLength : int = 3 ,
175244 ) -> None :
176-
245+ """
246+ Handles log messages for an agent, maintaining a rolling window of recent logs.
247+
248+ Args:
249+ logProperty (ReadonlyProperty): The property to update with log messages.
250+ lastLogs (list): The list of recent log messages.
251+ newLog (str): The new log message to add.
252+ maxLogLength (int): The maximum number of log messages to keep.
253+ """
177254 lastLogs .append (newLog )
178255 lastLogs = lastLogs [- maxLogLength :]
179256
@@ -189,7 +266,20 @@ def _injectAgent(
189266 logProxy : queue .Queue ,
190267 isMainThread : bool ,
191268 ) -> Agent :
192-
269+ """
270+ Injects core dependencies and logging into the agent.
271+
272+ Args:
273+ agent (Agent): The agent instance.
274+ agentName (str): The agent's unique name.
275+ shareOperator (ShareOperator): The shared operator.
276+ proxies (DictProxy): The proxy dictionary.
277+ logProxy (queue.Queue): The log proxy queue.
278+ isMainThread (bool): Whether running in the main thread.
279+
280+ Returns:
281+ Agent: The injected agent.
282+ """
193283 # injecting stuff shared from core
194284 agent ._injectCore (shareOperator , isMainThread , agentName )
195285 # creating new operators just for this agent and injecting them
@@ -218,7 +308,14 @@ def _injectNewOperators(
218308 agentName : str ,
219309 logProxy : queue .Queue
220310 ) -> None :
221- """Since any agent not on main thread will be in its own process, alot of new objects will have to be created"""
311+ """
312+ Injects new operator instances and logging handlers into the agent.
313+
314+ Args:
315+ agent (Agent): The agent instance.
316+ agentName (str): The agent's unique name.
317+ logProxy (queue.Queue): The log proxy queue.
318+ """
222319 client = XTablesClient (debug_mode = True ) # one per process
223320 client .add_client_version_property (f"MATRIX-ALT-{ agentName } " )
224321
@@ -267,6 +364,20 @@ def _startAgentLoop(
267364 logProxy : queue .Queue ,
268365 runOnCreate : Optional [Callable [[Agent ], None ]] = None ,
269366 ) -> None :
367+ """
368+ Main agent loop that manages agent lifecycle, including creation, running,
369+ shutdown, and cleanup.
370+
371+ Args:
372+ agentClass (type[Agent]): The agent class to instantiate.
373+ agentName (str): The agent's unique name.
374+ shareOperator (ShareOperator): The shared operator.
375+ isMainThread (bool): Whether running in the main thread.
376+ stopflag (threading.Event): The stop flag event.
377+ proxies (DictProxy): The proxy dictionary.
378+ logProxy (queue.Queue): The log proxy queue.
379+ runOnCreate (Optional[Callable[[Agent], None]]): Optional callback to run on agent creation.
380+ """
270381 if not isMainThread :
271382 signal .signal (signal .SIGINT , signal .SIG_IGN )
272383 signal .signal (signal .SIGTERM , signal .SIG_IGN )
@@ -413,10 +524,19 @@ def __handleException(exception: Exception) -> None:
413524 agent .isCleanedUp = True
414525
415526 def allFinished (self ):
527+ """
528+ Checks if all agent futures have completed.
529+
530+ Returns:
531+ bool: True if all agents are finished, False otherwise.
532+ """
416533 return all (f .done () for f in self .futures )
417534
418535 def waitForAgentsToFinish (self ) -> None :
419- """Thread blocking method that waits for any running agents"""
536+ """
537+ Thread blocking method that waits for any running agents to finish.
538+ Also ensures the main agent is properly shut down and cleaned up.
539+ """
420540 if not self .allFinished ():
421541 Sentinel .info ("Waiting for async agent to finish..." )
422542 while True :
@@ -450,6 +570,8 @@ def waitForAgentsToFinish(self) -> None:
450570 Sentinel .info ("Main agent finished" )
451571
452572 def shutDownNow (self ) -> None :
453- """Threadblocks until executor is finished"""
573+ """
574+ Blocks the thread until the executor is finished and all agent processes are shut down.
575+ """
454576 self .__executor .shutdown (wait = True , cancel_futures = True )
455577
0 commit comments