This commit is contained in:
Magnus Bender 2024-10-08 12:44:35 +02:00
parent d36ebf9694
commit cff39d61de
Signed by: bender
GPG Key ID: 5149A211831F2BD7
5 changed files with 133 additions and 50 deletions

View File

@ -23,7 +23,7 @@ services:
- AGENTS_SOLVE=http://agent_solve_1:3001
- AGENTS_GATEKEEPER=http://agent_gatekeeper_1:3001
volumes:
- ./data/share/:/ums-agent/share/
- ./data/share/:/ums-agenten/share/
- ./data/persist-management/:/ums-agenten/persist/
# bind code from host to container (for development)
- ./ums/:/ums-agenten/plattform/ums/:ro

View File

@ -9,16 +9,54 @@
# https://www.gnu.org/licenses/gpl-3.0.txt
import os
import sqlite3, atexit
from datetime import datetime
import sqlite3
import atexit
from threading import Lock
from typing import Generator
from pydantic import validate_call, BaseModel
from ums.utils import PERSIST_PATH, AgentMessage
class RowObject(BaseModel):
"""
Object representing a database row.
"""
count : int
"""
The count (primary key) of the item.
"""
sender : str
"""
The sender of the message.
"""
recipient : str
"""
The recipient of the message
"""
time : int
"""
The time (unix timestamp) the message was received/ sent.
"""
message : AgentMessage
"""
The message received/ sent.
"""
processed : bool
"""
Did the management process the message, i.e., did the tasks necessary for this message (mostly only relevant for received messages).
"""
class DB():
_DB_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
def __init__(self):
self.db = sqlite3.connect(
@ -38,71 +76,105 @@ class DB():
self.db.execute("""CREATE TABLE IF NOT EXISTS Messages (
count INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT,
fr TEXT,
to TEXT,
sender TEXT,
recipient TEXT,
time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
json BLOB,
processed BOOL DEFAULT FALSE
)""")
self.dblock.release()
def add_message(self, fr:str, to:str, message:AgentMessage, processed:bool=False) -> int:
@validate_call
def add_message(self, sender:str, recipient:str, message:AgentMessage, processed:bool=False) -> int:
self.dblock.acquire()
with self.db:
self.db.execute(
"""INSERT INTO Messages (
id, fr, to, json, processed
id, sender, recipient, json, processed
) VALUES (
:id, :fr, :to, :json, :processed
:id, :sender, :recipient, :json, :processed
)""", {
"id" : message.id,
"fr" : fr,
"to" : to,
"sender" : sender,
"recipient" : recipient,
"json" : message.model_dump_json(),
"processed" : processed
})
new_count = self.db.execute("LAST_INSERT_ROWID()").fetchone()
new_count = self.db.execute("SELECT LAST_INSERT_ROWID() as last").fetchone()
self.dblock.release()
return new_count
return new_count['last']
@validate_call
def set_processed(self, count:int, processed:bool=True) -> bool:
self.dblock.acquire()
with self.db:
self.db.execute("UPDATE Messages SET processed = ? WHERE count = ?", (processed, count))
try:
self.db.execute("UPDATE Messages SET processed = ? WHERE count = ?", (processed, count))
return True
except:
return False
self.dblock.release()
def iterate(self,
id:str|None=None, fr:str|None=None, to:str|None=None,
processed:bool|None=None,
time_after:int|None=0, time_before:int|None=None,
limit:int=20, offset:int=0
):
where = []
if time_after:
time_after = datetime.fromtimestamp(time_after).strftime("%Y-%m-%d %H:%M:%S")
if time_before:
time_before = datetime.fromtimestamp(time_before).strftime("%Y-%m-%d %H:%M:%S")
with self.db:
self.db.execute(
"SELECT * FROM Messages WHERE {} LIMIT :lim OFFSET :off".format(', '.join(where)),
{
})
def by_count(self, count:int):
with self.db:
self.db.execute("SELECT * FROM Messages WHERE count = ?").fetchone(count)
# error
# create object
def __iter__(self):
def __iter__(self) -> Generator[RowObject, None, None]:
yield from self.iterate()
@validate_call
def iterate(self,
id:str|None=None, sender:str|None=None, recipient:str|None=None,
processed:bool|None=None,
time_after:int|None=None, time_before:int|None=None,
limit:int=20, offset:int=0
) -> Generator[RowObject, None, None]:
where = []
params = {
"lim": limit,
"off": offset
}
for v,n in ((id,'id'), (sender,'sender'), (recipient,'recipient'), (processed,'processed')):
if not v is None:
where.append('{} = :{}'.format(n,n))
params[n] = v
if time_after:
where.append("time > :t_after")
params['t_after'] = datetime.fromtimestamp(time_after).strftime(self._DB_TIME_FORMAT)
if time_before:
where.append("time < :t_before")
params['t_before'] = datetime.fromtimestamp(time_before).strftime(self._DB_TIME_FORMAT)
if len(where) > 0:
where_clause = "WHERE " + (' AND '.join(where))
else:
where_clause = ""
with self.db:
for row in self.db.execute(
"SELECT * FROM Messages {} LIMIT :lim OFFSET :off".format(where_clause),
params
):
yield self._create_row_object(row)
def _create_row_object(self, row:sqlite3.Row) -> RowObject:
return RowObject(
count=row['count'],
sender=row['sender'],
recipient=row['recipient'],
time=int(datetime.strptime(row['time'], self._DB_TIME_FORMAT).timestamp()),
message=AgentMessage.model_construct(row['json']),
processed=row['processed']
)
def by_count(self, count:int) -> RowObject|None:
with self.db:
try:
return self._create_row_object(
self.db.execute("SELECT * FROM Messages WHERE count = ?", (count,)).fetchone()
)
except:
return None

View File

@ -10,12 +10,17 @@
# TEST ONLY
from ums.utils import AgentMessage, RiddleData, RiddleDataType, RiddleSolution
from typing import Union
from fastapi import FastAPI
from ums.utils import AgentMessage, RiddleData, RiddleDataType, RiddleSolution
from ums.management.db import DB
db = DB()
app = FastAPI()
@app.get("/")
@ -43,6 +48,12 @@ def huhu():
solution="Otto",
explanation="Written in line 6 after 'Name:'"
)
#ins_count = db.add_message('from', 'to', ex)
db.set_processed(34234)
print(db.by_count(3))
return ex

View File

@ -15,6 +15,6 @@
import os
BASE_PATH = '/ums-agent'
BASE_PATH = '/ums-agenten'
SHARE_PATH = os.path.join(BASE_PATH, 'share')
PERSIST_PATH = os.path.join(BASE_PATH, 'persist')

View File

@ -44,7 +44,7 @@
"data": [
{
"type": "text",
"file_plain": "/ums-agent/share/cv.txt",
"file_plain": "/ums-agenten/share/cv.txt",
"file_extracted": null
}
],