165 lines
5.0 KiB
Python

# Agenten Plattform
#
# (c) 2024 Magnus Bender
# Institute of Humanities-Centered Artificial Intelligence (CHAI)
# Universitaet Hamburg
# https://www.chai.uni-hamburg.de/~bender
#
# source code released under the terms of GNU Public License Version 3
# https://www.gnu.org/licenses/gpl-3.0.txt
import os, re
from typing import List
import requests
from fastapi import BackgroundTasks
from ums.management.db import DB
from ums.utils import AgentMessage, AgentResponse, logger
class MessageProcessor():
SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5))
MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
AGENTS_PROCESS = tuple(map(
lambda s:s.strip().strip('/'),
os.environ.get('AGENTS_PROCESS', '').split(',')
))
AGENTS_SOLVE = tuple(map(
lambda s:s.strip().strip('/'),
os.environ.get('AGENTS_SOLVE', '').split(',')
))
AGENTS_GATEKEEPER = tuple(map(
lambda s:s.strip().strip('/'),
os.environ.get('AGENTS_GATEKEEPER', '').split(',')
))
def __init__(self, db:DB):
self.db = db
self.management_name = self._get_name(self.MANAGEMENT_URL)
if len(self.AGENTS_PROCESS) == 0:
logger.warning(f"Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
if len(self.AGENTS_SOLVE) == 0:
logger.warning(f"Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
if len(self.AGENTS_GATEKEEPER) == 0:
logger.warning(f"Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
def _get_name(self, url:str) -> str:
m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url)
return "unknown" if m == None else m.group(1)
def new_message(self,
sender:str, receiver:str, message:AgentMessage,
background_tasks: BackgroundTasks
) -> AgentResponse:
try:
db_count = self.db.add_message(sender, receiver, message)
background_tasks.add_task(self._process_message, db_count)
return AgentResponse(
count=db_count,
msg="Added message to queue"
)
except Exception as e:
return AgentResponse(
count=-1,
error=True,
error_msg=str(e)
)
def _process_message(self, count:int, ignore_processed:bool=False):
db_message = self.db.by_count(count)
if db_message.processed and not ignore_processed:
# do not process processed messages again
return
# check which step/ state the message requires the management to do
if db_message.message.status.extract.required and not db_message.message.status.extract.finished:
# send to extract agents
self._send_messages(self.AGENTS_PROCESS, db_message.message)
elif db_message.message.status.solve.required and not db_message.message.status.solve.finished:
# send to solve agents
self._send_messages(self.AGENTS_SOLVE, db_message.message)
elif db_message.message.status.validate.required and not db_message.message.status.validate.finished:
# send to solve agents
self._send_messages(self.AGENTS_GATEKEEPER, db_message.message)
else: # all steps "done"
# validate not required? (then solved will never be set to true, thus set it here)
if not db_message.message.status.validate.required:
db_message.message.status.solved = True
if db_message.message.status.solved:
# yay, message is solved
self.db.set_solution(count=count, solution=True);
else:
# not solved, but all steps done
self.db.set_solution(count=count, solution=False);
# try again
self._do_again(db_message.message)
# now message processed!
self.db.set_processed(count=count, processed=True)
def _do_again(self, message:AgentMessage):
if message.status.trial < self.SOLUTION_MAX_TRIALS:
# try again, recycle message
# require steps again
if message.status.extract.required:
message.status.extract.finished = False
if message.status.solve.required:
message.status.solve.finished = False
if message.status.validate.required:
message.status.validate.finished = False
# increment trial
message.status.trial += 1
# append current solution als old one
if not message.solution is None:
message.riddle.solutions_before.append(
message.solution
)
# reset current solution
message.solution = None
# add the riddle as new to management
self._send_message(self.MANAGEMENT_URL, message)
def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool:
ok = True
for r in recipients:
ok = ok and self._send_message(r, message)
return ok
def _send_message(self, recipient:str, message:AgentMessage) -> bool:
db_count = self.db.add_message(
sender=self.management_name,
recipient=self._get_name(recipient),
message=message,
processed=False
)
r = requests.post(
"{}/message".format(recipient),
data=message.model_dump_json(),
headers={"accept" : "application/json", "content-type" : "application/json"}
)
if r.status_code == 200:
self.db.set_processed(db_count, processed=True)
return True
else:
logger.warning(f"Error sending message to: {recipient} {(r.text, r.headers)}")
return False