# Agenten Plattform # # (c) 2024 Magnus Bender # Institute of Humanities-Centered Artificial Intelligence (CHAI) # Universitaet Hamburg # https://www.chai.uni-hamburg.de/~bender # # source code released under the terms of GNU Public License Version 3 # https://www.gnu.org/licenses/gpl-3.0.txt import random, os, json, time from abc import abstractmethod, ABC from enum import Enum from typing import List, Callable from pydantic import validate_call from ums.utils import ( RiddleInformation, AgentMessage, RiddleDataType, RiddleData, Riddle, RiddleStatus, RiddleSolution, ExtractedData, logger ) class AgentCapability(Enum): """ The three different capabilities an agent can have. """ EXTRACT="extract" SOLVE="solve" GATEKEEPER="gatekeeper" class BasicAgent(ABC): """ A basic agent, each agent will be a subclass of this class. """ @staticmethod @abstractmethod def agent_capability() -> AgentCapability: """ Represents the capabilities of this agent, for messages/ tasks of this capability, the `handle` method will be called. """ pass @validate_call def __init__(self, message:AgentMessage, send_message:Callable[[AgentMessage], bool]): self._send_message = send_message self._sub_cnt = 0 self._message = message self._response = message.model_copy(deep=True) self._do_response = False self._process() self._respond() @abstractmethod def _process(self): pass def _respond(self): # do a very short sleep time.sleep(random.random()) # sending send_it = lambda: self._send_message(self._response) if self.before_response(self._response, send_it) and self._do_response: send_it() logger.debug(f"Response sent {self._response.id}") else: logger.debug(f"Stopped response {self._response.id}") @validate_call def before_response(self, response:AgentMessage, send_it:Callable[[], None]) -> bool: """ This method is called before the response is sent. If the method returns `False` no response will be sent. Thus, by overwriting this method, a response can be prevented. The response to be sent is in `response` and `send_it` is a callable, which sends the response to the management if it gets called. (Hence, one may stop sending the response and later call `send_it()` to send the response.) """ return True @validate_call def message(self) -> AgentMessage: """ Get the message this agent object is working on. """ return self._message; @validate_call def sub_riddle(self, riddle:Riddle, data:List[RiddleData]=[], status:RiddleStatus=None ) -> AgentMessage|bool: """ Create a new sub-riddle for solving details of the current riddle. For the sub-riddle, give a `riddle` and optionally, a selection of `data` items (default none) and a `status` (default `ums.utils.types.RiddleStatus()`). By changing a status, different steps can be (de-)selected. Return the message of the sub-riddle or `false` on error. """ if status is None: status = RiddleStatus() # new sub riddle id self._sub_cnt += 1 new_id = "{}-sub-{}.{}".format(self._message.id, self._sub_cnt, int(random.random()*100)) self._message.sub_ids.append(new_id) self._response.sub_ids.append(new_id) # create the riddle's message sub_msg = AgentMessage( id=new_id, riddle=riddle, data=data, status=status ) logger.debug(f"Created sub-riddle {sub_msg.id}") # send it if self._send_message(sub_msg): return sub_msg else: return False @abstractmethod def handle(self, *args:RiddleInformation) -> RiddleInformation: """ Handle a single task of the agent, the arguments and return value depends on the actual task (see subclass)! **This is the method to implement!** The full message is available via `message()`, a sub riddle can be created with `sub_riddle()`. """ pass @validate_call def get_extracted(self, data:RiddleData) -> ExtractedData|None: """ Loads the extracted data from the `data` item (i.e., from the file `data.file_extracted`). Returns None if no extracted data found. """ if not data.file_extracted is None: return ExtractedData.model_validate( json.load(open(data.file_extracted, 'r')) ) return None class ExtractAgent(BasicAgent): """ An extraction agent. """ def agent_capability() -> AgentCapability: return AgentCapability.EXTRACT @staticmethod @abstractmethod def extract_type() -> RiddleDataType: """ Represents the data this agent can process. """ pass def _process(self): for i, data in enumerate(self._response.data): if data.type == self.__class__.extract_type(): logger.debug(f"Start extraction '{data.file_plain}'") result = self.handle(data) logger.debug(f"End extraction '{data.file_plain}' ('{result.file_extracted}')") if result.file_extracted is None: logger.info(f"Riddle {self._response.id}: 'file_extracted' for data '{data.file_plain}' still empty after handling") self._response.data[i] = result self._do_response = True self._response.status.extract.finished = True @abstractmethod @validate_call def handle(self, data:RiddleData) -> RiddleData: """ Process the item `data`, create extraction file and return `data` with populated `data.file_extracted`. """ @validate_call def store_extracted(self, data:RiddleData, extracted:ExtractedData, allow_overwrite:bool=True) -> str: """ Stores the newly extracted data (in `extracted`) from `data` (i.e., `data.file_plain`) and returns the filename to use in `data.file_extracted`. If there already exists an extracted file for this `data`, the file will be overwritten if `allow_overwrite=True`. Generally the system will check, if the contents of the current file are equal to the contents to write. File with equal content will not be written again. """ # get path and name path_name = data.file_plain[:data.file_plain.rfind('.')] candidate = "{}.json".format(path_name) # data to write data = extracted.model_dump_json() # check for file if os.path.isfile(candidate): # get current content with open(candidate, 'r') as f: content = f.read() # files equal -> no need to rewrite if content == data: return candidate # not equal and overwrite not allowed elif not allow_overwrite: # get non-existent file name cnt = 0 while os.path.isfile(candidate): cnt += 1 candidate = "{}-{}.json".format(path_name, cnt) # write file with open(candidate, 'w+') as f: f.write(data) return candidate class ExtractTextAgent(ExtractAgent): """ An extraction agent for text, create a subclass for your agent. """ def extract_type() -> RiddleDataType: return RiddleDataType.TEXT class ExtractAudioAgent(ExtractAgent): """ An extraction agent for audio, create a subclass for your agent. """ def extract_type() -> RiddleDataType: return RiddleDataType.AUDIO class ExtractImageAgent(ExtractAgent): """ An extraction agent for images, create a subclass for your agent. """ def extract_type() -> RiddleDataType: return RiddleDataType.IMAGE class SolveAgent(BasicAgent): """ A solve agent, create a subclass for your agent. """ def agent_capability() -> AgentCapability: return AgentCapability.SOLVE def _process(self): logger.debug(f"Start solve: {self._response.id}") solution = self.handle(self._response.riddle, self._response.data) logger.debug(f"End solve: {self._response.id} ({solution.solution}, {solution.explanation})") if len(solution.solution) == 0 or len(solution.explanation) == 0: logger.info(f"Riddle {self._response.id}: Empty solution/ explanation after handling") self._response.solution.append(solution) self._response.status.solve.finished = True self._do_response = True @abstractmethod @validate_call def handle(self, riddle: Riddle, data: List[RiddleData]) -> RiddleSolution: """ Solve the `riddle` using `data` and return a single solution. """ class GatekeeperAgent(BasicAgent): """ A gatekeeper agent, create a subclass for your agent. """ def agent_capability() -> AgentCapability: return AgentCapability.GATEKEEPER def _process(self): if len(self._response.solution) == 0: self._response.solution.append(RiddleSolution(solution="", explanation="")) logger.debug(f"Start validate: {self._response.id}") solution = self.handle(self._response.solution, self._response.riddle) for single_solution in solution: logger.debug(f"End validate: {self._response.id} ({single_solution.review}, {single_solution.accepted})") if single_solution.review is None or len(single_solution.review) == 0: logger.info(f"Riddle {self._response.id}: Empty review after handling") self._response.solution = solution self._response.status.validate.finished = True self._response.status.solved = any(single_solution.accepted for single_solution in solution) self._do_response = True @abstractmethod @validate_call def handle(self, solution:List[RiddleSolution], riddle:Riddle) -> List[RiddleSolution]: """ Check the `solution` (multiple if multiple solver involved) of `riddle` and return solutions with populated `solution[i].accepted` and `solution[i].review`. """