# Agenten Plattform # # (c) 2024 Magnus Bender # Institute of Humanities-Centered Artificial Intelligence (CHAI) # Universitaet Hamburg # https://www.chai.uni-hamburg.de/~bender # # source code released under the terms of GNU Public License Version 3 # https://www.gnu.org/licenses/gpl-3.0.txt import os from datetime import datetime import sqlite3 import atexit from threading import Lock from ums.utils import PERSIST_PATH, AgentMessage class DB(): def __init__(self): self.db = sqlite3.connect( os.path.join(PERSIST_PATH, 'messages.db'), check_same_thread=False ) self.db.row_factory = sqlite3.Row self.dblock = Lock() atexit.register(lambda db : db.close(), self.db) self._assure_tables() def _assure_tables(self): self.dblock.acquire() with self.db: self.db.execute("""CREATE TABLE IF NOT EXISTS Messages ( count INTEGER PRIMARY KEY AUTOINCREMENT, id TEXT, fr TEXT, to TEXT, time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, json BLOB, processed BOOL DEFAULT FALSE )""") self.dblock.release() def add_message(self, fr:str, to:str, message:AgentMessage, processed:bool=False) -> int: self.dblock.acquire() with self.db: self.db.execute( """INSERT INTO Messages ( id, fr, to, json, processed ) VALUES ( :id, :fr, :to, :json, :processed )""", { "id" : message.id, "fr" : fr, "to" : to, "json" : message.model_dump_json(), "processed" : processed }) new_count = self.db.execute("LAST_INSERT_ROWID()").fetchone() self.dblock.release() return new_count def set_processed(self, count:int, processed:bool=True) -> bool: self.dblock.acquire() with self.db: self.db.execute("UPDATE Messages SET processed = ? WHERE count = ?", (processed, count)) self.dblock.release() def iterate(self, id:str|None=None, fr:str|None=None, to:str|None=None, processed:bool|None=None, time_after:int|None=0, time_before:int|None=None, limit:int=20, offset:int=0 ): where = [] if time_after: time_after = datetime.fromtimestamp(time_after).strftime("%Y-%m-%d %H:%M:%S") if time_before: time_before = datetime.fromtimestamp(time_before).strftime("%Y-%m-%d %H:%M:%S") with self.db: self.db.execute( "SELECT * FROM Messages WHERE {} LIMIT :lim OFFSET :off".format(', '.join(where)), { }) def by_count(self, count:int): with self.db: self.db.execute("SELECT * FROM Messages WHERE count = ?").fetchone(count) # error # create object def __iter__(self): yield from self.iterate()