#!/usr/bin/env python3 """ MCP Server for Congressional Bioguide profiles. Provides SQL queries and semantic search capabilities. """ import sys import sqlite3 import json import os import warnings from typing import List, Dict, Any, Optional import numpy as np from sentence_transformers import SentenceTransformer import faiss import pickle from pathlib import Path from mcp.server import Server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource import mcp.server.stdio # Suppress all warnings to prevent JSON protocol corruption warnings.filterwarnings('ignore') os.environ['TOKENIZERS_PARALLELISM'] = 'false' # Initialize global resources - use absolute paths SCRIPT_DIR = Path(__file__).parent.absolute() DB_PATH = str(SCRIPT_DIR / "congress.db") FAISS_INDEX_PATH = str(SCRIPT_DIR / "congress_faiss.index") BIO_IDS_PATH = str(SCRIPT_DIR / "congress_bio_ids.pkl") # Load FAISS index and model model = None faiss_index = None bio_id_mapping = None def initialize_search_index(): """Initialize the semantic search components.""" global model, faiss_index, bio_id_mapping try: if Path(FAISS_INDEX_PATH).exists() and Path(BIO_IDS_PATH).exists(): print(f"Loading FAISS index from: {FAISS_INDEX_PATH}", file=sys.stderr, flush=True) model = SentenceTransformer('all-MiniLM-L6-v2') faiss_index = faiss.read_index(FAISS_INDEX_PATH) with open(BIO_IDS_PATH, "rb") as f: bio_id_mapping = pickle.load(f) print(f"✓ Loaded {faiss_index.ntotal} embeddings", file=sys.stderr, flush=True) return True else: print(f"FAISS index not found at: {FAISS_INDEX_PATH}", file=sys.stderr, flush=True) print(f"Bio IDs not found at: {BIO_IDS_PATH}", file=sys.stderr, flush=True) return False except Exception as e: print(f"Error loading search index: {e}", file=sys.stderr, flush=True) return False def get_db_connection(): """Get a database connection.""" return sqlite3.connect(DB_PATH) def execute_query(query: str, params: tuple = ()) -> List[Dict[str, Any]]: """Execute a SQL query and return results as list of dicts.""" conn = get_db_connection() conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query, params) results = [dict(row) for row in cursor.fetchall()] conn.close() return results def format_member_concise(member: Dict[str, Any]) -> Dict[str, Any]: """Format member data to concise output with only essential fields.""" return { 'bio_id': member.get('bio_id'), 'name': f"{member.get('given_name', '')} {member.get('middle_name', '') + ' ' if member.get('middle_name') else ''}{member.get('family_name', '')}".strip(), 'birth_date': member.get('birth_date'), 'death_date': member.get('death_date'), 'party': member.get('party'), 'state': member.get('region_code'), 'position': member.get('job_name'), 'congress': member.get('congress_number') } def get_member_profile(bio_id: str) -> Optional[Dict[str, Any]]: """Get complete profile for a member including all related data.""" conn = get_db_connection() conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get member data cursor.execute("SELECT * FROM members WHERE bio_id = ?", (bio_id,)) member = cursor.fetchone() if not member: conn.close() return None profile = dict(member) # Get images cursor.execute("SELECT * FROM images WHERE bio_id = ?", (bio_id,)) profile['images'] = [dict(row) for row in cursor.fetchall()] # Get job positions cursor.execute("SELECT * FROM job_positions WHERE bio_id = ? ORDER BY start_date", (bio_id,)) profile['job_positions'] = [dict(row) for row in cursor.fetchall()] # Get relationships cursor.execute("SELECT * FROM relationships WHERE bio_id = ?", (bio_id,)) profile['relationships'] = [dict(row) for row in cursor.fetchall()] # Get creative works cursor.execute("SELECT * FROM creative_works WHERE bio_id = ?", (bio_id,)) profile['creative_works'] = [dict(row) for row in cursor.fetchall()] # Get assets cursor.execute("SELECT * FROM assets WHERE bio_id = ?", (bio_id,)) profile['assets'] = [dict(row) for row in cursor.fetchall()] conn.close() return profile def semantic_search(query_text: str, top_k: int = 10) -> List[str]: """Perform semantic search and return matching bio_ids.""" if not all([model, faiss_index, bio_id_mapping]): raise ValueError("Search index not initialized. Run ingest_data.py first.") # Encode query query_embedding = model.encode([query_text])[0].astype('float32') query_embedding = query_embedding.reshape(1, -1) # Normalize for cosine similarity faiss.normalize_L2(query_embedding) # Search scores, indices = faiss_index.search(query_embedding, top_k) # Map indices to bio_ids results = [] for idx, score in zip(indices[0], scores[0]): if idx < len(bio_id_mapping): results.append({ 'bio_id': bio_id_mapping[idx], 'similarity_score': float(score) }) return results # Initialize MCP server server = Server("congressional-bioguide") @server.list_tools() async def list_tools() -> List[Tool]: """List all available tools.""" return [ Tool( name="search_by_name", description="Search for Congressional members by name. Returns concise results (name, dates, party, congress) by default.", inputSchema={ "type": "object", "properties": { "family_name": { "type": "string", "description": "Family/last name to search for (partial match)" }, "given_name": { "type": "string", "description": "Given/first name to search for (partial match)" }, "full_name": { "type": "string", "description": "Full name to search for (partial match in any name field)" }, "limit": { "type": "integer", "description": "Maximum results to return (default: 50)", "default": 50 }, "return_full_profile": { "type": "boolean", "description": "Return full profile data including biography (default: false)", "default": False } } } ), Tool( name="search_by_party", description="Search for Congressional members by political party affiliation.", inputSchema={ "type": "object", "properties": { "party": { "type": "string", "description": "Party name (e.g., 'Republican', 'Democrat', 'Whig')" }, "congress_number": { "type": "integer", "description": "Optional: Filter by specific Congress number (e.g., 117)" } }, "required": ["party"] } ), Tool( name="search_by_state", description="Search for Congressional members by state or region they represented.", inputSchema={ "type": "object", "properties": { "state_code": { "type": "string", "description": "State code (e.g., 'CA', 'NY', 'TX')" }, "congress_number": { "type": "integer", "description": "Optional: Filter by specific Congress number" } }, "required": ["state_code"] } ), Tool( name="search_by_congress", description="Get all members who served in a specific Congress.", inputSchema={ "type": "object", "properties": { "congress_number": { "type": "integer", "description": "Congress number (e.g., 117 for the 117th Congress)" }, "chamber": { "type": "string", "description": "Optional: Filter by chamber ('Representative' or 'Senator')" } }, "required": ["congress_number"] } ), Tool( name="search_by_date_range", description="Search for members who served during a specific date range.", inputSchema={ "type": "object", "properties": { "start_date": { "type": "string", "description": "Start date in YYYY-MM-DD format" }, "end_date": { "type": "string", "description": "End date in YYYY-MM-DD format" } }, "required": ["start_date", "end_date"] } ), Tool( name="semantic_search_biography", description="Perform semantic search on member biographies. Use natural language to find members based on career details, accomplishments, background, etc.", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Natural language query to search biographies (e.g., 'lawyers who became judges', 'Civil War veterans')" }, "top_k": { "type": "integer", "description": "Number of results to return (default: 10)", "default": 5 } }, "required": ["query"] } ), Tool( name="get_member_profile", description="Get complete profile information for a specific member by their Bioguide ID.", inputSchema={ "type": "object", "properties": { "bio_id": { "type": "string", "description": "Bioguide ID (e.g., 'W000374', 'P000144')" } }, "required": ["bio_id"] } ), Tool( name="execute_sql_query", description="Execute a custom SQL query against the Congressional database. Use for complex queries not covered by other tools. READ-ONLY access.", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "SQL SELECT query to execute" } }, "required": ["query"] } ), Tool( name="get_database_schema", description="Get the database schema showing all tables and columns available for querying.", inputSchema={ "type": "object", "properties": {} } ), Tool( name="search_by_relationship", description="Find members who have family relationships with other members (e.g., father, son, spouse).", inputSchema={ "type": "object", "properties": { "relationship_type": { "type": "string", "description": "Type of relationship (e.g., 'father', 'son', 'spouse', 'brother')" } } } ), Tool( name="search_biography_regex", description="Search member biographies using regex patterns. Returns concise member info (name, dates, party, state) for matches. Use filters to narrow results.", inputSchema={ "type": "object", "properties": { "pattern": { "type": "string", "description": "Regex pattern to search for in biographies (e.g., 'Harvard', 'lawyer', 'served.*army', 'born in [0-9]{4}')" }, "case_sensitive": { "type": "boolean", "description": "Whether search should be case-sensitive (default: false)", "default": False }, "limit": { "type": "integer", "description": "Maximum number of results to return (default: 5)", "default": 5 }, "filter_party": { "type": "string", "description": "Optional: Filter results by party (e.g., 'Republican', 'Democrat')" }, "filter_state": { "type": "string", "description": "Optional: Filter results by state code (e.g., 'CA', 'NY')" }, "filter_congress": { "type": "integer", "description": "Optional: Filter results by Congress number (e.g., 117)" }, "return_full_profile": { "type": "boolean", "description": "Return full profile including biography text (default: false)", "default": False } }, "required": ["pattern"] } ), Tool( name="count_members", description="Count members matching specific criteria. Returns aggregated counts by party, state, position, or custom grouping. Much more efficient than returning full member lists.", inputSchema={ "type": "object", "properties": { "group_by": { "type": "string", "description": "Field to group by: 'party', 'state', 'position', 'congress', or 'year'", "enum": ["party", "state", "position", "congress", "year"] }, "filter_party": { "type": "string", "description": "Optional: Filter by party name" }, "filter_state": { "type": "string", "description": "Optional: Filter by state code" }, "filter_congress": { "type": "integer", "description": "Optional: Filter by Congress number" }, "filter_position": { "type": "string", "description": "Optional: Filter by position (Representative, Senator)" }, "date_range_start": { "type": "string", "description": "Optional: Start date (YYYY-MM-DD)" }, "date_range_end": { "type": "string", "description": "Optional: End date (YYYY-MM-DD)" } }, "required": ["group_by"] } ), Tool( name="temporal_analysis", description="Analyze member trends over time. Shows how membership changed across years, decades, or congresses. Perfect for historical analysis.", inputSchema={ "type": "object", "properties": { "analysis_type": { "type": "string", "description": "Type of temporal analysis", "enum": ["party_over_time", "state_representation", "position_counts", "demographics"] }, "time_unit": { "type": "string", "description": "Time granularity: 'congress', 'year', 'decade'", "enum": ["congress", "year", "decade"], "default": "congress" }, "start_date": { "type": "string", "description": "Optional: Start date (YYYY-MM-DD)" }, "end_date": { "type": "string", "description": "Optional: End date (YYYY-MM-DD)" }, "filter_party": { "type": "string", "description": "Optional: Filter to specific party" }, "filter_state": { "type": "string", "description": "Optional: Filter to specific state" } }, "required": ["analysis_type"] } ), Tool( name="count_by_biography_content", description="Count members whose biographies mention specific keywords or phrases (e.g., 'Harvard', 'lawyer', 'Civil War'). Much more efficient than searching when you only need counts.", inputSchema={ "type": "object", "properties": { "keywords": { "type": "array", "items": {"type": "string"}, "description": "List of keywords or phrases to search for (case-insensitive)" }, "match_all": { "type": "boolean", "description": "If true, count members matching ALL keywords. If false, count members matching ANY keyword (default: false)", "default": False }, "breakdown_by": { "type": "string", "description": "Optional: Break down counts by party, state, position, or congress", "enum": ["party", "state", "position", "congress", "none"], "default": "none" }, "filter_party": { "type": "string", "description": "Optional: Only count members from specific party" }, "filter_state": { "type": "string", "description": "Optional: Only count members from specific state" } }, "required": ["keywords"] } ) ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> List[TextContent]: """Handle tool calls.""" try: if name == "search_by_name": family_name = arguments.get("family_name") given_name = arguments.get("given_name") full_name = arguments.get("full_name") limit = arguments.get("limit", 50) return_full = arguments.get("return_full_profile", False) conditions = [] params = [] if family_name: conditions.append("LOWER(m.unaccented_family_name) LIKE LOWER(?)") params.append(f"%{family_name}%") if given_name: conditions.append("LOWER(m.unaccented_given_name) LIKE LOWER(?)") params.append(f"%{given_name}%") if full_name: conditions.append("""(LOWER(m.unaccented_family_name) LIKE LOWER(?) OR LOWER(m.unaccented_given_name) LIKE LOWER(?) OR LOWER(m.unaccented_middle_name) LIKE LOWER(?))""") params.extend([f"%{full_name}%"] * 3) if not conditions: return [TextContent(type="text", text="Please provide at least one name parameter.")] if return_full: query = f"SELECT * FROM members m WHERE {' AND '.join(conditions)} ORDER BY m.family_name, m.given_name LIMIT ?" params.append(limit) results = execute_query(query, tuple(params)) else: # Return concise results with job info query = f""" SELECT DISTINCT m.bio_id, m.given_name, m.middle_name, m.family_name, m.birth_date, m.death_date, j.party, j.region_code, j.job_name, j.congress_number FROM members m LEFT JOIN job_positions j ON m.bio_id = j.bio_id WHERE {' AND '.join(conditions)} ORDER BY m.family_name, m.given_name LIMIT ? """ params.append(limit) results = execute_query(query, tuple(params)) results = [format_member_concise(r) for r in results] response = { "count": len(results), "limit": limit, "results": results } return [TextContent(type="text", text=json.dumps(response, indent=2))] elif name == "search_by_party": party = arguments["party"] congress_number = arguments.get("congress_number") if congress_number: query = """ SELECT DISTINCT m.* FROM members m JOIN job_positions j ON m.bio_id = j.bio_id WHERE j.party = ? AND j.congress_number = ? ORDER BY m.family_name, m.given_name """ results = execute_query(query, (party, congress_number)) else: query = """ SELECT DISTINCT m.* FROM members m JOIN job_positions j ON m.bio_id = j.bio_id WHERE j.party = ? ORDER BY m.family_name, m.given_name """ results = execute_query(query, (party,)) return [TextContent(type="text", text=json.dumps(results, indent=2))] elif name == "search_by_state": state_code = arguments["state_code"].upper() congress_number = arguments.get("congress_number") if congress_number: query = """ SELECT DISTINCT m.*, j.job_name, j.party, j.congress_number FROM members m JOIN job_positions j ON m.bio_id = j.bio_id WHERE j.region_code = ? AND j.congress_number = ? ORDER BY m.family_name, m.given_name """ results = execute_query(query, (state_code, congress_number)) else: query = """ SELECT DISTINCT m.*, j.job_name, j.party, j.congress_number FROM members m JOIN job_positions j ON m.bio_id = j.bio_id WHERE j.region_code = ? ORDER BY m.family_name, m.given_name """ results = execute_query(query, (state_code,)) return [TextContent(type="text", text=json.dumps(results, indent=2))] elif name == "search_by_congress": congress_number = arguments["congress_number"] chamber = arguments.get("chamber") if chamber: query = """ SELECT DISTINCT m.*, j.job_name, j.party, j.region_code FROM members m JOIN job_positions j ON m.bio_id = j.bio_id WHERE j.congress_number = ? AND j.job_name = ? ORDER BY m.family_name, m.given_name """ results = execute_query(query, (congress_number, chamber)) else: query = """ SELECT DISTINCT m.*, j.job_name, j.party, j.region_code FROM members m JOIN job_positions j ON m.bio_id = j.bio_id WHERE j.congress_number = ? ORDER BY m.family_name, m.given_name """ results = execute_query(query, (congress_number,)) return [TextContent(type="text", text=json.dumps(results, indent=2))] elif name == "search_by_date_range": start_date = arguments["start_date"] end_date = arguments["end_date"] query = """ SELECT DISTINCT m.*, j.job_name, j.start_date, j.end_date FROM members m JOIN job_positions j ON m.bio_id = j.bio_id WHERE (j.start_date <= ? AND (j.end_date >= ? OR j.end_date IS NULL)) ORDER BY j.start_date, m.family_name, m.given_name """ results = execute_query(query, (end_date, start_date)) return [TextContent(type="text", text=json.dumps(results, indent=2))] elif name == "semantic_search_biography": query_text = arguments["query"] top_k = arguments.get("top_k", 10) # Perform semantic search search_results = semantic_search(query_text, top_k) # Get full profiles for top results profiles = [] for result in search_results: profile = get_member_profile(result['bio_id']) if profile: profile['similarity_score'] = result['similarity_score'] profiles.append(profile) return [TextContent(type="text", text=json.dumps(profiles, indent=2))] elif name == "get_member_profile": bio_id = arguments["bio_id"] profile = get_member_profile(bio_id) if profile: return [TextContent(type="text", text=json.dumps(profile, indent=2))] else: return [TextContent(type="text", text=f"No profile found for bio_id: {bio_id}")] elif name == "execute_sql_query": query = arguments["query"] # Basic security: only allow SELECT queries if not query.strip().upper().startswith("SELECT"): return [TextContent(type="text", text="Error: Only SELECT queries are allowed.")] results = execute_query(query) return [TextContent(type="text", text=json.dumps(results, indent=2))] elif name == "get_database_schema": schema_info = { "tables": { "members": { "description": "Main table with member biographical information", "columns": [ "bio_id (PRIMARY KEY) - Bioguide ID", "family_name - Last name", "given_name - First name", "middle_name - Middle name", "honorific_prefix - Title (Mr., Mrs., etc.)", "unaccented_family_name - Family name without accents", "unaccented_given_name - Given name without accents", "unaccented_middle_name - Middle name without accents", "birth_date - Birth date (YYYY-MM-DD)", "birth_circa - Whether birth date is approximate (0/1)", "death_date - Death date (YYYY-MM-DD)", "death_circa - Whether death date is approximate (0/1)", "profile_text - Full biography text", "full_name - Generated full name column" ] }, "job_positions": { "description": "Congressional positions held by members", "columns": [ "id (PRIMARY KEY)", "bio_id (FOREIGN KEY) - References members", "job_name - Position title (Representative, Senator)", "job_type - Type of position", "start_date - Start date of position", "start_circa - Whether start date is approximate (0/1)", "end_date - End date of position", "end_circa - Whether end date is approximate (0/1)", "congress_number - Congress number (e.g., 117)", "congress_name - Full Congress name", "party - Party affiliation", "caucus - Caucus affiliation", "region_type - Type of region represented", "region_code - State/region code (e.g., 'CA', 'NY')", "note - Additional notes" ] }, "images": { "description": "Profile images", "columns": ["id", "bio_id", "content_url", "caption"] }, "relationships": { "description": "Family relationships between members", "columns": ["id", "bio_id", "related_bio_id", "relationship_type"] }, "creative_works": { "description": "Publications and creative works by members", "columns": ["id", "bio_id", "citation_text"] }, "assets": { "description": "Additional assets (images, documents)", "columns": ["id", "bio_id", "name", "asset_type", "content_url", "credit_line", "accession_number", "upload_date"] } }, "indexes": [ "idx_family_name - Index on unaccented_family_name", "idx_given_name - Index on unaccented_given_name", "idx_birth_date - Index on birth_date", "idx_death_date - Index on death_date", "idx_job_congress - Index on congress_number", "idx_job_party - Index on party", "idx_job_region - Index on region_code", "idx_job_type - Index on job_name" ] } return [TextContent(type="text", text=json.dumps(schema_info, indent=2))] elif name == "search_by_relationship": relationship_type = arguments.get("relationship_type") if relationship_type: query = """ SELECT m1.bio_id, m1.family_name, m1.given_name, r.relationship_type, r.related_bio_id, m2.family_name as related_family_name, m2.given_name as related_given_name FROM members m1 JOIN relationships r ON m1.bio_id = r.bio_id JOIN members m2 ON r.related_bio_id = m2.bio_id WHERE r.relationship_type = ? ORDER BY m1.family_name, m1.given_name """ results = execute_query(query, (relationship_type,)) else: query = """ SELECT m1.bio_id, m1.family_name, m1.given_name, r.relationship_type, r.related_bio_id, m2.family_name as related_family_name, m2.given_name as related_given_name FROM members m1 JOIN relationships r ON m1.bio_id = r.bio_id JOIN members m2 ON r.related_bio_id = m2.bio_id ORDER BY m1.family_name, m1.given_name """ results = execute_query(query) return [TextContent(type="text", text=json.dumps(results, indent=2))] elif name == "search_biography_regex": import re pattern = arguments["pattern"] case_sensitive = arguments.get("case_sensitive", False) limit = arguments.get("limit", 5) filter_party = arguments.get("filter_party") filter_state = arguments.get("filter_state") filter_congress = arguments.get("filter_congress") return_full = arguments.get("return_full_profile", False) try: # Compile regex pattern flags = 0 if case_sensitive else re.IGNORECASE regex = re.compile(pattern, flags) # Build query with optional filters conn = get_db_connection() conn.row_factory = sqlite3.Row cursor = conn.cursor() # Base query - join with job_positions for filtering query = """ SELECT DISTINCT m.bio_id, m.family_name, m.given_name, m.middle_name, m.birth_date, m.death_date, m.profile_text, j.party, j.region_code, j.job_name, j.congress_number FROM members m LEFT JOIN job_positions j ON m.bio_id = j.bio_id WHERE m.profile_text IS NOT NULL """ where_conditions = [] params = [] if filter_party: where_conditions.append("j.party = ?") params.append(filter_party) if filter_state: where_conditions.append("j.region_code = ?") params.append(filter_state) if filter_congress: where_conditions.append("j.congress_number = ?") params.append(filter_congress) if where_conditions: query += " AND " + " AND ".join(where_conditions) cursor.execute(query, tuple(params)) # Filter using regex matches = [] for row in cursor: if regex.search(row['profile_text']): if return_full: # Return full profile matches.append(dict(row)) else: # Return concise info only match_result = { "bio_id": row['bio_id'], "name": f"{row['given_name']} {row['middle_name'] or ''} {row['family_name']}".strip(), "birth_date": row['birth_date'], "death_date": row['death_date'], "party": row['party'], "state": row['region_code'], "position": row['job_name'], "congress": row['congress_number'] } matches.append(match_result) if len(matches) >= limit: break conn.close() result = { "pattern": pattern, "case_sensitive": case_sensitive, "total_members_found": len(matches), "limit": limit, "filters_applied": { "party": filter_party, "state": filter_state, "congress": filter_congress }, "results": matches } return [TextContent(type="text", text=json.dumps(result, indent=2))] except re.error as e: return [TextContent(type="text", text=f"Invalid regex pattern: {str(e)}")] elif name == "count_members": group_by = arguments["group_by"] filter_party = arguments.get("filter_party") filter_state = arguments.get("filter_state") filter_congress = arguments.get("filter_congress") filter_position = arguments.get("filter_position") date_start = arguments.get("date_range_start") date_end = arguments.get("date_range_end") # Build WHERE clause where_conditions = [] params = [] if filter_party: where_conditions.append("j.party = ?") params.append(filter_party) if filter_state: where_conditions.append("j.region_code = ?") params.append(filter_state) if filter_congress: where_conditions.append("j.congress_number = ?") params.append(filter_congress) if filter_position: where_conditions.append("j.job_name = ?") params.append(filter_position) if date_start and date_end: where_conditions.append("(j.start_date <= ? AND (j.end_date >= ? OR j.end_date IS NULL))") params.extend([date_end, date_start]) where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else "" # Build GROUP BY query if group_by == "party": query = f""" SELECT j.party as group_key, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY j.party ORDER BY count DESC """ elif group_by == "state": query = f""" SELECT j.region_code as group_key, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY j.region_code ORDER BY count DESC """ elif group_by == "position": query = f""" SELECT j.job_name as group_key, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY j.job_name ORDER BY count DESC """ elif group_by == "congress": query = f""" SELECT j.congress_number as group_key, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY j.congress_number ORDER BY j.congress_number """ elif group_by == "year": query = f""" SELECT SUBSTR(j.start_date, 1, 4) as group_key, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY SUBSTR(j.start_date, 1, 4) ORDER BY group_key """ results = execute_query(query, tuple(params)) total = sum(r['count'] for r in results) response = { "group_by": group_by, "total_unique_members": total, "groups": results, "filters_applied": { "party": filter_party, "state": filter_state, "congress": filter_congress, "position": filter_position, "date_range": [date_start, date_end] if date_start and date_end else None } } return [TextContent(type="text", text=json.dumps(response, indent=2))] elif name == "temporal_analysis": analysis_type = arguments["analysis_type"] time_unit = arguments.get("time_unit", "congress") start_date = arguments.get("start_date") end_date = arguments.get("end_date") filter_party = arguments.get("filter_party") filter_state = arguments.get("filter_state") # Build WHERE clause where_conditions = [] params = [] if start_date: where_conditions.append("j.start_date >= ?") params.append(start_date) if end_date: where_conditions.append("j.start_date <= ?") params.append(end_date) if filter_party: where_conditions.append("j.party = ?") params.append(filter_party) if filter_state: where_conditions.append("j.region_code = ?") params.append(filter_state) where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else "" if analysis_type == "party_over_time": if time_unit == "congress": query = f""" SELECT j.congress_number, j.party, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY j.congress_number, j.party ORDER BY j.congress_number, j.party """ elif time_unit == "year": query = f""" SELECT SUBSTR(j.start_date, 1, 4) as year, j.party, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY year, j.party ORDER BY year, j.party """ elif time_unit == "decade": query = f""" SELECT (CAST(SUBSTR(j.start_date, 1, 4) AS INTEGER) / 10) * 10 as decade, j.party, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY decade, j.party ORDER BY decade, j.party """ elif analysis_type == "state_representation": if time_unit == "congress": query = f""" SELECT j.congress_number, j.region_code, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY j.congress_number, j.region_code ORDER BY j.congress_number, count DESC """ else: query = f""" SELECT SUBSTR(j.start_date, 1, 4) as year, j.region_code, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY year, j.region_code ORDER BY year, count DESC """ elif analysis_type == "position_counts": query = f""" SELECT j.congress_number, j.job_name, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY j.congress_number, j.job_name ORDER BY j.congress_number """ elif analysis_type == "demographics": # Analyze birth year distribution over time if time_unit == "congress": query = f""" SELECT j.congress_number, AVG(CAST(SUBSTR(m.birth_date, 1, 4) AS INTEGER)) as avg_birth_year, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY j.congress_number ORDER BY j.congress_number """ else: query = f""" SELECT SUBSTR(j.start_date, 1, 4) as year, AVG(CAST(SUBSTR(m.birth_date, 1, 4) AS INTEGER)) as avg_birth_year, COUNT(DISTINCT m.bio_id) as count FROM members m JOIN job_positions j ON m.bio_id = j.bio_id {where_clause} GROUP BY year ORDER BY year """ results = execute_query(query, tuple(params)) response = { "analysis_type": analysis_type, "time_unit": time_unit, "data_points": len(results), "results": results, "filters_applied": { "start_date": start_date, "end_date": end_date, "party": filter_party, "state": filter_state } } return [TextContent(type="text", text=json.dumps(response, indent=2))] elif name == "count_by_biography_content": keywords = arguments["keywords"] match_all = arguments.get("match_all", False) breakdown_by = arguments.get("breakdown_by", "none") filter_party = arguments.get("filter_party") filter_state = arguments.get("filter_state") # Build the query to find matching members conn = get_db_connection() conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get all members with their job info base_query = """ SELECT DISTINCT m.bio_id, m.profile_text, j.party, j.region_code, j.job_name, j.congress_number FROM members m LEFT JOIN job_positions j ON m.bio_id = j.bio_id WHERE m.profile_text IS NOT NULL """ where_conditions = [] params = [] if filter_party: where_conditions.append("j.party = ?") params.append(filter_party) if filter_state: where_conditions.append("j.region_code = ?") params.append(filter_state) if where_conditions: base_query += " AND " + " AND ".join(where_conditions) cursor.execute(base_query, tuple(params)) all_members = cursor.fetchall() # Filter members by keywords matching_members = [] for member in all_members: profile_text_lower = member['profile_text'].lower() if member['profile_text'] else "" if match_all: # ALL keywords must be present if all(keyword.lower() in profile_text_lower for keyword in keywords): matching_members.append(dict(member)) else: # ANY keyword must be present if any(keyword.lower() in profile_text_lower for keyword in keywords): matching_members.append(dict(member)) conn.close() # Count total unique members unique_bio_ids = set(m['bio_id'] for m in matching_members) total_count = len(unique_bio_ids) # Breakdown if requested breakdown = None if breakdown_by != "none" and matching_members: breakdown_counts = {} for member in matching_members: if breakdown_by == "party": key = member.get('party', 'Unknown') elif breakdown_by == "state": key = member.get('region_code', 'Unknown') elif breakdown_by == "position": key = member.get('job_name', 'Unknown') elif breakdown_by == "congress": key = member.get('congress_number', 'Unknown') else: key = 'Unknown' if key not in breakdown_counts: breakdown_counts[key] = set() breakdown_counts[key].add(member['bio_id']) # Convert sets to counts breakdown = [ {"group": k, "count": len(v)} for k, v in sorted(breakdown_counts.items(), key=lambda x: len(x[1]), reverse=True) ] response = { "keywords": keywords, "match_all": match_all, "total_members_matching": total_count, "breakdown_by": breakdown_by, "breakdown": breakdown, "filters_applied": { "party": filter_party, "state": filter_state } } return [TextContent(type="text", text=json.dumps(response, indent=2))] else: return [TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: return [TextContent(type="text", text=f"Error executing tool {name}: {str(e)}")] async def main(): """Main entry point for the MCP server.""" # Initialize search index (log to stderr to not interfere with stdio JSON protocol) if initialize_search_index(): print("Search index loaded successfully", file=sys.stderr, flush=True) else: print("Warning: Search index not found. Run ingest_data.py to create it.", file=sys.stderr, flush=True) # Run the server async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": import asyncio asyncio.run(main())