From cff39d61decd4b7eaae8dac16a1723a9a360eee8 Mon Sep 17 00:00:00 2001 From: KIMB-technologies Date: Tue, 8 Oct 2024 12:44:35 +0200 Subject: [PATCH] DB works --- docker-compose.yml | 2 +- ums/management/db.py | 164 +++++++++++++++++++++++++++++------------ ums/management/main.py | 13 +++- ums/utils/const.py | 2 +- ums/utils/types.py | 2 +- 5 files changed, 133 insertions(+), 50 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index ae966db..8618af7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,7 +23,7 @@ services: - AGENTS_SOLVE=http://agent_solve_1:3001 - AGENTS_GATEKEEPER=http://agent_gatekeeper_1:3001 volumes: - - ./data/share/:/ums-agent/share/ + - ./data/share/:/ums-agenten/share/ - ./data/persist-management/:/ums-agenten/persist/ # bind code from host to container (for development) - ./ums/:/ums-agenten/plattform/ums/:ro diff --git a/ums/management/db.py b/ums/management/db.py index 19a7bca..12117c3 100644 --- a/ums/management/db.py +++ b/ums/management/db.py @@ -9,16 +9,54 @@ # https://www.gnu.org/licenses/gpl-3.0.txt import os +import sqlite3, atexit + from datetime import datetime - -import sqlite3 -import atexit - from threading import Lock +from typing import Generator + +from pydantic import validate_call, BaseModel from ums.utils import PERSIST_PATH, AgentMessage +class RowObject(BaseModel): + """ + Object representing a database row. + """ + + count : int + """ + The count (primary key) of the item. + """ + + sender : str + """ + The sender of the message. + """ + + recipient : str + """ + The recipient of the message + """ + + time : int + """ + The time (unix timestamp) the message was received/ sent. + """ + + message : AgentMessage + """ + The message received/ sent. + """ + + processed : bool + """ + Did the management process the message, i.e., did the tasks necessary for this message (mostly only relevant for received messages). + """ + class DB(): + + _DB_TIME_FORMAT = "%Y-%m-%d %H:%M:%S" def __init__(self): self.db = sqlite3.connect( @@ -38,71 +76,105 @@ class DB(): self.db.execute("""CREATE TABLE IF NOT EXISTS Messages ( count INTEGER PRIMARY KEY AUTOINCREMENT, id TEXT, - fr TEXT, - to TEXT, + sender TEXT, + recipient TEXT, time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, json BLOB, processed BOOL DEFAULT FALSE )""") self.dblock.release() - def add_message(self, fr:str, to:str, message:AgentMessage, processed:bool=False) -> int: + @validate_call + def add_message(self, sender:str, recipient:str, message:AgentMessage, processed:bool=False) -> int: self.dblock.acquire() with self.db: self.db.execute( """INSERT INTO Messages ( - id, fr, to, json, processed + id, sender, recipient, json, processed ) VALUES ( - :id, :fr, :to, :json, :processed + :id, :sender, :recipient, :json, :processed )""", { "id" : message.id, - "fr" : fr, - "to" : to, + "sender" : sender, + "recipient" : recipient, "json" : message.model_dump_json(), "processed" : processed }) - new_count = self.db.execute("LAST_INSERT_ROWID()").fetchone() + new_count = self.db.execute("SELECT LAST_INSERT_ROWID() as last").fetchone() self.dblock.release() - return new_count + return new_count['last'] + @validate_call def set_processed(self, count:int, processed:bool=True) -> bool: self.dblock.acquire() with self.db: - self.db.execute("UPDATE Messages SET processed = ? WHERE count = ?", (processed, count)) + try: + self.db.execute("UPDATE Messages SET processed = ? WHERE count = ?", (processed, count)) + return True + except: + return False self.dblock.release() - def iterate(self, - id:str|None=None, fr:str|None=None, to:str|None=None, - processed:bool|None=None, - time_after:int|None=0, time_before:int|None=None, - limit:int=20, offset:int=0 - ): - - where = [] - - if time_after: - time_after = datetime.fromtimestamp(time_after).strftime("%Y-%m-%d %H:%M:%S") - if time_before: - time_before = datetime.fromtimestamp(time_before).strftime("%Y-%m-%d %H:%M:%S") - - with self.db: - self.db.execute( - "SELECT * FROM Messages WHERE {} LIMIT :lim OFFSET :off".format(', '.join(where)), - { - - }) - - - - - def by_count(self, count:int): - with self.db: - self.db.execute("SELECT * FROM Messages WHERE count = ?").fetchone(count) - # error - # create object - - def __iter__(self): + def __iter__(self) -> Generator[RowObject, None, None]: yield from self.iterate() - \ No newline at end of file + @validate_call + def iterate(self, + id:str|None=None, sender:str|None=None, recipient:str|None=None, + processed:bool|None=None, + time_after:int|None=None, time_before:int|None=None, + limit:int=20, offset:int=0 + ) -> Generator[RowObject, None, None]: + + where = [] + params = { + "lim": limit, + "off": offset + } + + for v,n in ((id,'id'), (sender,'sender'), (recipient,'recipient'), (processed,'processed')): + if not v is None: + where.append('{} = :{}'.format(n,n)) + params[n] = v + + if time_after: + where.append("time > :t_after") + params['t_after'] = datetime.fromtimestamp(time_after).strftime(self._DB_TIME_FORMAT) + + if time_before: + where.append("time < :t_before") + params['t_before'] = datetime.fromtimestamp(time_before).strftime(self._DB_TIME_FORMAT) + + if len(where) > 0: + where_clause = "WHERE " + (' AND '.join(where)) + else: + where_clause = "" + + with self.db: + for row in self.db.execute( + "SELECT * FROM Messages {} LIMIT :lim OFFSET :off".format(where_clause), + params + ): + yield self._create_row_object(row) + + def _create_row_object(self, row:sqlite3.Row) -> RowObject: + return RowObject( + count=row['count'], + sender=row['sender'], + recipient=row['recipient'], + time=int(datetime.strptime(row['time'], self._DB_TIME_FORMAT).timestamp()), + message=AgentMessage.model_construct(row['json']), + processed=row['processed'] + ) + + def by_count(self, count:int) -> RowObject|None: + with self.db: + try: + return self._create_row_object( + self.db.execute("SELECT * FROM Messages WHERE count = ?", (count,)).fetchone() + ) + except: + return None + + diff --git a/ums/management/main.py b/ums/management/main.py index 26d8490..df6c083 100644 --- a/ums/management/main.py +++ b/ums/management/main.py @@ -10,12 +10,17 @@ # TEST ONLY -from ums.utils import AgentMessage, RiddleData, RiddleDataType, RiddleSolution + from typing import Union from fastapi import FastAPI +from ums.utils import AgentMessage, RiddleData, RiddleDataType, RiddleSolution +from ums.management.db import DB + +db = DB() + app = FastAPI() @app.get("/") @@ -43,6 +48,12 @@ def huhu(): solution="Otto", explanation="Written in line 6 after 'Name:'" ) + + #ins_count = db.add_message('from', 'to', ex) + db.set_processed(34234) + + + print(db.by_count(3)) return ex diff --git a/ums/utils/const.py b/ums/utils/const.py index 8b5391c..fc2931f 100644 --- a/ums/utils/const.py +++ b/ums/utils/const.py @@ -15,6 +15,6 @@ import os -BASE_PATH = '/ums-agent' +BASE_PATH = '/ums-agenten' SHARE_PATH = os.path.join(BASE_PATH, 'share') PERSIST_PATH = os.path.join(BASE_PATH, 'persist') \ No newline at end of file diff --git a/ums/utils/types.py b/ums/utils/types.py index 63704f4..a414f55 100644 --- a/ums/utils/types.py +++ b/ums/utils/types.py @@ -44,7 +44,7 @@ "data": [ { "type": "text", - "file_plain": "/ums-agent/share/cv.txt", + "file_plain": "/ums-agenten/share/cv.txt", "file_extracted": null } ],