ums.agent.main
1# Agenten Plattform 2# 3# (c) 2024 Magnus Bender 4# Institute of Humanities-Centered Artificial Intelligence (CHAI) 5# Universitaet Hamburg 6# https://www.chai.uni-hamburg.de/~bender 7# 8# source code released under the terms of GNU Public License Version 3 9# https://www.gnu.org/licenses/gpl-3.0.txt 10 11import os 12 13from fastapi import FastAPI, Request, BackgroundTasks 14from fastapi.staticfiles import StaticFiles 15from fastapi.responses import JSONResponse 16 17from ums.agent.process import MessageProcessor 18from ums.utils import AgentMessage, AgentResponse, const 19 20class WebMain(): 21 22 def __init__(self): 23 self.msg_process = MessageProcessor() 24 25 self._init_app() 26 self._add_routes() 27 28 def _init_app(self): 29 self.app = FastAPI( 30 title="Agenten Plattform", 31 description="Agenten Plattform – Agent", 32 openapi_url="/api/schema.json", 33 docs_url='/api', 34 redoc_url=None 35 ) 36 37 self.app.mount( 38 "/static", 39 StaticFiles(directory=os.path.join(const.PUBLIC_PATH, 'static')), 40 name="static" 41 ) 42 self.app.mount( 43 "/docs", 44 StaticFiles(directory=os.path.join(const.PUBLIC_PATH, 'docs'), html=True), 45 name="docs" 46 ) 47 48 def _add_routes(self): 49 50 @self.app.get("/", response_class=JSONResponse, summary="Link list") 51 def index(): 52 return { 53 "title" : "Agenten Plattform – Agent", 54 "./message" : "Messaged from the Management", 55 "./api" : "API Overview", 56 "./docs" : "Documentation" 57 } 58 59 @self.app.post("/message", summary="Send a message to this agent") 60 def message(request: Request, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse: 61 return self.msg_process.new_message(message, background_tasks) 62 63if __name__ == "ums.agent.main" and os.environ.get('SERVE', 'false').lower() == 'true': 64 main = WebMain() 65 app = main.app
class
WebMain:
21class WebMain(): 22 23 def __init__(self): 24 self.msg_process = MessageProcessor() 25 26 self._init_app() 27 self._add_routes() 28 29 def _init_app(self): 30 self.app = FastAPI( 31 title="Agenten Plattform", 32 description="Agenten Plattform – Agent", 33 openapi_url="/api/schema.json", 34 docs_url='/api', 35 redoc_url=None 36 ) 37 38 self.app.mount( 39 "/static", 40 StaticFiles(directory=os.path.join(const.PUBLIC_PATH, 'static')), 41 name="static" 42 ) 43 self.app.mount( 44 "/docs", 45 StaticFiles(directory=os.path.join(const.PUBLIC_PATH, 'docs'), html=True), 46 name="docs" 47 ) 48 49 def _add_routes(self): 50 51 @self.app.get("/", response_class=JSONResponse, summary="Link list") 52 def index(): 53 return { 54 "title" : "Agenten Plattform – Agent", 55 "./message" : "Messaged from the Management", 56 "./api" : "API Overview", 57 "./docs" : "Documentation" 58 } 59 60 @self.app.post("/message", summary="Send a message to this agent") 61 def message(request: Request, message:AgentMessage, background_tasks: BackgroundTasks) -> AgentResponse: 62 return self.msg_process.new_message(message, background_tasks)