irajkoohi's picture
Update all files for deployment
af8f7f3
"""
uv init
uv venv --python 3.12
source .venv/bin/activate
uv pip install -r requirements.txt
"""
# Note: HuggingFace Spaces reads configuration from the README.md frontmatter, not from a separate YAML file.
# The config.yaml is for your reference/organization, but the actual Space config must remain in README.md.
# The Space was created with Docker SDK and README.md frontmatter specifies sdk: docker:
# huggingface-cli repo create Agentic_Rag3_dep_space --type space --space_sdk docker
# Without Docker, we use the Gradio SDK option in README.md frontmatter:
# ---
# sdk: gradio
# sdk_version: "6.0.1"
# python_version: "3.12"
# app_file: app.py
# ---
# Or:
# huggingface-cli repo create Agentic_Rag3_dep_space --type space --space_sdk gradio
# AGENT DEPLOYMENT NOTES:
# =====================
# - Local Environment: Uses Ollama (llama3.2) for development
# - HF Space Environment: Uses Llama-3.2-3B-Instruct (cloud API) for production
# - Environment Auto-Detection: Via SPACE_ID environment variable
# - Agent Tools Available: Document listing, counting, RAG search
# - Storage: Temporary (files lost on restart) or persistent (paid plans)
# - UI Features: Tool-powered sample questions, environment indicators
# - Security: Token stored as Space secret (HF_token), not in code
# - Space URL: https://huggingface.co/spaces/irajkoohi/Agentic_Rag3_dep_space
# A) If you want to run app.py locally:
"""
cd /Users/ik/UVcodes/Deployed_Agents_4 && clear && lsof -ti:7860 | xargs kill -9 2>/dev/null; sleep 2 && source .venv/bin/activate && python app.py
"""
# B) If you want to run app.py on Hugging Face Space:
"""
https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space
"""
# Create and Upload RAG Agent to HF Space Agentic_Rag4_dep_space (Docker SDK)
"""
# huggingface-cli repo create Agentic_Rag4_dep_space --type space --space_sdk docker 2>&1
Create new token with Write role at: https://huggingface.co/settings/tokens
Add token to Space secrets at: https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space/settings
clear
rm -rf Agentic_Rag4_dep_space && git clone https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space
cd /Users/ik/UVcodes/Deployed_Agents_4/Agentic_Rag4_dep_space && cp ../app.py . && cp ../helpers_SHARED.py . && cp ../helpers_HF.py . && cp ../helpers_LOCAL.py . && cp ../requirements.txt . && cp ../README.md . && cp ../Dockerfile . && cp ../config.yaml .
mkdir -p data/embeddings
git add . && git commit -m "Deploy RAG Agent with Dockerfile to HF space"
git push --force
https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space
"""
# if you want to upload all files:
"""
clear
cd /Users/ik/UVcodes/Deployed_Agents_4/Agentic_Rag4_dep_space
cp ../app.py .
cp ../helpers_SHARED.py .
cp ../helpers_HF.py .
cp ../helpers_LOCAL.py .
cp ../requirements.txt .
# cp ../README.md .
cp ../Dockerfile .
cp ../config.yaml .
git add .
git commit -m "Update all files"
git push --force
"""
# If you want to delete all files on HF space
"""
cd /Users/ik/UVcodes/Deployed_Agents_4
rm -rf Agentic_Rag4_dep_space && git clone https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space
cd Agentic_Rag4_dep_space && find . -maxdepth 1 -not -name '.git' -not -name '.' -delete
rm -rf data embeddings
git add -A && git commit -m "Remove all files to clean the space"
git push
ls -la && pwd
"""
# If you want to delete a space on HF website
"""
1. Go to: https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space/settings
2. Scroll down to "Delete this Space"
4. Type: irajkoohi/Agentic_Rag4_dep_space
4. Click "Delete"
"""
# if you want to sync changes of some files (like app.py and helpers_SHARED.py):
"""
cp ../app.py . && cp ../helpers_SHARED.py .
git add app.py helpers_SHARED.py
git commit -m "Sync app.py and helpers_SHARED.py with latest changes" && git push
"""
#%%
import os
import shutil
import warnings
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, HTTPException
from pydantic import BaseModel
import gradio as gr
# Suppress warnings for cleaner output on HF Spaces
warnings.filterwarnings("ignore", category=UserWarning)
# Fix event loop issues on HF Spaces
if os.getenv("SPACE_ID") is not None:
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# ============================================================================
# IMPORT FROM HELPER MODULES
# ============================================================================
from helpers_SHARED import (
# Configuration
CONFIG, IS_HF_SPACE, DATA_DIR, EMBEDDINGS_DIR,
HAS_PERSISTENT_STORAGE, STORAGE_WARNING,
# Memory functions
add_to_memory, get_memory_context, search_memory, clear_memory,
# Utility functions
get_timestamp, create_elapsed_timer, format_progress_bar,
# PDF helpers
get_pdf_list, get_pdf_list_ui, make_pdf_dropdown,
# Vectorstore
build_vectorstore, get_vectorstore, set_vectorstore, embeddings,
# Agent tools
AGENT_TOOLS, list_documents, count_documents, search_documents,
# Sample questions
SAMPLE_Q1, SAMPLE_Q2, SAMPLE_Q3, SAMPLE_Q4, SAMPLE_Q5, SAMPLE_Q6, SAMPLE_Q7,
)
from helpers_SHARED import floating_progress_bar_html
# Import environment-specific helpers
if IS_HF_SPACE:
from helpers_HF import (
init_hf_llm, hf_generate_chat_response, hf_generate_text_response,
get_hf_client, get_hf_llm_name
)
# Initialize HF LLM (default model from config)
hf_client, LLM_NAME = init_hf_llm(CONFIG["hf_model"] if "hf_model" in CONFIG else None)
ollama_llm = None
agent_executor = None
else:
from helpers_LOCAL import (
init_ollama_llm, ollama_generate_response, run_agent,
create_langchain_agent, get_ollama_llm, get_local_llm_name, get_agent_executor
)
# Initialize Ollama LLM
ollama_llm, LLM_NAME = init_ollama_llm()
hf_client = None
# Create directories
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(EMBEDDINGS_DIR, exist_ok=True)
# Build initial vectorstore
vs = build_vectorstore()
# Create agent (local only)
if not IS_HF_SPACE and ollama_llm is not None:
agent_executor = create_langchain_agent()
else:
agent_executor = None
# Debug: Print initial state
print(f"🐛 DEBUG: Initial vectorstore state: {vs is not None}")
print(f"🐛 DEBUG: IS_HF_SPACE: {IS_HF_SPACE}")
print(f"🐛 DEBUG: DATA_DIR: {DATA_DIR}")
print(f"🐛 DEBUG: EMBEDDINGS_DIR: {EMBEDDINGS_DIR}")
if IS_HF_SPACE:
print(f"🐛 DEBUG: /data exists: {os.path.exists('/data')}")
print(f"🐛 DEBUG: HF token available: {os.getenv('HF_token') is not None}")
print(f"🐛 DEBUG: LLM available: {(hf_client is not None) if IS_HF_SPACE else (ollama_llm is not None)}")
# ============================================================================
# FASTAPI APP (FastAPI is only used for local runs, not on HuggingFace Spaces)
# ============================================================================
app = FastAPI(title="RAG Chatbot API")
class Prompt(BaseModel):
prompt: str
@app.get("/pdfs")
# get_pdf_list() returns list of pdf files
def list_pdfs():
return {"pdfs": get_pdf_list()}
@app.post("/upload") # The @app.post("/upload") decorator in FastAPI means that the upload_pdf function will handle HTTP POST requests sent to the /upload endpoint.
# read pdf files and save to DATA_DIR, and rebuild vectorstore (embeddins completed here)
async def upload_pdf(file: UploadFile = File(...)):
if not file.filename or not file.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDFs allowed.")
filepath = os.path.join(DATA_DIR, file.filename)
with open(filepath, "wb") as f:
f.write(await file.read())
build_vectorstore(force_rebuild=True)
return {"message": f"Added {file.filename}. Embeddings updated."}
@app.delete("/delete/{filename}")
# deletes selected filename, and updates vectorstore
def delete_pdf(filename: str):
if filename not in get_pdf_list():
raise HTTPException(status_code=404, detail="PDF not found.")
filepath = os.path.join(DATA_DIR, filename)
os.remove(filepath)
build_vectorstore(force_rebuild=True)
return {"message": f"Deleted {filename}. Embeddings updated."}
@app.post("/generate")
# docs includes relevant documents retrieved from vectorstore
# full_prompt is the augmented prompt with context
def generate_response(prompt: Prompt):
global vs
vs = get_vectorstore()
if vs is None:
raise HTTPException(status_code=400, detail="No PDFs loaded.")
# Retrieve relevant docs (limit context size)
retriever = vs.as_retriever(search_kwargs={"k": CONFIG["search_k"]})
docs = retriever.invoke(prompt.prompt)
# Use all retrieved chunks
context = "\n\n".join([doc.page_content for doc in docs])
# Augment prompt
full_prompt = (
"Answer the following question based ONLY on the context provided below.\n"
"If the answer is not present in the context, reply exactly with: 'I don't know.'\n"
"Do NOT make up or guess any information that is not explicitly in the context.\n\n"
"Your answer MUST be a concise summary, listing the main topics or key points found in the context.\n"
"If the question asks for a list, provide a bulleted or numbered list.\n\n"
f"Context:\n{context}\n\n"
f"Question: {prompt.prompt}\n\n"
"Answer:"
)
try:
if IS_HF_SPACE and hf_client is not None:
response = hf_generate_text_response(full_prompt, context, hf_client) # temprature is set to 0 in "check hf_generate_text_response()"
return {"response": response}
elif not IS_HF_SPACE and ollama_llm is not None:
print(f"Generating response with Ollama ({LLM_NAME})...")
try:
response = ollama_llm.invoke(full_prompt, temperature=0)
print(f"✓ Success! Response generated.")
return {"response": response}
except Exception as ollama_error:
print(f"❌ Ollama error: {str(ollama_error)}")
return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."}
else:
return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."}
except Exception as e:
print(f"LLM failed: {str(e)}")
return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."}
@app.get("/refresh")
# "force_rebuild=True" rebuils vectorstore embeddings from scratch, even if an existing vectorstore is present
def refresh_embeddings():
build_vectorstore(force_rebuild=True)
return {"message": "Embeddings refreshed."}
# ============================================================================
# GRADIO UI FUNCTIONS
# ============================================================================
# When a user selects and uploads files in the UI, Gradio automatically provides the list of uploaded file objects as the "files" argument to add_pdf.
# add_pdf() is called by "file_upload.upload()"
def add_pdf(files): # files: list of file objects
if files is None or len(files) == 0:
return (
make_pdf_dropdown(),
"No files selected.",
"",
"\n".join(get_pdf_list())
)
start_time = datetime.now()
get_elapsed = create_elapsed_timer(start_time)
results = []
total_files = len(files)
upload_log = []
upload_log.append(f"[{get_timestamp()}] Starting upload process for {total_files} file(s)")
prev_embedded_files = get_pdf_list()
for i, file_obj in enumerate(files, 1): # i starts from 1
filename = os.path.basename(file_obj.name) # basename returns the final component of a file path, which is typically the filename itself
progress_percent = int((i * 2 - 1) / (total_files * 2) * 100)
status_msg = f"📤 Uploading {i}/{total_files}: {filename}..."
progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg)
upload_log.append(f"[{get_timestamp()}] Uploading file {i}: {filename}")
# Always use the list before upload until embedding completes
# A dropdown is a Gradio UI component that lets users select one (or more) options from a list.
# here, we use it to show the list of currently embedded PDF files.
# we can select some of those files for deletion later.
yield (
make_pdf_dropdown(),
"\n".join(results) if results else "Starting upload...",
progress_display,
"\n".join(prev_embedded_files)
)
try:
dest_path = os.path.join(DATA_DIR, filename)
shutil.copy2(file_obj.name, dest_path) # copy2 preserves file metadata (like modification time).
results.append(f"✓ {filename} uploaded") # We need to return (or yield) results, so the Gradio UI can show upload progress and status messages to the user in make_pdf_dropdown()
upload_log.append(f"[{get_timestamp()}] Uploading file {i} completed")
progress_percent = int(((i * 2) - 1) / (total_files * 2) * 100)
status_msg = f"🧠 Creating embeddings for {filename}..."
progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg)
upload_log.append(f"[{get_timestamp()}] Embedding file {i}: {filename}")
# Do NOT update embedded files before embedding completes
yield (
make_pdf_dropdown(),
"\n".join(results),
progress_display,
"\n".join(prev_embedded_files)
)
try:
build_vectorstore(force_rebuild=True)
results[-1] = f"✅ {filename} (uploaded & embedded)"
upload_log.append(f"[{get_timestamp()}] Embedding file {i} completed")
upload_log.append("")
# Show progress bar after embedding completes
progress_percent = int((i * 2) / (total_files * 2) * 100)
status_msg = f"✅ Embedded {i}/{total_files}: {filename}"
progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg)
# Update embedded files to show the new file immediately after embedding completes
new_embedded_files = get_pdf_list()
yield (
make_pdf_dropdown(),
"\n".join(results),
progress_display,
"\n".join(new_embedded_files)
)
except Exception as embed_error:
results[-1] = f"⚠️ {filename} (uploaded, embedding error: {str(embed_error)})"
upload_log.append(f"[{get_timestamp()}] Embedding file {i} failed")
upload_log.append("")
completed_progress = int((i * 2) / (total_files * 2) * 100)
status_msg = f"⚠️ File {i}/{total_files} completed with error: {filename}"
progress_display = format_progress_bar(get_elapsed(), completed_progress, status_msg)
# Do NOT update embedded files if embedding failed
yield (
make_pdf_dropdown(),
"\n".join(results),
progress_display,
"\n".join(get_pdf_list())
)
except Exception as e:
results.append(f"❌ {filename}: {str(e)}")
upload_log.append(f"[{get_timestamp()}] Uploading file {i} failed")
final_message = "\n".join(results)
final_progress = format_progress_bar(get_elapsed(), 100, f"🎉 All done! Processed {len(files)} file(s) successfully")
upload_log.append(f"[{get_timestamp()}] All {len(files)} file(s) completed")
# Only show fully embedded files in the Available Embedded Files window
# Reset the progress bar to its original empty state after completion (like delete)
yield (
make_pdf_dropdown(),
final_message,
"",
"\n".join(get_pdf_list())
)
# yield: "Send this update to the UI, but keep the function alive for possible further steps."
# return: "Stop here; no more updates or actions are needed."
def delete_pdf_ui(selected_pdf):
import time
available_files = get_pdf_list()
if not available_files:
# Disable delete button if no files are available
yield make_pdf_dropdown(disabled=True), "No embedded files to delete.", ""
return
if not selected_pdf:
# Hide overlay if nothing to delete
yield make_pdf_dropdown(), "\n".join(available_files), ""
return
# Show progress bar immediately on click
bar = format_progress_bar("", 0, "Preparing to delete files...")
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar
# Support both single and multiple selection
if isinstance(selected_pdf, str):
selected_files = [selected_pdf]
else:
selected_files = list(selected_pdf)
total_files = len(selected_files)
for idx, file_name in enumerate(selected_files, 1):
file_path = os.path.join(DATA_DIR, file_name)
# Remove file and all leftovers (e.g., embeddings) before advancing progress
deleted = False
leftovers_removed = False
# Remove file
if os.path.exists(file_path):
try:
os.remove(file_path)
deleted = True
except Exception:
deleted = False
# Remove leftovers (add your per-file embedding removal logic here if needed)
# Example: remove embedding file if it exists (customize as needed)
embedding_path = os.path.join(EMBEDDINGS_DIR, file_name + ".embedding")
if os.path.exists(embedding_path):
try:
os.remove(embedding_path)
leftovers_removed = True
except Exception:
leftovers_removed = False
else:
leftovers_removed = True # No leftovers to remove
# Only advance progress bar after both file and leftovers are deleted
if deleted and leftovers_removed:
build_vectorstore(force_rebuild=True)
percent = int(idx / total_files * 100) if total_files else 100
bar = format_progress_bar("", percent, f"Deleted {idx}/{total_files}: {file_name}")
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar
else:
bar = format_progress_bar("", int(idx / total_files * 100) if total_files else 100, f"⚠️ Error deleting {file_name}")
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar
time.sleep(0.2)
# Clear progress bar after all deletions
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), ""
# Enables "Delete All" button only if there are files to delet, otherwise disables it
def toggle_delete_all_btn():
# Check if there is at least one file in Available Embedded Files
files = get_pdf_list()
return gr.update(interactive=bool(files))
def delete_all_files():
import time
all_files = get_pdf_list()
if not all_files:
bar = format_progress_bar("", 0, "No files to delete.")
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar
return
bar = format_progress_bar("", 0, "Preparing to delete all files...")
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar
total_files = len(all_files)
for idx, file_name in enumerate(all_files, 1):
file_path = os.path.join(DATA_DIR, file_name)
deleted = False
leftovers_removed = False
if os.path.exists(file_path):
try:
os.remove(file_path)
deleted = True
except Exception:
deleted = False
embedding_path = os.path.join(EMBEDDINGS_DIR, file_name + ".embedding")
if os.path.exists(embedding_path):
try:
os.remove(embedding_path)
leftovers_removed = True
except Exception:
leftovers_removed = False
else:
leftovers_removed = True
if deleted and leftovers_removed:
build_vectorstore(force_rebuild=True)
percent = int(idx / total_files * 100) if total_files else 100
bar = format_progress_bar("", percent, f"Deleted {idx}/{total_files}: {file_name}")
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar
else:
bar = format_progress_bar("", int(idx / total_files * 100) if total_files else 100, f"⚠️ Error deleting {file_name}")
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar
time.sleep(0.2)
yield make_pdf_dropdown(), "\n".join(get_pdf_list()), ""
def show_delete_all_warning():
return (
gr.Markdown("**⚠️ Are you sure you want to delete ALL files? This cannot be undone. Click 'Confirm Delete All' to proceed.**", visible=True),
gr.update(interactive=True, visible=True)
)
def hide_delete_all_warning():
return (
gr.Markdown(visible=False),
gr.update(interactive=False, visible=False)
)
def analyze_query_and_use_tools(query: str) -> str:
"""Analyze query and use appropriate tools to gather information."""
query_lower = query.lower()
results = []
# Check for memory-related queries first
memory_keywords = ["remember", "earlier", "before", "previous", "last time", "we discussed",
"you said", "i asked", "conversation", "history", "recall", "what did we"]
if any(word in query_lower for word in memory_keywords):
print(f"🧠 Memory query detected, fetching conversation history...")
memory_result = get_memory_context(last_n=10)
if memory_result and "No previous conversation" not in memory_result:
results.append(f"📝 **Conversation History:**\n{memory_result}")
search_result = search_memory(query)
if search_result and "No conversation history" not in search_result:
results.append(f"🔍 **Relevant Past Discussions:**\n{search_result}")
if results:
return "\n\n".join(results)
# Try using LangGraph agent (local only)
if not IS_HF_SPACE and agent_executor is not None:
agent_result = run_agent(query)
if agent_result:
return agent_result
# Fallback: Manual tool routing
try:
if any(word in query_lower for word in ["what documents", "list documents", "available documents", "what files", "documents do i have"]):
results.append(list_documents.invoke({}))
if any(word in query_lower for word in ["how many", "count", "number of documents"]):
results.append(count_documents.invoke({}))
results.append(search_documents.invoke({"query": query}))
return "\n\n".join(results) if results else "No relevant information found."
except Exception as e:
return f"Error analyzing query: {str(e)}"
def chat_response(message, history):
"""Agent-enhanced chat response function with visual progress tracking."""
global vs
if not message:
return history, "", "💬 Ready for your question"
start_time = datetime.now()
get_elapsed = create_elapsed_timer(start_time)
if not isinstance(history, list):
history = []
history.append({"role": "user", "content": str(message)})
add_to_memory("user", message)
try:
yield (history, "", format_progress_bar(get_elapsed(), 33, "🔍 Analyzing your question...", bar_length=15))
print(f"🤖 Agent analyzing query: {message}")
try:
pdf_files = get_pdf_list()
print(f"🐛 DEBUG: PDF files available: {len(pdf_files)} - {pdf_files}")
print(f"🐛 DEBUG: Global vectorstore state: {get_vectorstore() is not None}")
except Exception as debug_error:
print(f"🐛 DEBUG ERROR: {str(debug_error)}")
try:
tool_results = analyze_query_and_use_tools(message)
print(f"🔧 Tool results: {tool_results[:100]}...")
except Exception as tool_error:
error_msg = f"❌ Tool execution failed: {str(tool_error)}"
print(error_msg)
history.append({"role": "assistant", "content": error_msg})
yield (history, "", f"{get_elapsed()} | [100%] ❌ Error during tool execution")
return
yield (history, "", format_progress_bar(get_elapsed(), 66, "🧠 Generating intelligent response...", bar_length=15))
try:
memory_context = get_memory_context(last_n=5)
llm_prompt = f"""
You are a helpful AI assistant with access to both the user's previous conversations and relevant document excerpts.
When answering, always:
- Use information from the provided conversation history and document excerpts.
- If the answer is not found in either, reply with: "I don't know."
- Do not make up or guess information.
- Provide concise, accurate, and context-aware answers.
- If the question asks for a list, use bullet points or numbers.
Conversation History:
{memory_context}
Document Excerpts:
{tool_results}
Question: {message}
Answer:
"""
if IS_HF_SPACE and hf_client is not None:
result = hf_generate_chat_response(llm_prompt, hf_client)
if result is None:
result = tool_results
elif not IS_HF_SPACE and ollama_llm is not None:
result = ollama_generate_response(llm_prompt, ollama_llm)
if result is None:
result = tool_results
else:
result = tool_results
print("ℹ️ No LLM available, returning tool results")
except Exception as llm_error:
print(f"❌ LLM processing error: {str(llm_error)}")
result = tool_results
result_str = str(result.content) if hasattr(result, 'content') else str(result)
history.append({"role": "assistant", "content": result_str})
add_to_memory("assistant", result_str)
yield (history, "", format_progress_bar(get_elapsed(), 100, "✅ Response generated successfully!", bar_length=15))
# Reset AI Processing Progress to original state
yield (history, "", "💬 Ready for your question")
except Exception as e:
error_msg = f"🚫 System error: {str(e)}\n\nPlease try again or upload your documents again."
print(f"💥 CRITICAL ERROR: {str(e)}")
import traceback
traceback.print_exc()
history.append({"role": "assistant", "content": error_msg})
yield (history, "", f"{get_elapsed()} | [100%] ❌ System error occurred")
def refresh_embeddings_ui():
"""Refresh embeddings directly"""
try:
build_vectorstore(force_rebuild=True)
return make_pdf_dropdown(), "Embeddings refreshed."
except Exception as e:
return make_pdf_dropdown(), f"Error refreshing embeddings: {str(e)}"
def clear_chat_and_memory():
"""Clear chat history and conversation memory."""
clear_memory()
return [], "", "💬 Chat and memory cleared. Ready for your question"
# ============================================================================
# GRADIO UI
# ============================================================================
ENV_NAME = "🌐 HuggingFace Space" if IS_HF_SPACE else "💻 Local Environment"
ENV_COLOR = "#FF6B6B" if IS_HF_SPACE else "#4ECDC4"
with gr.Blocks(title="RAG Agent Chatbot") as demo:
gr.Markdown(f"# 🤖 RAG Agent - AI Assistant with Tools\nUpload PDFs and interact with an intelligent agent that can search, analyze, and answer questions about your documents.")
if not IS_HF_SPACE:
from helpers_LOCAL import get_installed_llms, init_ollama_llm, create_langchain_agent
llm_choices = get_installed_llms()
if llm_choices:
llm_dropdown = gr.Dropdown(
label="Select Local LLM",
choices=llm_choices,
value=LLM_NAME if LLM_NAME in llm_choices else (llm_choices[0] if llm_choices else None),
interactive=True,
visible=True
)
current_llm_display = gr.Markdown(f"**Current LLM:** {LLM_NAME if LLM_NAME else ''}", elem_id="current-llm-display", visible=True)
top_banner = gr.Markdown(
f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>{LLM_NAME if LLM_NAME else 'None'}</span> | Agent: ✅ Active</div>",
elem_id="top-llm-banner"
)
def update_llm(selected_label):
global ollama_llm, LLM_NAME, agent_executor
if selected_label:
try:
ollama_llm, LLM_NAME = init_ollama_llm(selected_label)
agent_executor = create_langchain_agent()
banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>{selected_label}</span> | Agent: ✅ Active</div>"
return (
gr.Markdown(f"**Current LLM:** {selected_label}", elem_id="current-llm-display"),
banner_html
)
except Exception as e:
ollama_llm = None
LLM_NAME = None
agent_executor = None
banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>"
return (
gr.Markdown(f"**Current LLM:** (Error initializing {selected_label})", elem_id="current-llm-display"),
banner_html
)
banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>"
return gr.Markdown("", elem_id="current-llm-display"), banner_html
llm_dropdown.change(
fn=lambda label: update_llm(label),
inputs=[llm_dropdown],
outputs=[current_llm_display, top_banner]
)
else:
gr.Markdown(
"<div style='background-color: #ffcccc; padding: 10px; border-radius: 5px; text-align: center; color: #b30000; font-weight: bold;'>&#9888; <b>No local LLMs are installed.</b> Please install an Ollama model to enable LLM selection and chat capabilities.</div>"
)
llm_dropdown = gr.Dropdown(
label="Select Local LLM",
choices=[],
value=None,
interactive=False,
visible=True
)
current_llm_display = gr.Markdown(f"**Current LLM:** None", elem_id="current-llm-display", visible=True)
gr.Markdown(f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>")
else:
# --- Hugging Face Space: dynamic LLM selection ---
# Static list of free, popular LLMs on HF Inference API (can be expanded)
hf_llm_choices = [
"Llama-3.2-3B-Instruct",
"mistralai/Mistral-7B-Instruct-v0.2",
"meta-llama/Meta-Llama-3-8B-Instruct",
"google/gemma-7b-it",
"HuggingFaceH4/zephyr-7b-beta",
"Qwen/Qwen1.5-7B-Chat",
"tiiuae/falcon-7b-instruct"
]
default_llm = "Llama-3.2-3B-Instruct"
llm_dropdown = gr.Dropdown(
label="Select HF LLM",
choices=hf_llm_choices,
value=LLM_NAME if LLM_NAME in hf_llm_choices else default_llm,
interactive=True,
visible=True
)
current_llm_display = gr.Markdown(f"**Current LLM:** {LLM_NAME if LLM_NAME else default_llm}", elem_id="current-llm-display", visible=True)
top_banner = gr.Markdown(
f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>{LLM_NAME if LLM_NAME else default_llm}</span> | Agent: ✅ Active</div>",
elem_id="top-llm-banner"
)
def update_hf_llm(selected_label):
global hf_client, LLM_NAME
from helpers_HF import init_hf_llm
if selected_label:
try:
hf_client, LLM_NAME = init_hf_llm(selected_label)
banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>{selected_label}</span> | Agent: ✅ Active</div>"
return (
gr.Markdown(f"**Current LLM:** {selected_label}", elem_id="current-llm-display"),
banner_html
)
except Exception as e:
hf_client = None
LLM_NAME = None
banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>"
return (
gr.Markdown(f"**Current LLM:** (Error initializing {selected_label})", elem_id="current-llm-display"),
banner_html
)
banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>"
return gr.Markdown("", elem_id="current-llm-display"), banner_html
llm_dropdown.change(
fn=lambda label: update_hf_llm(label),
inputs=[llm_dropdown],
outputs=[current_llm_display, top_banner]
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📁 File Management")
if IS_HF_SPACE and STORAGE_WARNING:
gr.Markdown(f"**Storage Status:** {STORAGE_WARNING}")
file_upload = gr.File(
label="Upload Files (Multiple files supported)",
file_types=[".pdf"],
file_count="multiple"
)
upload_status = gr.Textbox(
label="Upload Status",
value="",
interactive=False,
lines=8,
max_lines=8,
autoscroll=True
)
with gr.Row():
progress_bar = gr.Textbox(
label="Uploading Progress",
value="",
interactive=False,
lines=1,
max_lines=1,
autoscroll=True
)
delete_progress_bar = gr.Textbox(
label="Deleting Progress",
value="",
interactive=False,
lines=1,
max_lines=1,
autoscroll=True
)
embedded_files = gr.Textbox(
label="Available Embedded Files",
value="\n".join(get_pdf_list()),
interactive=False,
lines=8,
max_lines=8,
autoscroll=True
)
with gr.Row():
pdf_dropdown = gr.Dropdown(
label="Select Files to Delete",
choices=get_pdf_list_ui(),
interactive=True,
allow_custom_value=False,
multiselect=True
)
delete_btn = gr.Button("🗑️ Delete Files", variant="stop", interactive=False)
delete_all_btn = gr.Button("🗑️ Delete All", variant="stop", interactive=bool(get_pdf_list()))
delete_all_warning = gr.Markdown(visible=False)
confirm_delete_all_btn = gr.Button("Confirm Delete All", variant="stop", interactive=True, visible=False)
delete_progress_overlay = gr.HTML(floating_progress_bar_html())
with gr.Column(scale=4):
gr.Markdown("### 🤖 AI Agent Chat")
gr.Markdown("**Agent Capabilities:** Search documents, list files, count documents, intelligent reasoning")
chatbot = gr.Chatbot(height=CONFIG["chatbot_height"], layout="bubble")
if IS_HF_SPACE and not HAS_PERSISTENT_STORAGE:
gr.Markdown("⚠️ **Storage Notice:** Files are temporary and will be lost when Space restarts. To enable persistent storage, upgrade to a paid plan in Settings → Hardware.")
gr.Markdown("**🛠️ Agent Commands - Try these tool-powered queries:**")
with gr.Row():
sample1 = gr.Button(f"📋 {SAMPLE_Q1}", size="sm")
sample2 = gr.Button(f"🔍 {SAMPLE_Q2}", size="sm")
sample3 = gr.Button(f"📊 {SAMPLE_Q3}", size="sm")
with gr.Row():
sample4 = gr.Button(f"🧠 {SAMPLE_Q4}", size="sm")
sample5 = gr.Button(f"🍳 {SAMPLE_Q5}", size="sm")
sample6 = gr.Button(f"🧠 {SAMPLE_Q6}", size="sm")
sample7 = gr.Button(f"📝 {SAMPLE_Q7}", size="sm")
msg_input = gr.Textbox(
placeholder="Ask a question about your PDFs...",
label="Ask about your PDFs",
show_label=False
)
ai_status = gr.Textbox(
label="📊 AI Processing Progress",
value="💬 Ready for your question",
interactive=False,
placeholder="AI processing status with progress tracking..."
)
with gr.Row():
submit_btn = gr.Button("Send", variant="primary", scale=1, interactive=False)
clear_btn = gr.Button("Clear", scale=1, interactive=False)
# Event handlers
# outputs=[pdf_dropdown, upload_status, progress_bar, embedded_files] means:
# that the function add_pdf will return (or yield) a tuple of four values, and each value will be used to update the corresponding UI component
# here for example "embedded_files" in a UI Textbox component eith the label "Available Embedded Files"
file_upload.upload(
fn=add_pdf,
inputs=[file_upload],
outputs=[pdf_dropdown, upload_status, progress_bar, embedded_files]
)
delete_btn.click(
fn=delete_pdf_ui,
inputs=[pdf_dropdown],
outputs=[pdf_dropdown, embedded_files, delete_progress_bar]
)
delete_all_btn.click(
fn=show_delete_all_warning,
inputs=[],
outputs=[delete_all_warning, confirm_delete_all_btn]
)
# Update Delete All button state when files change
demo.load(fn=toggle_delete_all_btn, outputs=[delete_all_btn])
pdf_dropdown.change(fn=toggle_delete_all_btn, outputs=[delete_all_btn])
embedded_files.change(fn=toggle_delete_all_btn, outputs=[delete_all_btn])
confirm_delete_all_btn.click(
fn=delete_all_files,
inputs=[],
outputs=[pdf_dropdown, embedded_files, delete_progress_bar]
).then(
fn=hide_delete_all_warning,
inputs=[],
outputs=[delete_all_warning, confirm_delete_all_btn]
)
# Enable/disable delete button based on selection
def toggle_delete_btn(selected):
# Enable if there are files in embedded_files, disable otherwise
files_exist = bool(get_pdf_list())
return gr.update(interactive=files_exist)
# Update delete button state on both dropdown change and embedded_files change
pdf_dropdown.change(
fn=toggle_delete_btn,
inputs=[pdf_dropdown],
outputs=[delete_btn]
)
embedded_files.change(
fn=lambda _: gr.update(interactive=bool(get_pdf_list())),
inputs=[embedded_files],
outputs=[delete_btn]
)
demo.load(fn=lambda: gr.update(interactive=bool(get_pdf_list())), outputs=[delete_btn])
demo.load(fn=lambda: "\n".join(get_pdf_list()), outputs=[embedded_files])
# Ensure embedded_files is updated on app start
demo.load(fn=lambda: "\n".join(get_pdf_list()), outputs=[embedded_files])
# Sample question handlers
sample_buttons = [sample1, sample2, sample3, sample4, sample5, sample6, sample7]
sample_questions = [SAMPLE_Q1, SAMPLE_Q2, SAMPLE_Q3, SAMPLE_Q4, SAMPLE_Q5, SAMPLE_Q6, SAMPLE_Q7]
for btn, question in zip(sample_buttons, sample_questions):
btn.click(fn=lambda q=question: q, outputs=[msg_input])
msg_input.submit(
fn=chat_response,
inputs=[msg_input, chatbot],
outputs=[chatbot, msg_input, ai_status]
)
submit_btn.click(
fn=chat_response,
inputs=[msg_input, chatbot],
outputs=[chatbot, msg_input, ai_status]
)
# Enable/disable send button based on input
def toggle_send_btn(text):
return gr.update(interactive=bool(text and text.strip()))
msg_input.change(
fn=toggle_send_btn,
inputs=[msg_input],
outputs=[submit_btn]
)
clear_btn.click(
fn=clear_chat_and_memory,
outputs=[chatbot, msg_input, ai_status]
)
# Enable/disable clear button based on input or chat
def toggle_clear_btn(text, chat):
return gr.update(interactive=bool((text and text.strip()) or (chat and len(chat) > 0)))
msg_input.change(
fn=lambda text: toggle_clear_btn(text, chatbot.value if hasattr(chatbot, 'value') else []),
inputs=[msg_input],
outputs=[clear_btn]
)
chatbot.change(
fn=lambda chat: toggle_clear_btn(msg_input.value if hasattr(msg_input, 'value') else '', chat),
inputs=[chatbot],
outputs=[clear_btn]
)
demo.load(fn=make_pdf_dropdown, outputs=[pdf_dropdown])
# ============================================================================
# LAUNCH application
# ============================================================================
if IS_HF_SPACE:
try:
demo.launch(
server_name=CONFIG["server_host"],
server_port=CONFIG["server_port"],
share=True,
show_error=True,
quiet=False
)
except Exception as launch_error:
print(f"Launch error: {launch_error}")
demo.launch(server_name=CONFIG["server_host"], server_port=CONFIG["server_port"])
else:
app_with_gradio = gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
import uvicorn
import webbrowser
from threading import Timer
Timer(3, lambda: webbrowser.open(f"http://127.0.0.1:{CONFIG['server_port']}")).start()
print("Starting server... Browser will open automatically in 3 seconds.")
uvicorn.run(app_with_gradio, host=CONFIG["server_host"], port=CONFIG["server_port"])