ums.management.process
1# Agenten Plattform 2# 3# (c) 2024 Magnus Bender 4# Institute of Humanities-Centered Artificial Intelligence (CHAI) 5# Universitaet Hamburg 6# https://www.chai.uni-hamburg.de/~bender 7# 8# source code released under the terms of GNU Public License Version 3 9# https://www.gnu.org/licenses/gpl-3.0.txt 10 11import os, re 12from typing import List 13 14import requests 15from fastapi import BackgroundTasks 16 17from ums.management.db import DB 18from ums.utils import AgentMessage, AgentResponse, logger 19 20class MessageProcessor(): 21 22 SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5)) 23 MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100)) 24 25 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 26 27 AGENTS_PROCESS = tuple(map( 28 lambda s:s.strip().strip('/'), 29 os.environ.get('AGENTS_PROCESS', '').split(',') 30 )) 31 AGENTS_SOLVE = tuple(map( 32 lambda s:s.strip().strip('/'), 33 os.environ.get('AGENTS_SOLVE', '').split(',') 34 )) 35 AGENTS_GATEKEEPER = tuple(map( 36 lambda s:s.strip().strip('/'), 37 os.environ.get('AGENTS_GATEKEEPER', '').split(',') 38 )) 39 40 def __init__(self, db:DB): 41 self.db = db 42 self.management_name = self._get_name(self.MANAGEMENT_URL) 43 44 if len(self.AGENTS_PROCESS) == 0: 45 logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 46 if len(self.AGENTS_SOLVE) == 0: 47 logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 48 if len(self.AGENTS_GATEKEEPER) == 0: 49 logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!") 50 51 def _get_name(self, url:str) -> str: 52 m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url) 53 return "unknown" if m == None else m.group(1) 54 55 def new_message(self, 56 sender:str, receiver:str, message:AgentMessage, 57 background_tasks: BackgroundTasks 58 ) -> AgentResponse: 59 60 try: 61 db_count = self.db.add_message(sender, receiver, message) 62 background_tasks.add_task(self._process_message, db_count) 63 64 return AgentResponse( 65 count=db_count, 66 msg="Added message to queue" 67 ) 68 except Exception as e: 69 return AgentResponse( 70 count=-1, 71 error=True, 72 error_msg=str(e) 73 ) 74 75 def _process_message(self, count:int, ignore_processed:bool=False): 76 db_message = self.db.by_count(count) 77 78 if db_message.processed and not ignore_processed: 79 # do not process processed messages again 80 return 81 82 # increment contacts counter 83 db_message.message.contacts += 1 84 if db_message.message.contacts > self.MESSAGE_MAX_CONTACTS: 85 logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}") 86 return 87 88 # check which step/ state the message requires the management to do 89 if db_message.message.status.extract.required and not db_message.message.status.extract.finished: 90 # send to extract agents 91 self._send_messages(self.AGENTS_PROCESS, db_message.message) 92 93 elif db_message.message.status.solve.required and not db_message.message.status.solve.finished: 94 # send to solve agents 95 self._send_messages(self.AGENTS_SOLVE, db_message.message) 96 97 elif db_message.message.status.validate.required and not db_message.message.status.validate.finished: 98 # send to solve agents 99 self._send_messages(self.AGENTS_GATEKEEPER, db_message.message) 100 101 else: # all steps "done" 102 103 # validate not required? (then solved will never be set to true, thus set it here) 104 if not db_message.message.status.validate.required: 105 db_message.message.status.solved = True 106 107 if db_message.message.status.solved: 108 # yay, message is solved 109 self.db.set_solution(count=count, solution=True); 110 else: 111 # not solved, but all steps done 112 self.db.set_solution(count=count, solution=False); 113 114 # try again 115 self._do_again(db_message.message) 116 117 # now message processed! 118 self.db.set_processed(count=count, processed=True) 119 120 def _do_again(self, message:AgentMessage): 121 if message.status.trial < self.SOLUTION_MAX_TRIALS: 122 # try again, recycle message 123 124 # require steps again 125 if message.status.extract.required: 126 message.status.extract.finished = False 127 if message.status.solve.required: 128 message.status.solve.finished = False 129 if message.status.validate.required: 130 message.status.validate.finished = False 131 132 # increment trial 133 message.status.trial += 1 134 135 # append current solution als old one 136 if not message.solution is None: 137 message.riddle.solutions_before.append( 138 message.solution 139 ) 140 # reset current solution 141 message.solution = None 142 143 # add the riddle as new to management 144 self._send_message(self.MANAGEMENT_URL, message) 145 146 else: 147 logger.info(f"Unsolved riddle after max number of trials: {message.id}") 148 149 def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool: 150 ok = True 151 for r in recipients: 152 ok = ok and self._send_message(r, message) 153 return ok 154 155 def _send_message(self, recipient:str, message:AgentMessage) -> bool: 156 db_count = self.db.add_message( 157 sender=self.management_name, 158 recipient=self._get_name(recipient), 159 message=message, 160 processed=False 161 ) 162 163 r = requests.post( 164 "{}/message".format(recipient), 165 data=message.model_dump_json(), 166 headers={"accept" : "application/json", "content-type" : "application/json"} 167 ) 168 169 if r.status_code == 200: 170 self.db.set_processed(db_count, processed=True) 171 return True 172 else: 173 logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}") 174 return False 175
class
MessageProcessor:
21class MessageProcessor(): 22 23 SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5)) 24 MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100)) 25 26 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 27 28 AGENTS_PROCESS = tuple(map( 29 lambda s:s.strip().strip('/'), 30 os.environ.get('AGENTS_PROCESS', '').split(',') 31 )) 32 AGENTS_SOLVE = tuple(map( 33 lambda s:s.strip().strip('/'), 34 os.environ.get('AGENTS_SOLVE', '').split(',') 35 )) 36 AGENTS_GATEKEEPER = tuple(map( 37 lambda s:s.strip().strip('/'), 38 os.environ.get('AGENTS_GATEKEEPER', '').split(',') 39 )) 40 41 def __init__(self, db:DB): 42 self.db = db 43 self.management_name = self._get_name(self.MANAGEMENT_URL) 44 45 if len(self.AGENTS_PROCESS) == 0: 46 logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 47 if len(self.AGENTS_SOLVE) == 0: 48 logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 49 if len(self.AGENTS_GATEKEEPER) == 0: 50 logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!") 51 52 def _get_name(self, url:str) -> str: 53 m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url) 54 return "unknown" if m == None else m.group(1) 55 56 def new_message(self, 57 sender:str, receiver:str, message:AgentMessage, 58 background_tasks: BackgroundTasks 59 ) -> AgentResponse: 60 61 try: 62 db_count = self.db.add_message(sender, receiver, message) 63 background_tasks.add_task(self._process_message, db_count) 64 65 return AgentResponse( 66 count=db_count, 67 msg="Added message to queue" 68 ) 69 except Exception as e: 70 return AgentResponse( 71 count=-1, 72 error=True, 73 error_msg=str(e) 74 ) 75 76 def _process_message(self, count:int, ignore_processed:bool=False): 77 db_message = self.db.by_count(count) 78 79 if db_message.processed and not ignore_processed: 80 # do not process processed messages again 81 return 82 83 # increment contacts counter 84 db_message.message.contacts += 1 85 if db_message.message.contacts > self.MESSAGE_MAX_CONTACTS: 86 logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}") 87 return 88 89 # check which step/ state the message requires the management to do 90 if db_message.message.status.extract.required and not db_message.message.status.extract.finished: 91 # send to extract agents 92 self._send_messages(self.AGENTS_PROCESS, db_message.message) 93 94 elif db_message.message.status.solve.required and not db_message.message.status.solve.finished: 95 # send to solve agents 96 self._send_messages(self.AGENTS_SOLVE, db_message.message) 97 98 elif db_message.message.status.validate.required and not db_message.message.status.validate.finished: 99 # send to solve agents 100 self._send_messages(self.AGENTS_GATEKEEPER, db_message.message) 101 102 else: # all steps "done" 103 104 # validate not required? (then solved will never be set to true, thus set it here) 105 if not db_message.message.status.validate.required: 106 db_message.message.status.solved = True 107 108 if db_message.message.status.solved: 109 # yay, message is solved 110 self.db.set_solution(count=count, solution=True); 111 else: 112 # not solved, but all steps done 113 self.db.set_solution(count=count, solution=False); 114 115 # try again 116 self._do_again(db_message.message) 117 118 # now message processed! 119 self.db.set_processed(count=count, processed=True) 120 121 def _do_again(self, message:AgentMessage): 122 if message.status.trial < self.SOLUTION_MAX_TRIALS: 123 # try again, recycle message 124 125 # require steps again 126 if message.status.extract.required: 127 message.status.extract.finished = False 128 if message.status.solve.required: 129 message.status.solve.finished = False 130 if message.status.validate.required: 131 message.status.validate.finished = False 132 133 # increment trial 134 message.status.trial += 1 135 136 # append current solution als old one 137 if not message.solution is None: 138 message.riddle.solutions_before.append( 139 message.solution 140 ) 141 # reset current solution 142 message.solution = None 143 144 # add the riddle as new to management 145 self._send_message(self.MANAGEMENT_URL, message) 146 147 else: 148 logger.info(f"Unsolved riddle after max number of trials: {message.id}") 149 150 def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool: 151 ok = True 152 for r in recipients: 153 ok = ok and self._send_message(r, message) 154 return ok 155 156 def _send_message(self, recipient:str, message:AgentMessage) -> bool: 157 db_count = self.db.add_message( 158 sender=self.management_name, 159 recipient=self._get_name(recipient), 160 message=message, 161 processed=False 162 ) 163 164 r = requests.post( 165 "{}/message".format(recipient), 166 data=message.model_dump_json(), 167 headers={"accept" : "application/json", "content-type" : "application/json"} 168 ) 169 170 if r.status_code == 200: 171 self.db.set_processed(db_count, processed=True) 172 return True 173 else: 174 logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}") 175 return False
MessageProcessor(db: ums.management.db.DB)
41 def __init__(self, db:DB): 42 self.db = db 43 self.management_name = self._get_name(self.MANAGEMENT_URL) 44 45 if len(self.AGENTS_PROCESS) == 0: 46 logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 47 if len(self.AGENTS_SOLVE) == 0: 48 logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 49 if len(self.AGENTS_GATEKEEPER) == 0: 50 logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
def
new_message( self, sender: str, receiver: str, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
56 def new_message(self, 57 sender:str, receiver:str, message:AgentMessage, 58 background_tasks: BackgroundTasks 59 ) -> AgentResponse: 60 61 try: 62 db_count = self.db.add_message(sender, receiver, message) 63 background_tasks.add_task(self._process_message, db_count) 64 65 return AgentResponse( 66 count=db_count, 67 msg="Added message to queue" 68 ) 69 except Exception as e: 70 return AgentResponse( 71 count=-1, 72 error=True, 73 error_msg=str(e) 74 )