Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Test script to validate the Congressional Bioguide database and search functionality. | |
| """ | |
| import sqlite3 | |
| import json | |
| from pathlib import Path | |
| def test_database(): | |
| """Test database structure and basic queries.""" | |
| print("Testing Database...") | |
| print("=" * 60) | |
| if not Path("congress.db").exists(): | |
| print("β Database not found. Run ingest_data.py first.") | |
| return False | |
| conn = sqlite3.connect("congress.db") | |
| cursor = conn.cursor() | |
| # Test 1: Count members | |
| cursor.execute("SELECT COUNT(*) FROM members") | |
| member_count = cursor.fetchone()[0] | |
| print(f"β Members in database: {member_count}") | |
| # Test 2: Count job positions | |
| cursor.execute("SELECT COUNT(*) FROM job_positions") | |
| job_count = cursor.fetchone()[0] | |
| print(f"β Job positions recorded: {job_count}") | |
| # Test 3: Search by name | |
| cursor.execute(""" | |
| SELECT bio_id, family_name, given_name, birth_date, death_date | |
| FROM members | |
| WHERE unaccented_family_name = 'Lincoln' | |
| ORDER BY birth_date | |
| """) | |
| lincolns = cursor.fetchall() | |
| print(f"\nβ Found {len(lincolns)} member(s) with family name 'Lincoln':") | |
| for bio_id, family, given, birth, death in lincolns: | |
| print(f" - {given} {family} ({bio_id}): {birth} - {death or 'present'}") | |
| # Test 4: Party breakdown | |
| cursor.execute(""" | |
| SELECT party, COUNT(DISTINCT bio_id) as count | |
| FROM job_positions | |
| WHERE party IS NOT NULL | |
| GROUP BY party | |
| ORDER BY count DESC | |
| LIMIT 10 | |
| """) | |
| parties = cursor.fetchall() | |
| print(f"\nβ Top parties by member count:") | |
| for party, count in parties: | |
| print(f" - {party}: {count} members") | |
| # Test 5: State representation | |
| cursor.execute(""" | |
| SELECT region_code, COUNT(DISTINCT bio_id) as count | |
| FROM job_positions | |
| WHERE region_code IS NOT NULL AND region_type = 'StateRegion' | |
| GROUP BY region_code | |
| ORDER BY count DESC | |
| LIMIT 10 | |
| """) | |
| states = cursor.fetchall() | |
| print(f"\nβ Top states by member count:") | |
| for state, count in states: | |
| print(f" - {state}: {count} members") | |
| # Test 6: Relationships | |
| cursor.execute("SELECT COUNT(*) FROM relationships") | |
| rel_count = cursor.fetchone()[0] | |
| print(f"\nβ Family relationships recorded: {rel_count}") | |
| if rel_count > 0: | |
| cursor.execute(""" | |
| SELECT m1.given_name, m1.family_name, r.relationship_type, | |
| m2.given_name, m2.family_name | |
| FROM relationships r | |
| JOIN members m1 ON r.bio_id = m1.bio_id | |
| JOIN members m2 ON r.related_bio_id = m2.bio_id | |
| LIMIT 5 | |
| """) | |
| relationships = cursor.fetchall() | |
| print(" Sample relationships:") | |
| for given1, family1, rel_type, given2, family2 in relationships: | |
| print(f" - {given1} {family1} is {rel_type} of {given2} {family2}") | |
| # Test 7: Profile text | |
| cursor.execute(""" | |
| SELECT bio_id, given_name, family_name, LENGTH(profile_text) as text_len | |
| FROM members | |
| WHERE profile_text IS NOT NULL | |
| ORDER BY text_len DESC | |
| LIMIT 5 | |
| """) | |
| longest_profiles = cursor.fetchall() | |
| print(f"\nβ Longest biography profiles:") | |
| for bio_id, given, family, length in longest_profiles: | |
| print(f" - {given} {family} ({bio_id}): {length} characters") | |
| conn.close() | |
| return True | |
| def test_faiss_index(): | |
| """Test FAISS index.""" | |
| print("\n\nTesting FAISS Index...") | |
| print("=" * 60) | |
| if not Path("congress_faiss.index").exists(): | |
| print("β FAISS index not found. Run ingest_data.py first.") | |
| return False | |
| if not Path("congress_bio_ids.pkl").exists(): | |
| print("β Bio ID mapping not found. Run ingest_data.py first.") | |
| return False | |
| try: | |
| import faiss | |
| import pickle | |
| from sentence_transformers import SentenceTransformer | |
| # Load index | |
| index = faiss.read_index("congress_faiss.index") | |
| with open("congress_bio_ids.pkl", "rb") as f: | |
| bio_ids = pickle.load(f) | |
| print(f"β FAISS index loaded: {index.ntotal} vectors") | |
| print(f"β Dimension: {index.d}") | |
| # Load model | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| print("β Sentence transformer model loaded") | |
| # Test search | |
| test_queries = [ | |
| "lawyers who became judges", | |
| "Civil War veterans", | |
| "served in the military", | |
| "teachers and educators" | |
| ] | |
| for query in test_queries: | |
| print(f"\nβ Testing query: '{query}'") | |
| query_embedding = model.encode([query])[0].reshape(1, -1).astype('float32') | |
| faiss.normalize_L2(query_embedding) | |
| scores, indices = index.search(query_embedding, 3) | |
| # Load database to get names | |
| conn = sqlite3.connect("congress.db") | |
| cursor = conn.cursor() | |
| print(" Top 3 results:") | |
| for idx, score in zip(indices[0], scores[0]): | |
| if idx < len(bio_ids): | |
| bio_id = bio_ids[idx] | |
| cursor.execute( | |
| "SELECT given_name, family_name FROM members WHERE bio_id = ?", | |
| (bio_id,) | |
| ) | |
| result = cursor.fetchone() | |
| if result: | |
| given, family = result | |
| print(f" - {given} {family} ({bio_id}): score={score:.4f}") | |
| conn.close() | |
| return True | |
| except ImportError as e: | |
| print(f"β Missing dependency: {e}") | |
| print(" Run: pip install -r requirements.txt") | |
| return False | |
| except Exception as e: | |
| print(f"β Error testing FAISS: {e}") | |
| return False | |
| def test_sample_profile(): | |
| """Display a sample profile.""" | |
| print("\n\nSample Profile...") | |
| print("=" * 60) | |
| conn = sqlite3.connect("congress.db") | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| # Get a well-known member | |
| cursor.execute(""" | |
| SELECT * FROM members | |
| WHERE unaccented_family_name = 'Lincoln' AND unaccented_given_name = 'Abraham' | |
| LIMIT 1 | |
| """) | |
| member = cursor.fetchone() | |
| if member: | |
| bio_id = member['bio_id'] | |
| print(f"Profile: {member['given_name']} {member['family_name']} ({bio_id})") | |
| print(f"Birth: {member['birth_date']}") | |
| print(f"Death: {member['death_date']}") | |
| print(f"\nBiography excerpt:") | |
| profile_text = member['profile_text'] or "" | |
| print(f" {profile_text[:300]}...") | |
| # Get positions | |
| cursor.execute(""" | |
| SELECT job_name, party, congress_number, region_code, start_date, end_date | |
| FROM job_positions | |
| WHERE bio_id = ? | |
| ORDER BY start_date | |
| """, (bio_id,)) | |
| positions = cursor.fetchall() | |
| if positions: | |
| print(f"\nPositions held ({len(positions)}):") | |
| for pos in positions: | |
| print(f" - {pos['job_name']} ({pos['party']}), {pos['region_code']}") | |
| print(f" Congress {pos['congress_number']}: {pos['start_date']} - {pos['end_date']}") | |
| conn.close() | |
| def main(): | |
| """Run all tests.""" | |
| print("Congressional Bioguide Database Test Suite") | |
| print("=" * 60) | |
| print() | |
| db_ok = test_database() | |
| faiss_ok = test_faiss_index() | |
| if db_ok: | |
| test_sample_profile() | |
| print("\n" + "=" * 60) | |
| if db_ok and faiss_ok: | |
| print("β All tests passed!") | |
| print("\nThe system is ready to use. Start the MCP server with:") | |
| print(" python3 server.py") | |
| else: | |
| print("β Some tests failed. Please check the errors above.") | |
| if not db_ok: | |
| print(" Run: python3 ingest_data.py") | |
| if __name__ == "__main__": | |
| main() | |