Management Message Processing

This commit is contained in:
Magnus Bender 2024-10-29 16:47:58 +01:00
parent 04ccd488f8
commit fac784e013
Signed by: bender
GPG Key ID: 5149A211831F2BD7
6 changed files with 182 additions and 21 deletions

View File

@ -19,6 +19,8 @@ services:
ports: ports:
- 8000:80 - 8000:80
environment: environment:
- SOLUTION_MAX_TRIALS=5
- MANAGEMENT_URL=http://management:8000
- AGENTS_PROCESS=http://agent_process_1:3001,http://agent_process_2:3001 - AGENTS_PROCESS=http://agent_process_1:3001,http://agent_process_2:3001
- AGENTS_SOLVE=http://agent_solve_1:3001 - AGENTS_SOLVE=http://agent_solve_1:3001
- AGENTS_GATEKEEPER=http://agent_gatekeeper_1:3001 - AGENTS_GATEKEEPER=http://agent_gatekeeper_1:3001

View File

@ -35,7 +35,6 @@ if __name__ == "__main__":
mr = ManagementRequest("localhost") mr = ManagementRequest("localhost")
print( mr.send_message(ex) ) print(mr.send_message(ex))
print(mr.get_status(20))
print( mr.get_status(20))

View File

@ -15,7 +15,7 @@ from datetime import datetime
from threading import Lock from threading import Lock
from typing import Generator from typing import Generator
from pydantic import validate_call from pydantic import validate_call, ValidationError
from ums.utils import PERSIST_PATH, AgentMessage, MessageDbRow from ums.utils import PERSIST_PATH, AgentMessage, MessageDbRow
@ -45,12 +45,13 @@ class DB():
recipient TEXT, recipient TEXT,
time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
json BLOB, json BLOB,
processed BOOL DEFAULT FALSE processed BOOL DEFAULT FALSE,
solution BOOL DEFAULT NULL
)""") )""")
self.db_lock.release() self.db_lock.release()
@validate_call @validate_call
def add_message(self, sender:str, recipient:str, message:AgentMessage, processed:bool=False) -> int: def add_message(self, sender:str, recipient:str, message:AgentMessage, processed:bool=False ) -> int:
self.db_lock.acquire() self.db_lock.acquire()
with self.db: with self.db:
self.db.execute( self.db.execute(
@ -82,13 +83,25 @@ class DB():
finally: finally:
self.db_lock.release() self.db_lock.release()
@validate_call
def set_solution(self, count:int, solution:bool) -> bool:
self.db_lock.acquire()
with self.db:
try:
self.db.execute("UPDATE Messages SET solution = ? WHERE count = ?", (solution, count))
return True
except:
return False
finally:
self.db_lock.release()
def __iter__(self) -> Generator[MessageDbRow, None, None]: def __iter__(self) -> Generator[MessageDbRow, None, None]:
yield from self.iterate() yield from self.iterate()
@validate_call @validate_call
def iterate(self, def iterate(self,
id:str|None=None, sender:str|None=None, recipient:str|None=None, id:str|None=None, sender:str|None=None, recipient:str|None=None,
processed:bool|None=None, processed:bool|None=None, solution:bool|None=None,
time_after:int|None=None, time_before:int|None=None, time_after:int|None=None, time_before:int|None=None,
limit:int=20, offset:int=0, _count_only:bool=False limit:int=20, offset:int=0, _count_only:bool=False
) -> Generator[MessageDbRow|int, None, None]: ) -> Generator[MessageDbRow|int, None, None]:
@ -99,7 +112,11 @@ class DB():
"off": offset "off": offset
} }
for v,n in ((id,'id'), (sender,'sender'), (recipient,'recipient'), (processed,'processed')): for v,n in (
(id,'id'),
(sender,'sender'), (recipient,'recipient'),
(processed,'processed'), (solution,'solution')
):
if not v is None: if not v is None:
where.append('{} = :{}'.format(n,n)) where.append('{} = :{}'.format(n,n))
params[n] = v params[n] = v
@ -130,7 +147,7 @@ class DB():
"SELECT * FROM Messages {} ORDER BY time DESC LIMIT :lim OFFSET :off".format(where_clause), "SELECT * FROM Messages {} ORDER BY time DESC LIMIT :lim OFFSET :off".format(where_clause),
params params
): ):
yield self._create_row_object(row) yield self._create_row_object(row, allow_lazy=True)
def __len__(self) -> int: def __len__(self) -> int:
return self.len() return self.len()
@ -142,14 +159,29 @@ class DB():
kwargs['_count_only'] = True kwargs['_count_only'] = True
return next(self.iterate(**kwargs)) return next(self.iterate(**kwargs))
def _create_row_object(self, row:sqlite3.Row) -> MessageDbRow: def _create_row_object(self, row:sqlite3.Row, allow_lazy:bool=True) -> MessageDbRow:
try:
message = AgentMessage.model_validate_json(
row['json'],
context={"require_file_exists": not allow_lazy}
)
except ValidationError as e:
if allow_lazy:
message = AgentMessage(
id="error",
riddle={"context":str(e),"question":"Failed to load from Database!"}
)
else:
raise e
return MessageDbRow( return MessageDbRow(
count=row['count'], count=row['count'],
sender=row['sender'], sender=row['sender'],
recipient=row['recipient'], recipient=row['recipient'],
time=int(datetime.strptime(row['time'], self._DB_TIME_FORMAT).timestamp()), time=int(datetime.strptime(row['time'], self._DB_TIME_FORMAT).timestamp()),
message=AgentMessage.model_validate_json(row['json'], context={"require_file_exists":False}), message=message,
processed=row['processed'] processed=row['processed'],
solution=row['solution']
) )
def by_count(self, count:int) -> MessageDbRow|None: def by_count(self, count:int) -> MessageDbRow|None:

View File

@ -44,7 +44,7 @@ class Interface():
@self.router.get("/table", response_class=HTMLResponse, summary="Table of messages") @self.router.get("/table", response_class=HTMLResponse, summary="Table of messages")
def table(request: Request, def table(request: Request,
id:str|None=None, sender:str|None=None, recipient:str|None=None, id:str|None=None, sender:str|None=None, recipient:str|None=None,
processed:bool|None=None, processed:bool|None=None, solution:bool|None=None,
time_after:int|str|None=None, time_before:int|str|None=None, time_after:int|str|None=None, time_before:int|str|None=None,
limit:int=10, offset:int=0, _count_only:bool=False limit:int=10, offset:int=0, _count_only:bool=False
): ):
@ -58,7 +58,7 @@ class Interface():
if not re.match(r'^\d+$', t) else int(t) if not re.match(r'^\d+$', t) else int(t)
for v,n,f in ( for v,n,f in (
(id,'id',str), (sender,'sender',str), (recipient,'recipient',str), (id,'id',str), (sender,'sender',str), (recipient,'recipient',str),
(processed,'processed', bool), (processed,'processed', bool), (solution,'solution', bool),
(time_after, 'time_after', convert_time), (time_before, 'time_before', convert_time) (time_after, 'time_after', convert_time), (time_before, 'time_before', convert_time)
): ):
if not v is None: if not v is None:
@ -83,7 +83,7 @@ class Interface():
@self.router.get("/table/total", summary="Total number of messages in table") @self.router.get("/table/total", summary="Total number of messages in table")
def table_total(request: Request, def table_total(request: Request,
id:str|None=None, sender:str|None=None, recipient:str|None=None, id:str|None=None, sender:str|None=None, recipient:str|None=None,
processed:bool|None=None, processed:bool|None=None, solution:bool|None=None,
time_after:int|str|None=None, time_before:int|str|None=None, time_after:int|str|None=None, time_before:int|str|None=None,
limit:int=10, offset:int=0 limit:int=10, offset:int=0
) -> int: ) -> int:

View File

@ -8,16 +8,48 @@
# source code released under the terms of GNU Public License Version 3 # source code released under the terms of GNU Public License Version 3
# https://www.gnu.org/licenses/gpl-3.0.txt # https://www.gnu.org/licenses/gpl-3.0.txt
import os, re
from typing import List
import requests
from fastapi import BackgroundTasks from fastapi import BackgroundTasks
from ums.management.db import DB from ums.management.db import DB
from ums.utils import AgentMessage, AgentResponse from ums.utils import AgentMessage, AgentResponse
class MessageProcessor(): class MessageProcessor():
SOLUTION_MAX_TRIALS = int(os.environ.get('SOLUTION_MAX_TRIALS', 5))
MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/')
AGENTS_PROCESS = tuple(map(
lambda s:s.strip().strip('/'),
os.environ.get('AGENTS_PROCESS', '').split(',')
))
AGENTS_SOLVE = tuple(map(
lambda s:s.strip().strip('/'),
os.environ.get('AGENTS_SOLVE', '').split(',')
))
AGENTS_GATEKEEPER = tuple(map(
lambda s:s.strip().strip('/'),
os.environ.get('AGENTS_GATEKEEPER', '').split(',')
))
def __init__(self, db:DB): def __init__(self, db:DB):
self.db = db self.db = db
self.management_name = self._get_name(self.MANAGEMENT_URL)
if len(self.AGENTS_PROCESS) == 0:
print("Not Process Agent (AGENTS_PROCESS) found, this may be a problem!")
if len(self.AGENTS_SOLVE) == 0:
print("Not Solve Agent (AGENTS_SOLVE) found, this may be a problem!")
if len(self.AGENTS_GATEKEEPER) == 0:
print("Not Gatekeeper Agent (AGENTS_GATEKEEPER) found, this may be a problem!")
def _get_name(self, url:str) -> str:
m = re.match(r'^https?://([^:]*)(?::(\d+))?$', url)
return "unknown" if m == None else m.group(1)
def new_message(self, def new_message(self,
sender:str, receiver:str, message:AgentMessage, sender:str, receiver:str, message:AgentMessage,
@ -39,6 +71,96 @@ class MessageProcessor():
error_msg=str(e) error_msg=str(e)
) )
def _process_message(self, count:int): def _process_message(self, count:int, ignore_processed:bool=False):
# TODO !!! db_message = self.db.by_count(count)
if db_message.processed and not ignore_processed:
# do not process processed messages again
return
# check which step/ state the message requires the management to do
if db_message.message.status.extract.required and not db_message.message.status.extract.finished:
self._do_extract(db_message.message)
elif db_message.message.status.solve.required and not db_message.message.status.solve.finished:
self._do_solve(db_message.message)
elif db_message.message.status.validate.required and not db_message.message.status.validate.finished:
self._do_validate(db_message.message)
else: # all steps "done"
if db_message.message.status.solved:
# yay, message is solved
self.db.set_solution(count=count, solution=True);
else:
# not solved, but all steps done
self.db.set_solution(count=count, solution=False);
# try again
self._do_again(db_message.message)
# now message processed!
self.db.set_processed(count=count, processed=True) self.db.set_processed(count=count, processed=True)
def _do_extract(self, message:AgentMessage):
# TODO
pass
def _do_solve(self, message:AgentMessage):
# TODO
pass
def _do_validate(self, message:AgentMessage):
# TODO
pass
def _do_again(self, message:AgentMessage):
if message.status.trial < self.SOLUTION_MAX_TRIALS:
# try again, recycle message
# require steps again
if message.status.extract.required:
message.status.extract.finished = False
if message.status.solve.required:
message.status.solve.finished = False
if message.status.validate.required:
message.status.validate.finished = False
# increment trial
message.status.trial += 1
# append current solution als old one
if not message.solution is None:
message.riddle.solutions_before.append(
message.solution
)
# reset current solution
message.solution = None
# add the riddle as new to management
self._send_message(self.MANAGEMENT_URL, message)
def _send_messages(self, recipients:List[str], message:AgentMessage) -> bool:
ok = True
for r in recipients:
ok = ok and self._send_message(r, message)
return ok
def _send_message(self, recipient:str, message:AgentMessage) -> bool:
db_count = self.db.add_message(
sender=self.management_name,
recipient=self._get_name(recipient),
message=message,
processed=False
)
r = requests.post(
"{}/message".format(recipient),
data=message.model_dump_json(),
headers={"accept" : "application/json", "content-type" : "application/json"}
)
if r.status_code == 200:
self.db.set_processed(db_count, processed=True)
return True
else:
print("Error sending message to:", recipient, (r.text, r.headers))
return False

View File

@ -371,3 +371,9 @@ class MessageDbRow(BaseModel):
""" """
Did the management process the message, i.e., did the tasks necessary for this message (mostly only relevant for received messages). Did the management process the message, i.e., did the tasks necessary for this message (mostly only relevant for received messages).
""" """
solution : bool|None = None
"""
Does this message contain a valid solution?
True if contains valid solution, False if solution not valid, Null/None if not applicable
"""