Spaces:
Runtime error
Runtime error
| # app.py — MCP POC using local Hugging Face model (flan-t5 or other) or rule-based fallback. | |
| # Place this file next to config.py. Do NOT store secrets here. | |
| from mcp.server.fastmcp import FastMCP | |
| from typing import Optional, List, Tuple, Any, Dict | |
| import requests | |
| import os | |
| import gradio as gr | |
| import json | |
| import time | |
| import traceback | |
| import inspect | |
| import re | |
| # Optional transformers imports — load only if available | |
| TRANSFORMERS_AVAILABLE = False | |
| try: | |
| from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForCausalLM | |
| TRANSFORMERS_AVAILABLE = True | |
| except Exception: | |
| TRANSFORMERS_AVAILABLE = False | |
| # ---------------------------- | |
| # Load config | |
| # ---------------------------- | |
| try: | |
| from config import ( | |
| CLIENT_ID, | |
| CLIENT_SECRET, | |
| REFRESH_TOKEN, | |
| API_BASE, | |
| LOCAL_MODEL, # e.g. "google/flan-t5-base" or None | |
| ) | |
| except Exception: | |
| raise SystemExit( | |
| "Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, LOCAL_MODEL (or leave LOCAL_MODEL=None)." | |
| ) | |
| # ---------------------------- | |
| # Initialize FastMCP | |
| # ---------------------------- | |
| mcp = FastMCP("ZohoCRMAgent") | |
| # ---------------------------- | |
| # Analytics / KPI logging (simple local JSON file) | |
| # ---------------------------- | |
| ANALYTICS_PATH = "mcp_analytics.json" | |
| def _init_analytics(): | |
| if not os.path.exists(ANALYTICS_PATH): | |
| base = { | |
| "tool_calls": {}, | |
| "llm_calls": 0, | |
| "last_llm_confidence": None, | |
| "created_at": time.time() | |
| } | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(base, f, indent=2) | |
| def _log_tool_call(tool_name: str, success: bool = True): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) | |
| data["tool_calls"][tool_name]["count"] += 1 | |
| if success: | |
| data["tool_calls"][tool_name]["success"] += 1 | |
| else: | |
| data["tool_calls"][tool_name]["fail"] += 1 | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def _log_llm_call(confidence: Optional[float] = None): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["llm_calls"] = data.get("llm_calls", 0) + 1 | |
| if confidence is not None: | |
| data["last_llm_confidence"] = confidence | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| _init_analytics() | |
| # ---------------------------- | |
| # Local LLM pipeline initialization | |
| # ---------------------------- | |
| LLM_PIPELINE = None | |
| TOKENIZER = None | |
| def init_local_model(): | |
| """ | |
| Initialize local HF model pipeline depending on LOCAL_MODEL. | |
| Supports seq2seq (flan/t5) and causal models. | |
| If transformers is unavailable or LOCAL_MODEL is None, leaves LLM_PIPELINE as None. | |
| """ | |
| global LLM_PIPELINE, TOKENIZER | |
| if not LOCAL_MODEL: | |
| print("LOCAL_MODEL is None — using rule-based fallback.") | |
| LLM_PIPELINE = None | |
| return | |
| if not TRANSFORMERS_AVAILABLE: | |
| print("transformers not installed — using rule-based fallback.") | |
| LLM_PIPELINE = None | |
| return | |
| try: | |
| tokenizer_name = LOCAL_TOKENIZER or LOCAL_MODEL | |
| # Detect seq2seq family (T5/Flan) | |
| if any(x in LOCAL_MODEL.lower() for x in ["flan", "t5", "seq2seq"]): | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(LOCAL_MODEL) | |
| LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER) | |
| print(f"Loaded seq2seq model pipeline for {LOCAL_MODEL}") | |
| else: | |
| # causal model path | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True) | |
| model = AutoModelForCausalLM.from_pretrained(LOCAL_MODEL) | |
| LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER) | |
| print(f"Loaded causal model pipeline for {LOCAL_MODEL}") | |
| except Exception as e: | |
| print("Failed to load local model:", e) | |
| traceback.print_exc() | |
| LLM_PIPELINE = None | |
| # Try to init model at startup (may be slow) | |
| init_local_model() | |
| # ---------------------------- | |
| # Rule-based fallback responder | |
| # ---------------------------- | |
| def rule_based_response(message: str) -> str: | |
| msg = (message or "").strip().lower() | |
| if msg.startswith("create record") or msg.startswith("create contact"): | |
| return "To create a record, use the command: create_record MODULE_NAME {\"Field\": \"value\"}" | |
| if msg.startswith("create_invoice"): | |
| return "To create invoice: create_invoice {\"customer_id\": \"...\", \"line_items\": [...]} (JSON)" | |
| if msg.startswith("help") or msg.startswith("what can you do"): | |
| return "I can create/update/delete records in Zoho (create_record/update_record/delete_record) or process local files by pasting their path (/mnt/data/...)." | |
| return "(fallback) No local LLM loaded. Use explicit commands like `create_record` or paste a /mnt/data/ path." | |
| # ---------------------------- | |
| # Zoho token refresh & headers helper | |
| # ---------------------------- | |
| def _get_valid_token_headers() -> dict: | |
| # Note: region-specific account host may need .com or .eu — ensure API_BASE matches services used. | |
| token_url = "https://accounts.zoho.in/oauth/v2/token" | |
| params = { | |
| "refresh_token": REFRESH_TOKEN, | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "grant_type": "refresh_token" | |
| } | |
| r = requests.post(token_url, params=params, timeout=20) | |
| if r.status_code == 200: | |
| t = r.json().get("access_token") | |
| return {"Authorization": f"Zoho-oauthtoken {t}"} | |
| else: | |
| raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}") | |
| # ---------------------------- | |
| # MCP tools: Zoho CRM & Books (CRUD + document processing) | |
| # ---------------------------- | |
| def authenticate_zoho() -> str: | |
| try: | |
| _ = _get_valid_token_headers() | |
| _log_tool_call("authenticate_zoho", True) | |
| return "Zoho token refreshed (ok)." | |
| except Exception as e: | |
| _log_tool_call("authenticate_zoho", False) | |
| return f"Failed to authenticate: {e}" | |
| def create_record(module_name: str, record_data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| payload = {"data": [record_data]} | |
| r = requests.post(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_record", False) | |
| return f"Error creating record: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_record", False) | |
| return f"Exception: {e}" | |
| def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("get_records", True) | |
| return r.json().get("data", []) | |
| _log_tool_call("get_records", False) | |
| return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] | |
| except Exception as e: | |
| _log_tool_call("get_records", False) | |
| return [f"Exception: {e}"] | |
| def update_record(module_name: str, record_id: str, data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| payload = {"data": [data]} | |
| r = requests.put(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("update_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("update_record", False) | |
| return f"Error updating: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("update_record", False) | |
| return f"Exception: {e}" | |
| def delete_record(module_name: str, record_id: str) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| r = requests.delete(url, headers=headers, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("delete_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("delete_record", False) | |
| return f"Error deleting: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("delete_record", False) | |
| return f"Exception: {e}" | |
| def create_invoice(data: dict) -> str: | |
| """ | |
| Creates an invoice in Zoho Books. | |
| NOTE: Ensure API_BASE points to the Books base (e.g. https://books.zoho.in/api/v3) when calling invoices. | |
| """ | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/invoices" | |
| r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_invoice", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_invoice", False) | |
| return f"Error creating invoice: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_invoice", False) | |
| return f"Exception: {e}" | |
| def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: | |
| """ | |
| Process an uploaded file path (local path or URL). Per developer instruction, | |
| we accept local paths like '/mnt/data/script_zoho_mcp' and return a file:// URL. | |
| Replace the placeholder OCR block with your real OCR pipeline when ready. | |
| """ | |
| try: | |
| if os.path.exists(file_path): | |
| file_url = f"file://{file_path}" | |
| # Placeholder extraction — replace with OCR + parsing logic | |
| extracted = { | |
| "Name": "ACME Corp (simulated)", | |
| "Email": "[email protected]", | |
| "Phone": "+91-99999-00000", | |
| "Total": "1234.00", | |
| "Confidence": 0.88 | |
| } | |
| _log_tool_call("process_document", True) | |
| return { | |
| "status": "success", | |
| "file": os.path.basename(file_path), | |
| "file_url": file_url, | |
| "target_module": target_module, | |
| "extracted_data": extracted | |
| } | |
| else: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": "file not found", "file_path": file_path} | |
| except Exception as e: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": str(e)} | |
| # ---------------------------- | |
| # Simple local command parser to call tools explicitly from chat (POC) | |
| # ---------------------------- | |
| def try_parse_and_invoke_command(text: str): | |
| """ | |
| Very small parser for explicit chat commands: | |
| create_record MODULE {json} | |
| create_invoice {json} | |
| process_document /mnt/data/... | |
| """ | |
| text = text.strip() | |
| # create_record | |
| m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I) | |
| if m: | |
| module = m.group(1) | |
| body = m.group(2) | |
| try: | |
| record_data = json.loads(body) | |
| except Exception: | |
| return "Invalid JSON for record_data" | |
| return create_record(module, record_data) | |
| # create_invoice | |
| m = re.match(r"^create_invoice\s+(.+)$", text, re.I) | |
| if m: | |
| body = m.group(1) | |
| try: | |
| invoice_data = json.loads(body) | |
| except Exception: | |
| return "Invalid JSON for invoice_data" | |
| return create_invoice(invoice_data) | |
| # process_document via local path | |
| m = re.match(r"^(\/mnt\/data\/\S+)$", text) | |
| if m: | |
| path = m.group(1) | |
| return process_document(path) | |
| return None | |
| # ---------------------------- | |
| # Local LLM / fallback generator | |
| # ---------------------------- | |
| def local_llm_generate(prompt: str) -> str: | |
| if LLM_PIPELINE is not None: | |
| try: | |
| # For seq2seq (text2text) the pipeline returns 'generated_text' | |
| out = LLM_PIPELINE(prompt, max_new_tokens=256) | |
| if isinstance(out, list) and len(out) > 0: | |
| # text2text pipelines often provide 'generated_text' | |
| if isinstance(out[0], dict): | |
| return out[0].get("generated_text") or out[0].get("text") or str(out[0]) | |
| return str(out[0]) | |
| return str(out) | |
| except Exception as e: | |
| print("LLM pipeline error:", e) | |
| traceback.print_exc() | |
| return rule_based_response(prompt) | |
| else: | |
| return rule_based_response(prompt) | |
| # ---------------------------- | |
| # Gradio chat handler (accepts message, history) | |
| # ---------------------------- | |
| def chat_handler(message, history): | |
| """ | |
| Gradio ChatInterface calls this with (message, history). | |
| If the message is a local file path (starting with /mnt/data/), we pass it unchanged | |
| to process_document() — this satisfies the developer instruction to send the path as the URL. | |
| Otherwise, try explicit commands, then local LLM fallback. | |
| """ | |
| history = history or [] | |
| trimmed = (message or "").strip() | |
| # 1) explicit commands | |
| cmd = try_parse_and_invoke_command(trimmed) | |
| if cmd is not None: | |
| return cmd | |
| # 2) developer convenience: local path handling | |
| if trimmed.startswith("/mnt/data/"): | |
| try: | |
| doc = process_document(trimmed) | |
| return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}" | |
| except Exception as e: | |
| return f"Error processing document: {e}" | |
| # 3) otherwise call local LLM (if available) or fallback | |
| # build a compact prompt including a short system instruction and history | |
| history_text = "" | |
| for pair in history: | |
| try: | |
| user_turn, assistant_turn = pair[0], pair[1] | |
| except Exception: | |
| if isinstance(pair, dict): | |
| user_turn = pair.get("user", "") | |
| assistant_turn = pair.get("assistant", "") | |
| else: | |
| user_turn, assistant_turn = "", "" | |
| if user_turn: | |
| history_text += f"User: {user_turn}\n" | |
| if assistant_turn: | |
| history_text += f"Assistant: {assistant_turn}\n" | |
| system = "You are a Zoho assistant that can call local MCP tools when asked. Keep replies short and actionable." | |
| prompt = f"{system}\n{history_text}\nUser: {trimmed}\nAssistant:" | |
| try: | |
| resp = local_llm_generate(prompt) | |
| _log_llm_call(None) | |
| return resp | |
| except Exception as e: | |
| return f"LLM error: {e}" | |
| # ---------------------------- | |
| # Gradio UI | |
| # ---------------------------- | |
| def chat_interface(): | |
| return gr.ChatInterface( | |
| fn=chat_handler, | |
| textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, or paste /mnt/data/... for dev.") | |
| ) | |
| # ---------------------------- | |
| # Entrypoint | |
| # ---------------------------- | |
| if __name__ == "__main__": | |
| print("[startup] Launching Gradio UI + FastMCP server (local LLM mode).") | |
| demo = chat_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |