# Agenten Plattform # # (c) 2024 Magnus Bender # Institute of Humanities-Centered Artificial Intelligence (CHAI) # Universitaet Hamburg # https://www.chai.uni-hamburg.de/~bender # # source code released under the terms of GNU Public License Version 3 # https://www.gnu.org/licenses/gpl-3.0.txt import os import sqlite3, atexit from datetime import datetime from threading import Lock from typing import Generator from pydantic import validate_call, BaseModel from ums.utils import PERSIST_PATH, AgentMessage class RowObject(BaseModel): """ Object representing a database row. """ count : int """ The count (primary key) of the item. """ sender : str """ The sender of the message. """ recipient : str """ The recipient of the message """ time : int """ The time (unix timestamp) the message was received/ sent. """ message : AgentMessage """ The message received/ sent. """ processed : bool """ Did the management process the message, i.e., did the tasks necessary for this message (mostly only relevant for received messages). """ class DB(): _DB_TIME_FORMAT = "%Y-%m-%d %H:%M:%S" def __init__(self): self.db = sqlite3.connect( os.path.join(PERSIST_PATH, 'messages.db'), check_same_thread=False ) self.db.row_factory = sqlite3.Row atexit.register(lambda db : db.close(), self.db) self.db_lock = Lock() self._assure_tables() def _assure_tables(self): self.db_lock.acquire() with self.db: self.db.execute("""CREATE TABLE IF NOT EXISTS Messages ( count INTEGER PRIMARY KEY AUTOINCREMENT, id TEXT, sender TEXT, recipient TEXT, time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, json BLOB, processed BOOL DEFAULT FALSE )""") self.db_lock.release() @validate_call def add_message(self, sender:str, recipient:str, message:AgentMessage, processed:bool=False) -> int: self.db_lock.acquire() with self.db: self.db.execute( """INSERT INTO Messages ( id, sender, recipient, json, processed ) VALUES ( :id, :sender, :recipient, :json, :processed )""", { "id" : message.id, "sender" : sender, "recipient" : recipient, "json" : message.model_dump_json(), "processed" : processed }) new_count = self.db.execute("SELECT LAST_INSERT_ROWID() as last").fetchone() self.db_lock.release() return new_count['last'] @validate_call def set_processed(self, count:int, processed:bool=True) -> bool: self.db_lock.acquire() with self.db: try: self.db.execute("UPDATE Messages SET processed = ? WHERE count = ?", (processed, count)) return True except: return False finally: self.db_lock.release() def __iter__(self) -> Generator[RowObject, None, None]: yield from self.iterate() @validate_call def iterate(self, id:str|None=None, sender:str|None=None, recipient:str|None=None, processed:bool|None=None, time_after:int|None=None, time_before:int|None=None, limit:int=20, offset:int=0 ) -> Generator[RowObject, None, None]: where = [] params = { "lim": limit, "off": offset } for v,n in ((id,'id'), (sender,'sender'), (recipient,'recipient'), (processed,'processed')): if not v is None: where.append('{} = :{}'.format(n,n)) params[n] = v if time_after: where.append("time > :t_after") params['t_after'] = datetime.fromtimestamp(time_after).strftime(self._DB_TIME_FORMAT) if time_before: where.append("time < :t_before") params['t_before'] = datetime.fromtimestamp(time_before).strftime(self._DB_TIME_FORMAT) if len(where) > 0: where_clause = "WHERE " + (' AND '.join(where)) else: where_clause = "" with self.db: for row in self.db.execute( "SELECT * FROM Messages {} ORDER BY time DESC LIMIT :lim OFFSET :off".format(where_clause), params ): yield self._create_row_object(row) def _create_row_object(self, row:sqlite3.Row) -> RowObject: return RowObject( count=row['count'], sender=row['sender'], recipient=row['recipient'], time=int(datetime.strptime(row['time'], self._DB_TIME_FORMAT).timestamp()), message=AgentMessage.model_validate_json(row['json']), processed=row['processed'] ) def by_count(self, count:int) -> RowObject|None: with self.db: try: return self._create_row_object( self.db.execute("SELECT * FROM Messages WHERE count = ?", (count,)).fetchone() ) except: return None