Atlas / app.py
WanIrfan's picture
Update app.py
e8d3d27 verified
raw
history blame
25.6 kB
from flask import Flask, request, render_template, session, url_for, redirect, jsonify
# from flask_session import Session <--- REMOVED
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import os
import logging
import re
import traceback
import base64
import shutil
import zipfile
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download
from PIL import Image
# --- Core Application Imports ---
from src.medical_swarm import run_medical_swarm
from src.utils import load_rag_system, standardize_query, get_standalone_question, parse_agent_response, markdown_bold_to_html
from langchain_google_genai import ChatGoogleGenerativeAI
# Setup logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# These are your "customers". You give them a key.
# In a real app, this would be in a database.
VALID_API_KEYS = {
"anakmail_key_123": "Dr. Amelia (Premium Plan, Unlimited)",
"irfan_key_456": "Irfan (Admin, Unlimited)",
"sistem_gelap": "Demo User (Free Tier, 10 requests)"
}
# --- 1. NEW HELPER FUNCTIONS TO FIX 'TypeError' ---
def hydrate_history(raw_history_list: list) -> list:
"""Converts a list of dicts from session back into LangChain Message objects."""
history = []
if not raw_history_list:
return history
for item in raw_history_list:
if item.get('type') == 'human':
history.append(HumanMessage(content=item.get('content', '')))
elif item.get('type') == 'ai':
history.append(AIMessage(content=item.get('content', '')))
return history
def dehydrate_history(history_messages: list) -> list:
"""Converts LangChain Message objects into a JSON-serializable list of dicts."""
raw_list = []
for msg in history_messages:
if isinstance(msg, HumanMessage):
raw_list.append({'type': 'human', 'content': msg.content})
elif isinstance(msg, AIMessage):
raw_list.append({'type': 'ai', 'content': msg.content})
return raw_list
# --- 2. DATABASE SETUP FUNCTION (For Deployment) ---
def setup_database():
"""Downloads and unzips the ChromaDB folder from Hugging Face Datasets."""
DATASET_REPO_ID = "WanIrfan/atlast-db"
ZIP_FILENAME = "chroma_db.zip"
DB_DIR = "chroma_db"
if os.path.exists(DB_DIR) and os.listdir(DB_DIR):
logger.info("βœ… Database directory already exists. Skipping download.")
return
logger.info(f"πŸ“₯ Downloading database from HF Hub: {DATASET_REPO_ID}")
try:
zip_path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=ZIP_FILENAME, repo_type="dataset")
logger.info(f"πŸ“¦ Unzipping database from {zip_path}...")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(".")
logger.info("βœ… Database setup complete!")
if os.path.exists(zip_path):
os.remove(zip_path)
except Exception as e:
logger.error(f"❌ CRITICAL ERROR setting up database: {e}", exc_info=True)
# --- RUN DATABASE SETUP *BEFORE* INITIALIZING THE APP ---
setup_database()
# --- STANDARD FLASK APP INITIALIZATION ---
app = Flask(__name__)
app.secret_key = "a_really_strong_static_secret_key_12345"
# --- REMOVED flask_session CONFIG ---
google_api_key = os.getenv("GOOGLE_API_KEY")
if not google_api_key:
logger.warning("⚠️ GOOGLE_API_KEY not found.")
else:
logger.info("GOOGLE_API_KEY loaded successfully.")
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.05, google_api_key=google_api_key)
# --- LOAD RAG SYSTEMS (AFTER DB SETUP) ---
logger.info("🌟 Starting Multi-Domain AI Assistant...")
try:
rag_systems = {
'medical': load_rag_system(collection_name="medical_csv_Agentic_retrieval", domain="medical"),
'islamic': load_rag_system(collection_name="islamic_texts_Agentic_retrieval", domain="islamic"),
'insurance': load_rag_system(collection_name="etiqa_Agentic_retrieval", domain="insurance")
}
except Exception as e:
logger.error(f"❌ FAILED to load RAG systems. Error: {e}", exc_info=True)
rag_systems = {'medical': None, 'islamic': None, 'insurance': None}
app.rag_systems = rag_systems
app.llm = llm
logger.info("\nπŸ“Š SYSTEM STATUS:")
for domain, system in rag_systems.items():
status = "βœ… Ready" if system else "❌ Failed (DB missing?)"
logger.info(f" {domain}: {status}")
# --- FLASK WEB UI ROUTES ---
@app.route("/")
def homePage():
session.clear() # Clear all keys
return render_template("homePage.html")
# --- MEDICAL PAGE ---
@app.route("/medical", methods=["GET", "POST"])
def medical_page():
if request.method == "GET":
latest_response = session.pop('latest_medical_response', {})
return render_template("medical_page.html",
history=session.get('medical_history', []),
answer=latest_response.get('answer', ""),
thoughts=latest_response.get('thoughts', ""),
validation=latest_response.get('validation', ""),
source=latest_response.get('source', ""))
answer, thoughts, validation, source = "", "", "", ""
raw_history_list = session.get('medical_history', [])
history_for_agent = hydrate_history(raw_history_list)
current_medical_document = session.get('current_medical_document', "")
query = ""
try:
query=standardize_query(request.form.get("query", ""))
has_image = 'image' in request.files and request.files['image'].filename
has_document = 'document' in request.files and request.files['document'].filename
if not (query or has_image or has_document):
raise ValueError("No query or file provided.")
if has_document:
logger.info("Processing Document with Medical Swarm")
file = request.files['document']
document_text = file.read().decode("utf-8")
session['current_medical_document'] = document_text
current_medical_document = document_text
swarm_answer = run_medical_swarm(current_medical_document, query)
answer = markdown_bold_to_html(swarm_answer)
thoughts = "Swarm analysis complete."
validation = (True, "Swarm output generated.")
source = "Medical Swarm"
history_for_agent.append(HumanMessage(content=f"[Document Uploaded] Query: '{query}'"))
history_for_agent.append(AIMessage(content=answer))
elif has_image :
logger.info("Processing Multimodal RAG: Query + Image")
file = request.files['image']
upload_dir = "Uploads"
os.makedirs(upload_dir, exist_ok=True)
image_path = os.path.join(upload_dir, file.filename)
try:
file.save(image_path); file.close()
with open(image_path, "rb") as img_file:
img_data = base64.b64encode(img_file.read()).decode("utf-8")
vision_prompt = f"Analyze image. Query: '{query}'"
message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_data}"}])
visual_prediction = llm.invoke([message]).content
enhanced_query = (f'User Query: "{query}" Context from Image: "{visual_prediction}"')
agent = rag_systems['medical']
if not agent: raise Exception("Medical RAG system not loaded.")
response_dict = agent.answer(enhanced_query, chat_history=history_for_agent)
answer, thoughts, validation, source = parse_agent_response(response_dict)
history_for_agent.append(HumanMessage(content=query + " [Image Attached]"))
history_for_agent.append(AIMessage(content=answer))
finally:
if os.path.exists(image_path):
try: os.remove(image_path)
except Exception as e: logger.warning(f"Could not remove {image_path}. Error: {e}")
elif query:
history_doc_context = history_for_agent
if current_medical_document:
history_doc_context = [HumanMessage(content=f"Document Context:\n{current_medical_document}")] + history_for_agent
else:
logger.info("Processing Text RAG query for Medical domain")
standalone_query = get_standalone_question(query, history_doc_context, llm)
logger.info(f"Standalone Query : {standalone_query}")
agent = rag_systems['medical']
if not agent: raise Exception("Medical RAG system not loaded.")
response_dict = agent.answer(standalone_query, chat_history=history_doc_context)
answer, thoughts, validation, source = parse_agent_response(response_dict)
history_for_agent.append(HumanMessage(content=query))
history_for_agent.append(AIMessage(content=answer))
except Exception as e:
logger.error(f"Error on /medical page: {e}", exc_info=True)
answer = f"An error occurred: {e}"
thoughts = traceback.format_exc()
validation = (False, "Exception")
source = "Application Error"
history_for_agent.append(HumanMessage(content=query if query else "Failed request"))
history_for_agent.append(AIMessage(content=answer))
session['medical_history'] = dehydrate_history(history_for_agent)
session['latest_medical_response'] = {'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source}
session.modified = True
logger.info(f"DEBUG: Saving to session: ANSWER='{answer[:50]}...'")
return redirect(url_for('medical_page'))
@app.route("/medical/clear")
def clear_medical_chat():
session.pop('medical_history', None)
session.pop('current_medical_document', None)
return redirect(url_for('medical_page'))
# --- ISLAMIC PAGE ---
@app.route("/islamic", methods=["GET", "POST"])
def islamic_page():
if request.method == "GET":
latest_response = session.pop('latest_islamic_response', {})
return render_template("islamic_page.html",
history=session.get('islamic_history', []),
answer=latest_response.get('answer', ""),
thoughts=latest_response.get('thoughts', ""),
validation=latest_response.get('validation', ""),
source=latest_response.get('source', ""))
answer, thoughts, validation, source = "", "", "", ""
raw_history_list = session.get('islamic_history', [])
history_for_agent = hydrate_history(raw_history_list)
query = ""
try:
query = standardize_query(request.form.get("query", ""))
has_image = 'image' in request.files and request.files['image'].filename
if not (query or has_image):
raise ValueError("No query or file provided.")
final_query = query
if has_image:
logger.info("Processing Multimodal RAG query for Islamic domain")
file = request.files['image']
upload_dir = "Uploads"
os.makedirs(upload_dir, exist_ok=True)
image_path = os.path.join(upload_dir, file.filename)
try:
file.save(image_path); file.close()
with open(image_path, "rb") as img_file:
img_base64 = base64.b64encode(img_file.read()).decode("utf-8")
vision_prompt = f"Analyze image. Query: '{query}'"
message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_base64}"}])
visual_prediction = llm.invoke([message]).content
final_query = (f'User Query: "{query}" Context from Image: "{visual_prediction}"')
finally:
if os.path.exists(image_path):
try: os.remove(image_path)
except Exception as e: logger.warning(f"Could not remove {image_path}. Error: {e}")
history_for_agent.append(HumanMessage(content=query + " [Image Attached]"))
elif query:
logger.info("Processing Text RAG query for Islamic domain")
final_query = get_standalone_question(query, history_for_agent, llm)
history_for_agent.append(HumanMessage(content=query))
agent = rag_systems['islamic']
if not agent: raise Exception("Islamic RAG system is not loaded.")
response_dict = agent.answer(final_query, chat_history=history_for_agent[:-1])
answer, thoughts, validation, source = parse_agent_response(response_dict)
history_for_agent.append(AIMessage(content=answer))
except Exception as e:
logger.error(f"Error on /islamic page: {e}", exc_info=True)
answer = f"An error occurred: {e}"; thoughts = traceback.format_exc(); validation = (False, "Exception"); source = "Application Error"
if not (has_image or query): history_for_agent.append(HumanMessage(content="Failed request"))
else: history_for_agent.append(HumanMessage(content=query))
history_for_agent.append(AIMessage(content=answer))
session['islamic_history'] = dehydrate_history(history_for_agent)
session['latest_islamic_response'] = {'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source}
session.modified = True
logger.info(f"DEBUG: Saving to session: ANSWER='{answer[:50]}...'")
return redirect(url_for('islamic_page'))
@app.route("/islamic/clear")
def clear_islamic_chat():
session.pop('islamic_history', None)
return redirect(url_for('islamic_page'))
# --- INSURANCE PAGE ---
@app.route("/insurance", methods=["GET", "POST"])
def insurance_page():
if request.method == "GET" :
latest_response = session.pop('latest_insurance_response',{})
return render_template("insurance_page.html",
history=session.get('insurance_history', []),
answer=latest_response.get('answer', ""),
thoughts=latest_response.get('thoughts', ""),
validation=latest_response.get('validation', ""),
source=latest_response.get('source', ""))
answer, thoughts, validation, source = "", "", "", ""
raw_history_list = session.get('insurance_history', [])
history_for_agent = hydrate_history(raw_history_list)
query = ""
try:
query = standardize_query(request.form.get("query", ""))
if not query:
raise ValueError("No query provided.")
standalone_query = get_standalone_question(query, history_for_agent, llm)
agent = rag_systems['insurance']
if not agent: raise Exception("Insurance RAG system is not loaded.")
response_dict = agent.answer(standalone_query, chat_history=history_for_agent)
answer, thoughts, validation, source = parse_agent_response(response_dict)
history_for_agent.append(HumanMessage(content=query))
history_for_agent.append(AIMessage(content=answer))
except Exception as e:
logger.error(f"Error on /insurance page: {e}", exc_info=True)
answer = f"An error occurred: {e}"; thoughts = traceback.format_exc(); validation = (False, "Exception"); source = "Application Error"
history_for_agent.append(HumanMessage(content=query))
history_for_agent.append(AIMessage(content=answer))
session['insurance_history'] = dehydrate_history(history_for_agent)
session['latest_insurance_response'] = {'answer': answer, 'thoughts': thoughts, 'validation': validation, 'source': source}
session.modified = True
logger.debug(f"Redirecting after saving latest response.")
return redirect(url_for('insurance_page'))
@app.route("/insurance/clear")
def clear_insurance_chat():
session.pop('insurance_history', None)
return redirect(url_for('insurance_page'))
@app.route("/about", methods=["GET"])
def about():
return render_template("about.html")
# --- (Metrics routes remain unchanged) ---
@app.route('/metrics/<domain>')
def get_metrics(domain):
try:
if domain == "medical" and rag_systems['medical']:
stats = rag_systems['medical'].metrics_tracker.get_stats()
elif domain == "islamic" and rag_systems['islamic']:
stats = rag_systems['islamic'].metrics_tracker.get_stats()
elif domain == "insurance" and rag_systems['insurance']:
stats = rag_systems['insurance'].metrics_tracker.get_stats()
elif not rag_systems.get(domain):
return jsonify({"error": f"{domain} RAG system not loaded"}), 500
else:
return jsonify({"error": "Invalid domain"}), 400
return jsonify(stats)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/metrics/reset/<domain>', methods=['POST'])
def reset_metrics(domain):
try:
if domain == "medical" and rag_systems['medical']:
rag_systems['medical'].metrics_tracker.reset_metrics()
elif domain == "islamic" and rag_systems['islamic']:
rag_systems['islamic'].metrics_tracker.reset_metrics()
elif domain == "insurance" and rag_systems['insurance']:
rag_systems['insurance'].metrics_tracker.reset_metrics()
elif not rag_systems.get(domain):
return jsonify({"error": f"{domain} RAG system not loaded"}), 500
else:
return jsonify({"error": "Invalid domain"}), 400
return jsonify({"success": True, "message": f"Metrics reset for {domain}"})
except Exception as e:
return jsonify({"error": str(e)}), 500
# Helper function to check API key
API_USAGE = {}
def check_api_key(request_data):
api_key = request_data.get("api_key")
# 1. Check if key exists
if not api_key or api_key not in VALID_API_KEYS:
return False, {"error": "Invalid API key"}, 401
# 2. Initialize counter for this key if new
if api_key not in API_USAGE:
API_USAGE[api_key] = 0
# 3. Check Quota (The "Selling" Logic)
if api_key == "sistem_gelap" and API_USAGE[api_key] >= 10:
logger.warning(f"Quota exceeded for user: {VALID_API_KEYS[api_key]}")
return False, {"error": "Quota exceeded. Free tier is limited to 10 requests."}, 429
# 4. Increment Counter
API_USAGE[api_key] += 1
logger.info(f"User {VALID_API_KEYS[api_key]} used {API_USAGE[api_key]} requests.")
return True, None, None
# Helper function to save and process uploaded files (Base64)
def process_base64_file(base64_string, file_type):
try:
# Decode the base64 string
file_bytes = base64.b64decode(base64_string)
# Save to a temporary file
upload_dir = "Uploads"
os.makedirs(upload_dir, exist_ok=True)
# Use a unique filename
temp_filename = f"{file_type}_{int(time.time())}.tmp"
temp_path = os.path.join(upload_dir, temp_filename)
with open(temp_path, 'wb') as f:
f.write(file_bytes)
logger.info(f"Saved temporary {file_type} to {temp_path}")
return temp_path
except Exception as e:
logger.error(f"Error decoding/saving base64 file: {e}")
return None
# --- 3. NEW API-ONLY ROUTES ---
@app.route("/api/medical", methods=["POST"])
def medical_api():
try:
data = request.json
is_valid, error_response, status_code = check_api_key(data)
if not is_valid:
return jsonify(error_response), status_code
query = data.get("query")
if not query:
return jsonify({"error": "No query provided"}), 400
# Hydrate history from the JSON payload
raw_history = data.get("history", [])
history_for_agent = hydrate_history(raw_history)
agent = rag_systems['medical']
if not agent:
return jsonify({"error": "Medical RAG system not loaded"}), 500
# --- Handle File Uploads (Base64) ---
enhanced_query = query
temp_file_path = None
if data.get("document_base64"):
logger.info("API: Processing base64 document for Swarm")
doc_text = base64.b64decode(data.get("document_base64")).decode('utf-8')
swarm_answer = run_medical_swarm(doc_text, query)
response_dict = {
"answer": markdown_bold_to_html(swarm_answer),
"thoughts": "Swarm analysis complete.",
"validation": (True, "Swarm output generated."),
"source": "Medical Swarm",
"response_time": 0 # Not tracked for swarm in this path
}
return jsonify(response_dict)
elif data.get("image_base64"):
logger.info("API: Processing base64 image")
temp_file_path = process_base64_file(data.get("image_base64"), "image")
if not temp_file_path:
return jsonify({"error": "Invalid base64 image data"}), 400
with open(temp_file_path, "rb") as img_file:
img_data = base64.b64encode(img_file.read()).decode("utf-8")
vision_prompt = f"Analyze image. Query: '{query}'"
message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_data}"}])
visual_prediction = llm.invoke([message]).content
enhanced_query = (f'User Query: "{query}" Context from Image: "{visual_prediction}"')
# Run the agent
response_dict = agent.answer(enhanced_query, chat_history=history_for_agent)
# Clean up temp file
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
# Return the full, clean JSON response
return jsonify(response_dict)
except Exception as e:
logger.error(f"Error on /api/medical: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
@app.route("/api/islamic", methods=["POST"])
def islamic_api():
try:
data = request.json
is_valid, error_response, status_code = check_api_key(data)
if not is_valid: return jsonify(error_response), status_code
query = data.get("query")
if not query: return jsonify({"error": "No query provided"}), 400
raw_history = data.get("history", [])
history_for_agent = hydrate_history(raw_history)
agent = rag_systems['islamic']
if not agent: return jsonify({"error": "Islamic RAG system not loaded"}), 500
enhanced_query = query
temp_file_path = None
if data.get("image_base64"):
logger.info("API: Processing base64 image")
temp_file_path = process_base64_file(data.get("image_base64"), "image")
if not temp_file_path:
return jsonify({"error": "Invalid base64 image data"}), 400
with open(temp_file_path, "rb") as img_file:
img_data = base64.b64encode(img_file.read()).decode("utf-8")
vision_prompt = f"Analyze image. Query: '{query}'"
message = HumanMessage(content=[{"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_data}"}])
visual_prediction = llm.invoke([message]).content
enhanced_query = (f'User Query: "{query}" Context from Image: "{visual_prediction}"')
response_dict = agent.answer(enhanced_query, chat_history=history_for_agent)
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
return jsonify(response_dict)
except Exception as e:
logger.error(f"Error on /api/islamic: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
@app.route("/api/insurance", methods=["POST"])
def insurance_api():
try:
data = request.json
is_valid, error_response, status_code = check_api_key(data)
if not is_valid: return jsonify(error_response), status_code
query = data.get("query")
if not query: return jsonify({"error": "No query provided"}), 400
raw_history = data.get("history", [])
history_for_agent = hydrate_history(raw_history)
agent = rag_systems['insurance']
if not agent: return jsonify({"error": "Insurance RAG system not loaded"}), 500
response_dict = agent.answer(query, chat_history=history_for_agent)
return jsonify(response_dict)
except Exception as e:
logger.error(f"Error on /api/insurance: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
logger.info("Starting Flask app for deployment testing...")
app.run(host="0.0.0.0", port=7860, debug=False)