Accept Messages

This commit is contained in:
2024-10-09 19:04:19 +02:00
parent 4199b1c347
commit 5e0945f9ae
8 changed files with 226 additions and 82 deletions

View File

@ -15,44 +15,9 @@ from datetime import datetime
from threading import Lock
from typing import Generator
from pydantic import validate_call, BaseModel
from pydantic import validate_call
from ums.utils import PERSIST_PATH, AgentMessage
class RowObject(BaseModel):
"""
Object representing a database row.
"""
count : int
"""
The count (primary key) of the item.
"""
sender : str
"""
The sender of the message.
"""
recipient : str
"""
The recipient of the message
"""
time : int
"""
The time (unix timestamp) the message was received/ sent.
"""
message : AgentMessage
"""
The message received/ sent.
"""
processed : bool
"""
Did the management process the message, i.e., did the tasks necessary for this message (mostly only relevant for received messages).
"""
from ums.utils import PERSIST_PATH, AgentMessage, MessageDbRow
class DB():
@ -117,7 +82,7 @@ class DB():
finally:
self.db_lock.release()
def __iter__(self) -> Generator[RowObject, None, None]:
def __iter__(self) -> Generator[MessageDbRow, None, None]:
yield from self.iterate()
@validate_call
@ -126,7 +91,7 @@ class DB():
processed:bool|None=None,
time_after:int|None=None, time_before:int|None=None,
limit:int=20, offset:int=0, _count_only:bool=False
) -> Generator[RowObject|int, None, None]:
) -> Generator[MessageDbRow|int, None, None]:
where = []
params = {
@ -177,8 +142,8 @@ class DB():
kwargs['_count_only'] = True
return next(self.iterate(**kwargs))
def _create_row_object(self, row:sqlite3.Row) -> RowObject:
return RowObject(
def _create_row_object(self, row:sqlite3.Row) -> MessageDbRow:
return MessageDbRow(
count=row['count'],
sender=row['sender'],
recipient=row['recipient'],
@ -187,7 +152,7 @@ class DB():
processed=row['processed']
)
def by_count(self, count:int) -> RowObject|None:
def by_count(self, count:int) -> MessageDbRow|None:
with self.db:
try:
return self._create_row_object(

View File

@ -12,16 +12,17 @@ import os
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from jinja2.runtime import Undefined as JinjaUndefined
from ums.management.interface import Interface
from ums.management.db import DB
from ums.management.db import DB, MessageDbRow
from ums.management.process import MessageProcessor
from ums.utils import AgentMessage, RiddleData, RiddleDataType, RiddleSolution, TEMPLATE_PATH
from ums.utils import AgentMessage, AgentResponse, TEMPLATE_PATH
class WebMain():
@ -32,6 +33,7 @@ class WebMain():
self._init_templates()
self.db = DB()
self.msg_process = MessageProcessor(self.db)
self._add_routes()
self._add_routers()
@ -75,33 +77,25 @@ class WebMain():
'index.html',
{"request" : request}
)
@self.app.post("/message", summary="Send a message to the management")
def message(request: Request, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse:
receiver = request.headers['host']
if ':' in receiver:
receiver = receiver[:receiver.rindex(':')]
sender = request.headers['x-forwarded-for']
return self.msg_process.new_message(sender, receiver, message, background_tasks)
@self.app.get("/test", summary="Test")
def huhu(request: Request) -> AgentMessage:
ex = AgentMessage(
id="ex1",
riddle={
"context":"Example 1",
"question":"Get the name of the person."
},
data=[
RiddleData(
type=RiddleDataType.TEXT,
file_plain="./cv.txt"
)
]
)
ex.status.extract.required = False
ex.solution = RiddleSolution(
solution="Otto",
explanation="Written in line 6 after 'Name:'"
)
ins_count = self.db.add_message('from', 'to', ex)
self.db.set_processed(ins_count)
return ex
@self.app.get("/status", summary="Get status of a message")
def status(count:int) -> MessageDbRow:
msg = self.db.by_count(count)
if msg is None:
raise HTTPException(status_code=404, detail="Message not found")
return msg
if __name__ == "ums.management.main" and os.environ.get('SERVE', 'false') == 'true':
main = WebMain()

44
ums/management/process.py Normal file
View File

@ -0,0 +1,44 @@
# Agenten Plattform
#
# (c) 2024 Magnus Bender
# Institute of Humanities-Centered Artificial Intelligence (CHAI)
# Universitaet Hamburg
# https://www.chai.uni-hamburg.de/~bender
#
# source code released under the terms of GNU Public License Version 3
# https://www.gnu.org/licenses/gpl-3.0.txt
from fastapi import BackgroundTasks
from ums.management.db import DB
from ums.utils import AgentMessage, AgentResponse
class MessageProcessor():
def __init__(self, db:DB):
self.db = db
def new_message(self,
sender:str, receiver:str, message:AgentMessage,
background_tasks: BackgroundTasks
) -> AgentResponse:
try:
db_count = self.db.add_message(sender, receiver, message)
background_tasks.add_task(self._process_message, db_count)
return AgentResponse(
count=db_count,
msg="Added message to queue"
)
except Exception as e:
return AgentResponse(
count=-1,
error=True,
error_msg=str(e)
)
def _process_message(self, count:int):
# TODO !!!
self.db.set_processed(count=count, processed=True)