import sys import unicodedata from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel from typing import List, Optional import os from pathlib import Path from urllib.parse import quote, unquote from dotenv import load_dotenv # load env vars try: from backend.rag_system import RAGSystem except ModuleNotFoundError: from rag_system import RAGSystem # Load environment variables from .env file in project root # Initialize paths at module level (will be updated in lifespan if needed) PROJECT_ROOT = Path(__file__).resolve().parents[1] DOCUMENTS_DIR = PROJECT_ROOT / "documents" PROCESSED_JSON = PROJECT_ROOT / "processed_documents.json" # env_path = PROJECT_ROOT / ".env" # print(f"[Module Init] .env file path: {env_path}") # print(f"[Module Init] .env file exists? {env_path.exists()}") # load_dotenv(env_path) # if env_path.exists(): # api_key = os.getenv("OPENAI_API_KEY") # if api_key: # print(f"[Module Init] OPENAI_API_KEY found (length: {len(api_key)} characters)") # else: # print("[Module Init] WARNING: OPENAI_API_KEY not found in .env file") # Initialize RAG system rag_system = None rag_ready = False @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for FastAPI startup and shutdown""" # Startup print("[Lifespan] FastAPI lifespan startup triggered") global rag_system, rag_ready, PROJECT_ROOT, DOCUMENTS_DIR, PROCESSED_JSON # Check if this app is being mounted (initialization handled by parent app) # When mounted, the parent app (app.py) handles initialization is_mounted = os.getenv("RAG_INIT_BY_PARENT", "false").lower() == "true" if is_mounted: print("[Lifespan] Skipping initialization - handled by parent app") yield print("[Lifespan] FastAPI lifespan shutdown") return # Re-calculate paths for local execution PROJECT_ROOT = Path(__file__).resolve().parents[1] DOCUMENTS_DIR = PROJECT_ROOT / "documents" PROCESSED_JSON = PROJECT_ROOT / "processed_documents.json" # Load environment variables from .env file with debug output env_path = PROJECT_ROOT / ".env" # print(f"[Lifespan] .env file path: {env_path}") # print(f"[Lifespan] .env file exists? {env_path.exists()}") if env_path.exists(): load_dotenv(env_path, override=True) api_key = os.getenv("OPENAI_API_KEY") # if api_key: # print(f"[Lifespan] OPENAI_API_KEY found (length: {len(api_key)} characters)") # else: # print("[Lifespan] WARNING: OPENAI_API_KEY not found in .env file") else: print(f"[Lifespan] WARNING: .env file not found at {env_path}") # Try loading anyway in case it's in a different location load_dotenv(env_path, override=True) # Call initialization function (only when running standalone) print("[Lifespan] Calling initialize_rag_system()") try: initialize_rag_system() except Exception as e: print(f"[Lifespan] WARNING: Initialization failed: {e}") import traceback traceback.print_exc() yield # Shutdown (if needed in the future) print("[Lifespan] FastAPI lifespan shutdown") app = FastAPI(title="Law Document RAG API", lifespan=lifespan) # CORS middleware to allow React frontend to connect app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your frontend URL allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class QuestionRequest(BaseModel): question: str use_history: Optional[bool] = True context_mode: Optional[str] = "chunks" model_provider: Optional[str] = "qwen" #qwen openai or huggingface class QuestionResponse(BaseModel): answer: str sources: List[str] def initialize_rag_system(): """Initialize the RAG system once at import time if data is available.""" global rag_system, rag_ready if rag_ready and rag_system is not None: return try: rag_ready = False print("[RAG Init] Starting initialization (import-time)") # Ensure documents folder exists # if not DOCUMENTS_DIR.exists(): # DOCUMENTS_DIR.mkdir(parents=True, exist_ok=True) # print(f"[RAG Init] Created documents folder at {DOCUMENTS_DIR}") rag_system = RAGSystem() # print(f"[RAG Init] processed_documents.json path: {PROCESSED_JSON}") # print(f"[RAG Init] processed_documents.json exists? {PROCESSED_JSON.exists()}") # print(f"[RAG Init] documents folder path: {DOCUMENTS_DIR}") # print(f"[RAG Init] documents folder exists? {DOCUMENTS_DIR.exists()}") if DOCUMENTS_DIR.exists() and any(DOCUMENTS_DIR.glob("*.pdf")): print("[RAG Init] PDFs detected, processing...") num_docs = rag_system.process_and_index_documents(str(DOCUMENTS_DIR)) print(f"[RAG Init] ✓ Processed and indexed {num_docs} documents") rag_ready = True elif PROCESSED_JSON.exists(): print("[RAG Init] processed_documents.json found. Building vectorstore from existing summaries...") # Call process_and_index_documents even without PDFs to build vectorstore from JSON # The method will detect no new PDFs and build from existing processed_documents.json docs_path = str(DOCUMENTS_DIR) num_docs = rag_system.process_and_index_documents(docs_path) if rag_system.vectorstore is not None: print(f"[RAG Init] ✓ Built vectorstore from processed_documents.json") rag_ready = True else: print("[RAG Init] Warning: Could not build vectorstore from processed_documents.json") rag_ready = False else: print("[RAG Init] No PDFs or processed_documents.json found. RAG remains uninitialized.") rag_ready = False except Exception as exc: print(f"[RAG Init] Initialization failed: {exc}") import traceback traceback.print_exc() rag_ready = False # initialize at import time # initialize_rag_system() @app.get("/") async def root(): return {"message": "Law Document RAG API is running"} @app.get("/health") async def health(): return {"status": "healthy"} @app.post("/ask", response_model=QuestionResponse) async def ask_question(request: QuestionRequest): """Answer a question using RAG with multi-turn chat history""" import time request_start = time.perf_counter() global rag_system, rag_ready if rag_system is None or not rag_ready: raise HTTPException( status_code=503, detail="RAG system not initialized. Upload PDFs or processed_documents.json, then restart the Space." ) if not request.question.strip(): raise HTTPException(status_code=400, detail="Question cannot be empty") try: answer, sources, _chunks = rag_system.answer_question( request.question, use_history=request.use_history, model_provider=request.model_provider , context_mode=request.context_mode or "full", ) return QuestionResponse(answer=answer, sources=sources) except Exception as e: raise HTTPException( status_code=500, detail=f"Error answering question: {str(e)}. " "If you are running on Hugging Face Spaces, ensure processed_documents.json " "or your PDFs are uploaded and then restart the Space." ) @app.post("/clear-history", response_model=dict) async def clear_history(): """Clear chat history""" global rag_system if rag_system is None: raise HTTPException(status_code=503, detail="RAG system not initialized") rag_system.clear_chat_history() return {"message": "Chat history cleared"} @app.get("/documents/{filename}") async def get_document(filename: str, mode: str = Query("download", enum=["download", "preview"])): """Serve processed document files for preview or download""" documents_dir = DOCUMENTS_DIR.resolve() # Decode URL-encoded filename decoded_filename = unquote(filename) # Try direct path first file_path = (documents_dir / decoded_filename).resolve() # Prevent directory traversal if documents_dir not in file_path.parents and file_path != documents_dir: raise HTTPException(status_code=403, detail="Access denied") # If file doesn't exist, try to find it by matching actual files in directory if not file_path.exists(): # List all PDF files in documents directory actual_files = list(documents_dir.glob("*.pdf")) # Normalize the requested filename for comparison def normalize_name(name: str) -> str: """Normalize filename for comparison (handle encoding variations)""" # Remove .pdf extension for comparison base_name = name.replace(".pdf", "").lower() # Normalize unicode (NFD -> NFC to handle composed vs decomposed) normalized = unicodedata.normalize("NFC", base_name) return normalized.strip() requested_normalized = normalize_name(decoded_filename) # Try to find matching file matched_file = None for actual_file in actual_files: actual_name = actual_file.name actual_normalized = normalize_name(actual_name) if requested_normalized == actual_normalized: matched_file = actual_file break if matched_file: file_path = matched_file.resolve() else: error_detail = f"Document not found: '{decoded_filename}'. Available files: {[f.name for f in actual_files]}" raise HTTPException( status_code=404, detail=error_detail ) file_extension = file_path.suffix.lower() def build_headers(disposition_type: str) -> dict: try: ascii_name = filename.encode("ascii", "ignore").decode("ascii") except Exception: ascii_name = "" ascii_name = ascii_name.replace('"', '').strip() or ("document.pdf" if file_extension == ".pdf" else "document") encoded_name = quote(filename) return { "Content-Disposition": f"{disposition_type}; filename=\"{ascii_name}\"; filename*=UTF-8''{encoded_name}" } if mode == "preview": if file_extension != ".pdf": error_msg = f"Preview only available for PDF files. File extension: {file_extension}" return JSONResponse({"filename": filename, "error": error_msg}, status_code=400) # Verify file exists before returning if not file_path.exists(): error_msg = f"File not found for preview: {file_path}" raise HTTPException(status_code=404, detail=error_msg) # Verify file is readable and not empty try: file_size = file_path.stat().st_size if file_size == 0: error_msg = f"File is empty: {file_path}" raise HTTPException(status_code=400, detail=error_msg) except Exception as e: error_msg = f"Error accessing file: {str(e)}" raise HTTPException(status_code=500, detail=error_msg) # Build headers for preview (inline display) preview_headers = build_headers("inline") # Add CORS headers if needed preview_headers["Access-Control-Allow-Origin"] = "*" preview_headers["Access-Control-Expose-Headers"] = "Content-Disposition, Content-Type" return FileResponse( str(file_path), media_type="application/pdf", filename=filename, headers=preview_headers ) media_type = "application/pdf" if file_extension == ".pdf" else "application/octet-stream" return FileResponse( str(file_path), media_type=media_type, filename=filename, headers=build_headers("attachment") ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)