from flask import Flask, request, render_template, session, url_for, redirect, jsonify from flask_session import Session from langchain_core.messages import HumanMessage, AIMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder import os import logging import re import traceback import base64 import shutil import zipfile from dotenv import load_dotenv from huggingface_hub import hf_hub_download from PIL import Image # --- Core Application Imports --- # Make sure you have an empty __init__.py file in your 'src' folder from api import api_bp from src.medical_swarm import run_medical_swarm from src.utils import load_rag_system, standardize_query, get_standalone_question, parse_agent_response, markdown_bold_to_html from langchain_google_genai import ChatGoogleGenerativeAI # Setup logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # --- 1. DATABASE SETUP FUNCTION (For Deployment) --- def setup_database(): """Downloads and unzips the ChromaDB folder from Hugging Face Datasets.""" # --- !!! IMPORTANT !!! --- # YOU MUST CHANGE THIS to your Hugging Face Dataset repo ID # For example: "your_username/your_database_repo_name" DATASET_REPO_ID = "WanIrfan/atlast-db" # ------------------------- ZIP_FILENAME = "chroma_db.zip" DB_DIR = "chroma_db" if os.path.exists(DB_DIR) and os.listdir(DB_DIR): logger.info("āœ… Database directory already exists. Skipping download.") return logger.info(f"šŸ“„ Downloading database from HF Hub: {DATASET_REPO_ID}") try: zip_path = hf_hub_download( repo_id=DATASET_REPO_ID, filename=ZIP_FILENAME, repo_type="dataset", # You might need to add your HF token to secrets if the dataset is private # token=os.getenv("HF_TOKEN") ) logger.info(f"šŸ“¦ Unzipping database from {zip_path}...") with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(".") # Extracts to the root, creating ./chroma_db logger.info("āœ… Database setup complete!") # Clean up the downloaded zip file to save space if os.path.exists(zip_path): os.remove(zip_path) except Exception as e: logger.error(f"āŒ CRITICAL ERROR setting up database: {e}", exc_info=True) # This will likely cause the RAG system to fail loading, which is expected # if the database isn't available. # --- RUN DATABASE SETUP *BEFORE* INITIALIZING THE APP --- setup_database() # --- STANDARD FLASK APP INITIALIZATION --- app = Flask(__name__) app.secret_key = os.urandom(24) # Set a secret key for session signing # --- CONFIGURE SERVER-SIDE SESSIONS --- app.config["SESSION_PERMANENT"] = False app.config["SESSION_TYPE"] = "filesystem" Session(app) google_api_key = os.getenv("GOOGLE_API_KEY") if not google_api_key: logger.warning("āš ļø GOOGLE_API_KEY not found in environment variables. LLM calls will fail.") else: logger.info("GOOGLE_API_KEY loaded successfully.") # Initialize LLM llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.05, google_api_key=google_api_key) # --- LOAD RAG SYSTEMS (AFTER DB SETUP) --- logger.info("🌟 Starting Multi-Domain AI Assistant...") try: rag_systems = { 'medical': load_rag_system(collection_name="medical_csv_Agentic_retrieval", domain="medical"), 'islamic': load_rag_system(collection_name="islamic_texts_Agentic_retrieval", domain="islamic"), 'insurance': load_rag_system(collection_name="etiqa_Agentic_retrieval", domain="insurance") } except Exception as e: logger.error(f"āŒ FAILED to load RAG systems. Check database path and permissions. Error: {e}", exc_info=True) rag_systems = {'medical': None, 'islamic': None, 'insurance': None} # Store systems and LLM on the app for blueprints app.rag_systems = rag_systems app.llm = llm # Register the API blueprint app.register_blueprint(api_bp) logger.info(f"āœ… API Blueprint registered. API endpoints are now available under /api") # Check initialization status logger.info("\nšŸ“Š SYSTEM STATUS:") for domain, system in rag_systems.items(): status = "āœ… Ready" if system else "āŒ Failed (DB missing?)" logger.info(f" {domain}: {status}") # --- FLASK ROUTES --- @app.route("/") def homePage(): # Clear all session history when visiting the home page session.pop('medical_history', None) session.pop('islamic_history', None) session.pop('insurance_history', None) session.pop('current_medical_document', None) return render_template("homePage.html") @app.route("/medical", methods=["GET", "POST"]) def medical_page(): # Use session for history and document context if request.method == "GET": # Load all latest data from session (or default to empty if not found) latest_response = session.pop('latest_medical_response', {}) # POP to clear it after one display answer = latest_response.get('answer', "") thoughts = latest_response.get('thoughts', "") validation = latest_response.get('validation', "") source = latest_response.get('source', "") # Clear history only when a user first navigates (not on redirect) if not latest_response and 'medical_history' not in session: session.pop('current_medical_document', None) return render_template("medical_page.html", history=session.get('medical_history', []), answer=answer, thoughts=thoughts, validation=validation, source=source) # POST Request Logic answer, thoughts, validation, source = "", "", "", "" history = session.get('medical_history', []) current_medical_document = session.get('current_medical_document', "") try: query=standardize_query(request.form.get("query", "")) has_image = 'image' in request.files and request.files['image'].filename has_document = 'document' in request.files and request.files['document'].filename has_query = request.form.get("query") or request.form.get("question", "") logger.info(f"POST request received: has_image={has_image}, has_document={has_document}, has_query={has_query}") if has_document: # Scenario 3: Query + Document logger.info("Processing Scenario 3: Query + Document with Medical Swarm") file = request.files['document'] try: # Store the new document text in the session document_text = file.read().decode("utf-8") session['current_medical_document'] = document_text current_medical_document = document_text # Use the new document for this turn except UnicodeDecodeError: answer = "Error: Could not decode the uploaded document. Please ensure it is a valid text or PDF file." logger.error("Scenario 3: Document decode error") thoughts = traceback.format_exc() swarm_answer = run_medical_swarm(current_medical_document, query) answer = markdown_bold_to_html(swarm_answer) history.append(HumanMessage(content=f"[Document Uploaded] Query: '{query}'")) history.append(AIMessage(content=swarm_answer)) thoughts = "Swarm analysis complete. The process is orchestrated and does not use the ReAct thought process. You can now ask follow-up questions." source= "Medical Swarm" validation = (True, "Swarm output generated.") # Swarm has its own validation logic elif has_image : #Scenario 1 logger.info("Processing Multimodal RAG: Query + Image") # --- Step 1 & 2: Image Setup & Vision Analysis --- file = request.files['image'] upload_dir = "Uploads" os.makedirs(upload_dir, exist_ok=True) image_path = os.path.join(upload_dir, file.filename) try: file.save(image_path) file.close() with open(image_path, "rb") as img_file: img_data = base64.b64encode(img_file.read()).decode("utf-8") vision_prompt = f"Analyze this image and identify the main subject in a single, concise sentence. The user's query is: '{query}'" message = HumanMessage(content=[ {"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_data}"} ]) vision_response = llm.invoke([message]) visual_prediction = vision_response.content logger.info(f"Vision Prediction: {visual_prediction}") # --- Create an Enhanced Query --- enhanced_query = ( f'User Query: "{query}" ' f'Context from an image provided by the LLM: "{visual_prediction}" ' 'Based on the user\'s query and the context from LLM, provide a comprehensive answer.' ) logger.info(f"Enhanced query : {enhanced_query}") agent = rag_systems['medical'] if not agent: raise Exception("Medical RAG system is not loaded.") response_dict = agent.answer(enhanced_query, chat_history=history) answer, thoughts, validation, source = parse_agent_response(response_dict) history.append(HumanMessage(content=query)) history.append(AIMessage(content=answer)) finally: if os.path.exists(image_path): try: os.remove(image_path) logger.info(f"Successfully deleted temporary image file: {image_path}") except PermissionError as e: logger.warning(f"Could not remove {image_path} after processing. " f"File may be locked by another process. Error: {e}") elif query: # --- SCENARIO 2: TEXT-ONLY QUERY OR SWARM FOLLOW-UP --- history_for_agent = history if current_medical_document: logger.info("Processing Follow-up Query for Document") history_for_agent = [HumanMessage(content=f"We are discussing this document:\n{current_medical_document}")] + history else: logger.info("Processing Text RAG query for Medical domain") logger.info(f"Original Query: '{query}'") print(f"šŸ“š Using chat history with {len(history)} previous messages to create standalone query") standalone_query = get_standalone_question(query, history_for_agent,llm) logger.info(f"Standalone Query: '{standalone_query}'") agent = rag_systems['medical'] if not agent: raise Exception("Medical RAG system is not loaded.") response_dict = agent.answer(standalone_query, chat_history=history_for_agent) answer, thoughts, validation, source = parse_agent_response(response_dict) history.append(HumanMessage(content=query)) history.append(AIMessage(content=answer)) else: raise ValueError("No query or file provided.") except Exception as e: logger.error(f"Error on /medical page: {e}", exc_info=True) answer = f"An error occurred: {e}" thoughts = traceback.format_exc() # Save updated history and LATEST RESPONSE DATA back to the session session['medical_history'] = history session['latest_medical_response'] = { 'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source } session.modified = True logger.debug(f"Redirecting after saving latest response.") return redirect(url_for('medical_page')) @app.route("/medical/clear") def clear_medical_chat(): session.pop('medical_history', None) session.pop('current_medical_document', None) logger.info("Medical chat history cleared.") return redirect(url_for('medical_page')) @app.route("/islamic", methods=["GET", "POST"]) def islamic_page(): #Use session if request.method == "GET": # Load all latest data from session (or default to empty if not found) latest_response = session.pop('latest_islamic_response', {}) # POP to clear it after one display answer = latest_response.get('answer', "") thoughts = latest_response.get('thoughts', "") validation = latest_response.get('validation', "") source = latest_response.get('source', "") # Clear history only when a user first navigates (no latest_response and no current history) if not latest_response and 'islamic_history' not in session: session.pop('islamic_history', None) return render_template("islamic_page.html", history=session.get('islamic_history', []), answer=answer, thoughts=thoughts, validation=validation, source=source) # POST Request Logic answer, thoughts, validation, source = "", "", "", "" history = session.get('islamic_history', []) # This try/except block wraps the ENTIRE POST logic try: query = standardize_query(request.form.get("query", "")) has_image = 'image' in request.files and request.files['image'].filename final_query = query # Default to the original query if has_image: logger.info("Processing Multimodal RAG query for Islamic domain") file = request.files['image'] upload_dir = "Uploads" os.makedirs(upload_dir, exist_ok=True) image_path = os.path.join(upload_dir, file.filename) try: file.save(image_path) file.close() with open(image_path, "rb") as img_file: img_base64 = base64.b64encode(img_file.read()).decode("utf-8") vision_prompt = f"Analyze this image's main subject. User's query is: '{query}'" message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_base64}"}]) visual_prediction = llm.invoke([message]).content enhanced_query = ( f'User Query: "{query}" ' f'Context from an image provided by the LLM: "{visual_prediction}" ' 'Based on the user\'s query and the context from LLM, provide a comprehensive answer.' ) logger.info(f"Create enchanced query : {enhanced_query}") final_query = enhanced_query finally: if os.path.exists(image_path): try: os.remove(image_path) logger.info(f"Successfully cleaned up {image_path}") except PermissionError as e: logger.warning(f"Could not remove {image_path} after processing. " f"File may be locked. Error: {e}") elif query: # Only run text logic if there's a query and no image logger.info("Processing Text RAG query for Islamic domain") standalone_query = get_standalone_question(query, history,llm) logger.info(f"Original Query: '{query}'") print(f"šŸ“š Using chat history with {len(history)} previous messages to create standalone query") logger.info(f"Standalone Query: '{standalone_query}'") final_query = standalone_query if not final_query: raise ValueError("No query or file provided.") agent = rag_systems['islamic'] if not agent: raise Exception("Islamic RAG system is not loaded.") response_dict = agent.answer(final_query, chat_history=history) answer, thoughts , validation, source = parse_agent_response(response_dict) history.append(HumanMessage(content=query)) history.append(AIMessage(content=answer)) except Exception as e: logger.error(f"Error on /islamic page: {e}", exc_info=True) answer = f"An error occurred: {e}" thoughts = traceback.format_exc() # Save updated history and LATEST RESPONSE DATA back to the session session['islamic_history'] = history session['latest_islamic_response'] = { 'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source } session.modified = True logger.debug(f"Redirecting after saving latest response.") return redirect(url_for('islamic_page')) @app.route("/islamic/clear") def clear_islamic_chat(): session.pop('islamic_history', None) logger.info("Islamic chat history cleared.") return redirect(url_for('islamic_page')) @app.route("/insurance", methods=["GET", "POST"]) def insurance_page(): if request.method == "GET" : latest_response = session.pop('latest_insurance_response',{}) answer = latest_response.get('answer', "") thoughts = latest_response.get('thoughts', "") validation = latest_response.get('validation', "") source = latest_response.get('source', "") if not latest_response and 'insurance_history' not in session: session.pop('insurance_history', None) return render_template("insurance_page.html", # You will need to create this HTML file history=session.get('insurance_history', []), answer=answer, thoughts=thoughts, validation=validation, source=source) # POST Request Logic answer, thoughts, validation, source = "", "", "", "" history = session.get('insurance_history', []) try: query = standardize_query(request.form.get("query", "")) if query: logger.info("Processing Text RAG query for Insurance domain") standalone_query = get_standalone_question(query, history, llm) logger.info(f"Original Query: '{query}'") logger.info(f"Standalone Query: '{standalone_query}'") agent = rag_systems['insurance'] if not agent: raise Exception("Insurance RAG system is not loaded.") response_dict = agent.answer(standalone_query, chat_history=history) answer, thoughts, validation, source = parse_agent_response(response_dict) history.append(HumanMessage(content=query)) history.append(AIMessage(content=answer)) else: raise ValueError("No query provided.") except Exception as e: logger.error(f"Error on /insurance page: {e}", exc_info=True) answer = f"An error occurred: {e}" thoughts = traceback.format_exc() session['insurance_history'] = history session['latest_insurance_response'] = { 'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source } session.modified = True logger.debug(f"Redirecting after saving latest response.") return redirect(url_for('insurance_page')) @app.route("/insurance/clear") def clear_insurance_chat(): session.pop('insurance_history', None) logger.info("Insurance chat history cleared.") return redirect(url_for('insurance_page')) @app.route("/about", methods=["GET"]) def about(): return render_template("about.html") @app.route('/metrics/') def get_metrics(domain): """API endpoint to get metrics for a specific domain.""" try: if domain == "medical" and rag_systems['medical']: stats = rag_systems['medical'].metrics_tracker.get_stats() elif domain == "islamic" and rag_systems['islamic']: stats = rag_systems['islamic'].metrics_tracker.get_stats() elif domain == "insurance" and rag_systems['insurance']: stats = rag_systems['insurance'].metrics_tracker.get_stats() elif not rag_systems.get(domain): return jsonify({"error": f"{domain} RAG system not loaded"}), 500 else: return jsonify({"error": "Invalid domain"}), 400 return jsonify(stats) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/metrics/reset/', methods=['POST']) def reset_metrics(domain): """Reset metrics for a domain (useful for testing).""" try: if domain == "medical" and rag_systems['medical']: rag_systems['medical'].metrics_tracker.reset_metrics() elif domain == "islamic" and rag_systems['islamic']: rag_systems['islamic'].metrics_tracker.reset_metrics() elif domain == "insurance" and rag_systems['insurance']: rag_systems['insurance'].metrics_tracker.reset_metrics() elif not rag_systems.get(domain): return jsonify({"error": f"{domain} RAG system not loaded"}), 500 else: return jsonify({"error": "Invalid domain"}), 400 return jsonify({"success": True, "message": f"Metrics reset for {domain}"}) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == "__main__": logger.info("Starting Flask app for deployment testing...") # This port 7860 is what Hugging Face Spaces expects by default app.run(host="0.0.0.0", port=7860, debug=False)