AldawsariNLP's picture
pushing alst changes ...
97cdc7c
import sys
import unicodedata
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import os
from pathlib import Path
from urllib.parse import quote, unquote
from dotenv import load_dotenv # load env vars
try:
from backend.rag_system import RAGSystem
except ModuleNotFoundError:
from rag_system import RAGSystem
# Load environment variables from .env file in project root
# Initialize paths at module level (will be updated in lifespan if needed)
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DOCUMENTS_DIR = PROJECT_ROOT / "documents"
PROCESSED_JSON = PROJECT_ROOT / "processed_documents.json"
# env_path = PROJECT_ROOT / ".env"
# print(f"[Module Init] .env file path: {env_path}")
# print(f"[Module Init] .env file exists? {env_path.exists()}")
# load_dotenv(env_path)
# if env_path.exists():
# api_key = os.getenv("OPENAI_API_KEY")
# if api_key:
# print(f"[Module Init] OPENAI_API_KEY found (length: {len(api_key)} characters)")
# else:
# print("[Module Init] WARNING: OPENAI_API_KEY not found in .env file")
# Initialize RAG system
rag_system = None
rag_ready = False
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for FastAPI startup and shutdown"""
# Startup
print("[Lifespan] FastAPI lifespan startup triggered")
global rag_system, rag_ready, PROJECT_ROOT, DOCUMENTS_DIR, PROCESSED_JSON
# Check if this app is being mounted (initialization handled by parent app)
# When mounted, the parent app (app.py) handles initialization
is_mounted = os.getenv("RAG_INIT_BY_PARENT", "false").lower() == "true"
if is_mounted:
print("[Lifespan] Skipping initialization - handled by parent app")
yield
print("[Lifespan] FastAPI lifespan shutdown")
return
# Re-calculate paths for local execution
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DOCUMENTS_DIR = PROJECT_ROOT / "documents"
PROCESSED_JSON = PROJECT_ROOT / "processed_documents.json"
# Load environment variables from .env file with debug output
env_path = PROJECT_ROOT / ".env"
# print(f"[Lifespan] .env file path: {env_path}")
# print(f"[Lifespan] .env file exists? {env_path.exists()}")
if env_path.exists():
load_dotenv(env_path, override=True)
api_key = os.getenv("OPENAI_API_KEY")
# if api_key:
# print(f"[Lifespan] OPENAI_API_KEY found (length: {len(api_key)} characters)")
# else:
# print("[Lifespan] WARNING: OPENAI_API_KEY not found in .env file")
else:
print(f"[Lifespan] WARNING: .env file not found at {env_path}")
# Try loading anyway in case it's in a different location
load_dotenv(env_path, override=True)
# Call initialization function (only when running standalone)
print("[Lifespan] Calling initialize_rag_system()")
try:
initialize_rag_system()
except Exception as e:
print(f"[Lifespan] WARNING: Initialization failed: {e}")
import traceback
traceback.print_exc()
yield
# Shutdown (if needed in the future)
print("[Lifespan] FastAPI lifespan shutdown")
app = FastAPI(title="Law Document RAG API", lifespan=lifespan)
# CORS middleware to allow React frontend to connect
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class QuestionRequest(BaseModel):
question: str
use_history: Optional[bool] = True
context_mode: Optional[str] = "chunks"
model_provider: Optional[str] = "qwen" #qwen openai or huggingface
class QuestionResponse(BaseModel):
answer: str
sources: List[str]
def initialize_rag_system():
"""Initialize the RAG system once at import time if data is available."""
global rag_system, rag_ready
if rag_ready and rag_system is not None:
return
try:
rag_ready = False
print("[RAG Init] Starting initialization (import-time)")
# Ensure documents folder exists
# if not DOCUMENTS_DIR.exists():
# DOCUMENTS_DIR.mkdir(parents=True, exist_ok=True)
# print(f"[RAG Init] Created documents folder at {DOCUMENTS_DIR}")
rag_system = RAGSystem()
# print(f"[RAG Init] processed_documents.json path: {PROCESSED_JSON}")
# print(f"[RAG Init] processed_documents.json exists? {PROCESSED_JSON.exists()}")
# print(f"[RAG Init] documents folder path: {DOCUMENTS_DIR}")
# print(f"[RAG Init] documents folder exists? {DOCUMENTS_DIR.exists()}")
if DOCUMENTS_DIR.exists() and any(DOCUMENTS_DIR.glob("*.pdf")):
print("[RAG Init] PDFs detected, processing...")
num_docs = rag_system.process_and_index_documents(str(DOCUMENTS_DIR))
print(f"[RAG Init] ✓ Processed and indexed {num_docs} documents")
rag_ready = True
elif PROCESSED_JSON.exists():
print("[RAG Init] processed_documents.json found. Building vectorstore from existing summaries...")
# Call process_and_index_documents even without PDFs to build vectorstore from JSON
# The method will detect no new PDFs and build from existing processed_documents.json
docs_path = str(DOCUMENTS_DIR)
num_docs = rag_system.process_and_index_documents(docs_path)
if rag_system.vectorstore is not None:
print(f"[RAG Init] ✓ Built vectorstore from processed_documents.json")
rag_ready = True
else:
print("[RAG Init] Warning: Could not build vectorstore from processed_documents.json")
rag_ready = False
else:
print("[RAG Init] No PDFs or processed_documents.json found. RAG remains uninitialized.")
rag_ready = False
except Exception as exc:
print(f"[RAG Init] Initialization failed: {exc}")
import traceback
traceback.print_exc()
rag_ready = False
# initialize at import time
# initialize_rag_system()
@app.get("/")
async def root():
return {"message": "Law Document RAG API is running"}
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.post("/ask", response_model=QuestionResponse)
async def ask_question(request: QuestionRequest):
"""Answer a question using RAG with multi-turn chat history"""
import time
request_start = time.perf_counter()
global rag_system, rag_ready
if rag_system is None or not rag_ready:
raise HTTPException(
status_code=503,
detail="RAG system not initialized. Upload PDFs or processed_documents.json, then restart the Space."
)
if not request.question.strip():
raise HTTPException(status_code=400, detail="Question cannot be empty")
try:
answer, sources, _chunks = rag_system.answer_question(
request.question,
use_history=request.use_history,
model_provider=request.model_provider ,
context_mode=request.context_mode or "full",
)
return QuestionResponse(answer=answer, sources=sources)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error answering question: {str(e)}. "
"If you are running on Hugging Face Spaces, ensure processed_documents.json "
"or your PDFs are uploaded and then restart the Space."
)
@app.post("/clear-history", response_model=dict)
async def clear_history():
"""Clear chat history"""
global rag_system
if rag_system is None:
raise HTTPException(status_code=503, detail="RAG system not initialized")
rag_system.clear_chat_history()
return {"message": "Chat history cleared"}
@app.get("/documents/{filename}")
async def get_document(filename: str, mode: str = Query("download", enum=["download", "preview"])):
"""Serve processed document files for preview or download"""
documents_dir = DOCUMENTS_DIR.resolve()
# Decode URL-encoded filename
decoded_filename = unquote(filename)
# Try direct path first
file_path = (documents_dir / decoded_filename).resolve()
# Prevent directory traversal
if documents_dir not in file_path.parents and file_path != documents_dir:
raise HTTPException(status_code=403, detail="Access denied")
# If file doesn't exist, try to find it by matching actual files in directory
if not file_path.exists():
# List all PDF files in documents directory
actual_files = list(documents_dir.glob("*.pdf"))
# Normalize the requested filename for comparison
def normalize_name(name: str) -> str:
"""Normalize filename for comparison (handle encoding variations)"""
# Remove .pdf extension for comparison
base_name = name.replace(".pdf", "").lower()
# Normalize unicode (NFD -> NFC to handle composed vs decomposed)
normalized = unicodedata.normalize("NFC", base_name)
return normalized.strip()
requested_normalized = normalize_name(decoded_filename)
# Try to find matching file
matched_file = None
for actual_file in actual_files:
actual_name = actual_file.name
actual_normalized = normalize_name(actual_name)
if requested_normalized == actual_normalized:
matched_file = actual_file
break
if matched_file:
file_path = matched_file.resolve()
else:
error_detail = f"Document not found: '{decoded_filename}'. Available files: {[f.name for f in actual_files]}"
raise HTTPException(
status_code=404,
detail=error_detail
)
file_extension = file_path.suffix.lower()
def build_headers(disposition_type: str) -> dict:
try:
ascii_name = filename.encode("ascii", "ignore").decode("ascii")
except Exception:
ascii_name = ""
ascii_name = ascii_name.replace('"', '').strip() or ("document.pdf" if file_extension == ".pdf" else "document")
encoded_name = quote(filename)
return {
"Content-Disposition": f"{disposition_type}; filename=\"{ascii_name}\"; filename*=UTF-8''{encoded_name}"
}
if mode == "preview":
if file_extension != ".pdf":
error_msg = f"Preview only available for PDF files. File extension: {file_extension}"
return JSONResponse({"filename": filename, "error": error_msg}, status_code=400)
# Verify file exists before returning
if not file_path.exists():
error_msg = f"File not found for preview: {file_path}"
raise HTTPException(status_code=404, detail=error_msg)
# Verify file is readable and not empty
try:
file_size = file_path.stat().st_size
if file_size == 0:
error_msg = f"File is empty: {file_path}"
raise HTTPException(status_code=400, detail=error_msg)
except Exception as e:
error_msg = f"Error accessing file: {str(e)}"
raise HTTPException(status_code=500, detail=error_msg)
# Build headers for preview (inline display)
preview_headers = build_headers("inline")
# Add CORS headers if needed
preview_headers["Access-Control-Allow-Origin"] = "*"
preview_headers["Access-Control-Expose-Headers"] = "Content-Disposition, Content-Type"
return FileResponse(
str(file_path),
media_type="application/pdf",
filename=filename,
headers=preview_headers
)
media_type = "application/pdf" if file_extension == ".pdf" else "application/octet-stream"
return FileResponse(
str(file_path),
media_type=media_type,
filename=filename,
headers=build_headers("attachment")
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)