File size: 3,916 Bytes
7c02217
4f8ac5b
 
7c02217
4f8ac5b
7c02217
 
 
 
 
4f8ac5b
7c02217
4f8ac5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c02217
4f8ac5b
7c02217
 
 
4f8ac5b
 
7c02217
 
 
 
4f8ac5b
7c02217
4f8ac5b
 
 
 
7c02217
 
4f8ac5b
 
7c02217
4f8ac5b
 
 
 
 
7c02217
 
 
4f8ac5b
7c02217
4f8ac5b
 
 
 
7c02217
 
 
4f8ac5b
7c02217
4f8ac5b
 
 
 
7c02217
4f8ac5b
 
 
 
 
 
 
 
7c02217
 
 
4f8ac5b
 
 
 
7c02217
4f8ac5b
7c02217
4f8ac5b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from mcp.server.fastmcp import FastMCP
from typing import Optional
import requests
import os

from config import CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE

# --- Initialize the FastMCP Server ---
mcp = FastMCP("ZohoCRMAgent")

# --- Token Refresh Utility ---
def _get_valid_token_headers() -> dict:
    """Internal function to ensure a valid Zoho access token is available.
    This uses the refresh token flow to retrieve a fresh access token."""
    token_url = "https://accounts.zoho.in/oauth/v2/token"
    params = {
        "refresh_token": REFRESH_TOKEN,
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "grant_type": "refresh_token"
    }
    response = requests.post(token_url, params=params)
    if response.status_code == 200:
        access_token = response.json().get("access_token")
        return {"Authorization": f"Zoho-oauthtoken {access_token}"}
    else:
        raise Exception(f"Failed to refresh token: {response.text}")

# --- MCP Tools for Zoho CRM and Zoho Books Operations ---

@mcp.tool()
def authenticate_zoho() -> str:
    """Refreshes and confirms Zoho CRM access token availability."""
    _ = _get_valid_token_headers()
    return "Zoho CRM access token successfully refreshed."

@mcp.tool()
def create_record(module_name: str, record_data: dict) -> str:
    """Creates a new record in the specified Zoho CRM module."""
    headers = _get_valid_token_headers()
    response = requests.post(f"{API_BASE}/{module_name}", headers=headers, json={"data": [record_data]})
    if response.status_code in [200, 201]:
        return f"Record created successfully in {module_name}."
    return f"Error creating record: {response.text}"

@mcp.tool()
def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list:
    """Fetches records from a specified Zoho CRM module."""
    headers = _get_valid_token_headers()
    params = {"page": page, "per_page": per_page}
    response = requests.get(f"{API_BASE}/{module_name}", headers=headers, params=params)
    if response.status_code == 200:
        return response.json().get("data", [])
    return [f"Error retrieving records: {response.text}"]

@mcp.tool()
def update_record(module_name: str, record_id: str, data: dict) -> str:
    """Updates a record in a Zoho CRM module."""
    headers = _get_valid_token_headers()
    response = requests.put(f"{API_BASE}/{module_name}/{record_id}", headers=headers, json={"data": [data]})
    if response.status_code == 200:
        return f"Record {record_id} in {module_name} updated successfully."
    return f"Error updating record: {response.text}"

@mcp.tool()
def delete_record(module_name: str, record_id: str) -> str:
    """Deletes a record from the specified Zoho CRM module."""
    headers = _get_valid_token_headers()
    response = requests.delete(f"{API_BASE}/{module_name}/{record_id}", headers=headers)
    if response.status_code == 200:
        return f"Record {record_id} in {module_name} deleted."
    return f"Error deleting record: {response.text}"

@mcp.tool()
def create_invoice(data: dict) -> str:
    """Creates an invoice in Zoho Books."""
    headers = _get_valid_token_headers()
    response = requests.post(f"{API_BASE}/invoices", headers=headers, json={"data": [data]})
    if response.status_code in [200, 201]:
        return "Invoice created successfully."
    return f"Error creating invoice: {response.text}"

@mcp.tool()
def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict:
    """Extracts data from uploaded file (PDF/image) and returns structured info."""
    # Placeholder for OCR + Gemini parsing logic
    # raw_text = perform_ocr(file_path)
    # structured_data = gemini_parse_json(raw_text)
    return {
        "status": "success",
        "file": os.path.basename(file_path),
        "extracted_data": f"Simulated structured data from {target_module} document."
    }