diff --git a/docker-compose.yml b/docker-compose.yml index a73e488..e4c728e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,8 @@ services: ports: - 8000:80 environment: + - SOLUTION_MAX_TRIALS=5 + - MANAGEMENT_URL=http://management:8000 - AGENTS_PROCESS=http://agent_process_1:3001,http://agent_process_2:3001 - AGENTS_SOLVE=http://agent_solve_1:3001 - AGENTS_GATEKEEPER=http://agent_gatekeeper_1:3001 diff --git a/ums/__main__.py b/ums/__main__.py index ad69ded..ec6f2c7 100644 --- a/ums/__main__.py +++ b/ums/__main__.py @@ -35,7 +35,6 @@ if __name__ == "__main__": mr = ManagementRequest("localhost") - print( mr.send_message(ex) ) - - print( mr.get_status(20)) + print(mr.send_message(ex)) + print(mr.get_status(20)) \ No newline at end of file diff --git a/ums/management/db.py b/ums/management/db.py index 51464c2..944d096 100644 --- a/ums/management/db.py +++ b/ums/management/db.py @@ -15,7 +15,7 @@ from datetime import datetime from threading import Lock from typing import Generator -from pydantic import validate_call +from pydantic import validate_call, ValidationError from ums.utils import PERSIST_PATH, AgentMessage, MessageDbRow @@ -45,19 +45,20 @@ class DB(): recipient TEXT, time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, json BLOB, - processed BOOL DEFAULT FALSE + processed BOOL DEFAULT FALSE, + solution BOOL DEFAULT NULL )""") self.db_lock.release() @validate_call - def add_message(self, sender:str, recipient:str, message:AgentMessage, processed:bool=False) -> int: + def add_message(self, sender:str, recipient:str, message:AgentMessage, processed:bool=False ) -> int: self.db_lock.acquire() with self.db: self.db.execute( """INSERT INTO Messages ( - id, sender, recipient, json, processed + id, sender, recipient, json, processed ) VALUES ( - :id, :sender, :recipient, :json, :processed + :id, :sender, :recipient, :json, :processed )""", { "id" : message.id, "sender" : sender, @@ -82,13 +83,25 @@ class DB(): finally: self.db_lock.release() + @validate_call + def set_solution(self, count:int, solution:bool) -> bool: + self.db_lock.acquire() + with self.db: + try: + self.db.execute("UPDATE Messages SET solution = ? WHERE count = ?", (solution, count)) + return True + except: + return False + finally: + self.db_lock.release() + def __iter__(self) -> Generator[MessageDbRow, None, None]: yield from self.iterate() @validate_call def iterate(self, id:str|None=None, sender:str|None=None, recipient:str|None=None, - processed:bool|None=None, + processed:bool|None=None, solution:bool|None=None, time_after:int|None=None, time_before:int|None=None, limit:int=20, offset:int=0, _count_only:bool=False ) -> Generator[MessageDbRow|int, None, None]: @@ -99,7 +112,11 @@ class DB(): "off": offset } - for v,n in ((id,'id'), (sender,'sender'), (recipient,'recipient'), (processed,'processed')): + for v,n in ( + (id,'id'), + (sender,'sender'), (recipient,'recipient'), + (processed,'processed'), (solution,'solution') + ): if not v is None: where.append('{} = :{}'.format(n,n)) params[n] = v @@ -130,7 +147,7 @@ class DB(): "SELECT * FROM Messages {} ORDER BY time DESC LIMIT :lim OFFSET :off".format(where_clause), params ): - yield self._create_row_object(row) + yield self._create_row_object(row, allow_lazy=True) def __len__(self) -> int: return self.len() @@ -142,14 +159,29 @@ class DB(): kwargs['_count_only'] = True return next(self.iterate(**kwargs)) - def _create_row_object(self, row:sqlite3.Row) -> MessageDbRow: + def _create_row_object(self, row:sqlite3.Row, allow_lazy:bool=True) -> MessageDbRow: + try: + message = AgentMessage.model_validate_json( + row['json'], + context={"require_file_exists": not allow_lazy} + ) + except ValidationError as e: + if allow_lazy: + message = AgentMessage( + id="error", + riddle={"context":str(e),"question":"Failed to load from Database!"} + ) + else: + raise e + return MessageDbRow( count=row['count'], sender=row['sender'], recipient=row['recipient'], time=int(datetime.strptime(row['time'], self._DB_TIME_FORMAT).timestamp()), - message=AgentMessage.model_validate_json(row['json'], context={"require_file_exists":False}), - processed=row['processed'] + message=message, + processed=row['processed'], + solution=row['solution'] ) def by_count(self, count:int) -> MessageDbRow|None: diff --git a/ums/management/interface.py b/ums/management/interface.py index 0c915ba..87c3a43 100644 --- a/ums/management/interface.py +++ b/ums/management/interface.py @@ -44,7 +44,7 @@ class Interface(): @self.router.get("/table", response_class=HTMLResponse, summary="Table of messages") def table(request: Request, id:str|None=None, sender:str|None=None, recipient:str|None=None, - processed:bool|None=None, + processed:bool|None=None, solution:bool|None=None, time_after:int|str|None=None, time_before:int|str|None=None, limit:int=10, offset:int=0, _count_only:bool=False ): @@ -58,7 +58,7 @@ class Interface(): if not re.match(r'^\d+$', t) else int(t) for v,n,f in ( (id,'id',str), (sender,'sender',str), (recipient,'recipient',str), - (processed,'processed', bool), + (processed,'processed', bool), (solution,'solution', bool), (time_after, 'time_after', convert_time), (time_before, 'time_before', convert_time) ): if not v is None: @@ -83,7 +83,7 @@ class Interface(): @self.router.get("/table/total", summary="Total number of messages in table") def table_total(request: Request, id:str|None=None, sender:str|None=None, recipient:str|None=None, - processed:bool|None=None, + processed:bool|None=None, solution:bool|None=None, time_after:int|str|None=None, time_before:int|str|None=None, limit:int=10, offset:int=0 ) -> int: diff --git a/ums/management/process.py b/ums/management/process.py index 3cb1f29..7921869 100644 --- a/ums/management/process.py +++ b/ums/management/process.py @@ -8,16 +8,48 @@ # source code released under the terms of GNU Public License Version 3 # https://www.gnu.org/licenses/gpl-3.0.txt +import os, re +from typing import List + +import requests from fastapi import BackgroundTasks from ums.management.db import DB - from ums.utils import AgentMessage, AgentResponse class MessageProcessor(): + SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5)) + + MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') + + AGENTS_PROCESS = tuple(map( + lambda s:s.strip().strip('/'), + os.environ.get('AGENTS_PROCESS', '').split(',') + )) + AGENTS_SOLVE = tuple(map( + lambda s:s.strip().strip('/'), + os.environ.get('AGENTS_SOLVE', '').split(',') + )) + AGENTS_GATEKEEPER = tuple(map( + lambda s:s.strip().strip('/'), + os.environ.get('AGENTS_GATEKEEPER', '').split(',') + )) + def __init__(self, db:DB): self.db = db + self.management_name = self._get_name(self.MANAGEMENT_URL) + + if len(self.AGENTS_PROCESS) == 0: + print("Not Process Agent (AGENTS_PROCESS) found, this may be a problem!") + if len(self.AGENTS_SOLVE) == 0: + print("Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!") + if len(self.AGENTS_GATEKEEPER) == 0: + print("Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!") + + def _get_name(self, url:str) -> str: + m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url) + return "unknown" if m == None else m.group(1) def new_message(self, sender:str, receiver:str, message:AgentMessage, @@ -39,6 +71,96 @@ class MessageProcessor(): error_msg=str(e) ) - def _process_message(self, count:int): - # TODO !!! - self.db.set_processed(count=count, processed=True) \ No newline at end of file + def _process_message(self, count:int, ignore_processed:bool=False): + db_message = self.db.by_count(count) + + if db_message.processed and not ignore_processed: + # do not process processed messages again + return + + # check which step/ state the message requires the management to do + if db_message.message.status.extract.required and not db_message.message.status.extract.finished: + self._do_extract(db_message.message) + elif db_message.message.status.solve.required and not db_message.message.status.solve.finished: + self._do_solve(db_message.message) + elif db_message.message.status.validate.required and not db_message.message.status.validate.finished: + self._do_validate(db_message.message) + else: # all steps "done" + if db_message.message.status.solved: + # yay, message is solved + self.db.set_solution(count=count, solution=True); + else: + # not solved, but all steps done + self.db.set_solution(count=count, solution=False); + + # try again + self._do_again(db_message.message) + + # now message processed! + self.db.set_processed(count=count, processed=True) + + def _do_extract(self, message:AgentMessage): + # TODO + pass + + def _do_solve(self, message:AgentMessage): + # TODO + pass + + def _do_validate(self, message:AgentMessage): + # TODO + pass + + def _do_again(self, message:AgentMessage): + if message.status.trial < self.SOLUTION_MAX_TRIALS: + # try again, recycle message + + # require steps again + if message.status.extract.required: + message.status.extract.finished = False + if message.status.solve.required: + message.status.solve.finished = False + if message.status.validate.required: + message.status.validate.finished = False + + # increment trial + message.status.trial += 1 + + # append current solution als old one + if not message.solution is None: + message.riddle.solutions_before.append( + message.solution + ) + # reset current solution + message.solution = None + + # add the riddle as new to management + self._send_message(self.MANAGEMENT_URL, message) + + def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool: + ok = True + for r in recipients: + ok = ok and self._send_message(r, message) + return ok + + def _send_message(self, recipient:str, message:AgentMessage) -> bool: + db_count = self.db.add_message( + sender=self.management_name, + recipient=self._get_name(recipient), + message=message, + processed=False + ) + + r = requests.post( + "{}/message".format(recipient), + data=message.model_dump_json(), + headers={"accept" : "application/json", "content-type" : "application/json"} + ) + + if r.status_code == 200: + self.db.set_processed(db_count, processed=True) + return True + else: + print("Error sending message to:", recipient, (r.text, r.headers)) + return False + \ No newline at end of file diff --git a/ums/utils/types.py b/ums/utils/types.py index 65a55f2..ce784fa 100644 --- a/ums/utils/types.py +++ b/ums/utils/types.py @@ -370,4 +370,10 @@ class MessageDbRow(BaseModel): processed : bool """ Did the management process the message, i.e., did the tasks necessary for this message (mostly only relevant for received messages). + """ + + solution : bool|None = None + """ + Does this message contain a valid solution? + True if contains valid solution, False if solution not valid, Null/None if not applicable """ \ No newline at end of file