Spaces:
Sleeping
Sleeping
| import sys | |
| import unicodedata | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import os | |
| from pathlib import Path | |
| from urllib.parse import quote, unquote | |
| from dotenv import load_dotenv # load env vars | |
| try: | |
| from backend.rag_system import RAGSystem | |
| except ModuleNotFoundError: | |
| from rag_system import RAGSystem | |
| # Load environment variables from .env file in project root | |
| # Initialize paths at module level (will be updated in lifespan if needed) | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| DOCUMENTS_DIR = PROJECT_ROOT / "documents" | |
| PROCESSED_JSON = PROJECT_ROOT / "processed_documents.json" | |
| # env_path = PROJECT_ROOT / ".env" | |
| # print(f"[Module Init] .env file path: {env_path}") | |
| # print(f"[Module Init] .env file exists? {env_path.exists()}") | |
| # load_dotenv(env_path) | |
| # if env_path.exists(): | |
| # api_key = os.getenv("OPENAI_API_KEY") | |
| # if api_key: | |
| # print(f"[Module Init] OPENAI_API_KEY found (length: {len(api_key)} characters)") | |
| # else: | |
| # print("[Module Init] WARNING: OPENAI_API_KEY not found in .env file") | |
| # Initialize RAG system | |
| rag_system = None | |
| rag_ready = False | |
| async def lifespan(app: FastAPI): | |
| """Lifespan context manager for FastAPI startup and shutdown""" | |
| # Startup | |
| print("[Lifespan] FastAPI lifespan startup triggered") | |
| global rag_system, rag_ready, PROJECT_ROOT, DOCUMENTS_DIR, PROCESSED_JSON | |
| # Check if this app is being mounted (initialization handled by parent app) | |
| # When mounted, the parent app (app.py) handles initialization | |
| is_mounted = os.getenv("RAG_INIT_BY_PARENT", "false").lower() == "true" | |
| if is_mounted: | |
| print("[Lifespan] Skipping initialization - handled by parent app") | |
| yield | |
| print("[Lifespan] FastAPI lifespan shutdown") | |
| return | |
| # Re-calculate paths for local execution | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| DOCUMENTS_DIR = PROJECT_ROOT / "documents" | |
| PROCESSED_JSON = PROJECT_ROOT / "processed_documents.json" | |
| # Load environment variables from .env file with debug output | |
| env_path = PROJECT_ROOT / ".env" | |
| # print(f"[Lifespan] .env file path: {env_path}") | |
| # print(f"[Lifespan] .env file exists? {env_path.exists()}") | |
| if env_path.exists(): | |
| load_dotenv(env_path, override=True) | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| # if api_key: | |
| # print(f"[Lifespan] OPENAI_API_KEY found (length: {len(api_key)} characters)") | |
| # else: | |
| # print("[Lifespan] WARNING: OPENAI_API_KEY not found in .env file") | |
| else: | |
| print(f"[Lifespan] WARNING: .env file not found at {env_path}") | |
| # Try loading anyway in case it's in a different location | |
| load_dotenv(env_path, override=True) | |
| # Call initialization function (only when running standalone) | |
| print("[Lifespan] Calling initialize_rag_system()") | |
| try: | |
| initialize_rag_system() | |
| except Exception as e: | |
| print(f"[Lifespan] WARNING: Initialization failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| yield | |
| # Shutdown (if needed in the future) | |
| print("[Lifespan] FastAPI lifespan shutdown") | |
| app = FastAPI(title="Law Document RAG API", lifespan=lifespan) | |
| # CORS middleware to allow React frontend to connect | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # In production, specify your frontend URL | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class QuestionRequest(BaseModel): | |
| question: str | |
| use_history: Optional[bool] = True | |
| context_mode: Optional[str] = "chunks" | |
| model_provider: Optional[str] = "qwen" #qwen openai or huggingface | |
| class QuestionResponse(BaseModel): | |
| answer: str | |
| sources: List[str] | |
| def initialize_rag_system(): | |
| """Initialize the RAG system once at import time if data is available.""" | |
| global rag_system, rag_ready | |
| if rag_ready and rag_system is not None: | |
| return | |
| try: | |
| rag_ready = False | |
| print("[RAG Init] Starting initialization (import-time)") | |
| # Ensure documents folder exists | |
| # if not DOCUMENTS_DIR.exists(): | |
| # DOCUMENTS_DIR.mkdir(parents=True, exist_ok=True) | |
| # print(f"[RAG Init] Created documents folder at {DOCUMENTS_DIR}") | |
| rag_system = RAGSystem() | |
| # print(f"[RAG Init] processed_documents.json path: {PROCESSED_JSON}") | |
| # print(f"[RAG Init] processed_documents.json exists? {PROCESSED_JSON.exists()}") | |
| # print(f"[RAG Init] documents folder path: {DOCUMENTS_DIR}") | |
| # print(f"[RAG Init] documents folder exists? {DOCUMENTS_DIR.exists()}") | |
| if DOCUMENTS_DIR.exists() and any(DOCUMENTS_DIR.glob("*.pdf")): | |
| print("[RAG Init] PDFs detected, processing...") | |
| num_docs = rag_system.process_and_index_documents(str(DOCUMENTS_DIR)) | |
| print(f"[RAG Init] ✓ Processed and indexed {num_docs} documents") | |
| rag_ready = True | |
| elif PROCESSED_JSON.exists(): | |
| print("[RAG Init] processed_documents.json found. Building vectorstore from existing summaries...") | |
| # Call process_and_index_documents even without PDFs to build vectorstore from JSON | |
| # The method will detect no new PDFs and build from existing processed_documents.json | |
| docs_path = str(DOCUMENTS_DIR) | |
| num_docs = rag_system.process_and_index_documents(docs_path) | |
| if rag_system.vectorstore is not None: | |
| print(f"[RAG Init] ✓ Built vectorstore from processed_documents.json") | |
| rag_ready = True | |
| else: | |
| print("[RAG Init] Warning: Could not build vectorstore from processed_documents.json") | |
| rag_ready = False | |
| else: | |
| print("[RAG Init] No PDFs or processed_documents.json found. RAG remains uninitialized.") | |
| rag_ready = False | |
| except Exception as exc: | |
| print(f"[RAG Init] Initialization failed: {exc}") | |
| import traceback | |
| traceback.print_exc() | |
| rag_ready = False | |
| # initialize at import time | |
| # initialize_rag_system() | |
| async def root(): | |
| return {"message": "Law Document RAG API is running"} | |
| async def health(): | |
| return {"status": "healthy"} | |
| async def ask_question(request: QuestionRequest): | |
| """Answer a question using RAG with multi-turn chat history""" | |
| import time | |
| request_start = time.perf_counter() | |
| global rag_system, rag_ready | |
| if rag_system is None or not rag_ready: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="RAG system not initialized. Upload PDFs or processed_documents.json, then restart the Space." | |
| ) | |
| if not request.question.strip(): | |
| raise HTTPException(status_code=400, detail="Question cannot be empty") | |
| try: | |
| answer, sources, _chunks = rag_system.answer_question( | |
| request.question, | |
| use_history=request.use_history, | |
| model_provider=request.model_provider , | |
| context_mode=request.context_mode or "full", | |
| ) | |
| return QuestionResponse(answer=answer, sources=sources) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error answering question: {str(e)}. " | |
| "If you are running on Hugging Face Spaces, ensure processed_documents.json " | |
| "or your PDFs are uploaded and then restart the Space." | |
| ) | |
| async def clear_history(): | |
| """Clear chat history""" | |
| global rag_system | |
| if rag_system is None: | |
| raise HTTPException(status_code=503, detail="RAG system not initialized") | |
| rag_system.clear_chat_history() | |
| return {"message": "Chat history cleared"} | |
| async def get_document(filename: str, mode: str = Query("download", enum=["download", "preview"])): | |
| """Serve processed document files for preview or download""" | |
| documents_dir = DOCUMENTS_DIR.resolve() | |
| # Decode URL-encoded filename | |
| decoded_filename = unquote(filename) | |
| # Try direct path first | |
| file_path = (documents_dir / decoded_filename).resolve() | |
| # Prevent directory traversal | |
| if documents_dir not in file_path.parents and file_path != documents_dir: | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| # If file doesn't exist, try to find it by matching actual files in directory | |
| if not file_path.exists(): | |
| # List all PDF files in documents directory | |
| actual_files = list(documents_dir.glob("*.pdf")) | |
| # Normalize the requested filename for comparison | |
| def normalize_name(name: str) -> str: | |
| """Normalize filename for comparison (handle encoding variations)""" | |
| # Remove .pdf extension for comparison | |
| base_name = name.replace(".pdf", "").lower() | |
| # Normalize unicode (NFD -> NFC to handle composed vs decomposed) | |
| normalized = unicodedata.normalize("NFC", base_name) | |
| return normalized.strip() | |
| requested_normalized = normalize_name(decoded_filename) | |
| # Try to find matching file | |
| matched_file = None | |
| for actual_file in actual_files: | |
| actual_name = actual_file.name | |
| actual_normalized = normalize_name(actual_name) | |
| if requested_normalized == actual_normalized: | |
| matched_file = actual_file | |
| break | |
| if matched_file: | |
| file_path = matched_file.resolve() | |
| else: | |
| error_detail = f"Document not found: '{decoded_filename}'. Available files: {[f.name for f in actual_files]}" | |
| raise HTTPException( | |
| status_code=404, | |
| detail=error_detail | |
| ) | |
| file_extension = file_path.suffix.lower() | |
| def build_headers(disposition_type: str) -> dict: | |
| try: | |
| ascii_name = filename.encode("ascii", "ignore").decode("ascii") | |
| except Exception: | |
| ascii_name = "" | |
| ascii_name = ascii_name.replace('"', '').strip() or ("document.pdf" if file_extension == ".pdf" else "document") | |
| encoded_name = quote(filename) | |
| return { | |
| "Content-Disposition": f"{disposition_type}; filename=\"{ascii_name}\"; filename*=UTF-8''{encoded_name}" | |
| } | |
| if mode == "preview": | |
| if file_extension != ".pdf": | |
| error_msg = f"Preview only available for PDF files. File extension: {file_extension}" | |
| return JSONResponse({"filename": filename, "error": error_msg}, status_code=400) | |
| # Verify file exists before returning | |
| if not file_path.exists(): | |
| error_msg = f"File not found for preview: {file_path}" | |
| raise HTTPException(status_code=404, detail=error_msg) | |
| # Verify file is readable and not empty | |
| try: | |
| file_size = file_path.stat().st_size | |
| if file_size == 0: | |
| error_msg = f"File is empty: {file_path}" | |
| raise HTTPException(status_code=400, detail=error_msg) | |
| except Exception as e: | |
| error_msg = f"Error accessing file: {str(e)}" | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| # Build headers for preview (inline display) | |
| preview_headers = build_headers("inline") | |
| # Add CORS headers if needed | |
| preview_headers["Access-Control-Allow-Origin"] = "*" | |
| preview_headers["Access-Control-Expose-Headers"] = "Content-Disposition, Content-Type" | |
| return FileResponse( | |
| str(file_path), | |
| media_type="application/pdf", | |
| filename=filename, | |
| headers=preview_headers | |
| ) | |
| media_type = "application/pdf" if file_extension == ".pdf" else "application/octet-stream" | |
| return FileResponse( | |
| str(file_path), | |
| media_type=media_type, | |
| filename=filename, | |
| headers=build_headers("attachment") | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |