ums.agent.process
1# Agenten Plattform 2# 3# (c) 2024 Magnus Bender 4# Institute of Humanities-Centered Artificial Intelligence (CHAI) 5# Universitaet Hamburg 6# https://www.chai.uni-hamburg.de/~bender 7# 8# source code released under the terms of GNU Public License Version 3 9# https://www.gnu.org/licenses/gpl-3.0.txt 10 11import os, importlib 12from typing import List 13 14import requests 15from fastapi import BackgroundTasks 16 17from ums.agent.agent import BasicAgent, AgentCapability, ExtractAgent, SolveAgent, GatekeeperAgent 18from ums.utils import AgentMessage, AgentResponse, logger 19 20class MessageProcessor(): 21 22 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 23 AGENTS_LIST = os.environ.get('AGENTS_LIST', 'ums.example.example:AGENT_CLASSES').strip() 24 25 def __init__(self, disable_messages:bool=False): 26 self.counts = 0 27 self.disable_messages = disable_messages 28 29 module_name, var_name = self.AGENTS_LIST.split(':') 30 agents_module = importlib.import_module(module_name) 31 32 self.agent_classes:List[BasicAgent] = getattr(agents_module, var_name) 33 self.extract_agents:List[ExtractAgent] = list(filter( 34 lambda ac: ac.agent_capability() == AgentCapability.EXTRACT, 35 self.agent_classes 36 )) 37 self.solve_agents:List[SolveAgent] = list(filter( 38 lambda ac: ac.agent_capability() == AgentCapability.SOLVE, 39 self.agent_classes 40 )) 41 self.gatekeeper_agents:List[GatekeeperAgent] = list(filter( 42 lambda ac: ac.agent_capability() == AgentCapability.GATEKEEPER, 43 self.agent_classes 44 )) 45 46 def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse: 47 enqueued = False 48 49 if message.status.extract.required and not message.status.extract.finished: 50 # send to extract agents 51 if len(self.extract_agents) > 0: 52 data_types = set( d.type for d in message.data ) 53 for ac in self.extract_agents: 54 if ac.extract_type() in data_types: 55 background_tasks.add_task(ac, message, self._send_message) 56 enqueued = True 57 58 elif message.status.solve.required and not message.status.solve.finished: 59 # send to solve agents 60 if len(self.solve_agents) > 0: 61 for sa in self.solve_agents: 62 background_tasks.add_task(sa, message, self._send_message) 63 enqueued = True 64 65 elif message.status.validate.required and not message.status.validate.finished: 66 # send to solve agents 67 if len(self.gatekeeper_agents) > 0: 68 for ga in self.gatekeeper_agents: 69 background_tasks.add_task(ga, message, self._send_message) 70 enqueued = True 71 72 logger.debug( 73 ("Added to queue" if enqueued else "No agent found to queue message.") + 74 f"ID: {message.id} Count: {self.counts}" 75 ) 76 77 self.counts += 1 78 return AgentResponse( 79 count=self.counts-1, 80 msg="Added to queue" if enqueued else "", 81 error=not enqueued, 82 error_msg=None if enqueued else "No agent found to queue message." 83 ) 84 85 def _send_message(self, message:AgentMessage) -> bool: 86 if not self.disable_messages: 87 r = requests.post( 88 "{}/message".format(self.MANAGEMENT_URL), 89 data=message.model_dump_json(), 90 headers={"accept" : "application/json", "content-type" : "application/json"} 91 ) 92 93 if r.status_code == 200: 94 return True 95 else: 96 logger.warning(f"Error sending message to management! {(r.text, r.headers)}") 97 return False 98 else: 99 print("\tMessages disabled: Requested to send message to management:") 100 print(message.model_dump_json(indent=2))
class
MessageProcessor:
21class MessageProcessor(): 22 23 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 24 AGENTS_LIST = os.environ.get('AGENTS_LIST', 'ums.example.example:AGENT_CLASSES').strip() 25 26 def __init__(self, disable_messages:bool=False): 27 self.counts = 0 28 self.disable_messages = disable_messages 29 30 module_name, var_name = self.AGENTS_LIST.split(':') 31 agents_module = importlib.import_module(module_name) 32 33 self.agent_classes:List[BasicAgent] = getattr(agents_module, var_name) 34 self.extract_agents:List[ExtractAgent] = list(filter( 35 lambda ac: ac.agent_capability() == AgentCapability.EXTRACT, 36 self.agent_classes 37 )) 38 self.solve_agents:List[SolveAgent] = list(filter( 39 lambda ac: ac.agent_capability() == AgentCapability.SOLVE, 40 self.agent_classes 41 )) 42 self.gatekeeper_agents:List[GatekeeperAgent] = list(filter( 43 lambda ac: ac.agent_capability() == AgentCapability.GATEKEEPER, 44 self.agent_classes 45 )) 46 47 def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse: 48 enqueued = False 49 50 if message.status.extract.required and not message.status.extract.finished: 51 # send to extract agents 52 if len(self.extract_agents) > 0: 53 data_types = set( d.type for d in message.data ) 54 for ac in self.extract_agents: 55 if ac.extract_type() in data_types: 56 background_tasks.add_task(ac, message, self._send_message) 57 enqueued = True 58 59 elif message.status.solve.required and not message.status.solve.finished: 60 # send to solve agents 61 if len(self.solve_agents) > 0: 62 for sa in self.solve_agents: 63 background_tasks.add_task(sa, message, self._send_message) 64 enqueued = True 65 66 elif message.status.validate.required and not message.status.validate.finished: 67 # send to solve agents 68 if len(self.gatekeeper_agents) > 0: 69 for ga in self.gatekeeper_agents: 70 background_tasks.add_task(ga, message, self._send_message) 71 enqueued = True 72 73 logger.debug( 74 ("Added to queue" if enqueued else "No agent found to queue message.") + 75 f"ID: {message.id} Count: {self.counts}" 76 ) 77 78 self.counts += 1 79 return AgentResponse( 80 count=self.counts-1, 81 msg="Added to queue" if enqueued else "", 82 error=not enqueued, 83 error_msg=None if enqueued else "No agent found to queue message." 84 ) 85 86 def _send_message(self, message:AgentMessage) -> bool: 87 if not self.disable_messages: 88 r = requests.post( 89 "{}/message".format(self.MANAGEMENT_URL), 90 data=message.model_dump_json(), 91 headers={"accept" : "application/json", "content-type" : "application/json"} 92 ) 93 94 if r.status_code == 200: 95 return True 96 else: 97 logger.warning(f"Error sending message to management! {(r.text, r.headers)}") 98 return False 99 else: 100 print("\tMessages disabled: Requested to send message to management:") 101 print(message.model_dump_json(indent=2))
MessageProcessor(disable_messages: bool = False)
26 def __init__(self, disable_messages:bool=False): 27 self.counts = 0 28 self.disable_messages = disable_messages 29 30 module_name, var_name = self.AGENTS_LIST.split(':') 31 agents_module = importlib.import_module(module_name) 32 33 self.agent_classes:List[BasicAgent] = getattr(agents_module, var_name) 34 self.extract_agents:List[ExtractAgent] = list(filter( 35 lambda ac: ac.agent_capability() == AgentCapability.EXTRACT, 36 self.agent_classes 37 )) 38 self.solve_agents:List[SolveAgent] = list(filter( 39 lambda ac: ac.agent_capability() == AgentCapability.SOLVE, 40 self.agent_classes 41 )) 42 self.gatekeeper_agents:List[GatekeeperAgent] = list(filter( 43 lambda ac: ac.agent_capability() == AgentCapability.GATEKEEPER, 44 self.agent_classes 45 ))
AGENTS_LIST =
'ums.example.example:AGENT_CLASSES'
agent_classes: List[ums.agent.agent.BasicAgent]
extract_agents: List[ums.agent.agent.ExtractAgent]
solve_agents: List[ums.agent.agent.SolveAgent]
gatekeeper_agents: List[ums.agent.agent.GatekeeperAgent]
def
new_message( self, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
47 def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse: 48 enqueued = False 49 50 if message.status.extract.required and not message.status.extract.finished: 51 # send to extract agents 52 if len(self.extract_agents) > 0: 53 data_types = set( d.type for d in message.data ) 54 for ac in self.extract_agents: 55 if ac.extract_type() in data_types: 56 background_tasks.add_task(ac, message, self._send_message) 57 enqueued = True 58 59 elif message.status.solve.required and not message.status.solve.finished: 60 # send to solve agents 61 if len(self.solve_agents) > 0: 62 for sa in self.solve_agents: 63 background_tasks.add_task(sa, message, self._send_message) 64 enqueued = True 65 66 elif message.status.validate.required and not message.status.validate.finished: 67 # send to solve agents 68 if len(self.gatekeeper_agents) > 0: 69 for ga in self.gatekeeper_agents: 70 background_tasks.add_task(ga, message, self._send_message) 71 enqueued = True 72 73 logger.debug( 74 ("Added to queue" if enqueued else "No agent found to queue message.") + 75 f"ID: {message.id} Count: {self.counts}" 76 ) 77 78 self.counts += 1 79 return AgentResponse( 80 count=self.counts-1, 81 msg="Added to queue" if enqueued else "", 82 error=not enqueued, 83 error_msg=None if enqueued else "No agent found to queue message." 84 )