Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Gradio MCP Server for Congressional Bioguide profiles. | |
| Provides search and analysis capabilities via Gradio interface. | |
| """ | |
| import gradio as gr | |
| import sqlite3 | |
| import json | |
| import os | |
| import warnings | |
| from typing import List, Dict, Any | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import pickle | |
| from pathlib import Path | |
| # Suppress warnings | |
| warnings.filterwarnings('ignore') | |
| os.environ['TOKENIZERS_PARALLELISM'] = 'false' | |
| # Initialize global resources | |
| SCRIPT_DIR = Path(__file__).parent.absolute() | |
| DB_PATH = str(SCRIPT_DIR / "congress.db") | |
| FAISS_INDEX_PATH = str(SCRIPT_DIR / "congress_faiss.index") | |
| BIO_IDS_PATH = str(SCRIPT_DIR / "congress_bio_ids.pkl") | |
| # Global state | |
| model = None | |
| faiss_index = None | |
| bio_id_mapping = None | |
| def initialize_search_index(): | |
| """Initialize the semantic search components.""" | |
| global model, faiss_index, bio_id_mapping | |
| try: | |
| if Path(FAISS_INDEX_PATH).exists() and Path(BIO_IDS_PATH).exists(): | |
| print(f"Loading FAISS index from: {FAISS_INDEX_PATH}") | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| faiss_index = faiss.read_index(FAISS_INDEX_PATH) | |
| with open(BIO_IDS_PATH, "rb") as f: | |
| bio_id_mapping = pickle.load(f) | |
| print(f"β Loaded {faiss_index.ntotal} embeddings") | |
| return True | |
| else: | |
| print(f"FAISS index not found. Semantic search will be unavailable.") | |
| return False | |
| except Exception as e: | |
| print(f"Error loading search index: {e}") | |
| return False | |
| def get_db_connection(): | |
| """Get a database connection.""" | |
| return sqlite3.connect(DB_PATH) | |
| def execute_query(query: str, params: tuple = ()) -> List[Dict[str, Any]]: | |
| """Execute a SQL query and return results as list of dicts.""" | |
| conn = get_db_connection() | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| results = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| return results | |
| # Initialize search index on startup | |
| print("Initializing Congressional Bioguide MCP Server...") | |
| initialize_search_index() | |
| # MCP Tool Functions with decorators | |
| def search_by_name(family_name: str = "", given_name: str = "", limit: int = 10) -> str: | |
| """ | |
| Search for Congressional members by name. | |
| Args: | |
| family_name: Last name to search for (partial match) | |
| given_name: First name to search for (partial match) | |
| limit: Maximum number of results to return (default: 10) | |
| Returns: | |
| JSON string with search results including bio_id, name, birth/death dates, party, state | |
| """ | |
| try: | |
| conditions = [] | |
| params = [] | |
| if family_name: | |
| conditions.append("LOWER(m.unaccented_family_name) LIKE LOWER(?)") | |
| params.append(f"%{family_name}%") | |
| if given_name: | |
| conditions.append("LOWER(m.unaccented_given_name) LIKE LOWER(?)") | |
| params.append(f"%{given_name}%") | |
| if not conditions: | |
| return json.dumps({"error": "Please provide at least family_name or given_name"}) | |
| query = f""" | |
| SELECT DISTINCT m.bio_id, m.given_name, m.middle_name, m.family_name, | |
| m.birth_date, m.death_date, | |
| j.party, j.region_code, j.job_name, j.congress_number | |
| FROM members m | |
| LEFT JOIN job_positions j ON m.bio_id = j.bio_id | |
| WHERE {' AND '.join(conditions)} | |
| ORDER BY m.family_name, m.given_name | |
| LIMIT ? | |
| """ | |
| params.append(limit) | |
| results = execute_query(query, tuple(params)) | |
| return json.dumps({"count": len(results), "results": results}, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def search_by_party(party: str, congress_number: int = None) -> str: | |
| """ | |
| Search for Congressional members by political party. | |
| Args: | |
| party: Party name (e.g., 'Republican', 'Democrat', 'Whig') | |
| congress_number: Optional Congress number to filter by (e.g., 117) | |
| Returns: | |
| JSON string with members from the specified party | |
| """ | |
| try: | |
| if congress_number: | |
| query = """ | |
| SELECT DISTINCT m.bio_id, m.given_name, m.family_name, m.birth_date, m.death_date, | |
| j.party, j.region_code, j.job_name, j.congress_number | |
| FROM members m | |
| JOIN job_positions j ON m.bio_id = j.bio_id | |
| WHERE j.party = ? AND j.congress_number = ? | |
| ORDER BY m.family_name, m.given_name | |
| LIMIT 100 | |
| """ | |
| results = execute_query(query, (party, congress_number)) | |
| else: | |
| query = """ | |
| SELECT DISTINCT m.bio_id, m.given_name, m.family_name, m.birth_date, m.death_date, | |
| j.party, j.region_code, j.job_name, j.congress_number | |
| FROM members m | |
| JOIN job_positions j ON m.bio_id = j.bio_id | |
| WHERE j.party = ? | |
| ORDER BY m.family_name, m.given_name | |
| LIMIT 100 | |
| """ | |
| results = execute_query(query, (party,)) | |
| return json.dumps({"count": len(results), "party": party, "results": results}, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def search_by_state(state_code: str, congress_number: int = None) -> str: | |
| """ | |
| Search for Congressional members by state. | |
| Args: | |
| state_code: Two-letter state code (e.g., 'CA', 'NY', 'TX') | |
| congress_number: Optional Congress number to filter by | |
| Returns: | |
| JSON string with members from the specified state | |
| """ | |
| try: | |
| state_code = state_code.upper() | |
| if congress_number: | |
| query = """ | |
| SELECT DISTINCT m.bio_id, m.given_name, m.family_name, m.birth_date, m.death_date, | |
| j.party, j.region_code, j.job_name, j.congress_number | |
| FROM members m | |
| JOIN job_positions j ON m.bio_id = j.bio_id | |
| WHERE j.region_code = ? AND j.congress_number = ? | |
| ORDER BY m.family_name, m.given_name | |
| LIMIT 100 | |
| """ | |
| results = execute_query(query, (state_code, congress_number)) | |
| else: | |
| query = """ | |
| SELECT DISTINCT m.bio_id, m.given_name, m.family_name, m.birth_date, m.death_date, | |
| j.party, j.region_code, j.job_name, j.congress_number | |
| FROM members m | |
| JOIN job_positions j ON m.bio_id = j.bio_id | |
| WHERE j.region_code = ? | |
| ORDER BY m.family_name, m.given_name | |
| LIMIT 100 | |
| """ | |
| results = execute_query(query, (state_code,)) | |
| return json.dumps({"count": len(results), "state": state_code, "results": results}, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def semantic_search_biography(query: str, top_k: int = 5) -> str: | |
| """ | |
| Perform AI-powered semantic search on member biographies using natural language. | |
| Args: | |
| query: Natural language query (e.g., 'lawyers who became judges', 'Civil War veterans') | |
| top_k: Number of results to return (default: 5, max: 20) | |
| Returns: | |
| JSON string with matching members and their similarity scores | |
| """ | |
| try: | |
| if not all([model, faiss_index, bio_id_mapping]): | |
| return json.dumps({"error": "Semantic search is not available. FAISS index not loaded."}) | |
| # Limit top_k | |
| top_k = min(max(1, top_k), 20) | |
| # Encode query | |
| query_embedding = model.encode([query])[0].astype('float32') | |
| query_embedding = query_embedding.reshape(1, -1) | |
| faiss.normalize_L2(query_embedding) | |
| # Search | |
| scores, indices = faiss_index.search(query_embedding, top_k) | |
| # Get profiles | |
| results = [] | |
| for idx, score in zip(indices[0], scores[0]): | |
| if idx < len(bio_id_mapping): | |
| bio_id = bio_id_mapping[idx] | |
| member_query = """ | |
| SELECT m.bio_id, m.given_name, m.middle_name, m.family_name, | |
| m.birth_date, m.death_date, m.profile_text, | |
| j.party, j.region_code, j.job_name, j.congress_number | |
| FROM members m | |
| LEFT JOIN job_positions j ON m.bio_id = j.bio_id | |
| WHERE m.bio_id = ? | |
| LIMIT 1 | |
| """ | |
| member_data = execute_query(member_query, (bio_id,)) | |
| if member_data: | |
| member = member_data[0] | |
| # Truncate profile_text for response | |
| if member.get('profile_text'): | |
| member['profile_text'] = member['profile_text'][:500] + "..." | |
| member['similarity_score'] = float(score) | |
| results.append(member) | |
| return json.dumps({"query": query, "count": len(results), "results": results}, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def get_member_profile(bio_id: str) -> str: | |
| """ | |
| Get complete profile for a specific member by their Bioguide ID. | |
| Args: | |
| bio_id: Bioguide ID (e.g., 'L000313' for John Lewis, 'W000374') | |
| Returns: | |
| JSON string with complete member profile including positions and relationships | |
| """ | |
| try: | |
| bio_id = bio_id.upper() | |
| conn = get_db_connection() | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM members WHERE bio_id = ?", (bio_id,)) | |
| member = cursor.fetchone() | |
| if not member: | |
| conn.close() | |
| return json.dumps({"error": f"No member found with bio_id: {bio_id}"}) | |
| profile = dict(member) | |
| # Get job positions | |
| cursor.execute("SELECT * FROM job_positions WHERE bio_id = ? ORDER BY start_date", (bio_id,)) | |
| profile['job_positions'] = [dict(row) for row in cursor.fetchall()] | |
| # Get relationships | |
| cursor.execute("SELECT * FROM relationships WHERE bio_id = ?", (bio_id,)) | |
| profile['relationships'] = [dict(row) for row in cursor.fetchall()] | |
| # Get creative works | |
| cursor.execute("SELECT * FROM creative_works WHERE bio_id = ?", (bio_id,)) | |
| profile['creative_works'] = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| return json.dumps(profile, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def count_members_by_party(filter_congress: int = None) -> str: | |
| """ | |
| Count members by political party. | |
| Args: | |
| filter_congress: Optional Congress number to filter by (e.g., 117) | |
| Returns: | |
| JSON string with member counts grouped by party | |
| """ | |
| try: | |
| if filter_congress: | |
| query = """ | |
| SELECT j.party as party, COUNT(DISTINCT m.bio_id) as count | |
| FROM members m | |
| JOIN job_positions j ON m.bio_id = j.bio_id | |
| WHERE j.congress_number = ? | |
| GROUP BY j.party | |
| ORDER BY count DESC | |
| """ | |
| results = execute_query(query, (filter_congress,)) | |
| else: | |
| query = """ | |
| SELECT j.party as party, COUNT(DISTINCT m.bio_id) as count | |
| FROM members m | |
| JOIN job_positions j ON m.bio_id = j.bio_id | |
| GROUP BY j.party | |
| ORDER BY count DESC | |
| """ | |
| results = execute_query(query) | |
| total = sum(r['count'] for r in results) | |
| return json.dumps({"total_members": total, "by_party": results}, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def count_members_by_state(filter_congress: int = None) -> str: | |
| """ | |
| Count members by state. | |
| Args: | |
| filter_congress: Optional Congress number to filter by | |
| Returns: | |
| JSON string with member counts grouped by state | |
| """ | |
| try: | |
| if filter_congress: | |
| query = """ | |
| SELECT j.region_code as state, COUNT(DISTINCT m.bio_id) as count | |
| FROM members m | |
| JOIN job_positions j ON m.bio_id = j.bio_id | |
| WHERE j.congress_number = ? | |
| GROUP BY j.region_code | |
| ORDER BY count DESC | |
| """ | |
| results = execute_query(query, (filter_congress,)) | |
| else: | |
| query = """ | |
| SELECT j.region_code as state, COUNT(DISTINCT m.bio_id) as count | |
| FROM members m | |
| JOIN job_positions j ON m.bio_id = j.bio_id | |
| GROUP BY j.region_code | |
| ORDER BY count DESC | |
| """ | |
| results = execute_query(query) | |
| total = sum(r['count'] for r in results) | |
| return json.dumps({"total_members": total, "by_state": results}, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def execute_sql_query(query: str) -> str: | |
| """ | |
| Execute a custom SQL SELECT query against the Congressional database (READ-ONLY). | |
| Args: | |
| query: SQL SELECT query to execute | |
| Returns: | |
| JSON string with query results | |
| """ | |
| try: | |
| # Security: only allow SELECT queries | |
| if not query.strip().upper().startswith("SELECT"): | |
| return json.dumps({"error": "Only SELECT queries are allowed"}) | |
| results = execute_query(query) | |
| return json.dumps({"count": len(results), "results": results}, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| def get_database_schema() -> str: | |
| """ | |
| Get the database schema showing all tables and columns available for querying. | |
| Returns: | |
| JSON string with database schema information | |
| """ | |
| schema_info = { | |
| "tables": { | |
| "members": { | |
| "description": "Main table with member biographical information", | |
| "columns": [ | |
| "bio_id (PRIMARY KEY) - Bioguide ID", | |
| "family_name - Last name", | |
| "given_name - First name", | |
| "middle_name - Middle name", | |
| "birth_date - Birth date (YYYY-MM-DD)", | |
| "death_date - Death date (YYYY-MM-DD)", | |
| "profile_text - Full biography text" | |
| ] | |
| }, | |
| "job_positions": { | |
| "description": "Congressional positions held by members", | |
| "columns": [ | |
| "bio_id (FOREIGN KEY) - References members", | |
| "job_name - Position title (Representative, Senator)", | |
| "start_date - Start date of position", | |
| "end_date - End date of position", | |
| "congress_number - Congress number (e.g., 117)", | |
| "party - Party affiliation", | |
| "region_code - State/region code (e.g., 'CA', 'NY')" | |
| ] | |
| }, | |
| "relationships": { | |
| "description": "Family relationships between members", | |
| "columns": ["bio_id", "related_bio_id", "relationship_type"] | |
| }, | |
| "creative_works": { | |
| "description": "Publications and creative works by members", | |
| "columns": ["bio_id", "citation_text"] | |
| } | |
| } | |
| } | |
| return json.dumps(schema_info, indent=2) | |
| # Create Gradio Interfaces for each tool | |
| demo = gr.TabbedInterface( | |
| [ | |
| # Search by Name | |
| gr.Interface( | |
| fn=search_by_name, | |
| inputs=[ | |
| gr.Textbox(label="Family Name (Last Name)", placeholder="e.g., Lincoln"), | |
| gr.Textbox(label="Given Name (First Name)", placeholder="e.g., Abraham"), | |
| gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Max Results") | |
| ], | |
| outputs=gr.JSON(label="Search Results"), | |
| title="Search by Name", | |
| description="Search for Congressional members by their first or last name." | |
| ), | |
| # Search by Party | |
| gr.Interface( | |
| fn=search_by_party, | |
| inputs=[ | |
| gr.Textbox(label="Party Name", placeholder="e.g., Republican, Democrat, Whig"), | |
| gr.Number(label="Congress Number (optional)", value=None, precision=0) | |
| ], | |
| outputs=gr.JSON(label="Search Results"), | |
| title="Search by Party", | |
| description="Find members by political party affiliation." | |
| ), | |
| # Search by State | |
| gr.Interface( | |
| fn=search_by_state, | |
| inputs=[ | |
| gr.Textbox(label="State Code", placeholder="e.g., CA, NY, TX"), | |
| gr.Number(label="Congress Number (optional)", value=None, precision=0) | |
| ], | |
| outputs=gr.JSON(label="Search Results"), | |
| title="Search by State", | |
| description="Find members by the state they represented." | |
| ), | |
| # Semantic Search | |
| gr.Interface( | |
| fn=semantic_search_biography, | |
| inputs=[ | |
| gr.Textbox(label="Search Query", placeholder="e.g., 'lawyers who became judges' or 'Civil War veterans'", lines=3), | |
| gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Number of Results") | |
| ], | |
| outputs=gr.JSON(label="Search Results"), | |
| title="AI Semantic Search", | |
| description="Use natural language to search biographies. Find members by career, background, or accomplishments." | |
| ), | |
| # Get Member Profile | |
| gr.Interface( | |
| fn=get_member_profile, | |
| inputs=gr.Textbox(label="Bioguide ID", placeholder="e.g., L000313 (John Lewis)"), | |
| outputs=gr.JSON(label="Member Profile"), | |
| title="Get Member Profile", | |
| description="Get complete profile for a specific member using their Bioguide ID." | |
| ), | |
| # Count by Party | |
| gr.Interface( | |
| fn=count_members_by_party, | |
| inputs=gr.Number(label="Filter by Congress Number (optional)", value=None, precision=0), | |
| outputs=gr.JSON(label="Party Counts"), | |
| title="Count by Party", | |
| description="Get member counts grouped by political party." | |
| ), | |
| # Count by State | |
| gr.Interface( | |
| fn=count_members_by_state, | |
| inputs=gr.Number(label="Filter by Congress Number (optional)", value=None, precision=0), | |
| outputs=gr.JSON(label="State Counts"), | |
| title="Count by State", | |
| description="Get member counts grouped by state." | |
| ), | |
| # SQL Query | |
| gr.Interface( | |
| fn=execute_sql_query, | |
| inputs=gr.Textbox(label="SQL Query", placeholder="SELECT * FROM members LIMIT 10", lines=3), | |
| outputs=gr.JSON(label="Query Results"), | |
| title="Execute SQL", | |
| description="Execute custom SQL SELECT queries (read-only)." | |
| ), | |
| # Database Schema | |
| gr.Interface( | |
| fn=get_database_schema, | |
| inputs=None, | |
| outputs=gr.JSON(label="Database Schema"), | |
| title="Database Schema", | |
| description="View the database structure and available tables/columns." | |
| ), | |
| ], | |
| tab_names=[ | |
| "Search by Name", | |
| "Search by Party", | |
| "Search by State", | |
| "AI Semantic Search", | |
| "Member Profile", | |
| "Count by Party", | |
| "Count by State", | |
| "Execute SQL", | |
| "Database Schema" | |
| ], | |
| title="ποΈ Congressional Bioguide MCP Server", | |
| theme=gr.themes.Soft() | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) | |