BioGuideMCP / test_embeddings_data.py
stefanjwojcik's picture
Add setup script and comprehensive tests for Congressional Bioguide MCP Server
15de73a
#!/usr/bin/env python3
"""
Test the embeddings data to check for issues before FAISS operations.
"""
import sys
import os
import sqlite3
import numpy as np
print("=" * 60)
print("EMBEDDINGS DATA VALIDATION TEST")
print("=" * 60)
print(f"Python version: {sys.version}")
print()
# Load model
print("Loading sentence transformer...")
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
print("βœ“ Model loaded\n")
# Load ALL biographies
print("Loading ALL biographies from database...")
conn = sqlite3.connect("congress.db")
cursor = conn.cursor()
cursor.execute("""
SELECT bio_id, profile_text
FROM members
WHERE profile_text IS NOT NULL AND profile_text != ''
""")
rows = cursor.fetchall()
conn.close()
bio_ids = [r[0] for r in rows]
texts = [r[1] for r in rows]
print(f"βœ“ Loaded {len(texts)} biographies\n")
# Encode ALL
print("Encoding all biographies...")
print("(This will take a few minutes...)")
embeddings = []
batch_size = 32
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch_embeddings = model.encode(
batch,
show_progress_bar=False,
convert_to_numpy=True,
normalize_embeddings=False,
device='cpu'
)
embeddings.extend(batch_embeddings)
if (i // batch_size + 1) % 100 == 0:
print(f" Encoded {i + len(batch)}/{len(texts)}...")
embeddings = np.array(embeddings, dtype=np.float32)
print(f"βœ“ Encoded all, shape: {embeddings.shape}\n")
# Validate embeddings
print("Validating embeddings data...")
print(f" Shape: {embeddings.shape}")
print(f" Dtype: {embeddings.dtype}")
print(f" Min value: {np.min(embeddings)}")
print(f" Max value: {np.max(embeddings)}")
print(f" Mean: {np.mean(embeddings)}")
print(f" Has NaN: {np.any(np.isnan(embeddings))}")
print(f" Has Inf: {np.any(np.isinf(embeddings))}")
print(f" Is C-contiguous: {embeddings.flags['C_CONTIGUOUS']}")
print(f" Memory usage: {embeddings.nbytes / (1024**2):.2f} MB")
if np.any(np.isnan(embeddings)):
print("\n❌ ERROR: Embeddings contain NaN values!")
sys.exit(1)
if np.any(np.isinf(embeddings)):
print("\n❌ ERROR: Embeddings contain Inf values!")
sys.exit(1)
print("\nβœ“ Embeddings data looks good")
# Now test FAISS operations one by one
print("\n" + "=" * 60)
print("Testing FAISS operations...")
print("=" * 60)
import faiss
dimension = embeddings.shape[1]
print(f"\n1. Creating IndexFlatIP with dimension {dimension}...")
try:
index = faiss.IndexFlatIP(dimension)
print(" βœ“ Index created")
except Exception as e:
print(f" ❌ FAILED at index creation: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
print(f"\n2. Normalizing {len(embeddings)} embeddings...")
try:
# Make a copy to preserve original
embeddings_norm = embeddings.copy()
print(f" Before normalize - sample norm: {np.linalg.norm(embeddings_norm[0]):.4f}")
faiss.normalize_L2(embeddings_norm)
print(f" After normalize - sample norm: {np.linalg.norm(embeddings_norm[0]):.4f}")
print(f" βœ“ Normalized")
except Exception as e:
print(f" ❌ FAILED at normalize: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
print(f"\n3. Adding {len(embeddings_norm)} vectors to index...")
try:
index.add(embeddings_norm)
print(f" βœ“ Added {index.ntotal} vectors")
except Exception as e:
print(f" ❌ FAILED at add: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
print(f"\n4. Writing index to disk...")
try:
faiss.write_index(index, "test_full.faiss")
print(f" βœ“ Index written")
except Exception as e:
print(f" ❌ FAILED at write: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
print("\n" + "=" * 60)
print("βœ… SUCCESS! Full pipeline works!")
print("=" * 60)
print(f"\nProcessed {len(embeddings)} embeddings successfully")
print("The index has been created: test_full.faiss")