ums.agent.process

 1# Agenten Plattform
 2#
 3# (c) 2024 Magnus Bender
 4# 	Institute of Humanities-Centered Artificial Intelligence (CHAI)
 5# 	Universitaet Hamburg
 6# 	https://www.chai.uni-hamburg.de/~bender
 7#  
 8# source code released under the terms of GNU Public License Version 3
 9# https://www.gnu.org/licenses/gpl-3.0.txt
10
11import os, importlib
12from typing import List
13
14import requests
15from fastapi import BackgroundTasks
16
17from ums.agent.agent import BasicAgent, AgentCapability, ExtractAgent, SolveAgent, GatekeeperAgent
18from ums.utils import AgentMessage, AgentResponse, logger
19
20class MessageProcessor():
21
22	MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
23	AGENTS_LIST = os.environ.get('AGENTS_LIST', 'ums.example.example:AGENT_CLASSES').strip()
24
25	def __init__(self):
26		self.counts = 0
27
28		module_name, var_name = self.AGENTS_LIST.split(':')
29		agents_module = importlib.import_module(module_name)
30
31		self.agent_classes:List[BasicAgent] = getattr(agents_module, var_name)
32		self.extract_agents:List[ExtractAgent] = list(filter(
33			lambda ac: ac.agent_capability() == AgentCapability.EXTRACT,
34			self.agent_classes
35		))
36		self.solve_agents:List[SolveAgent] = list(filter(
37			lambda ac: ac.agent_capability() == AgentCapability.SOLVE,
38			self.agent_classes
39		))
40		self.gatekeeper_agents:List[GatekeeperAgent] = list(filter(
41			lambda ac: ac.agent_capability() == AgentCapability.GATEKEEPER,
42			self.agent_classes
43		))
44
45	def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse:
46		enqueued = False
47
48		if message.status.extract.required and not message.status.extract.finished:
49			# send to extract agents
50			if len(self.extract_agents) > 0:
51				data_types = set( d.type for d in message.data )
52				for ac in self.extract_agents:
53					if ac.extract_type() in data_types: 
54						background_tasks.add_task(ac, message, self._send_message)
55						enqueued = True
56
57		elif message.status.solve.required and not message.status.solve.finished:
58			# send to solve agents
59			if len(self.solve_agents) > 0:
60				for sa in self.solve_agents:
61					background_tasks.add_task(sa, message, self._send_message)
62					enqueued = True
63			
64		elif message.status.validate.required and not message.status.validate.finished:
65			# send to solve agents
66			if len(self.gatekeeper_agents) > 0:
67				for ga in self.gatekeeper_agents:
68					background_tasks.add_task(ga, message, self._send_message)
69					enqueued = True
70
71		logger.debug(
72			("Added to queue" if enqueued else "No agent found to queue message.") +
73			f"ID: {message.id} Count: {self.counts}"
74		)
75
76		self.counts += 1
77		return AgentResponse(
78			count=self.counts-1,
79			msg="Added to queue" if enqueued else "",
80			error=not enqueued,
81			error_msg=None if enqueued else "No agent found to queue message."
82		)
83
84	def _send_message(self, message:AgentMessage) -> bool:
85		r = requests.post(
86			"{}/message".format(self.MANAGEMENT_URL),
87			data=message.model_dump_json(),
88			headers={"accept" : "application/json", "content-type" : "application/json"}
89		)
90
91		if r.status_code == 200:
92			return True
93		else:
94			logger.warning(f"Error sending message to management! {(r.text, r.headers)}")
95			return False
class MessageProcessor:
21class MessageProcessor():
22
23	MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
24	AGENTS_LIST = os.environ.get('AGENTS_LIST', 'ums.example.example:AGENT_CLASSES').strip()
25
26	def __init__(self):
27		self.counts = 0
28
29		module_name, var_name = self.AGENTS_LIST.split(':')
30		agents_module = importlib.import_module(module_name)
31
32		self.agent_classes:List[BasicAgent] = getattr(agents_module, var_name)
33		self.extract_agents:List[ExtractAgent] = list(filter(
34			lambda ac: ac.agent_capability() == AgentCapability.EXTRACT,
35			self.agent_classes
36		))
37		self.solve_agents:List[SolveAgent] = list(filter(
38			lambda ac: ac.agent_capability() == AgentCapability.SOLVE,
39			self.agent_classes
40		))
41		self.gatekeeper_agents:List[GatekeeperAgent] = list(filter(
42			lambda ac: ac.agent_capability() == AgentCapability.GATEKEEPER,
43			self.agent_classes
44		))
45
46	def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse:
47		enqueued = False
48
49		if message.status.extract.required and not message.status.extract.finished:
50			# send to extract agents
51			if len(self.extract_agents) > 0:
52				data_types = set( d.type for d in message.data )
53				for ac in self.extract_agents:
54					if ac.extract_type() in data_types: 
55						background_tasks.add_task(ac, message, self._send_message)
56						enqueued = True
57
58		elif message.status.solve.required and not message.status.solve.finished:
59			# send to solve agents
60			if len(self.solve_agents) > 0:
61				for sa in self.solve_agents:
62					background_tasks.add_task(sa, message, self._send_message)
63					enqueued = True
64			
65		elif message.status.validate.required and not message.status.validate.finished:
66			# send to solve agents
67			if len(self.gatekeeper_agents) > 0:
68				for ga in self.gatekeeper_agents:
69					background_tasks.add_task(ga, message, self._send_message)
70					enqueued = True
71
72		logger.debug(
73			("Added to queue" if enqueued else "No agent found to queue message.") +
74			f"ID: {message.id} Count: {self.counts}"
75		)
76
77		self.counts += 1
78		return AgentResponse(
79			count=self.counts-1,
80			msg="Added to queue" if enqueued else "",
81			error=not enqueued,
82			error_msg=None if enqueued else "No agent found to queue message."
83		)
84
85	def _send_message(self, message:AgentMessage) -> bool:
86		r = requests.post(
87			"{}/message".format(self.MANAGEMENT_URL),
88			data=message.model_dump_json(),
89			headers={"accept" : "application/json", "content-type" : "application/json"}
90		)
91
92		if r.status_code == 200:
93			return True
94		else:
95			logger.warning(f"Error sending message to management! {(r.text, r.headers)}")
96			return False
MANAGEMENT_URL = 'http://127.0.0.1:80'
AGENTS_LIST = 'ums.example.example:AGENT_CLASSES'
counts
agent_classes: List[ums.agent.agent.BasicAgent]
extract_agents: List[ums.agent.agent.ExtractAgent]
solve_agents: List[ums.agent.agent.SolveAgent]
gatekeeper_agents: List[ums.agent.agent.GatekeeperAgent]
def new_message( self, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
46	def new_message(self, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse:
47		enqueued = False
48
49		if message.status.extract.required and not message.status.extract.finished:
50			# send to extract agents
51			if len(self.extract_agents) > 0:
52				data_types = set( d.type for d in message.data )
53				for ac in self.extract_agents:
54					if ac.extract_type() in data_types: 
55						background_tasks.add_task(ac, message, self._send_message)
56						enqueued = True
57
58		elif message.status.solve.required and not message.status.solve.finished:
59			# send to solve agents
60			if len(self.solve_agents) > 0:
61				for sa in self.solve_agents:
62					background_tasks.add_task(sa, message, self._send_message)
63					enqueued = True
64			
65		elif message.status.validate.required and not message.status.validate.finished:
66			# send to solve agents
67			if len(self.gatekeeper_agents) > 0:
68				for ga in self.gatekeeper_agents:
69					background_tasks.add_task(ga, message, self._send_message)
70					enqueued = True
71
72		logger.debug(
73			("Added to queue" if enqueued else "No agent found to queue message.") +
74			f"ID: {message.id} Count: {self.counts}"
75		)
76
77		self.counts += 1
78		return AgentResponse(
79			count=self.counts-1,
80			msg="Added to queue" if enqueued else "",
81			error=not enqueued,
82			error_msg=None if enqueued else "No agent found to queue message."
83		)