ums.agent.process
1# Agenten Plattform 2# 3# (c) 2024 Magnus Bender 4# Institute of Humanities-Centered Artificial Intelligence (CHAI) 5# Universitaet Hamburg 6# https://www.chai.uni-hamburg.de/~bender 7# 8# source code released under the terms of GNU Public License Version 3 9# https://www.gnu.org/licenses/gpl-3.0.txt 10 11import os, importlib 12from typing import List 13 14import requests 15from fastapi import BackgroundTasks 16 17from ums.agent.agent import BasicAgent, AgentCapability, ExtractAgent, SolveAgent, GatekeeperAgent 18from ums.utils import AgentMessage, AgentResponse, logger 19 20class MessageProcessor(): 21 22 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 23 AGENTS_LIST = os.environ.get('AGENTS_LIST', 'ums.example.example:AGENT_CLASSES').strip() 24 25 def __init__(self): 26 self.counts = 0 27 28 module_name, var_name = self.AGENTS_LIST.split(':') 29 agents_module = importlib.import_module(module_name) 30 31 self.agent_classes:List[BasicAgent] = getattr(agents_module, var_name) 32 self.extract_agents:List[ExtractAgent] = list(filter( 33 lambda ac: ac.agent_capability() == AgentCapability.EXTRACT, 34 self.agent_classes 35 )) 36 self.solve_agents:List[SolveAgent] = list(filter( 37 lambda ac: ac.agent_capability() == AgentCapability.SOLVE, 38 self.agent_classes 39 )) 40 self.gatekeeper_agents:List[GatekeeperAgent] = list(filter( 41 lambda ac: ac.agent_capability() == AgentCapability.GATEKEEPER, 42 self.agent_classes 43 )) 44 45 def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse: 46 enqueued = False 47 48 if message.status.extract.required and not message.status.extract.finished: 49 # send to extract agents 50 if len(self.extract_agents) > 0: 51 data_types = set( d.type for d in message.data ) 52 for ac in self.extract_agents: 53 if ac.extract_type() in data_types: 54 background_tasks.add_task(ac, message, self._send_message) 55 enqueued = True 56 57 elif message.status.solve.required and not message.status.solve.finished: 58 # send to solve agents 59 if len(self.solve_agents) > 0: 60 for sa in self.solve_agents: 61 background_tasks.add_task(sa, message, self._send_message) 62 enqueued = True 63 64 elif message.status.validate.required and not message.status.validate.finished: 65 # send to solve agents 66 if len(self.gatekeeper_agents) > 0: 67 for ga in self.gatekeeper_agents: 68 background_tasks.add_task(ga, message, self._send_message) 69 enqueued = True 70 71 logger.debug( 72 ("Added to queue" if enqueued else "No agent found to queue message.") + 73 f"ID: {message.id} Count: {self.counts}" 74 ) 75 76 self.counts += 1 77 return AgentResponse( 78 count=self.counts-1, 79 msg="Added to queue" if enqueued else "", 80 error=not enqueued, 81 error_msg=None if enqueued else "No agent found to queue message." 82 ) 83 84 def _send_message(self, message:AgentMessage) -> bool: 85 r = requests.post( 86 "{}/message".format(self.MANAGEMENT_URL), 87 data=message.model_dump_json(), 88 headers={"accept" : "application/json", "content-type" : "application/json"} 89 ) 90 91 if r.status_code == 200: 92 return True 93 else: 94 logger.warning(f"Error sending message to management! {(r.text, r.headers)}") 95 return False
class
MessageProcessor:
21class MessageProcessor(): 22 23 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 24 AGENTS_LIST = os.environ.get('AGENTS_LIST', 'ums.example.example:AGENT_CLASSES').strip() 25 26 def __init__(self): 27 self.counts = 0 28 29 module_name, var_name = self.AGENTS_LIST.split(':') 30 agents_module = importlib.import_module(module_name) 31 32 self.agent_classes:List[BasicAgent] = getattr(agents_module, var_name) 33 self.extract_agents:List[ExtractAgent] = list(filter( 34 lambda ac: ac.agent_capability() == AgentCapability.EXTRACT, 35 self.agent_classes 36 )) 37 self.solve_agents:List[SolveAgent] = list(filter( 38 lambda ac: ac.agent_capability() == AgentCapability.SOLVE, 39 self.agent_classes 40 )) 41 self.gatekeeper_agents:List[GatekeeperAgent] = list(filter( 42 lambda ac: ac.agent_capability() == AgentCapability.GATEKEEPER, 43 self.agent_classes 44 )) 45 46 def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse: 47 enqueued = False 48 49 if message.status.extract.required and not message.status.extract.finished: 50 # send to extract agents 51 if len(self.extract_agents) > 0: 52 data_types = set( d.type for d in message.data ) 53 for ac in self.extract_agents: 54 if ac.extract_type() in data_types: 55 background_tasks.add_task(ac, message, self._send_message) 56 enqueued = True 57 58 elif message.status.solve.required and not message.status.solve.finished: 59 # send to solve agents 60 if len(self.solve_agents) > 0: 61 for sa in self.solve_agents: 62 background_tasks.add_task(sa, message, self._send_message) 63 enqueued = True 64 65 elif message.status.validate.required and not message.status.validate.finished: 66 # send to solve agents 67 if len(self.gatekeeper_agents) > 0: 68 for ga in self.gatekeeper_agents: 69 background_tasks.add_task(ga, message, self._send_message) 70 enqueued = True 71 72 logger.debug( 73 ("Added to queue" if enqueued else "No agent found to queue message.") + 74 f"ID: {message.id} Count: {self.counts}" 75 ) 76 77 self.counts += 1 78 return AgentResponse( 79 count=self.counts-1, 80 msg="Added to queue" if enqueued else "", 81 error=not enqueued, 82 error_msg=None if enqueued else "No agent found to queue message." 83 ) 84 85 def _send_message(self, message:AgentMessage) -> bool: 86 r = requests.post( 87 "{}/message".format(self.MANAGEMENT_URL), 88 data=message.model_dump_json(), 89 headers={"accept" : "application/json", "content-type" : "application/json"} 90 ) 91 92 if r.status_code == 200: 93 return True 94 else: 95 logger.warning(f"Error sending message to management! {(r.text, r.headers)}") 96 return False
AGENTS_LIST =
'ums.example.example:AGENT_CLASSES'
agent_classes: List[ums.agent.agent.BasicAgent]
extract_agents: List[ums.agent.agent.ExtractAgent]
solve_agents: List[ums.agent.agent.SolveAgent]
gatekeeper_agents: List[ums.agent.agent.GatekeeperAgent]
def
new_message( self, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
46 def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse: 47 enqueued = False 48 49 if message.status.extract.required and not message.status.extract.finished: 50 # send to extract agents 51 if len(self.extract_agents) > 0: 52 data_types = set( d.type for d in message.data ) 53 for ac in self.extract_agents: 54 if ac.extract_type() in data_types: 55 background_tasks.add_task(ac, message, self._send_message) 56 enqueued = True 57 58 elif message.status.solve.required and not message.status.solve.finished: 59 # send to solve agents 60 if len(self.solve_agents) > 0: 61 for sa in self.solve_agents: 62 background_tasks.add_task(sa, message, self._send_message) 63 enqueued = True 64 65 elif message.status.validate.required and not message.status.validate.finished: 66 # send to solve agents 67 if len(self.gatekeeper_agents) > 0: 68 for ga in self.gatekeeper_agents: 69 background_tasks.add_task(ga, message, self._send_message) 70 enqueued = True 71 72 logger.debug( 73 ("Added to queue" if enqueued else "No agent found to queue message.") + 74 f"ID: {message.id} Count: {self.counts}" 75 ) 76 77 self.counts += 1 78 return AgentResponse( 79 count=self.counts-1, 80 msg="Added to queue" if enqueued else "", 81 error=not enqueued, 82 error_msg=None if enqueued else "No agent found to queue message." 83 )