ums.management.process

  1# Agenten Plattform
  2#
  3# (c) 2024 Magnus Bender
  4# 	Institute of Humanities-Centered Artificial Intelligence (CHAI)
  5# 	Universitaet Hamburg
  6# 	https://www.chai.uni-hamburg.de/~bender
  7#  
  8# source code released under the terms of GNU Public License Version 3
  9# https://www.gnu.org/licenses/gpl-3.0.txt
 10
 11import os, re
 12from typing import List
 13
 14import requests
 15from fastapi import BackgroundTasks
 16
 17from ums.management.db import DB
 18from ums.utils import AgentMessage, AgentResponse, logger, RiddleData
 19
 20class MessageProcessor():
 21
 22	SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5))
 23	MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100))
 24
 25	REQUIRE_FULL_EXTRACT = os.environ.get('REQUIRE_FULL_EXTRACT', 'false').lower() == 'true'
 26
 27	MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
 28	
 29	AGENTS_PROCESS = tuple(map(
 30		lambda s:s.strip().strip('/'),
 31		os.environ.get('AGENTS_PROCESS', '').split(',')
 32	))
 33	AGENTS_SOLVE = tuple(map(
 34		lambda s:s.strip().strip('/'),
 35		os.environ.get('AGENTS_SOLVE', '').split(',')
 36	))
 37	AGENTS_GATEKEEPER = tuple(map(
 38		lambda s:s.strip().strip('/'),
 39		os.environ.get('AGENTS_GATEKEEPER', '').split(',')
 40	))
 41
 42	def __init__(self, db:DB):
 43		self.db = db
 44		self.management_name = self._get_name(self.MANAGEMENT_URL)
 45
 46		if len(self.AGENTS_PROCESS) == 0:
 47			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
 48		if len(self.AGENTS_SOLVE) == 0:
 49			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
 50		if len(self.AGENTS_GATEKEEPER) == 0:
 51			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
 52
 53	def _get_name(self, url:str) -> str:
 54		m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url)
 55		return "unknown" if m == None else m.group(1)
 56
 57	def new_message(self,
 58		sender:str, receiver:str, message:AgentMessage,
 59		background_tasks: BackgroundTasks
 60	) -> AgentResponse:
 61
 62		try:
 63			db_count = self.db.add_message(sender, receiver, message)
 64			background_tasks.add_task(self._process_message, db_count)
 65
 66			return AgentResponse(
 67				count=db_count,
 68				msg="Added message to queue"
 69			)
 70		except Exception as e:
 71			return AgentResponse(
 72				count=-1,
 73				error=True,
 74				error_msg=str(e)
 75			)
 76
 77	def _process_message(self, count:int, ignore_processed:bool=False):
 78		db_message = self.db.by_count(count)
 79
 80		if db_message.processed and not ignore_processed:
 81			# do not process processed messages again
 82			return
 83		
 84		# now message processed!
 85		self.db.set_processed(count=count, processed=True)
 86		
 87		# increment contacts counter
 88		db_message.message.contacts += 1
 89		if db_message.message.contacts > self.MESSAGE_MAX_CONTACTS:
 90			logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}")
 91			return
 92		
 93		# combine different extractions in data items
 94		#	will update items in `db_message.message.data`
 95		fully_extracted = self._add_extractions(db_message.message.id, db_message.message.data)
 96		if (self.REQUIRE_FULL_EXTRACT and not fully_extracted) \
 97			and not (db_message.message.status.extract.required and not db_message.message.status.extract.finished):
 98			logger.warning(f"Postpone message, wait for full extract of items! {db_message.message.id}, {count}")
 99			return
100
101		# check which step/ state the message requires the management to do
102		if db_message.message.status.extract.required and not db_message.message.status.extract.finished:
103			# send to extract agents
104			self._send_messages(self.AGENTS_PROCESS, db_message.message)
105
106		elif db_message.message.status.solve.required and not db_message.message.status.solve.finished:
107			# send to solve agents
108			self._send_messages(self.AGENTS_SOLVE, db_message.message)
109			
110		elif db_message.message.status.validate.required and not db_message.message.status.validate.finished:
111			# send to solve agents
112			self._send_messages(self.AGENTS_GATEKEEPER, db_message.message)
113
114		else: # all steps "done"
115
116			# validate not required? (then solved will never be set to true, thus set it here)
117			if not db_message.message.status.validate.required:
118				db_message.message.status.solved = True
119
120			if db_message.message.status.solved:
121				# yay, message is solved
122				self.db.set_solution(count=count, solution=True);
123			else:
124				# not solved, but all steps done
125				self.db.set_solution(count=count, solution=False);
126		
127				# try again
128				self._do_again(db_message.message)
129
130	def _hash_data(self, d:RiddleData) -> int:
131		return hash((d.file_plain, d.type, d.prompt))
132
133	def _add_extractions(self, riddle_id:str, data:List[RiddleData]) -> bool:
134		
135		# get all the data items without extraction
136		empty_data = {}
137		for i, d in enumerate(data):
138			if d.file_extracted is None:
139				empty_data[self._hash_data(d)] = i
140
141		# search db for extractions already available
142		for row in self.db.iterate(
143			id=riddle_id,
144			limit=min(self.db.len(id=riddle_id), 250)
145		):
146			# check for required extraction
147			for d in row.message.data:
148				# already extracted ? 
149				#	extraction file exists ?
150				#	one of the items, we do not have extractions for ?
151				#	the same data item ?
152				if not d.file_extracted is None \
153					and not d.file_extracted.startswith("missing:") \
154					and self._hash_data(d) in empty_data:
155						# copy the reference to the extracted data
156						data[empty_data[self._hash_data(d)]].file_extracted = d.file_extracted
157						# remove from items we need extracted data for
158						del empty_data[self._hash_data(d)]
159
160			# break if all extractions found
161			if len(empty_data) == 0:
162				break
163
164		return len(empty_data) == 0 # fully extracted
165
166	def _do_again(self, message:AgentMessage):
167		if message.status.trial < self.SOLUTION_MAX_TRIALS:
168			# try again, recycle message
169
170			# require steps again
171			if message.status.extract.required:
172				message.status.extract.finished = False 
173			if message.status.solve.required:
174				message.status.solve.finished = False 
175			if message.status.validate.required:
176				message.status.validate.finished = False
177
178			# increment trial
179			message.status.trial += 1
180
181			# append current solution als old one
182			if not message.solution is None:
183				message.riddle.solutions_before.append(
184					message.solution
185				)
186				# reset current solution
187				message.solution = None
188
189			# add the riddle as new to management
190			self._send_message(self.MANAGEMENT_URL, message)
191
192		else:
193			logger.info(f"Unsolved riddle after max number of trials: {message.id}")
194
195	def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool:
196		ok  = True
197		for r in recipients:
198			ok = ok and self._send_message(r, message)
199		return ok
200
201	def _send_message(self, recipient:str, message:AgentMessage) -> bool:
202		db_count = self.db.add_message(
203			sender=self.management_name,
204			recipient=self._get_name(recipient),
205			message=message,
206			processed=False
207		)
208
209		r = requests.post(
210			"{}/message".format(recipient),
211			data=message.model_dump_json(),
212			headers={"accept" : "application/json", "content-type" : "application/json"}
213		)
214
215		if r.status_code == 200:
216			self.db.set_processed(db_count, processed=True)
217			return True
218		else:
219			logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}")
220			return False
221			
class MessageProcessor:
 21class MessageProcessor():
 22
 23	SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5))
 24	MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100))
 25
 26	REQUIRE_FULL_EXTRACT = os.environ.get('REQUIRE_FULL_EXTRACT', 'false').lower() == 'true'
 27
 28	MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
 29	
 30	AGENTS_PROCESS = tuple(map(
 31		lambda s:s.strip().strip('/'),
 32		os.environ.get('AGENTS_PROCESS', '').split(',')
 33	))
 34	AGENTS_SOLVE = tuple(map(
 35		lambda s:s.strip().strip('/'),
 36		os.environ.get('AGENTS_SOLVE', '').split(',')
 37	))
 38	AGENTS_GATEKEEPER = tuple(map(
 39		lambda s:s.strip().strip('/'),
 40		os.environ.get('AGENTS_GATEKEEPER', '').split(',')
 41	))
 42
 43	def __init__(self, db:DB):
 44		self.db = db
 45		self.management_name = self._get_name(self.MANAGEMENT_URL)
 46
 47		if len(self.AGENTS_PROCESS) == 0:
 48			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
 49		if len(self.AGENTS_SOLVE) == 0:
 50			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
 51		if len(self.AGENTS_GATEKEEPER) == 0:
 52			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
 53
 54	def _get_name(self, url:str) -> str:
 55		m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url)
 56		return "unknown" if m == None else m.group(1)
 57
 58	def new_message(self,
 59		sender:str, receiver:str, message:AgentMessage,
 60		background_tasks: BackgroundTasks
 61	) -> AgentResponse:
 62
 63		try:
 64			db_count = self.db.add_message(sender, receiver, message)
 65			background_tasks.add_task(self._process_message, db_count)
 66
 67			return AgentResponse(
 68				count=db_count,
 69				msg="Added message to queue"
 70			)
 71		except Exception as e:
 72			return AgentResponse(
 73				count=-1,
 74				error=True,
 75				error_msg=str(e)
 76			)
 77
 78	def _process_message(self, count:int, ignore_processed:bool=False):
 79		db_message = self.db.by_count(count)
 80
 81		if db_message.processed and not ignore_processed:
 82			# do not process processed messages again
 83			return
 84		
 85		# now message processed!
 86		self.db.set_processed(count=count, processed=True)
 87		
 88		# increment contacts counter
 89		db_message.message.contacts += 1
 90		if db_message.message.contacts > self.MESSAGE_MAX_CONTACTS:
 91			logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}")
 92			return
 93		
 94		# combine different extractions in data items
 95		#	will update items in `db_message.message.data`
 96		fully_extracted = self._add_extractions(db_message.message.id, db_message.message.data)
 97		if (self.REQUIRE_FULL_EXTRACT and not fully_extracted) \
 98			and not (db_message.message.status.extract.required and not db_message.message.status.extract.finished):
 99			logger.warning(f"Postpone message, wait for full extract of items! {db_message.message.id}, {count}")
100			return
101
102		# check which step/ state the message requires the management to do
103		if db_message.message.status.extract.required and not db_message.message.status.extract.finished:
104			# send to extract agents
105			self._send_messages(self.AGENTS_PROCESS, db_message.message)
106
107		elif db_message.message.status.solve.required and not db_message.message.status.solve.finished:
108			# send to solve agents
109			self._send_messages(self.AGENTS_SOLVE, db_message.message)
110			
111		elif db_message.message.status.validate.required and not db_message.message.status.validate.finished:
112			# send to solve agents
113			self._send_messages(self.AGENTS_GATEKEEPER, db_message.message)
114
115		else: # all steps "done"
116
117			# validate not required? (then solved will never be set to true, thus set it here)
118			if not db_message.message.status.validate.required:
119				db_message.message.status.solved = True
120
121			if db_message.message.status.solved:
122				# yay, message is solved
123				self.db.set_solution(count=count, solution=True);
124			else:
125				# not solved, but all steps done
126				self.db.set_solution(count=count, solution=False);
127		
128				# try again
129				self._do_again(db_message.message)
130
131	def _hash_data(self, d:RiddleData) -> int:
132		return hash((d.file_plain, d.type, d.prompt))
133
134	def _add_extractions(self, riddle_id:str, data:List[RiddleData]) -> bool:
135		
136		# get all the data items without extraction
137		empty_data = {}
138		for i, d in enumerate(data):
139			if d.file_extracted is None:
140				empty_data[self._hash_data(d)] = i
141
142		# search db for extractions already available
143		for row in self.db.iterate(
144			id=riddle_id,
145			limit=min(self.db.len(id=riddle_id), 250)
146		):
147			# check for required extraction
148			for d in row.message.data:
149				# already extracted ? 
150				#	extraction file exists ?
151				#	one of the items, we do not have extractions for ?
152				#	the same data item ?
153				if not d.file_extracted is None \
154					and not d.file_extracted.startswith("missing:") \
155					and self._hash_data(d) in empty_data:
156						# copy the reference to the extracted data
157						data[empty_data[self._hash_data(d)]].file_extracted = d.file_extracted
158						# remove from items we need extracted data for
159						del empty_data[self._hash_data(d)]
160
161			# break if all extractions found
162			if len(empty_data) == 0:
163				break
164
165		return len(empty_data) == 0 # fully extracted
166
167	def _do_again(self, message:AgentMessage):
168		if message.status.trial < self.SOLUTION_MAX_TRIALS:
169			# try again, recycle message
170
171			# require steps again
172			if message.status.extract.required:
173				message.status.extract.finished = False 
174			if message.status.solve.required:
175				message.status.solve.finished = False 
176			if message.status.validate.required:
177				message.status.validate.finished = False
178
179			# increment trial
180			message.status.trial += 1
181
182			# append current solution als old one
183			if not message.solution is None:
184				message.riddle.solutions_before.append(
185					message.solution
186				)
187				# reset current solution
188				message.solution = None
189
190			# add the riddle as new to management
191			self._send_message(self.MANAGEMENT_URL, message)
192
193		else:
194			logger.info(f"Unsolved riddle after max number of trials: {message.id}")
195
196	def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool:
197		ok  = True
198		for r in recipients:
199			ok = ok and self._send_message(r, message)
200		return ok
201
202	def _send_message(self, recipient:str, message:AgentMessage) -> bool:
203		db_count = self.db.add_message(
204			sender=self.management_name,
205			recipient=self._get_name(recipient),
206			message=message,
207			processed=False
208		)
209
210		r = requests.post(
211			"{}/message".format(recipient),
212			data=message.model_dump_json(),
213			headers={"accept" : "application/json", "content-type" : "application/json"}
214		)
215
216		if r.status_code == 200:
217			self.db.set_processed(db_count, processed=True)
218			return True
219		else:
220			logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}")
221			return False
MessageProcessor(db: ums.management.db.DB)
43	def __init__(self, db:DB):
44		self.db = db
45		self.management_name = self._get_name(self.MANAGEMENT_URL)
46
47		if len(self.AGENTS_PROCESS) == 0:
48			logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
49		if len(self.AGENTS_SOLVE) == 0:
50			logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
51		if len(self.AGENTS_GATEKEEPER) == 0:
52			logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
SOLUTION_MAX_TRIALS = 5
MESSAGE_MAX_CONTACTS = 100
REQUIRE_FULL_EXTRACT = False
MANAGEMENT_URL = 'http://127.0.0.1:80'
AGENTS_PROCESS = ('',)
AGENTS_SOLVE = ('',)
AGENTS_GATEKEEPER = ('',)
db
management_name
def new_message( self, sender: str, receiver: str, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
58	def new_message(self,
59		sender:str, receiver:str, message:AgentMessage,
60		background_tasks: BackgroundTasks
61	) -> AgentResponse:
62
63		try:
64			db_count = self.db.add_message(sender, receiver, message)
65			background_tasks.add_task(self._process_message, db_count)
66
67			return AgentResponse(
68				count=db_count,
69				msg="Added message to queue"
70			)
71		except Exception as e:
72			return AgentResponse(
73				count=-1,
74				error=True,
75				error_msg=str(e)
76			)