| | """ |
| | Core logic for the WellBe+ multi‑context AI assistant. |
| | This module handles MCP server instantiation, model selection, and single‑shot |
| | question answering. It is intentionally agnostic of any UI layer so that it can |
| | be re‑used from a command‑line script, a notebook, or the Gradio front‑end. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import asyncio |
| | from copy import deepcopy |
| | from typing import Tuple, Union, Optional |
| | import os |
| | import openai |
| | from agents import Agent, Runner |
| | from agents.extensions.models.litellm_model import LitellmModel |
| | from agents.mcp.server import MCPServerStdio, MCPServerStreamableHttp |
| |
|
| | from config import PROMPT_TEMPLATE, MCP_CONFIGS |
| |
|
| | __all__ = [ |
| | "initialize_mcp_servers", |
| | "select_model", |
| | "answer_question", |
| | "answer_sync", |
| | ] |
| |
|
| | |
| | _healthcare_server: Optional[MCPServerStreamableHttp] = None |
| | _whoop_server: Optional[Union[MCPServerStdio, str]] = None |
| |
|
| |
|
| | def initialize_mcp_servers(whoop_email: str, whoop_password: str) -> None: |
| | """Initialize and cache the MCP servers (once only).""" |
| | global _healthcare_server, _whoop_server |
| | if _healthcare_server is None: |
| | cfg = deepcopy(MCP_CONFIGS) |
| | cfg["whoop"].update({"username": whoop_email, "password": whoop_password}) |
| |
|
| | _healthcare_server = MCPServerStreamableHttp( |
| | params=cfg["healthcare-mcp-public"], name="Healthcare MCP Server" |
| | ) |
| | |
| | _whoop_server = "Whoop Tool Not available now" |
| |
|
| |
|
| | def select_model() -> Union[str, LitellmModel]: |
| | return "o3-mini" |
| |
|
| |
|
| | async def answer_question( |
| | question: str, |
| | openai_key: str, |
| | whoop_email: str, |
| | whoop_password: str, |
| | ) -> str: |
| | """Run the WellBe+ agent on a single question and return the assistant reply.""" |
| | initialize_mcp_servers(whoop_email, whoop_password) |
| | if _healthcare_server is None: |
| | return "**Error:** MCP servers not initialized." |
| |
|
| | async with _healthcare_server as hserver: |
| | agent = Agent( |
| | name="WellBe+ Assistant", |
| | instructions=PROMPT_TEMPLATE, |
| | model=select_model(), |
| | mcp_servers=[ |
| | hserver |
| | |
| | ], |
| | ) |
| | result = await Runner.run(agent, question) |
| | return result.final_output |
| |
|
| |
|
| | def answer_sync(question: str, openai_key: str, email: str, password: str) -> str: |
| | """Blocking wrapper around :func:`answer_question`.""" |
| | api_key = openai_key or os.getenv("OPENAI_API_KEY") |
| | openai.api_key = api_key |
| | os.environ["OPENAI_API_KEY"] = api_key |
| | if not question.strip(): |
| | return "Please enter a question." |
| | try: |
| | return asyncio.run( |
| | answer_question( |
| | question, openai_key.strip(), email.strip(), password.strip() |
| | ) |
| | ) |
| | except Exception as exc: |
| | return f"**Error:** {exc}" |
| |
|