ums.management.process

  1# Agenten Plattform
  2#
  3# (c) 2024 Magnus Bender
  4# 	Institute of Humanities-Centered Artificial Intelligence (CHAI)
  5# 	Universitaet Hamburg
  6# 	https://www.chai.uni-hamburg.de/~bender
  7#  
  8# source code released under the terms of GNU Public License Version 3
  9# https://www.gnu.org/licenses/gpl-3.0.txt
 10
 11import os, re
 12from typing import List
 13
 14import requests
 15from fastapi import BackgroundTasks
 16
 17from ums.management.db import DB
 18from ums.utils import AgentMessage, AgentResponse, logger
 19
 20class MessageProcessor():
 21
 22	SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5))
 23	MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100))
 24
 25	MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
 26	
 27	AGENTS_PROCESS = tuple(map(
 28		lambda s:s.strip().strip('/'),
 29		os.environ.get('AGENTS_PROCESS', '').split(',')
 30	))
 31	AGENTS_SOLVE = tuple(map(
 32		lambda s:s.strip().strip('/'),
 33		os.environ.get('AGENTS_SOLVE', '').split(',')
 34	))
 35	AGENTS_GATEKEEPER = tuple(map(
 36		lambda s:s.strip().strip('/'),
 37		os.environ.get('AGENTS_GATEKEEPER', '').split(',')
 38	))
 39
 40	def __init__(self, db:DB):
 41		self.db = db
 42		self.management_name = self._get_name(self.MANAGEMENT_URL)
 43
 44		if len(self.AGENTS_PROCESS) == 0:
 45			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
 46		if len(self.AGENTS_SOLVE) == 0:
 47			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
 48		if len(self.AGENTS_GATEKEEPER) == 0:
 49			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
 50
 51	def _get_name(self, url:str) -> str:
 52		m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url)
 53		return "unknown" if m == None else m.group(1)
 54
 55	def new_message(self,
 56		sender:str, receiver:str, message:AgentMessage,
 57		background_tasks: BackgroundTasks
 58	) -> AgentResponse:
 59
 60		try:
 61			db_count = self.db.add_message(sender, receiver, message)
 62			background_tasks.add_task(self._process_message, db_count)
 63
 64			return AgentResponse(
 65				count=db_count,
 66				msg="Added message to queue"
 67			)
 68		except Exception as e:
 69			return AgentResponse(
 70				count=-1,
 71				error=True,
 72				error_msg=str(e)
 73			)
 74
 75	def _process_message(self, count:int, ignore_processed:bool=False):
 76		db_message = self.db.by_count(count)
 77
 78		if db_message.processed and not ignore_processed:
 79			# do not process processed messages again
 80			return
 81		
 82		# increment contacts counter
 83		db_message.message.contacts += 1
 84		if db_message.message.contacts > self.MESSAGE_MAX_CONTACTS:
 85			logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}")
 86			return
 87		
 88		# check which step/ state the message requires the management to do
 89		if db_message.message.status.extract.required and not db_message.message.status.extract.finished:
 90			# send to extract agents
 91			self._send_messages(self.AGENTS_PROCESS, db_message.message)
 92
 93		elif db_message.message.status.solve.required and not db_message.message.status.solve.finished:
 94			# send to solve agents
 95			self._send_messages(self.AGENTS_SOLVE, db_message.message)
 96			
 97		elif db_message.message.status.validate.required and not db_message.message.status.validate.finished:
 98			# send to solve agents
 99			self._send_messages(self.AGENTS_GATEKEEPER, db_message.message)
100
101		else: # all steps "done"
102
103			# validate not required? (then solved will never be set to true, thus set it here)
104			if not db_message.message.status.validate.required:
105				db_message.message.status.solved = True
106
107			if db_message.message.status.solved:
108				# yay, message is solved
109				self.db.set_solution(count=count, solution=True);
110			else:
111				# not solved, but all steps done
112				self.db.set_solution(count=count, solution=False);
113		
114				# try again
115				self._do_again(db_message.message)
116
117		# now message processed!
118		self.db.set_processed(count=count, processed=True)
119
120	def _do_again(self, message:AgentMessage):
121		if message.status.trial < self.SOLUTION_MAX_TRIALS:
122			# try again, recycle message
123
124			# require steps again
125			if message.status.extract.required:
126				message.status.extract.finished = False 
127			if message.status.solve.required:
128				message.status.solve.finished = False 
129			if message.status.validate.required:
130				message.status.validate.finished = False
131
132			# increment trial
133			message.status.trial += 1
134
135			# append current solution als old one
136			if not message.solution is None:
137				message.riddle.solutions_before.append(
138					message.solution
139				)
140				# reset current solution
141				message.solution = None
142
143			# add the riddle as new to management
144			self._send_message(self.MANAGEMENT_URL, message)
145
146		else:
147			logger.info(f"Unsolved riddle after max number of trials: {message.id}")
148
149	def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool:
150		ok  = True
151		for r in recipients:
152			ok = ok and self._send_message(r, message)
153		return ok
154
155	def _send_message(self, recipient:str, message:AgentMessage) -> bool:
156		db_count = self.db.add_message(
157			sender=self.management_name,
158			recipient=self._get_name(recipient),
159			message=message,
160			processed=False
161		)
162
163		r = requests.post(
164			"{}/message".format(recipient),
165			data=message.model_dump_json(),
166			headers={"accept" : "application/json", "content-type" : "application/json"}
167		)
168
169		if r.status_code == 200:
170			self.db.set_processed(db_count, processed=True)
171			return True
172		else:
173			logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}")
174			return False
175			
class MessageProcessor:
 21class MessageProcessor():
 22
 23	SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5))
 24	MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100))
 25
 26	MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
 27	
 28	AGENTS_PROCESS = tuple(map(
 29		lambda s:s.strip().strip('/'),
 30		os.environ.get('AGENTS_PROCESS', '').split(',')
 31	))
 32	AGENTS_SOLVE = tuple(map(
 33		lambda s:s.strip().strip('/'),
 34		os.environ.get('AGENTS_SOLVE', '').split(',')
 35	))
 36	AGENTS_GATEKEEPER = tuple(map(
 37		lambda s:s.strip().strip('/'),
 38		os.environ.get('AGENTS_GATEKEEPER', '').split(',')
 39	))
 40
 41	def __init__(self, db:DB):
 42		self.db = db
 43		self.management_name = self._get_name(self.MANAGEMENT_URL)
 44
 45		if len(self.AGENTS_PROCESS) == 0:
 46			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
 47		if len(self.AGENTS_SOLVE) == 0:
 48			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
 49		if len(self.AGENTS_GATEKEEPER) == 0:
 50			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
 51
 52	def _get_name(self, url:str) -> str:
 53		m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url)
 54		return "unknown" if m == None else m.group(1)
 55
 56	def new_message(self,
 57		sender:str, receiver:str, message:AgentMessage,
 58		background_tasks: BackgroundTasks
 59	) -> AgentResponse:
 60
 61		try:
 62			db_count = self.db.add_message(sender, receiver, message)
 63			background_tasks.add_task(self._process_message, db_count)
 64
 65			return AgentResponse(
 66				count=db_count,
 67				msg="Added message to queue"
 68			)
 69		except Exception as e:
 70			return AgentResponse(
 71				count=-1,
 72				error=True,
 73				error_msg=str(e)
 74			)
 75
 76	def _process_message(self, count:int, ignore_processed:bool=False):
 77		db_message = self.db.by_count(count)
 78
 79		if db_message.processed and not ignore_processed:
 80			# do not process processed messages again
 81			return
 82		
 83		# increment contacts counter
 84		db_message.message.contacts += 1
 85		if db_message.message.contacts > self.MESSAGE_MAX_CONTACTS:
 86			logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}")
 87			return
 88		
 89		# check which step/ state the message requires the management to do
 90		if db_message.message.status.extract.required and not db_message.message.status.extract.finished:
 91			# send to extract agents
 92			self._send_messages(self.AGENTS_PROCESS, db_message.message)
 93
 94		elif db_message.message.status.solve.required and not db_message.message.status.solve.finished:
 95			# send to solve agents
 96			self._send_messages(self.AGENTS_SOLVE, db_message.message)
 97			
 98		elif db_message.message.status.validate.required and not db_message.message.status.validate.finished:
 99			# send to solve agents
100			self._send_messages(self.AGENTS_GATEKEEPER, db_message.message)
101
102		else: # all steps "done"
103
104			# validate not required? (then solved will never be set to true, thus set it here)
105			if not db_message.message.status.validate.required:
106				db_message.message.status.solved = True
107
108			if db_message.message.status.solved:
109				# yay, message is solved
110				self.db.set_solution(count=count, solution=True);
111			else:
112				# not solved, but all steps done
113				self.db.set_solution(count=count, solution=False);
114		
115				# try again
116				self._do_again(db_message.message)
117
118		# now message processed!
119		self.db.set_processed(count=count, processed=True)
120
121	def _do_again(self, message:AgentMessage):
122		if message.status.trial < self.SOLUTION_MAX_TRIALS:
123			# try again, recycle message
124
125			# require steps again
126			if message.status.extract.required:
127				message.status.extract.finished = False 
128			if message.status.solve.required:
129				message.status.solve.finished = False 
130			if message.status.validate.required:
131				message.status.validate.finished = False
132
133			# increment trial
134			message.status.trial += 1
135
136			# append current solution als old one
137			if not message.solution is None:
138				message.riddle.solutions_before.append(
139					message.solution
140				)
141				# reset current solution
142				message.solution = None
143
144			# add the riddle as new to management
145			self._send_message(self.MANAGEMENT_URL, message)
146
147		else:
148			logger.info(f"Unsolved riddle after max number of trials: {message.id}")
149
150	def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool:
151		ok  = True
152		for r in recipients:
153			ok = ok and self._send_message(r, message)
154		return ok
155
156	def _send_message(self, recipient:str, message:AgentMessage) -> bool:
157		db_count = self.db.add_message(
158			sender=self.management_name,
159			recipient=self._get_name(recipient),
160			message=message,
161			processed=False
162		)
163
164		r = requests.post(
165			"{}/message".format(recipient),
166			data=message.model_dump_json(),
167			headers={"accept" : "application/json", "content-type" : "application/json"}
168		)
169
170		if r.status_code == 200:
171			self.db.set_processed(db_count, processed=True)
172			return True
173		else:
174			logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}")
175			return False
MessageProcessor(db: ums.management.db.DB)
41	def __init__(self, db:DB):
42		self.db = db
43		self.management_name = self._get_name(self.MANAGEMENT_URL)
44
45		if len(self.AGENTS_PROCESS) == 0:
46			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
47		if len(self.AGENTS_SOLVE) == 0:
48			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
49		if len(self.AGENTS_GATEKEEPER) == 0:
50			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
SOLUTION_MAX_TRIALS = 5
MESSAGE_MAX_CONTACTS = 100
MANAGEMENT_URL = 'http://127.0.0.1:80'
AGENTS_PROCESS = ('',)
AGENTS_SOLVE = ('',)
AGENTS_GATEKEEPER = ('',)
db
management_name
def new_message( self, sender: str, receiver: str, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
56	def new_message(self,
57		sender:str, receiver:str, message:AgentMessage,
58		background_tasks: BackgroundTasks
59	) -> AgentResponse:
60
61		try:
62			db_count = self.db.add_message(sender, receiver, message)
63			background_tasks.add_task(self._process_message, db_count)
64
65			return AgentResponse(
66				count=db_count,
67				msg="Added message to queue"
68			)
69		except Exception as e:
70			return AgentResponse(
71				count=-1,
72				error=True,
73				error_msg=str(e)
74			)