ums.management.process

  1# Agenten Plattform
  2#
  3# (c) 2024 Magnus Bender
  4# 	Institute of Humanities-Centered Artificial Intelligence (CHAI)
  5# 	Universitaet Hamburg
  6# 	https://www.chai.uni-hamburg.de/~bender
  7#  
  8# source code released under the terms of GNU Public License Version 3
  9# https://www.gnu.org/licenses/gpl-3.0.txt
 10
 11import os, re
 12from typing import List
 13
 14import requests
 15from fastapi import BackgroundTasks
 16
 17from ums.management.db import DB
 18from ums.utils import AgentMessage, AgentResponse, logger
 19
 20class MessageProcessor():
 21
 22	SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5))
 23
 24	MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
 25	
 26	AGENTS_PROCESS = tuple(map(
 27		lambda s:s.strip().strip('/'),
 28		os.environ.get('AGENTS_PROCESS', '').split(',')
 29	))
 30	AGENTS_SOLVE = tuple(map(
 31		lambda s:s.strip().strip('/'),
 32		os.environ.get('AGENTS_SOLVE', '').split(',')
 33	))
 34	AGENTS_GATEKEEPER = tuple(map(
 35		lambda s:s.strip().strip('/'),
 36		os.environ.get('AGENTS_GATEKEEPER', '').split(',')
 37	))
 38
 39	def __init__(self, db:DB):
 40		self.db = db
 41		self.management_name = self._get_name(self.MANAGEMENT_URL)
 42
 43		if len(self.AGENTS_PROCESS) == 0:
 44			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
 45		if len(self.AGENTS_SOLVE) == 0:
 46			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
 47		if len(self.AGENTS_GATEKEEPER) == 0:
 48			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
 49
 50	def _get_name(self, url:str) -> str:
 51		m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url)
 52		return "unknown" if m == None else m.group(1)
 53
 54	def new_message(self,
 55		sender:str, receiver:str, message:AgentMessage,
 56		background_tasks: BackgroundTasks
 57	) -> AgentResponse:
 58
 59		try:
 60			db_count = self.db.add_message(sender, receiver, message)
 61			background_tasks.add_task(self._process_message, db_count)
 62
 63			return AgentResponse(
 64				count=db_count,
 65				msg="Added message to queue"
 66			)
 67		except Exception as e:
 68			return AgentResponse(
 69				count=-1,
 70				error=True,
 71				error_msg=str(e)
 72			)
 73
 74	def _process_message(self, count:int, ignore_processed:bool=False):
 75		db_message = self.db.by_count(count)
 76
 77		if db_message.processed and not ignore_processed:
 78			# do not process processed messages again
 79			return
 80		
 81		# check which step/ state the message requires the management to do
 82		if db_message.message.status.extract.required and not db_message.message.status.extract.finished:
 83			# send to extract agents
 84			self._send_messages(self.AGENTS_PROCESS, db_message.message)
 85
 86		elif db_message.message.status.solve.required and not db_message.message.status.solve.finished:
 87			# send to solve agents
 88			self._send_messages(self.AGENTS_SOLVE, db_message.message)
 89			
 90		elif db_message.message.status.validate.required and not db_message.message.status.validate.finished:
 91			# send to solve agents
 92			self._send_messages(self.AGENTS_GATEKEEPER, db_message.message)
 93
 94		else: # all steps "done"
 95
 96			# validate not required? (then solved will never be set to true, thus set it here)
 97			if not db_message.message.status.validate.required:
 98				db_message.message.status.solved = True
 99
100			if db_message.message.status.solved:
101				# yay, message is solved
102				self.db.set_solution(count=count, solution=True);
103			else:
104				# not solved, but all steps done
105				self.db.set_solution(count=count, solution=False);
106		
107				# try again
108				self._do_again(db_message.message)
109
110		# now message processed!
111		self.db.set_processed(count=count, processed=True)
112
113	def _do_again(self, message:AgentMessage):
114		if message.status.trial < self.SOLUTION_MAX_TRIALS:
115			# try again, recycle message
116
117			# require steps again
118			if message.status.extract.required:
119				message.status.extract.finished = False 
120			if message.status.solve.required:
121				message.status.solve.finished = False 
122			if message.status.validate.required:
123				message.status.validate.finished = False
124
125			# increment trial
126			message.status.trial += 1
127
128			# append current solution als old one
129			if not message.solution is None:
130				message.riddle.solutions_before.append(
131					message.solution
132				)
133				# reset current solution
134				message.solution = None
135
136			# add the riddle as new to management
137			self._send_message(self.MANAGEMENT_URL, message)
138
139	def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool:
140		ok  = True
141		for r in recipients:
142			ok = ok and self._send_message(r, message)
143		return ok
144
145	def _send_message(self, recipient:str, message:AgentMessage) -> bool:
146		db_count = self.db.add_message(
147			sender=self.management_name,
148			recipient=self._get_name(recipient),
149			message=message,
150			processed=False
151		)
152
153		r = requests.post(
154			"{}/message".format(recipient),
155			data=message.model_dump_json(),
156			headers={"accept" : "application/json", "content-type" : "application/json"}
157		)
158
159		if r.status_code == 200:
160			self.db.set_processed(db_count, processed=True)
161			return True
162		else:
163			logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}")
164			return False
165			
class MessageProcessor:
 21class MessageProcessor():
 22
 23	SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5))
 24
 25	MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
 26	
 27	AGENTS_PROCESS = tuple(map(
 28		lambda s:s.strip().strip('/'),
 29		os.environ.get('AGENTS_PROCESS', '').split(',')
 30	))
 31	AGENTS_SOLVE = tuple(map(
 32		lambda s:s.strip().strip('/'),
 33		os.environ.get('AGENTS_SOLVE', '').split(',')
 34	))
 35	AGENTS_GATEKEEPER = tuple(map(
 36		lambda s:s.strip().strip('/'),
 37		os.environ.get('AGENTS_GATEKEEPER', '').split(',')
 38	))
 39
 40	def __init__(self, db:DB):
 41		self.db = db
 42		self.management_name = self._get_name(self.MANAGEMENT_URL)
 43
 44		if len(self.AGENTS_PROCESS) == 0:
 45			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
 46		if len(self.AGENTS_SOLVE) == 0:
 47			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
 48		if len(self.AGENTS_GATEKEEPER) == 0:
 49			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
 50
 51	def _get_name(self, url:str) -> str:
 52		m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url)
 53		return "unknown" if m == None else m.group(1)
 54
 55	def new_message(self,
 56		sender:str, receiver:str, message:AgentMessage,
 57		background_tasks: BackgroundTasks
 58	) -> AgentResponse:
 59
 60		try:
 61			db_count = self.db.add_message(sender, receiver, message)
 62			background_tasks.add_task(self._process_message, db_count)
 63
 64			return AgentResponse(
 65				count=db_count,
 66				msg="Added message to queue"
 67			)
 68		except Exception as e:
 69			return AgentResponse(
 70				count=-1,
 71				error=True,
 72				error_msg=str(e)
 73			)
 74
 75	def _process_message(self, count:int, ignore_processed:bool=False):
 76		db_message = self.db.by_count(count)
 77
 78		if db_message.processed and not ignore_processed:
 79			# do not process processed messages again
 80			return
 81		
 82		# check which step/ state the message requires the management to do
 83		if db_message.message.status.extract.required and not db_message.message.status.extract.finished:
 84			# send to extract agents
 85			self._send_messages(self.AGENTS_PROCESS, db_message.message)
 86
 87		elif db_message.message.status.solve.required and not db_message.message.status.solve.finished:
 88			# send to solve agents
 89			self._send_messages(self.AGENTS_SOLVE, db_message.message)
 90			
 91		elif db_message.message.status.validate.required and not db_message.message.status.validate.finished:
 92			# send to solve agents
 93			self._send_messages(self.AGENTS_GATEKEEPER, db_message.message)
 94
 95		else: # all steps "done"
 96
 97			# validate not required? (then solved will never be set to true, thus set it here)
 98			if not db_message.message.status.validate.required:
 99				db_message.message.status.solved = True
100
101			if db_message.message.status.solved:
102				# yay, message is solved
103				self.db.set_solution(count=count, solution=True);
104			else:
105				# not solved, but all steps done
106				self.db.set_solution(count=count, solution=False);
107		
108				# try again
109				self._do_again(db_message.message)
110
111		# now message processed!
112		self.db.set_processed(count=count, processed=True)
113
114	def _do_again(self, message:AgentMessage):
115		if message.status.trial < self.SOLUTION_MAX_TRIALS:
116			# try again, recycle message
117
118			# require steps again
119			if message.status.extract.required:
120				message.status.extract.finished = False 
121			if message.status.solve.required:
122				message.status.solve.finished = False 
123			if message.status.validate.required:
124				message.status.validate.finished = False
125
126			# increment trial
127			message.status.trial += 1
128
129			# append current solution als old one
130			if not message.solution is None:
131				message.riddle.solutions_before.append(
132					message.solution
133				)
134				# reset current solution
135				message.solution = None
136
137			# add the riddle as new to management
138			self._send_message(self.MANAGEMENT_URL, message)
139
140	def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool:
141		ok  = True
142		for r in recipients:
143			ok = ok and self._send_message(r, message)
144		return ok
145
146	def _send_message(self, recipient:str, message:AgentMessage) -> bool:
147		db_count = self.db.add_message(
148			sender=self.management_name,
149			recipient=self._get_name(recipient),
150			message=message,
151			processed=False
152		)
153
154		r = requests.post(
155			"{}/message".format(recipient),
156			data=message.model_dump_json(),
157			headers={"accept" : "application/json", "content-type" : "application/json"}
158		)
159
160		if r.status_code == 200:
161			self.db.set_processed(db_count, processed=True)
162			return True
163		else:
164			logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}")
165			return False
MessageProcessor(db: ums.management.db.DB)
40	def __init__(self, db:DB):
41		self.db = db
42		self.management_name = self._get_name(self.MANAGEMENT_URL)
43
44		if len(self.AGENTS_PROCESS) == 0:
45			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
46		if len(self.AGENTS_SOLVE) == 0:
47			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
48		if len(self.AGENTS_GATEKEEPER) == 0:
49			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
SOLUTION_MAX_TRIALS = 5
MANAGEMENT_URL = 'http://127.0.0.1:80'
AGENTS_PROCESS = ('',)
AGENTS_SOLVE = ('',)
AGENTS_GATEKEEPER = ('',)
db
management_name
def new_message( self, sender: str, receiver: str, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
55	def new_message(self,
56		sender:str, receiver:str, message:AgentMessage,
57		background_tasks: BackgroundTasks
58	) -> AgentResponse:
59
60		try:
61			db_count = self.db.add_message(sender, receiver, message)
62			background_tasks.add_task(self._process_message, db_count)
63
64			return AgentResponse(
65				count=db_count,
66				msg="Added message to queue"
67			)
68		except Exception as e:
69			return AgentResponse(
70				count=-1,
71				error=True,
72				error_msg=str(e)
73			)