ums.management.process
1# Agenten Plattform 2# 3# (c) 2024 Magnus Bender 4# Institute of Humanities-Centered Artificial Intelligence (CHAI) 5# Universitaet Hamburg 6# https://www.chai.uni-hamburg.de/~bender 7# 8# source code released under the terms of GNU Public License Version 3 9# https://www.gnu.org/licenses/gpl-3.0.txt 10 11import os, re 12from typing import List 13 14import requests 15from fastapi import BackgroundTasks 16 17from ums.management.db import DB 18from ums.utils import AgentMessage, AgentResponse 19 20class MessageProcessor(): 21 22 SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5)) 23 24 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 25 26 AGENTS_PROCESS = tuple(map( 27 lambda s:s.strip().strip('/'), 28 os.environ.get('AGENTS_PROCESS', '').split(',') 29 )) 30 AGENTS_SOLVE = tuple(map( 31 lambda s:s.strip().strip('/'), 32 os.environ.get('AGENTS_SOLVE', '').split(',') 33 )) 34 AGENTS_GATEKEEPER = tuple(map( 35 lambda s:s.strip().strip('/'), 36 os.environ.get('AGENTS_GATEKEEPER', '').split(',') 37 )) 38 39 def __init__(self, db:DB): 40 self.db = db 41 self.management_name = self._get_name(self.MANAGEMENT_URL) 42 43 if len(self.AGENTS_PROCESS) == 0: 44 print("Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 45 if len(self.AGENTS_SOLVE) == 0: 46 print("Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 47 if len(self.AGENTS_GATEKEEPER) == 0: 48 print("Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!") 49 50 def _get_name(self, url:str) -> str: 51 m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url) 52 return "unknown" if m == None else m.group(1) 53 54 def new_message(self, 55 sender:str, receiver:str, message:AgentMessage, 56 background_tasks: BackgroundTasks 57 ) -> AgentResponse: 58 59 try: 60 db_count = self.db.add_message(sender, receiver, message) 61 background_tasks.add_task(self._process_message, db_count) 62 63 return AgentResponse( 64 count=db_count, 65 msg="Added message to queue" 66 ) 67 except Exception as e: 68 return AgentResponse( 69 count=-1, 70 error=True, 71 error_msg=str(e) 72 ) 73 74 def _process_message(self, count:int, ignore_processed:bool=False): 75 db_message = self.db.by_count(count) 76 77 if db_message.processed and not ignore_processed: 78 # do not process processed messages again 79 return 80 81 # check which step/ state the message requires the management to do 82 if db_message.message.status.extract.required and not db_message.message.status.extract.finished: 83 self._do_extract(db_message.message) 84 elif db_message.message.status.solve.required and not db_message.message.status.solve.finished: 85 self._do_solve(db_message.message) 86 elif db_message.message.status.validate.required and not db_message.message.status.validate.finished: 87 self._do_validate(db_message.message) 88 else: # all steps "done" 89 if db_message.message.status.solved: 90 # yay, message is solved 91 self.db.set_solution(count=count, solution=True); 92 else: 93 # not solved, but all steps done 94 self.db.set_solution(count=count, solution=False); 95 96 # try again 97 self._do_again(db_message.message) 98 99 # now message processed! 100 self.db.set_processed(count=count, processed=True) 101 102 def _do_extract(self, message:AgentMessage): 103 # TODO 104 pass 105 106 def _do_solve(self, message:AgentMessage): 107 # TODO 108 pass 109 110 def _do_validate(self, message:AgentMessage): 111 # TODO 112 pass 113 114 def _do_again(self, message:AgentMessage): 115 if message.status.trial < self.SOLUTION_MAX_TRIALS: 116 # try again, recycle message 117 118 # require steps again 119 if message.status.extract.required: 120 message.status.extract.finished = False 121 if message.status.solve.required: 122 message.status.solve.finished = False 123 if message.status.validate.required: 124 message.status.validate.finished = False 125 126 # increment trial 127 message.status.trial += 1 128 129 # append current solution als old one 130 if not message.solution is None: 131 message.riddle.solutions_before.append( 132 message.solution 133 ) 134 # reset current solution 135 message.solution = None 136 137 # add the riddle as new to management 138 self._send_message(self.MANAGEMENT_URL, message) 139 140 def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool: 141 ok = True 142 for r in recipients: 143 ok = ok and self._send_message(r, message) 144 return ok 145 146 def _send_message(self, recipient:str, message:AgentMessage) -> bool: 147 db_count = self.db.add_message( 148 sender=self.management_name, 149 recipient=self._get_name(recipient), 150 message=message, 151 processed=False 152 ) 153 154 r = requests.post( 155 "{}/message".format(recipient), 156 data=message.model_dump_json(), 157 headers={"accept" : "application/json", "content-type" : "application/json"} 158 ) 159 160 if r.status_code == 200: 161 self.db.set_processed(db_count, processed=True) 162 return True 163 else: 164 print("Error sending message to:", recipient, (r.text, r.headers)) 165 return False 166
class
MessageProcessor:
21class MessageProcessor(): 22 23 SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5)) 24 25 MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') 26 27 AGENTS_PROCESS = tuple(map( 28 lambda s:s.strip().strip('/'), 29 os.environ.get('AGENTS_PROCESS', '').split(',') 30 )) 31 AGENTS_SOLVE = tuple(map( 32 lambda s:s.strip().strip('/'), 33 os.environ.get('AGENTS_SOLVE', '').split(',') 34 )) 35 AGENTS_GATEKEEPER = tuple(map( 36 lambda s:s.strip().strip('/'), 37 os.environ.get('AGENTS_GATEKEEPER', '').split(',') 38 )) 39 40 def __init__(self, db:DB): 41 self.db = db 42 self.management_name = self._get_name(self.MANAGEMENT_URL) 43 44 if len(self.AGENTS_PROCESS) == 0: 45 print("Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 46 if len(self.AGENTS_SOLVE) == 0: 47 print("Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 48 if len(self.AGENTS_GATEKEEPER) == 0: 49 print("Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!") 50 51 def _get_name(self, url:str) -> str: 52 m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url) 53 return "unknown" if m == None else m.group(1) 54 55 def new_message(self, 56 sender:str, receiver:str, message:AgentMessage, 57 background_tasks: BackgroundTasks 58 ) -> AgentResponse: 59 60 try: 61 db_count = self.db.add_message(sender, receiver, message) 62 background_tasks.add_task(self._process_message, db_count) 63 64 return AgentResponse( 65 count=db_count, 66 msg="Added message to queue" 67 ) 68 except Exception as e: 69 return AgentResponse( 70 count=-1, 71 error=True, 72 error_msg=str(e) 73 ) 74 75 def _process_message(self, count:int, ignore_processed:bool=False): 76 db_message = self.db.by_count(count) 77 78 if db_message.processed and not ignore_processed: 79 # do not process processed messages again 80 return 81 82 # check which step/ state the message requires the management to do 83 if db_message.message.status.extract.required and not db_message.message.status.extract.finished: 84 self._do_extract(db_message.message) 85 elif db_message.message.status.solve.required and not db_message.message.status.solve.finished: 86 self._do_solve(db_message.message) 87 elif db_message.message.status.validate.required and not db_message.message.status.validate.finished: 88 self._do_validate(db_message.message) 89 else: # all steps "done" 90 if db_message.message.status.solved: 91 # yay, message is solved 92 self.db.set_solution(count=count, solution=True); 93 else: 94 # not solved, but all steps done 95 self.db.set_solution(count=count, solution=False); 96 97 # try again 98 self._do_again(db_message.message) 99 100 # now message processed! 101 self.db.set_processed(count=count, processed=True) 102 103 def _do_extract(self, message:AgentMessage): 104 # TODO 105 pass 106 107 def _do_solve(self, message:AgentMessage): 108 # TODO 109 pass 110 111 def _do_validate(self, message:AgentMessage): 112 # TODO 113 pass 114 115 def _do_again(self, message:AgentMessage): 116 if message.status.trial < self.SOLUTION_MAX_TRIALS: 117 # try again, recycle message 118 119 # require steps again 120 if message.status.extract.required: 121 message.status.extract.finished = False 122 if message.status.solve.required: 123 message.status.solve.finished = False 124 if message.status.validate.required: 125 message.status.validate.finished = False 126 127 # increment trial 128 message.status.trial += 1 129 130 # append current solution als old one 131 if not message.solution is None: 132 message.riddle.solutions_before.append( 133 message.solution 134 ) 135 # reset current solution 136 message.solution = None 137 138 # add the riddle as new to management 139 self._send_message(self.MANAGEMENT_URL, message) 140 141 def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool: 142 ok = True 143 for r in recipients: 144 ok = ok and self._send_message(r, message) 145 return ok 146 147 def _send_message(self, recipient:str, message:AgentMessage) -> bool: 148 db_count = self.db.add_message( 149 sender=self.management_name, 150 recipient=self._get_name(recipient), 151 message=message, 152 processed=False 153 ) 154 155 r = requests.post( 156 "{}/message".format(recipient), 157 data=message.model_dump_json(), 158 headers={"accept" : "application/json", "content-type" : "application/json"} 159 ) 160 161 if r.status_code == 200: 162 self.db.set_processed(db_count, processed=True) 163 return True 164 else: 165 print("Error sending message to:", recipient, (r.text, r.headers)) 166 return False
MessageProcessor(db: ums.management.db.DB)
40 def __init__(self, db:DB): 41 self.db = db 42 self.management_name = self._get_name(self.MANAGEMENT_URL) 43 44 if len(self.AGENTS_PROCESS) == 0: 45 print("Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") 46 if len(self.AGENTS_SOLVE) == 0: 47 print("Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") 48 if len(self.AGENTS_GATEKEEPER) == 0: 49 print("Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
def
new_message( self, sender: str, receiver: str, message: ums.utils.types.AgentMessage, background_tasks: fastapi.background.BackgroundTasks) -> ums.utils.types.AgentResponse:
55 def new_message(self, 56 sender:str, receiver:str, message:AgentMessage, 57 background_tasks: BackgroundTasks 58 ) -> AgentResponse: 59 60 try: 61 db_count = self.db.add_message(sender, receiver, message) 62 background_tasks.add_task(self._process_message, db_count) 63 64 return AgentResponse( 65 count=db_count, 66 msg="Added message to queue" 67 ) 68 except Exception as e: 69 return AgentResponse( 70 count=-1, 71 error=True, 72 error_msg=str(e) 73 )