ums.management.process
1# Agenten Plattform 2# 3# (c) 2024 Magnus Bender 4# Institute of Humanities-Centered Artificial Intelligence (CHAI) 5# Universitaet Hamburg 6# https://www.chai.uni-hamburg.de/~bender 7# 8# source code released under the terms of GNU Public License Version 3 9# https://www.gnu.org/licenses/gpl-3.0.txt 10 11import os, re 12from typing import List 13 14import requests 15from fastapi import BackgroundTasks 16 17from ums.management.db import DB 18from ums.utils import AgentMessage, AgentResponse, logger, RiddleData, RiddleSolution 19 20class MessageProcessor(): 21 22 SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5)) 23 MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100)) 24 25 REQUIRE_FULL_EXTRACT = os.environ.get('REQUIRE_FULL_EXTRACT', 'false').lower() == 'true' 26 REQUIRE_FULL_SOLVE = os.environ.get('REQUIRE_FULL_SOLVE', 'false').lower() == 'true' 27 28 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 29 30 AGENTS_PROCESS = tuple(map( 31 lambda s:s.strip().strip('/'), 32 os.environ.get('AGENTS_PROCESS', '').split(',') 33 )) 34 AGENTS_SOLVE = tuple(map( 35 lambda s:s.strip().strip('/'), 36 os.environ.get('AGENTS_SOLVE', '').split(',') 37 )) 38 AGENTS_GATEKEEPER = tuple(map( 39 lambda s:s.strip().strip('/'), 40 os.environ.get('AGENTS_GATEKEEPER', '').split(',') 41 )) 42 43 def __init__(self, db:DB): 44 self.db = db 45 self.management_name = self._get_name(self.MANAGEMENT_URL) 46 47 if len(self.AGENTS_PROCESS) == 0: 48 logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 49 if len(self.AGENTS_SOLVE) == 0: 50 logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 51 if len(self.AGENTS_GATEKEEPER) == 0: 52 logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!") 53 54 def _get_name(self, url:str) -> str: 55 m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url) 56 return "unknown" if m == None else m.group(1) 57 58 def new_message(self, 59 sender:str, receiver:str, message:AgentMessage, 60 background_tasks: BackgroundTasks 61 ) -> AgentResponse: 62 63 try: 64 db_count = self.db.add_message(sender, receiver, message) 65 background_tasks.add_task(self._process_message, db_count) 66 67 return AgentResponse( 68 count=db_count, 69 msg="Added message to queue" 70 ) 71 except Exception as e: 72 return AgentResponse( 73 count=-1, 74 error=True, 75 error_msg=str(e) 76 ) 77 78 def _process_message(self, count:int, ignore_processed:bool=False): 79 db_message = self.db.by_count(count) 80 81 if db_message.processed and not ignore_processed: 82 # do not process processed messages again 83 return 84 85 # now message processed! 86 self.db.set_processed(count=count, processed=True) 87 88 # increment contacts counter 89 db_message.message.contacts += 1 90 if db_message.message.contacts > self.MESSAGE_MAX_CONTACTS: 91 logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}") 92 return 93 94 # check which step/ state the message requires the management to do 95 # -> IF 96 if db_message.message.status.extract.required and not db_message.message.status.extract.finished: 97 # send to extract agents 98 self._send_messages(self.AGENTS_PROCESS, db_message.message) 99 return 100 101 # combine different extractions in data items 102 # will update items in `db_message.message.data` 103 fully_extracted = self._add_extractions(db_message.message.id, db_message.message.data) 104 if self.REQUIRE_FULL_EXTRACT and not fully_extracted: 105 logger.warning(f"Postpone message, wait for full extract of items! {db_message.message.id}, {count}") 106 return 107 108 # -> EL IF 109 if db_message.message.status.solve.required and not db_message.message.status.solve.finished: 110 # send to solve agents 111 self._send_messages(self.AGENTS_SOLVE, db_message.message) 112 return 113 114 # combine different solutions 115 # will add solutions received before to `db_message.message.solution` 116 fully_solved = self._add_solutions(db_message.message.id, db_message.message.solution, db_message.message.status.trial) 117 if self.REQUIRE_FULL_SOLVE and not fully_solved: 118 logger.warning(f"Postpone message, wait for all solutions of riddle! {db_message.message.id}, {count}") 119 return 120 121 # -> EL IF 122 if db_message.message.status.validate.required and not db_message.message.status.validate.finished: 123 # send to solve agents 124 self._send_messages(self.AGENTS_GATEKEEPER, db_message.message) 125 return 126 127 # -> ELSE 128 # all steps "done" 129 130 # validate not required? (then solved will never be set to true, thus set it here) 131 if not db_message.message.status.validate.required: 132 db_message.message.status.solved = True 133 134 if db_message.message.status.solved: 135 # yay, message is solved 136 self.db.set_solution(count=count, solution=True); 137 else: 138 # not solved, but all steps done 139 self.db.set_solution(count=count, solution=False); 140 141 # try again 142 self._do_again(db_message.message) 143 144 def _hash_solution(self, s:RiddleSolution) -> int: 145 return hash((s.solution, s.explanation, tuple((d.file_plain, d.type) for d in s.used_data))) 146 147 def _add_solutions(self, riddle_id:str, solution:List[RiddleSolution], trial:int) -> bool: 148 # do not do anything, if all solutions available 149 if len(solution) >= len(self.AGENTS_SOLVE): 150 return True 151 152 contained = set(self._hash_solution(s) for s in solution) 153 154 # search db for solutions from before 155 for row in self.db.iterate( 156 id=riddle_id, 157 limit=min(self.db.len(id=riddle_id), 250) 158 ): 159 # make sure to only use solutions from same "trial" 160 if row.message.status.trial == trial: 161 for s in row.message.solution: 162 h = self._hash_solution(s) 163 if h not in contained: 164 # add the 'new' solution 165 solution.append(s) 166 contained.add(h) 167 168 # all solutions found ? 169 if len(solution) >= len(self.AGENTS_SOLVE): 170 break 171 172 return len(solution) >= len(self.AGENTS_SOLVE) 173 174 def _hash_data(self, d:RiddleData) -> int: 175 return hash((d.file_plain, d.type, d.prompt)) 176 177 def _add_extractions(self, riddle_id:str, data:List[RiddleData]) -> bool: 178 # get all the data items without extraction 179 empty_data = {} 180 for i, d in enumerate(data): 181 if d.file_extracted is None: 182 empty_data[self._hash_data(d)] = i 183 184 # do not do anything if fully extracted 185 if len(empty_data) == 0: 186 return True 187 188 # search db for extractions already available 189 for row in self.db.iterate( 190 id=riddle_id, 191 limit=min(self.db.len(id=riddle_id), 250) 192 ): 193 # check for required extraction 194 for d in row.message.data: 195 # already extracted ? 196 # extraction file exists ? 197 # one of the items, we do not have extractions for ? 198 # the same data item ? 199 if not d.file_extracted is None \ 200 and not d.file_extracted.startswith("missing:") \ 201 and self._hash_data(d) in empty_data: 202 # copy the reference to the extracted data 203 data[empty_data[self._hash_data(d)]].file_extracted = d.file_extracted 204 # remove from items we need extracted data for 205 del empty_data[self._hash_data(d)] 206 207 # break if all extractions found 208 if len(empty_data) == 0: 209 break 210 211 return len(empty_data) == 0 # fully extracted 212 213 def _do_again(self, message:AgentMessage): 214 if message.status.trial < self.SOLUTION_MAX_TRIALS: 215 # try again, recycle message 216 217 # require steps again 218 if message.status.extract.required: 219 message.status.extract.finished = False 220 if message.status.solve.required: 221 message.status.solve.finished = False 222 if message.status.validate.required: 223 message.status.validate.finished = False 224 225 # increment trial 226 message.status.trial += 1 227 228 # append current solution(s) als old one(s) 229 if len(message.solution) > 0: 230 message.riddle.solutions_before.extend( 231 message.solution 232 ) 233 # reset current solution 234 message.solution = [] 235 236 # add the riddle as new to management 237 self._send_message(self.MANAGEMENT_URL, message) 238 239 else: 240 logger.info(f"Unsolved riddle after max number of trials: {message.id}") 241 242 def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool: 243 ok = True 244 for r in recipients: 245 ok = ok and self._send_message(r, message) 246 return ok 247 248 def _send_message(self, recipient:str, message:AgentMessage) -> bool: 249 db_count = self.db.add_message( 250 sender=self.management_name, 251 recipient=self._get_name(recipient), 252 message=message, 253 processed=False 254 ) 255 256 r = requests.post( 257 "{}/message".format(recipient), 258 data=message.model_dump_json(), 259 headers={"accept" : "application/json", "content-type" : "application/json"} 260 ) 261 262 if r.status_code == 200: 263 self.db.set_processed(db_count, processed=True) 264 return True 265 else: 266 logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}") 267 return False 268
class
MessageProcessor:
21class MessageProcessor(): 22 23 SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5)) 24 MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100)) 25 26 REQUIRE_FULL_EXTRACT = os.environ.get('REQUIRE_FULL_EXTRACT', 'false').lower() == 'true' 27 REQUIRE_FULL_SOLVE = os.environ.get('REQUIRE_FULL_SOLVE', 'false').lower() == 'true' 28 29 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 30 31 AGENTS_PROCESS = tuple(map( 32 lambda s:s.strip().strip('/'), 33 os.environ.get('AGENTS_PROCESS', '').split(',') 34 )) 35 AGENTS_SOLVE = tuple(map( 36 lambda s:s.strip().strip('/'), 37 os.environ.get('AGENTS_SOLVE', '').split(',') 38 )) 39 AGENTS_GATEKEEPER = tuple(map( 40 lambda s:s.strip().strip('/'), 41 os.environ.get('AGENTS_GATEKEEPER', '').split(',') 42 )) 43 44 def __init__(self, db:DB): 45 self.db = db 46 self.management_name = self._get_name(self.MANAGEMENT_URL) 47 48 if len(self.AGENTS_PROCESS) == 0: 49 logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 50 if len(self.AGENTS_SOLVE) == 0: 51 logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 52 if len(self.AGENTS_GATEKEEPER) == 0: 53 logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!") 54 55 def _get_name(self, url:str) -> str: 56 m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url) 57 return "unknown" if m == None else m.group(1) 58 59 def new_message(self, 60 sender:str, receiver:str, message:AgentMessage, 61 background_tasks: BackgroundTasks 62 ) -> AgentResponse: 63 64 try: 65 db_count = self.db.add_message(sender, receiver, message) 66 background_tasks.add_task(self._process_message, db_count) 67 68 return AgentResponse( 69 count=db_count, 70 msg="Added message to queue" 71 ) 72 except Exception as e: 73 return AgentResponse( 74 count=-1, 75 error=True, 76 error_msg=str(e) 77 ) 78 79 def _process_message(self, count:int, ignore_processed:bool=False): 80 db_message = self.db.by_count(count) 81 82 if db_message.processed and not ignore_processed: 83 # do not process processed messages again 84 return 85 86 # now message processed! 87 self.db.set_processed(count=count, processed=True) 88 89 # increment contacts counter 90 db_message.message.contacts += 1 91 if db_message.message.contacts > self.MESSAGE_MAX_CONTACTS: 92 logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}") 93 return 94 95 # check which step/ state the message requires the management to do 96 # -> IF 97 if db_message.message.status.extract.required and not db_message.message.status.extract.finished: 98 # send to extract agents 99 self._send_messages(self.AGENTS_PROCESS, db_message.message) 100 return 101 102 # combine different extractions in data items 103 # will update items in `db_message.message.data` 104 fully_extracted = self._add_extractions(db_message.message.id, db_message.message.data) 105 if self.REQUIRE_FULL_EXTRACT and not fully_extracted: 106 logger.warning(f"Postpone message, wait for full extract of items! {db_message.message.id}, {count}") 107 return 108 109 # -> EL IF 110 if db_message.message.status.solve.required and not db_message.message.status.solve.finished: 111 # send to solve agents 112 self._send_messages(self.AGENTS_SOLVE, db_message.message) 113 return 114 115 # combine different solutions 116 # will add solutions received before to `db_message.message.solution` 117 fully_solved = self._add_solutions(db_message.message.id, db_message.message.solution, db_message.message.status.trial) 118 if self.REQUIRE_FULL_SOLVE and not fully_solved: 119 logger.warning(f"Postpone message, wait for all solutions of riddle! {db_message.message.id}, {count}") 120 return 121 122 # -> EL IF 123 if db_message.message.status.validate.required and not db_message.message.status.validate.finished: 124 # send to solve agents 125 self._send_messages(self.AGENTS_GATEKEEPER, db_message.message) 126 return 127 128 # -> ELSE 129 # all steps "done" 130 131 # validate not required? (then solved will never be set to true, thus set it here) 132 if not db_message.message.status.validate.required: 133 db_message.message.status.solved = True 134 135 if db_message.message.status.solved: 136 # yay, message is solved 137 self.db.set_solution(count=count, solution=True); 138 else: 139 # not solved, but all steps done 140 self.db.set_solution(count=count, solution=False); 141 142 # try again 143 self._do_again(db_message.message) 144 145 def _hash_solution(self, s:RiddleSolution) -> int: 146 return hash((s.solution, s.explanation, tuple((d.file_plain, d.type) for d in s.used_data))) 147 148 def _add_solutions(self, riddle_id:str, solution:List[RiddleSolution], trial:int) -> bool: 149 # do not do anything, if all solutions available 150 if len(solution) >= len(self.AGENTS_SOLVE): 151 return True 152 153 contained = set(self._hash_solution(s) for s in solution) 154 155 # search db for solutions from before 156 for row in self.db.iterate( 157 id=riddle_id, 158 limit=min(self.db.len(id=riddle_id), 250) 159 ): 160 # make sure to only use solutions from same "trial" 161 if row.message.status.trial == trial: 162 for s in row.message.solution: 163 h = self._hash_solution(s) 164 if h not in contained: 165 # add the 'new' solution 166 solution.append(s) 167 contained.add(h) 168 169 # all solutions found ? 170 if len(solution) >= len(self.AGENTS_SOLVE): 171 break 172 173 return len(solution) >= len(self.AGENTS_SOLVE) 174 175 def _hash_data(self, d:RiddleData) -> int: 176 return hash((d.file_plain, d.type, d.prompt)) 177 178 def _add_extractions(self, riddle_id:str, data:List[RiddleData]) -> bool: 179 # get all the data items without extraction 180 empty_data = {} 181 for i, d in enumerate(data): 182 if d.file_extracted is None: 183 empty_data[self._hash_data(d)] = i 184 185 # do not do anything if fully extracted 186 if len(empty_data) == 0: 187 return True 188 189 # search db for extractions already available 190 for row in self.db.iterate( 191 id=riddle_id, 192 limit=min(self.db.len(id=riddle_id), 250) 193 ): 194 # check for required extraction 195 for d in row.message.data: 196 # already extracted ? 197 # extraction file exists ? 198 # one of the items, we do not have extractions for ? 199 # the same data item ? 200 if not d.file_extracted is None \ 201 and not d.file_extracted.startswith("missing:") \ 202 and self._hash_data(d) in empty_data: 203 # copy the reference to the extracted data 204 data[empty_data[self._hash_data(d)]].file_extracted = d.file_extracted 205 # remove from items we need extracted data for 206 del empty_data[self._hash_data(d)] 207 208 # break if all extractions found 209 if len(empty_data) == 0: 210 break 211 212 return len(empty_data) == 0 # fully extracted 213 214 def _do_again(self, message:AgentMessage): 215 if message.status.trial < self.SOLUTION_MAX_TRIALS: 216 # try again, recycle message 217 218 # require steps again 219 if message.status.extract.required: 220 message.status.extract.finished = False 221 if message.status.solve.required: 222 message.status.solve.finished = False 223 if message.status.validate.required: 224 message.status.validate.finished = False 225 226 # increment trial 227 message.status.trial += 1 228 229 # append current solution(s) als old one(s) 230 if len(message.solution) > 0: 231 message.riddle.solutions_before.extend( 232 message.solution 233 ) 234 # reset current solution 235 message.solution = [] 236 237 # add the riddle as new to management 238 self._send_message(self.MANAGEMENT_URL, message) 239 240 else: 241 logger.info(f"Unsolved riddle after max number of trials: {message.id}") 242 243 def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool: 244 ok = True 245 for r in recipients: 246 ok = ok and self._send_message(r, message) 247 return ok 248 249 def _send_message(self, recipient:str, message:AgentMessage) -> bool: 250 db_count = self.db.add_message( 251 sender=self.management_name, 252 recipient=self._get_name(recipient), 253 message=message, 254 processed=False 255 ) 256 257 r = requests.post( 258 "{}/message".format(recipient), 259 data=message.model_dump_json(), 260 headers={"accept" : "application/json", "content-type" : "application/json"} 261 ) 262 263 if r.status_code == 200: 264 self.db.set_processed(db_count, processed=True) 265 return True 266 else: 267 logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}") 268 return False
MessageProcessor(db: ums.management.db.DB)
44 def __init__(self, db:DB): 45 self.db = db 46 self.management_name = self._get_name(self.MANAGEMENT_URL) 47 48 if len(self.AGENTS_PROCESS) == 0: 49 logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 50 if len(self.AGENTS_SOLVE) == 0: 51 logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 52 if len(self.AGENTS_GATEKEEPER) == 0: 53 logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
def
new_message( self, sender: str, receiver: str, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
59 def new_message(self, 60 sender:str, receiver:str, message:AgentMessage, 61 background_tasks: BackgroundTasks 62 ) -> AgentResponse: 63 64 try: 65 db_count = self.db.add_message(sender, receiver, message) 66 background_tasks.add_task(self._process_message, db_count) 67 68 return AgentResponse( 69 count=db_count, 70 msg="Added message to queue" 71 ) 72 except Exception as e: 73 return AgentResponse( 74 count=-1, 75 error=True, 76 error_msg=str(e) 77 )