from flask import Flask, request, render_template, session, url_for, redirect, jsonify # from flask_session import Session <--- REMOVED from langchain_core.messages import HumanMessage, AIMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder import os import logging import re import traceback import base64 import shutil import zipfile from dotenv import load_dotenv from huggingface_hub import hf_hub_download from PIL import Image # --- Core Application Imports --- from src.medical_swarm import run_medical_swarm from src.utils import load_rag_system, standardize_query, get_standalone_question, parse_agent_response, markdown_bold_to_html from langchain_google_genai import ChatGoogleGenerativeAI # Setup logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # These are your "customers". You give them a key. # In a real app, this would be in a database. VALID_API_KEYS = { "amelia_key_123": "Dr. Amelia (Premium Plan, Unlimited)", "irfan_key_456": "Irfan (Admin, Unlimited)", "sistem_gelap": "Demo User (Free Tier, 10 requests)" } # --- 1. NEW HELPER FUNCTIONS TO FIX 'TypeError' --- def hydrate_history(raw_history_list: list) -> list: """Converts a list of dicts from session back into LangChain Message objects.""" history = [] if not raw_history_list: return history for item in raw_history_list: if item.get('type') == 'human': history.append(HumanMessage(content=item.get('content', ''))) elif item.get('type') == 'ai': history.append(AIMessage(content=item.get('content', ''))) return history def dehydrate_history(history_messages: list) -> list: """Converts LangChain Message objects into a JSON-serializable list of dicts.""" raw_list = [] for msg in history_messages: if isinstance(msg, HumanMessage): raw_list.append({'type': 'human', 'content': msg.content}) elif isinstance(msg, AIMessage): raw_list.append({'type': 'ai', 'content': msg.content}) return raw_list # --- 2. DATABASE SETUP FUNCTION (For Deployment) --- def setup_database(): """Downloads and unzips the ChromaDB folder from Hugging Face Datasets.""" DATASET_REPO_ID = "WanIrfan/atlast-db" ZIP_FILENAME = "chroma_db.zip" DB_DIR = "chroma_db" if os.path.exists(DB_DIR) and os.listdir(DB_DIR): logger.info("āœ… Database directory already exists. Skipping download.") return logger.info(f"šŸ“„ Downloading database from HF Hub: {DATASET_REPO_ID}") try: zip_path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=ZIP_FILENAME, repo_type="dataset") logger.info(f"šŸ“¦ Unzipping database from {zip_path}...") with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(".") logger.info("āœ… Database setup complete!") if os.path.exists(zip_path): os.remove(zip_path) except Exception as e: logger.error(f"āŒ CRITICAL ERROR setting up database: {e}", exc_info=True) # --- RUN DATABASE SETUP *BEFORE* INITIALIZING THE APP --- setup_database() # --- STANDARD FLASK APP INITIALIZATION --- app = Flask(__name__) app.secret_key = "a_really_strong_static_secret_key_12345" # --- REMOVED flask_session CONFIG --- google_api_key = os.getenv("GOOGLE_API_KEY") if not google_api_key: logger.warning("āš ļø GOOGLE_API_KEY not found.") else: logger.info("GOOGLE_API_KEY loaded successfully.") llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.05, google_api_key=google_api_key) # --- LOAD RAG SYSTEMS (AFTER DB SETUP) --- logger.info("🌟 Starting Multi-Domain AI Assistant...") try: rag_systems = { 'medical': load_rag_system(collection_name="medical_csv_Agentic_retrieval", domain="medical"), 'islamic': load_rag_system(collection_name="islamic_texts_Agentic_retrieval", domain="islamic"), 'insurance': load_rag_system(collection_name="etiqa_Agentic_retrieval", domain="insurance") } except Exception as e: logger.error(f"āŒ FAILED to load RAG systems. Error: {e}", exc_info=True) rag_systems = {'medical': None, 'islamic': None, 'insurance': None} app.rag_systems = rag_systems app.llm = llm logger.info("\nšŸ“Š SYSTEM STATUS:") for domain, system in rag_systems.items(): status = "āœ… Ready" if system else "āŒ Failed (DB missing?)" logger.info(f" {domain}: {status}") # --- FLASK WEB UI ROUTES --- @app.route("/") def homePage(): session.clear() # Clear all keys return render_template("homePage.html") # --- MEDICAL PAGE --- @app.route("/medical", methods=["GET", "POST"]) def medical_page(): if request.method == "GET": latest_response = session.pop('latest_medical_response', {}) return render_template("medical_page.html", history=session.get('medical_history', []), answer=latest_response.get('answer', ""), thoughts=latest_response.get('thoughts', ""), validation=latest_response.get('validation', ""), source=latest_response.get('source', "")) answer, thoughts, validation, source = "", "", "", "" raw_history_list = session.get('medical_history', []) history_for_agent = hydrate_history(raw_history_list) current_medical_document = session.get('current_medical_document', "") query = "" try: query=standardize_query(request.form.get("query", "")) has_image = 'image' in request.files and request.files['image'].filename has_document = 'document' in request.files and request.files['document'].filename if not (query or has_image or has_document): raise ValueError("No query or file provided.") if has_document: logger.info("Processing Document with Medical Swarm") file = request.files['document'] document_text = file.read().decode("utf-8") session['current_medical_document'] = document_text current_medical_document = document_text swarm_answer = run_medical_swarm(current_medical_document, query) answer = markdown_bold_to_html(swarm_answer) thoughts = "Swarm analysis complete." validation = (True, "Swarm output generated.") source = "Medical Swarm" history_for_agent.append(HumanMessage(content=f"[Document Uploaded] Query: '{query}'")) history_for_agent.append(AIMessage(content=answer)) elif has_image : logger.info("Processing Multimodal RAG: Query + Image") file = request.files['image'] upload_dir = "Uploads" os.makedirs(upload_dir, exist_ok=True) image_path = os.path.join(upload_dir, file.filename) try: file.save(image_path); file.close() with open(image_path, "rb") as img_file: img_data = base64.b64encode(img_file.read()).decode("utf-8") vision_prompt = f"Analyze image. Query: '{query}'" message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_data}"}]) visual_prediction = llm.invoke([message]).content enhanced_query = (f'User Query: "{query}" Context from Image: "{visual_prediction}"') agent = rag_systems['medical'] if not agent: raise Exception("Medical RAG system not loaded.") response_dict = agent.answer(enhanced_query, chat_history=history_for_agent) answer, thoughts, validation, source = parse_agent_response(response_dict) history_for_agent.append(HumanMessage(content=query + " [Image Attached]")) history_for_agent.append(AIMessage(content=answer)) finally: if os.path.exists(image_path): try: os.remove(image_path) except Exception as e: logger.warning(f"Could not remove {image_path}. Error: {e}") elif query: history_doc_context = history_for_agent if current_medical_document: history_doc_context = [HumanMessage(content=f"Document Context:\n{current_medical_document}")] + history_for_agent else: logger.info("Processing Text RAG query for Medical domain") standalone_query = get_standalone_question(query, history_doc_context, llm) logger.info(f"Standalone Query : {standalone_query}") agent = rag_systems['medical'] if not agent: raise Exception("Medical RAG system not loaded.") response_dict = agent.answer(standalone_query, chat_history=history_doc_context) answer, thoughts, validation, source = parse_agent_response(response_dict) history_for_agent.append(HumanMessage(content=query)) history_for_agent.append(AIMessage(content=answer)) except Exception as e: logger.error(f"Error on /medical page: {e}", exc_info=True) answer = f"An error occurred: {e}" thoughts = traceback.format_exc() validation = (False, "Exception") source = "Application Error" history_for_agent.append(HumanMessage(content=query if query else "Failed request")) history_for_agent.append(AIMessage(content=answer)) session['medical_history'] = dehydrate_history(history_for_agent) session['latest_medical_response'] = {'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source} session.modified = True logger.info(f"DEBUG: Saving to session: ANSWER='{answer[:50]}...'") return redirect(url_for('medical_page')) @app.route("/medical/clear") def clear_medical_chat(): session.pop('medical_history', None) session.pop('current_medical_document', None) return redirect(url_for('medical_page')) # --- ISLAMIC PAGE --- @app.route("/islamic", methods=["GET", "POST"]) def islamic_page(): if request.method == "GET": latest_response = session.pop('latest_islamic_response', {}) return render_template("islamic_page.html", history=session.get('islamic_history', []), answer=latest_response.get('answer', ""), thoughts=latest_response.get('thoughts', ""), validation=latest_response.get('validation', ""), source=latest_response.get('source', "")) answer, thoughts, validation, source = "", "", "", "" raw_history_list = session.get('islamic_history', []) history_for_agent = hydrate_history(raw_history_list) query = "" try: query = standardize_query(request.form.get("query", "")) has_image = 'image' in request.files and request.files['image'].filename if not (query or has_image): raise ValueError("No query or file provided.") final_query = query if has_image: logger.info("Processing Multimodal RAG query for Islamic domain") file = request.files['image'] upload_dir = "Uploads" os.makedirs(upload_dir, exist_ok=True) image_path = os.path.join(upload_dir, file.filename) try: file.save(image_path); file.close() with open(image_path, "rb") as img_file: img_base64 = base64.b64encode(img_file.read()).decode("utf-8") vision_prompt = f"Analyze image. Query: '{query}'" message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_base64}"}]) visual_prediction = llm.invoke([message]).content final_query = (f'User Query: "{query}" Context from Image: "{visual_prediction}"') finally: if os.path.exists(image_path): try: os.remove(image_path) except Exception as e: logger.warning(f"Could not remove {image_path}. Error: {e}") history_for_agent.append(HumanMessage(content=query + " [Image Attached]")) elif query: logger.info("Processing Text RAG query for Islamic domain") final_query = get_standalone_question(query, history_for_agent, llm) history_for_agent.append(HumanMessage(content=query)) agent = rag_systems['islamic'] if not agent: raise Exception("Islamic RAG system is not loaded.") response_dict = agent.answer(final_query, chat_history=history_for_agent[:-1]) answer, thoughts, validation, source = parse_agent_response(response_dict) history_for_agent.append(AIMessage(content=answer)) except Exception as e: logger.error(f"Error on /islamic page: {e}", exc_info=True) answer = f"An error occurred: {e}"; thoughts = traceback.format_exc(); validation = (False, "Exception"); source = "Application Error" if not (has_image or query): history_for_agent.append(HumanMessage(content="Failed request")) else: history_for_agent.append(HumanMessage(content=query)) history_for_agent.append(AIMessage(content=answer)) session['islamic_history'] = dehydrate_history(history_for_agent) session['latest_islamic_response'] = {'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source} session.modified = True logger.info(f"DEBUG: Saving to session: ANSWER='{answer[:50]}...'") return redirect(url_for('islamic_page')) @app.route("/islamic/clear") def clear_islamic_chat(): session.pop('islamic_history', None) return redirect(url_for('islamic_page')) # --- INSURANCE PAGE --- @app.route("/insurance", methods=["GET", "POST"]) def insurance_page(): if request.method == "GET" : latest_response = session.pop('latest_insurance_response',{}) return render_template("insurance_page.html", history=session.get('insurance_history', []), answer=latest_response.get('answer', ""), thoughts=latest_response.get('thoughts', ""), validation=latest_response.get('validation', ""), source=latest_response.get('source', "")) answer, thoughts, validation, source = "", "", "", "" raw_history_list = session.get('insurance_history', []) history_for_agent = hydrate_history(raw_history_list) query = "" try: query = standardize_query(request.form.get("query", "")) if not query: raise ValueError("No query provided.") standalone_query = get_standalone_question(query, history_for_agent, llm) agent = rag_systems['insurance'] if not agent: raise Exception("Insurance RAG system is not loaded.") response_dict = agent.answer(standalone_query, chat_history=history_for_agent) answer, thoughts, validation, source = parse_agent_response(response_dict) history_for_agent.append(HumanMessage(content=query)) history_for_agent.append(AIMessage(content=answer)) except Exception as e: logger.error(f"Error on /insurance page: {e}", exc_info=True) answer = f"An error occurred: {e}"; thoughts = traceback.format_exc(); validation = (False, "Exception"); source = "Application Error" history_for_agent.append(HumanMessage(content=query)) history_for_agent.append(AIMessage(content=answer)) session['insurance_history'] = dehydrate_history(history_for_agent) session['latest_insurance_response'] = {'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source} session.modified = True logger.debug(f"Redirecting after saving latest response.") return redirect(url_for('insurance_page')) @app.route("/insurance/clear") def clear_insurance_chat(): session.pop('insurance_history', None) return redirect(url_for('insurance_page')) @app.route("/about", methods=["GET"]) def about(): return render_template("about.html") # --- (Metrics routes remain unchanged) --- @app.route('/metrics/') def get_metrics(domain): try: if domain == "medical" and rag_systems['medical']: stats = rag_systems['medical'].metrics_tracker.get_stats() elif domain == "islamic" and rag_systems['islamic']: stats = rag_systems['islamic'].metrics_tracker.get_stats() elif domain == "insurance" and rag_systems['insurance']: stats = rag_systems['insurance'].metrics_tracker.get_stats() elif not rag_systems.get(domain): return jsonify({"error": f"{domain} RAG system not loaded"}), 500 else: return jsonify({"error": "Invalid domain"}), 400 return jsonify(stats) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/metrics/reset/', methods=['POST']) def reset_metrics(domain): try: if domain == "medical" and rag_systems['medical']: rag_systems['medical'].metrics_tracker.reset_metrics() elif domain == "islamic" and rag_systems['islamic']: rag_systems['islamic'].metrics_tracker.reset_metrics() elif domain == "insurance" and rag_systems['insurance']: rag_systems['insurance'].metrics_tracker.reset_metrics() elif not rag_systems.get(domain): return jsonify({"error": f"{domain} RAG system not loaded"}), 500 else: return jsonify({"error": "Invalid domain"}), 400 return jsonify({"success": True, "message": f"Metrics reset for {domain}"}) except Exception as e: return jsonify({"error": str(e)}), 500 # Helper function to check API key API_USAGE = {} def check_api_key(request_data): api_key = request_data.get("api_key") # 1. Check if key exists if not api_key or api_key not in VALID_API_KEYS: return False, {"error": "Invalid API key"}, 401 # 2. Initialize counter for this key if new if api_key not in API_USAGE: API_USAGE[api_key] = 0 # 3. Check Quota (The "Selling" Logic) if api_key == "sistem_gelap" and API_USAGE[api_key] >= 10: logger.warning(f"Quota exceeded for user: {VALID_API_KEYS[api_key]}") return False, {"error": "Quota exceeded. Free tier is limited to 10 requests."}, 429 # 4. Increment Counter API_USAGE[api_key] += 1 logger.info(f"User {VALID_API_KEYS[api_key]} used {API_USAGE[api_key]} requests.") return True, None, None # Helper function to save and process uploaded files (Base64) def process_base64_file(base64_string, file_type): try: # Decode the base64 string file_bytes = base64.b64decode(base64_string) # Save to a temporary file upload_dir = "Uploads" os.makedirs(upload_dir, exist_ok=True) # Use a unique filename temp_filename = f"{file_type}_{int(time.time())}.tmp" temp_path = os.path.join(upload_dir, temp_filename) with open(temp_path, 'wb') as f: f.write(file_bytes) logger.info(f"Saved temporary {file_type} to {temp_path}") return temp_path except Exception as e: logger.error(f"Error decoding/saving base64 file: {e}") return None # --- 3. NEW API-ONLY ROUTES --- @app.route("/api/medical", methods=["POST"]) def medical_api(): try: data = request.json is_valid, error_response, status_code = check_api_key(data) if not is_valid: return jsonify(error_response), status_code query = data.get("query") if not query: return jsonify({"error": "No query provided"}), 400 # Hydrate history from the JSON payload raw_history = data.get("history", []) history_for_agent = hydrate_history(raw_history) agent = rag_systems['medical'] if not agent: return jsonify({"error": "Medical RAG system not loaded"}), 500 # --- Handle File Uploads (Base64) --- enhanced_query = query temp_file_path = None if data.get("document_base64"): logger.info("API: Processing base64 document for Swarm") doc_text = base64.b64decode(data.get("document_base64")).decode('utf-8') swarm_answer = run_medical_swarm(doc_text, query) response_dict = { "answer": markdown_bold_to_html(swarm_answer), "thoughts": "Swarm analysis complete.", "validation": (True, "Swarm output generated."), "source": "Medical Swarm", "response_time": 0 # Not tracked for swarm in this path } return jsonify(response_dict) elif data.get("image_base64"): logger.info("API: Processing base64 image") temp_file_path = process_base64_file(data.get("image_base64"), "image") if not temp_file_path: return jsonify({"error": "Invalid base64 image data"}), 400 with open(temp_file_path, "rb") as img_file: img_data = base64.b64encode(img_file.read()).decode("utf-8") vision_prompt = f"Analyze image. Query: '{query}'" message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_data}"}]) visual_prediction = llm.invoke([message]).content enhanced_query = (f'User Query: "{query}" Context from Image: "{visual_prediction}"') # Run the agent response_dict = agent.answer(enhanced_query, chat_history=history_for_agent) # Clean up temp file if temp_file_path and os.path.exists(temp_file_path): os.remove(temp_file_path) # Return the full, clean JSON response return jsonify(response_dict) except Exception as e: logger.error(f"Error on /api/medical: {e}", exc_info=True) return jsonify({"error": str(e)}), 500 @app.route("/api/islamic", methods=["POST"]) def islamic_api(): try: data = request.json is_valid, error_response, status_code = check_api_key(data) if not is_valid: return jsonify(error_response), status_code query = data.get("query") if not query: return jsonify({"error": "No query provided"}), 400 raw_history = data.get("history", []) history_for_agent = hydrate_history(raw_history) agent = rag_systems['islamic'] if not agent: return jsonify({"error": "Islamic RAG system not loaded"}), 500 enhanced_query = query temp_file_path = None if data.get("image_base64"): logger.info("API: Processing base64 image") temp_file_path = process_base64_file(data.get("image_base64"), "image") if not temp_file_path: return jsonify({"error": "Invalid base64 image data"}), 400 with open(temp_file_path, "rb") as img_file: img_data = base64.b64encode(img_file.read()).decode("utf-8") vision_prompt = f"Analyze image. Query: '{query}'" message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_data}"}]) visual_prediction = llm.invoke([message]).content enhanced_query = (f'User Query: "{query}" Context from Image: "{visual_prediction}"') response_dict = agent.answer(enhanced_query, chat_history=history_for_agent) if temp_file_path and os.path.exists(temp_file_path): os.remove(temp_file_path) return jsonify(response_dict) except Exception as e: logger.error(f"Error on /api/islamic: {e}", exc_info=True) return jsonify({"error": str(e)}), 500 @app.route("/api/insurance", methods=["POST"]) def insurance_api(): try: data = request.json is_valid, error_response, status_code = check_api_key(data) if not is_valid: return jsonify(error_response), status_code query = data.get("query") if not query: return jsonify({"error": "No query provided"}), 400 raw_history = data.get("history", []) history_for_agent = hydrate_history(raw_history) agent = rag_systems['insurance'] if not agent: return jsonify({"error": "Insurance RAG system not loaded"}), 500 response_dict = agent.answer(query, chat_history=history_for_agent) return jsonify(response_dict) except Exception as e: logger.error(f"Error on /api/insurance: {e}", exc_info=True) return jsonify({"error": str(e)}), 500 if __name__ == "__main__": logger.info("Starting Flask app for deployment testing...") app.run(host="0.0.0.0", port=7860, debug=False)