diff --git a/docker-compose.yml b/docker-compose.yml index 9bf3d6e..3a32173 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,7 +21,8 @@ services: environment: - SOLUTION_MAX_TRIALS=5 - MESSAGE_MAX_CONTACTS=100 - - REQUIRE_FULL_EXTRACT=false + - REQUIRE_FULL_EXTRACT=true + - REQUIRE_FULL_SOLVE=true - MANAGEMENT_URL=http://management - AGENTS_PROCESS=http://agent_all:8000 - AGENTS_SOLVE=http://agent_all:8000 diff --git a/ums/agent/agent.py b/ums/agent/agent.py index 2825801..d4cad66 100644 --- a/ums/agent/agent.py +++ b/ums/agent/agent.py @@ -280,7 +280,7 @@ class SolveAgent(BasicAgent): if len(solution.solution) == 0 or len(solution.explanation) == 0: logger.info(f"Riddle {self._response.id}: Empty solution/ explanation after handling") - self._response.solution = solution + self._response.solution.append(solution) self._response.status.solve.finished = True self._do_response = True @@ -289,7 +289,7 @@ class SolveAgent(BasicAgent): @validate_call def handle(self, riddle: Riddle, data: List[RiddleData]) -> RiddleSolution: """ - Solve the `riddle` using `data` and return a solution. + Solve the `riddle` using `data` and return a single solution. """ class GatekeeperAgent(BasicAgent): @@ -301,25 +301,26 @@ class GatekeeperAgent(BasicAgent): return AgentCapability.GATEKEEPER def _process(self): - if self._response.solution is None: - self._response.solution = RiddleSolution(solution="", explanation="") + if len(self._response.solution) == 0: + self._response.solution.append(RiddleSolution(solution="", explanation="")) logger.debug(f"Start validate: {self._response.id}") solution = self.handle(self._response.solution, self._response.riddle) - logger.debug(f"End validate: {self._response.id} ({solution.review}, {solution.accepted})") - if solution.review is None or len(solution.review) == 0: - logger.info(f"Riddle {self._response.id}: Empty review after handling") + for single_solution in solution: + logger.debug(f"End validate: {self._response.id} ({single_solution.review}, {single_solution.accepted})") + if single_solution.review is None or len(single_solution.review) == 0: + logger.info(f"Riddle {self._response.id}: Empty review after handling") self._response.solution = solution self._response.status.validate.finished = True - self._response.status.solved = solution.accepted + self._response.status.solved = any(single_solution.accepted for single_solution in solution) self._do_response = True @abstractmethod @validate_call - def handle(self, solution:RiddleSolution, riddle:Riddle) -> RiddleSolution: + def handle(self, solution:List[RiddleSolution], riddle:Riddle) -> List[RiddleSolution]: """ - Check the `solution` of `riddle` and return solution with populated `solution.accepted` and `solution.review`. + Check the `solution` (multiple if multiple solver involved) of `riddle` and return solutions with populated `solution[i].accepted` and `solution[i].review`. """ \ No newline at end of file diff --git a/ums/example/example.py b/ums/example/example.py index b778f1c..41f7c8f 100644 --- a/ums/example/example.py +++ b/ums/example/example.py @@ -8,6 +8,8 @@ # source code released under the terms of GNU Public License Version 3 # https://www.gnu.org/licenses/gpl-3.0.txt +import random + from typing import Callable, List from ums.agent import ExtractAudioAgent, ExtractImageAgent, ExtractTextAgent, SolveAgent, GatekeeperAgent @@ -59,14 +61,14 @@ class MySolveAgent(SolveAgent): status.extract.required = False self.sub_riddle(riddle=Riddle(context="Haha", question="Blubber"), status=status) - return RiddleSolution(solution="Huii", explanation="Blubb") + return RiddleSolution(solution="Huii", explanation=f"Blubb, {random.random()}") class MyGatekeeperAgent(GatekeeperAgent): - def handle(self, solution: RiddleSolution, riddle: Riddle) -> RiddleSolution: - solution.accepted = True - solution.review = "Ok" + def handle(self, solution: List[RiddleSolution], riddle: Riddle) -> RiddleSolution: + solution[0].accepted = True + solution[0].review = "Ok" return solution diff --git a/ums/management/process.py b/ums/management/process.py index f6dcd2c..4a077ea 100644 --- a/ums/management/process.py +++ b/ums/management/process.py @@ -15,7 +15,7 @@ import requests from fastapi import BackgroundTasks from ums.management.db import DB -from ums.utils import AgentMessage, AgentResponse, logger, RiddleData +from ums.utils import AgentMessage, AgentResponse, logger, RiddleData, RiddleSolution class MessageProcessor(): @@ -23,6 +23,7 @@ class MessageProcessor(): MESSAGE_MAX_CONTACTS = int(os.environ.get('MESSAGE_MAX_CONTACTS', 100)) REQUIRE_FULL_EXTRACT = os.environ.get('REQUIRE_FULL_EXTRACT', 'false').lower() == 'true' + REQUIRE_FULL_SOLVE = os.environ.get('REQUIRE_FULL_SOLVE', 'false').lower() == 'true' MANAGEMENT_URL = os.environ.get('MANAGEMENT_URL', 'http://127.0.0.1:80').strip().strip('/') @@ -90,53 +91,99 @@ class MessageProcessor(): logger.warning(f"Message reached max number of contacts! {db_message.message.id}, {count}") return - # combine different extractions in data items - # will update items in `db_message.message.data` - fully_extracted = self._add_extractions(db_message.message.id, db_message.message.data) - if (self.REQUIRE_FULL_EXTRACT and not fully_extracted) \ - and not (db_message.message.status.extract.required and not db_message.message.status.extract.finished): - logger.warning(f"Postpone message, wait for full extract of items! {db_message.message.id}, {count}") - return - # check which step/ state the message requires the management to do + # -> IF if db_message.message.status.extract.required and not db_message.message.status.extract.finished: # send to extract agents self._send_messages(self.AGENTS_PROCESS, db_message.message) + return + + # combine different extractions in data items + # will update items in `db_message.message.data` + fully_extracted = self._add_extractions(db_message.message.id, db_message.message.data) + if self.REQUIRE_FULL_EXTRACT and not fully_extracted: + logger.warning(f"Postpone message, wait for full extract of items! {db_message.message.id}, {count}") + return - elif db_message.message.status.solve.required and not db_message.message.status.solve.finished: + # -> EL IF + if db_message.message.status.solve.required and not db_message.message.status.solve.finished: # send to solve agents self._send_messages(self.AGENTS_SOLVE, db_message.message) - - elif db_message.message.status.validate.required and not db_message.message.status.validate.finished: + return + + # combine different solutions + # will add solutions received before to `db_message.message.solution` + fully_solved = self._add_solutions(db_message.message.id, db_message.message.solution, db_message.message.status.trial) + if self.REQUIRE_FULL_SOLVE and not fully_solved: + logger.warning(f"Postpone message, wait for all solutions of riddle! {db_message.message.id}, {count}") + return + + # -> EL IF + if db_message.message.status.validate.required and not db_message.message.status.validate.finished: # send to solve agents self._send_messages(self.AGENTS_GATEKEEPER, db_message.message) + return - else: # all steps "done" + # -> ELSE + # all steps "done" - # validate not required? (then solved will never be set to true, thus set it here) - if not db_message.message.status.validate.required: - db_message.message.status.solved = True + # validate not required? (then solved will never be set to true, thus set it here) + if not db_message.message.status.validate.required: + db_message.message.status.solved = True - if db_message.message.status.solved: - # yay, message is solved - self.db.set_solution(count=count, solution=True); - else: - # not solved, but all steps done - self.db.set_solution(count=count, solution=False); + if db_message.message.status.solved: + # yay, message is solved + self.db.set_solution(count=count, solution=True); + else: + # not solved, but all steps done + self.db.set_solution(count=count, solution=False); + + # try again + self._do_again(db_message.message) + + def _hash_solution(self, s:RiddleSolution) -> int: + return hash((s.solution, s.explanation, tuple((d.file_plain, d.type) for d in s.used_data))) + + def _add_solutions(self, riddle_id:str, solution:List[RiddleSolution], trial:int) -> bool: + # do not do anything, if all solutions available + if len(solution) >= len(self.AGENTS_SOLVE): + return True - # try again - self._do_again(db_message.message) + contained = set(self._hash_solution(s) for s in solution) + + # search db for solutions from before + for row in self.db.iterate( + id=riddle_id, + limit=min(self.db.len(id=riddle_id), 250) + ): + # make sure to only use solutions from same "trial" + if row.message.status.trial == trial: + for s in row.message.solution: + h = self._hash_solution(s) + if h not in contained: + # add the 'new' solution + solution.append(s) + contained.add(h) + # all solutions found ? + if len(solution) >= len(self.AGENTS_SOLVE): + break + + return len(solution) >= len(self.AGENTS_SOLVE) + def _hash_data(self, d:RiddleData) -> int: return hash((d.file_plain, d.type, d.prompt)) def _add_extractions(self, riddle_id:str, data:List[RiddleData]) -> bool: - # get all the data items without extraction empty_data = {} for i, d in enumerate(data): if d.file_extracted is None: empty_data[self._hash_data(d)] = i + + # do not do anything if fully extracted + if len(empty_data) == 0: + return True # search db for extractions already available for row in self.db.iterate( @@ -178,13 +225,13 @@ class MessageProcessor(): # increment trial message.status.trial += 1 - # append current solution als old one - if not message.solution is None: - message.riddle.solutions_before.append( + # append current solution(s) als old one(s) + if len(message.solution) > 0: + message.riddle.solutions_before.extend( message.solution ) # reset current solution - message.solution = None + message.solution = [] # add the riddle as new to management self._send_message(self.MANAGEMENT_URL, message) diff --git a/ums/utils/types.py b/ums/utils/types.py index 0868100..48d1ed2 100644 --- a/ums/utils/types.py +++ b/ums/utils/types.py @@ -40,7 +40,7 @@ "question": "Get the name of the person.", "solutions_before": [] }, - "solution": null, + "solution": [], "data": [ { "type": "text", @@ -77,13 +77,13 @@ ```json { ... - "solution": { + "solution": [{ "solution": "Otto", "explanation": "Written in line 6 after 'Name:'", "used_data": [], "accepted": false, "review": null - }, + }], ... } ``` @@ -98,11 +98,11 @@ from typing import List, Any from typing_extensions import Annotated from pydantic import ( - BaseModel, - ValidationError, ValidationInfo, - ValidatorFunctionWrapHandler + BaseModel, + ValidationError, ValidationInfo, + ValidatorFunctionWrapHandler, + WrapValidator, AfterValidator, BeforeValidator ) -from pydantic.functional_validators import WrapValidator, AfterValidator from ums.utils.const import SHARE_PATH from ums.utils.schema import ExtractionSchema @@ -268,12 +268,12 @@ class RiddleStatus(RiddleInformation): """ The *main* solving step. - `AgentMessage.solution` shall be an `RiddleSolution` afterwards. + `AgentMessage.solution` shall contain an `RiddleSolution` afterwards. """ validate: RiddleSubStatus = RiddleSubStatus() """ - The validation step, i.e., does the gatekeeper accept the solution in `AgentMessage.solution`. + The validation step, i.e., does the gatekeeper accept the solution(s) in `AgentMessage.solution`. """ trial: int = 0 @@ -284,9 +284,13 @@ class RiddleStatus(RiddleInformation): solved: bool = False """ - True, after the gatekeeper accepts the solution at `AgentMessage.solution` + True, after the gatekeeper accepts the solution(s) at `AgentMessage.solution` """ +def _transform_to_list(value : Any) -> List[Any]: + # type check of items is done next by pydantic + return value if isinstance(value, list) else [value] + class AgentMessage(RiddleInformation): """ The basic message, which is sent be the agent and the management. @@ -310,9 +314,10 @@ class AgentMessage(RiddleInformation): The riddle to solve. """ - solution: RiddleSolution | None = None + solution: Annotated[List[RiddleSolution], BeforeValidator(_transform_to_list)] = [] """ - The solution of the riddle (or empty if no solution available) + The solutions of the riddle (or empty list if no solutions available) + (When assigning a single object of `RiddleSolution` will be convert to list with this single object.) """ data: List[RiddleData] = []