Spaces:
Runtime error
Runtime error
| # app.py — MCP server using DeepSeek via Hugging Face transformers (or fallback) | |
| # - Put this file next to config.py (see example below) | |
| # - Supports LOCAL_MODEL values like: | |
| # "deepseek/deepseek-r1-0528" or "deepseek/deepseek-r1-0528:free" | |
| # If a ":revision" suffix is present, it will be passed as the `revision=` argument | |
| # to transformers.from_pretrained so HF validation is satisfied. | |
| # | |
| # - It loads the model via transformers.pipeline if available; otherwise falls back | |
| # to google/flan-t5-small or rule-based responder. | |
| # | |
| # - Developer instruction: when a user pastes a local path (e.g. /mnt/data/...), the | |
| # chat handler passes the path unchanged to process_document(); tool invocation | |
| # normalization converts file_path -> file_url (file://...) and optionally file_b64. | |
| from mcp.server.fastmcp import FastMCP | |
| from typing import Optional, List, Tuple, Any, Dict | |
| import requests | |
| import os | |
| import gradio as gr | |
| import json | |
| import time | |
| import traceback | |
| import inspect | |
| import re | |
| import logging | |
| import base64 | |
| import tempfile | |
| # Setup simple logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("mcp_deepseek") | |
| # Optional transformers imports — will attempt; we handle missing gracefully | |
| TRANSFORMERS_AVAILABLE = False | |
| try: | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM | |
| TRANSFORMERS_AVAILABLE = True | |
| except Exception as e: | |
| logger.warning("transformers not available: %s", e) | |
| TRANSFORMERS_AVAILABLE = False | |
| # ---------------------------- | |
| # Load config | |
| # ---------------------------- | |
| try: | |
| from config import ( | |
| CLIENT_ID, | |
| CLIENT_SECRET, | |
| REFRESH_TOKEN, | |
| API_BASE, | |
| LOCAL_MODEL, # e.g. "deepseek/deepseek-r1-7b" or "deepseek/deepseek-r1-0528:free" | |
| ) | |
| except Exception as e: | |
| raise SystemExit( | |
| "Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, LOCAL_MODEL (or set LOCAL_MODEL=None)." | |
| ) | |
| # Safe defaults for optional config values (prevents NameError) | |
| LOCAL_TOKENIZER = globals().get("LOCAL_TOKENIZER", None) | |
| OPENROUTER_BASE_URL = globals().get("OPENROUTER_BASE_URL", None) | |
| OPENROUTER_API_KEY = globals().get("OPENROUTER_API_KEY", None) | |
| OPENROUTER_MODEL = globals().get("OPENROUTER_MODEL", None) | |
| # ---------------------------- | |
| # FastMCP init | |
| # ---------------------------- | |
| mcp = FastMCP("ZohoCRMAgent") | |
| # ---------------------------- | |
| # Analytics / KPI logging (simple local JSON file) | |
| # ---------------------------- | |
| ANALYTICS_PATH = "mcp_analytics.json" | |
| def _init_analytics(): | |
| if not os.path.exists(ANALYTICS_PATH): | |
| base = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None, "created_at": time.time()} | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(base, f, indent=2) | |
| def _log_tool_call(tool_name: str, success: bool = True): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) | |
| data["tool_calls"][tool_name]["count"] += 1 | |
| if success: | |
| data["tool_calls"][tool_name]["success"] += 1 | |
| else: | |
| data["tool_calls"][tool_name]["fail"] += 1 | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def _log_llm_call(confidence: Optional[float] = None): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["llm_calls"] = data.get("llm_calls", 0) + 1 | |
| if confidence is not None: | |
| data["last_llm_confidence"] = confidence | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| _init_analytics() | |
| # ---------------------------- | |
| # Helper: normalize local file_path args | |
| # ---------------------------- | |
| def _normalize_local_path_args(args: Any) -> Any: | |
| """ | |
| If args is a dict containing 'file_path' that points to a local file (starting with /mnt/data/), | |
| add file_url: file://<path> and file_b64: <base64> (optional) so tools can use either. | |
| Returns modified args (same object or new). | |
| """ | |
| if not isinstance(args, dict): | |
| return args | |
| fp = args.get("file_path") | |
| if isinstance(fp, str) and fp.startswith("/mnt/data/") and os.path.exists(fp): | |
| try: | |
| args["file_url"] = f"file://{fp}" | |
| # include base64 payload for tools that prefer raw bytes | |
| try: | |
| with open(fp, "rb") as f: | |
| args["file_b64"] = base64.b64encode(f.read()).decode("utf-8") | |
| except Exception as e: | |
| # If reading fails, still include file_url | |
| logger.warning("Could not read file for base64 embedding: %s", e) | |
| args.pop("file_b64", None) | |
| except Exception as e: | |
| logger.warning("Normalization error for file_path %s: %s", fp, e) | |
| return args | |
| # ---------------------------- | |
| # DeepSeek / HF model loader (with revision parsing) | |
| # ---------------------------- | |
| LLM_PIPELINE = None | |
| TOKENIZER = None | |
| LOADED_MODEL_NAME = None | |
| def _parse_model_and_revision(model_string: str) -> Tuple[str, Optional[str]]: | |
| """ | |
| Accepts model strings like: | |
| - 'owner/repo' | |
| - 'owner/repo:revision' | |
| Returns (repo_id, revision_or_none). | |
| """ | |
| if ":" in model_string: | |
| repo_id, revision = model_string.split(":", 1) | |
| repo_id = repo_id.strip() | |
| revision = revision.strip() | |
| return repo_id, revision | |
| return model_string, None | |
| def init_deepseek_model(): | |
| """ | |
| Try to load LOCAL_MODEL via transformers.pipeline. | |
| If a ':revision' is present, pass revision=... to from_pretrained to avoid HF repo-id validation errors. | |
| If loading fails, try a fallback small model (flan-t5-small or distilgpt2). | |
| """ | |
| global LLM_PIPELINE, TOKENIZER, LOADED_MODEL_NAME | |
| if not LOCAL_MODEL: | |
| logger.info("LOCAL_MODEL is None — no local LLM will be used.") | |
| LLM_PIPELINE = None | |
| return | |
| if not TRANSFORMERS_AVAILABLE: | |
| logger.warning("transformers not installed; cannot load DeepSeek. Falling back to rule-based.") | |
| LLM_PIPELINE = None | |
| return | |
| try: | |
| model_string = LOCAL_MODEL | |
| repo_id, revision = _parse_model_and_revision(model_string) | |
| tokenizer_name = LOCAL_TOKENIZER or repo_id | |
| model_name_for_logging = f"{repo_id}" + (f" (rev={revision})" if revision else "") | |
| LOADED_MODEL_NAME = model_name_for_logging | |
| # If model looks like seq2seq (T5/flan) use text2text; else causal | |
| seq2seq_keywords = ["flan", "t5", "seq2seq"] | |
| if any(k in repo_id.lower() for k in seq2seq_keywords): | |
| if revision: | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True, revision=revision) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(repo_id, revision=revision) | |
| else: | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(repo_id) | |
| LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER) | |
| logger.info("Loaded seq2seq model: %s", model_name_for_logging) | |
| else: | |
| if revision: | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True, revision=revision) | |
| model = AutoModelForCausalLM.from_pretrained(repo_id, revision=revision) | |
| else: | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True) | |
| model = AutoModelForCausalLM.from_pretrained(repo_id) | |
| LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER) | |
| logger.info("Loaded causal model: %s", model_name_for_logging) | |
| except Exception as e: | |
| logger.error("Failed to load requested model '%s': %s", LOCAL_MODEL, e) | |
| traceback.print_exc() | |
| # Try a small CPU-friendly fallback | |
| try: | |
| fallback = "google/flan-t5-small" | |
| if "flan" in fallback: | |
| TOKENIZER = AutoTokenizer.from_pretrained(fallback, use_fast=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(fallback) | |
| LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER) | |
| else: | |
| TOKENIZER = AutoTokenizer.from_pretrained("distilgpt2", use_fast=True) | |
| model = AutoModelForCausalLM.from_pretrained("distilgpt2") | |
| LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER) | |
| LOADED_MODEL_NAME = fallback | |
| logger.info("Loaded fallback model: %s", fallback) | |
| except Exception as e2: | |
| logger.error("Fallback model also failed: %s", e2) | |
| traceback.print_exc() | |
| LLM_PIPELINE = None | |
| LOADED_MODEL_NAME = None | |
| # Initialize model at startup (may take time) | |
| init_deepseek_model() | |
| # ---------------------------- | |
| # Rule-based fallback responder | |
| # ---------------------------- | |
| def rule_based_response(message: str) -> str: | |
| msg = (message or "").strip().lower() | |
| if msg.startswith("create record") or msg.startswith("create contact"): | |
| return "To create a record, use: create_record MODULE_NAME {\"Field\":\"value\"}" | |
| if msg.startswith("create_invoice"): | |
| return "To create invoice: create_invoice {\"customer_id\":\"...\",\"line_items\":[...]} (JSON)" | |
| if msg.startswith("help") or "what can you do" in msg: | |
| return "I can run create_record/update_record/delete_record or process local files by pasting their /mnt/data/ path." | |
| return "(fallback) No local LLM loaded. Use explicit commands like create_record or paste /mnt/data/ path." | |
| # ---------------------------- | |
| # LLM wrapper that returns text + confidence (best-effort) | |
| # ---------------------------- | |
| def deepseek_generate(prompt: str, max_tokens: int = 256) -> Dict[str, Any]: | |
| """ | |
| Generate using the loaded pipeline. Returns {'text': str, 'confidence': Optional[float], 'raw': resp} | |
| """ | |
| if LLM_PIPELINE is None: | |
| return {"text": rule_based_response(prompt), "confidence": None, "raw": None} | |
| try: | |
| out = LLM_PIPELINE(prompt, max_new_tokens=max_tokens) | |
| text = "" | |
| # pipeline returns list: [{'generated_text':...}] or [{'generated_text' or 'text'}] | |
| if isinstance(out, list) and len(out) > 0: | |
| first = out[0] | |
| if isinstance(first, dict): | |
| # many HF pipelines use 'generated_text' or 'text' | |
| text = first.get("generated_text") or first.get("text") or str(first) | |
| else: | |
| text = str(first) | |
| else: | |
| text = str(out) | |
| _log_llm_call(None) | |
| return {"text": text, "confidence": None, "raw": out} | |
| except Exception as e: | |
| logger.error("LLM generation error: %s", e) | |
| traceback.print_exc() | |
| return {"text": rule_based_response(prompt), "confidence": None, "raw": str(e)} | |
| # ---------------------------- | |
| # Zoho token refresh & MCP tools (unchanged) | |
| # ---------------------------- | |
| def _get_valid_token_headers() -> dict: | |
| token_url = "https://accounts.zoho.in/oauth/v2/token" | |
| params = {"refresh_token": REFRESH_TOKEN, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "grant_type": "refresh_token"} | |
| r = requests.post(token_url, params=params, timeout=20) | |
| if r.status_code == 200: | |
| t = r.json().get("access_token") | |
| return {"Authorization": f"Zoho-oauthtoken {t}"} | |
| else: | |
| raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}") | |
| def authenticate_zoho() -> str: | |
| try: | |
| _ = _get_valid_token_headers() | |
| _log_tool_call("authenticate_zoho", True) | |
| return "Zoho token refreshed (ok)." | |
| except Exception as e: | |
| _log_tool_call("authenticate_zoho", False) | |
| return f"Failed to authenticate: {e}" | |
| def create_record(module_name: str, record_data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| payload = {"data": [record_data]} | |
| r = requests.post(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_record", False) | |
| return f"Error creating record: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_record", False) | |
| return f"Exception: {e}" | |
| def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("get_records", True) | |
| return r.json().get("data", []) | |
| _log_tool_call("get_records", False) | |
| return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] | |
| except Exception as e: | |
| _log_tool_call("get_records", False) | |
| return [f"Exception: {e}"] | |
| def update_record(module_name: str, record_id: str, data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| payload = {"data": [data]} | |
| r = requests.put(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("update_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("update_record", False) | |
| return f"Error updating: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("update_record", False) | |
| return f"Exception: {e}" | |
| def delete_record(module_name: str, record_id: str) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| r = requests.delete(url, headers=headers, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("delete_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("delete_record", False) | |
| return f"Error deleting: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("delete_record", False) | |
| return f"Exception: {e}" | |
| def create_invoice(data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/invoices" | |
| r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_invoice", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_invoice", False) | |
| return f"Error creating invoice: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_invoice", False) | |
| return f"Exception: {e}" | |
| def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: | |
| """ | |
| Accepts local path (e.g. /mnt/data/script_zoho_mcp) or URL. | |
| Per developer instruction we treat the path as the file URL (file://...). | |
| Replace placeholder OCR logic with your pipeline. | |
| """ | |
| try: | |
| if os.path.exists(file_path): | |
| file_url = f"file://{file_path}" | |
| extracted = { | |
| "Name": "ACME Corp (simulated)", | |
| "Email": "[email protected]", | |
| "Phone": "+91-99999-00000", | |
| "Total": "1234.00", | |
| "Confidence": 0.88 | |
| } | |
| _log_tool_call("process_document", True) | |
| return {"status": "success", "file": os.path.basename(file_path), "file_url": file_url, "target_module": target_module, "extracted_data": extracted} | |
| else: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": "file not found", "file_path": file_path} | |
| except Exception as e: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": str(e)} | |
| # ---------------------------- | |
| # Simple command parser (explicit commands in chat) | |
| # ---------------------------- | |
| def try_parse_and_invoke_command(text: str): | |
| text = text.strip() | |
| m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I) | |
| if m: | |
| module = m.group(1); body = m.group(2) | |
| try: record_data = json.loads(body) | |
| except Exception: return "Invalid JSON for record_data" | |
| return create_record(module, record_data) | |
| m = re.match(r"^create_invoice\s+(.+)$", text, re.I) | |
| if m: | |
| body = m.group(1) | |
| try: invoice_data = json.loads(body) | |
| except Exception: return "Invalid JSON for invoice_data" | |
| return create_invoice(invoice_data) | |
| m = re.match(r"^(\/mnt\/data\/\S+)$", text) | |
| if m: | |
| path = m.group(1); return process_document(path) | |
| return None | |
| # ---------------------------- | |
| # Chat handler that uses DeepSeek generation (or fallback) | |
| # ---------------------------- | |
| def deepseek_response(message: str, history: Optional[List[Tuple[str,str]]] = None) -> str: | |
| history = history or [] | |
| system_prompt = "You are Zoho Assistant. Prefer concise answers. If you want to call a tool, return a JSON object: {\"tool\": \"create_record\", \"args\": {...}}" | |
| # compact history into text for few-shot context (optional) | |
| history_text = "" | |
| for pair in history: | |
| try: | |
| u,a = pair[0], pair[1] | |
| history_text += f"User: {u}\nAssistant: {a}\n" | |
| except Exception: | |
| continue | |
| prompt = f"{system_prompt}\n{history_text}\nUser: {message}\nAssistant:" | |
| gen = deepseek_generate(prompt, max_tokens=256) | |
| text = gen.get("text", "") | |
| # if text looks like JSON with a tool action, try to invoke (normalize args first) | |
| payload = text.strip() | |
| if payload.startswith("{") or payload.startswith("["): | |
| try: | |
| parsed = json.loads(payload) | |
| if isinstance(parsed, dict) and "tool" in parsed: | |
| tool_name = parsed.get("tool"); args = parsed.get("args", {}) | |
| # Normalize local path args if present | |
| args = _normalize_local_path_args(args) if isinstance(args, dict) else args | |
| if tool_name in globals() and callable(globals()[tool_name]): | |
| try: | |
| out = globals()[tool_name](**args) if isinstance(args, dict) else globals()[tool_name](args) | |
| return f"Invoked tool '{tool_name}'. Result:\n{out}" | |
| except Exception as e: | |
| return f"Tool invocation error: {e}" | |
| else: | |
| return f"Requested tool '{tool_name}' not found locally." | |
| except Exception: | |
| pass | |
| return text | |
| # ---------------------------- | |
| # Gradio chat handler | |
| # ---------------------------- | |
| def chat_handler(message, history): | |
| history = history or [] | |
| trimmed = (message or "").strip() | |
| # explicit command parser | |
| cmd = try_parse_and_invoke_command(trimmed) | |
| if cmd is not None: | |
| return cmd | |
| # developer dev path handling (send path unchanged) | |
| if trimmed.startswith("/mnt/data/"): | |
| try: | |
| doc = process_document(trimmed) | |
| return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}" | |
| except Exception as e: | |
| return f"Error processing document: {e}" | |
| # otherwise, call deepseek_response (LLM or fallback) | |
| try: | |
| return deepseek_response(trimmed, history) | |
| except Exception as e: | |
| logger.error("deepseek_response error: %s", e) | |
| traceback.print_exc() | |
| return rule_based_response(trimmed) | |
| # ---------------------------- | |
| # Gradio UI | |
| # ---------------------------- | |
| def chat_interface(): | |
| return gr.ChatInterface(fn=chat_handler, textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, upload docs (or paste /mnt/data/... for dev).")) | |
| # ---------------------------- | |
| # Entrypoint | |
| # ---------------------------- | |
| if __name__ == "__main__": | |
| logger.info("Starting MCP server (DeepSeek mode). Loaded model: %s", LOADED_MODEL_NAME) | |
| demo = chat_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |