""" uv init uv venv --python 3.12 source .venv/bin/activate uv pip install -r requirements.txt """ # Note: HuggingFace Spaces reads configuration from the README.md frontmatter, not from a separate YAML file. # The config.yaml is for your reference/organization, but the actual Space config must remain in README.md. # The Space was created with Docker SDK and README.md frontmatter specifies sdk: docker: # huggingface-cli repo create Agentic_Rag3_dep_space --type space --space_sdk docker # Without Docker, we use the Gradio SDK option in README.md frontmatter: # --- # sdk: gradio # sdk_version: "6.0.1" # python_version: "3.12" # app_file: app.py # --- # Or: # huggingface-cli repo create Agentic_Rag3_dep_space --type space --space_sdk gradio # AGENT DEPLOYMENT NOTES: # ===================== # - Local Environment: Uses Ollama (llama3.2) for development # - HF Space Environment: Uses Llama-3.2-3B-Instruct (cloud API) for production # - Environment Auto-Detection: Via SPACE_ID environment variable # - Agent Tools Available: Document listing, counting, RAG search # - Storage: Temporary (files lost on restart) or persistent (paid plans) # - UI Features: Tool-powered sample questions, environment indicators # - Security: Token stored as Space secret (HF_token), not in code # - Space URL: https://huggingface.co/spaces/irajkoohi/Agentic_Rag3_dep_space # A) If you want to run app.py locally: """ cd /Users/ik/UVcodes/Deployed_Agents_4 && clear && lsof -ti:7860 | xargs kill -9 2>/dev/null; sleep 2 && source .venv/bin/activate && python app.py """ # B) If you want to run app.py on Hugging Face Space: """ https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space """ # Create and Upload RAG Agent to HF Space Agentic_Rag4_dep_space (Docker SDK) """ # huggingface-cli repo create Agentic_Rag4_dep_space --type space --space_sdk docker 2>&1 Create new token with Write role at: https://huggingface.co/settings/tokens Add token to Space secrets at: https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space/settings clear rm -rf Agentic_Rag4_dep_space && git clone https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space cd /Users/ik/UVcodes/Deployed_Agents_4/Agentic_Rag4_dep_space && cp ../app.py . && cp ../helpers_SHARED.py . && cp ../helpers_HF.py . && cp ../helpers_LOCAL.py . && cp ../requirements.txt . && cp ../README.md . && cp ../Dockerfile . && cp ../config.yaml . mkdir -p data/embeddings git add . && git commit -m "Deploy RAG Agent with Dockerfile to HF space" git push --force https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space """ # if you want to upload all files: """ clear cd /Users/ik/UVcodes/Deployed_Agents_4/Agentic_Rag4_dep_space cp ../app.py . cp ../helpers_SHARED.py . cp ../helpers_HF.py . cp ../helpers_LOCAL.py . cp ../requirements.txt . # cp ../README.md . cp ../Dockerfile . cp ../config.yaml . git add . git commit -m "Update all files" git push --force """ # If you want to delete all files on HF space """ cd /Users/ik/UVcodes/Deployed_Agents_4 rm -rf Agentic_Rag4_dep_space && git clone https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space cd Agentic_Rag4_dep_space && find . -maxdepth 1 -not -name '.git' -not -name '.' -delete rm -rf data embeddings git add -A && git commit -m "Remove all files to clean the space" git push ls -la && pwd """ # If you want to delete a space on HF website """ 1. Go to: https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space/settings 2. Scroll down to "Delete this Space" 4. Type: irajkoohi/Agentic_Rag4_dep_space 4. Click "Delete" """ # if you want to sync changes of some files (like app.py and helpers_SHARED.py): """ cp ../app.py . && cp ../helpers_SHARED.py . git add app.py helpers_SHARED.py git commit -m "Sync app.py and helpers_SHARED.py with latest changes" && git push """ #%% import os import shutil import warnings from datetime import datetime from fastapi import FastAPI, UploadFile, File, HTTPException from pydantic import BaseModel import gradio as gr # Suppress warnings for cleaner output on HF Spaces warnings.filterwarnings("ignore", category=UserWarning) # Fix event loop issues on HF Spaces if os.getenv("SPACE_ID") is not None: try: import nest_asyncio nest_asyncio.apply() except ImportError: pass # ============================================================================ # IMPORT FROM HELPER MODULES # ============================================================================ from helpers_SHARED import ( # Configuration CONFIG, IS_HF_SPACE, DATA_DIR, EMBEDDINGS_DIR, HAS_PERSISTENT_STORAGE, STORAGE_WARNING, # Memory functions add_to_memory, get_memory_context, search_memory, clear_memory, # Utility functions get_timestamp, create_elapsed_timer, format_progress_bar, # PDF helpers get_pdf_list, get_pdf_list_ui, make_pdf_dropdown, # Vectorstore build_vectorstore, get_vectorstore, set_vectorstore, embeddings, # Agent tools AGENT_TOOLS, list_documents, count_documents, search_documents, # Sample questions SAMPLE_Q1, SAMPLE_Q2, SAMPLE_Q3, SAMPLE_Q4, SAMPLE_Q5, SAMPLE_Q6, SAMPLE_Q7, ) from helpers_SHARED import floating_progress_bar_html # Import environment-specific helpers if IS_HF_SPACE: from helpers_HF import ( init_hf_llm, hf_generate_chat_response, hf_generate_text_response, get_hf_client, get_hf_llm_name ) # Initialize HF LLM (default model from config) hf_client, LLM_NAME = init_hf_llm(CONFIG["hf_model"] if "hf_model" in CONFIG else None) ollama_llm = None agent_executor = None else: from helpers_LOCAL import ( init_ollama_llm, ollama_generate_response, run_agent, create_langchain_agent, get_ollama_llm, get_local_llm_name, get_agent_executor ) # Initialize Ollama LLM ollama_llm, LLM_NAME = init_ollama_llm() hf_client = None # Create directories os.makedirs(DATA_DIR, exist_ok=True) os.makedirs(EMBEDDINGS_DIR, exist_ok=True) # Build initial vectorstore vs = build_vectorstore() # Create agent (local only) if not IS_HF_SPACE and ollama_llm is not None: agent_executor = create_langchain_agent() else: agent_executor = None # Debug: Print initial state print(f"🐛 DEBUG: Initial vectorstore state: {vs is not None}") print(f"🐛 DEBUG: IS_HF_SPACE: {IS_HF_SPACE}") print(f"🐛 DEBUG: DATA_DIR: {DATA_DIR}") print(f"🐛 DEBUG: EMBEDDINGS_DIR: {EMBEDDINGS_DIR}") if IS_HF_SPACE: print(f"🐛 DEBUG: /data exists: {os.path.exists('/data')}") print(f"🐛 DEBUG: HF token available: {os.getenv('HF_token') is not None}") print(f"🐛 DEBUG: LLM available: {(hf_client is not None) if IS_HF_SPACE else (ollama_llm is not None)}") # ============================================================================ # FASTAPI APP (FastAPI is only used for local runs, not on HuggingFace Spaces) # ============================================================================ app = FastAPI(title="RAG Chatbot API") class Prompt(BaseModel): prompt: str @app.get("/pdfs") # get_pdf_list() returns list of pdf files def list_pdfs(): return {"pdfs": get_pdf_list()} @app.post("/upload") # The @app.post("/upload") decorator in FastAPI means that the upload_pdf function will handle HTTP POST requests sent to the /upload endpoint. # read pdf files and save to DATA_DIR, and rebuild vectorstore (embeddins completed here) async def upload_pdf(file: UploadFile = File(...)): if not file.filename or not file.filename.endswith(".pdf"): raise HTTPException(status_code=400, detail="Only PDFs allowed.") filepath = os.path.join(DATA_DIR, file.filename) with open(filepath, "wb") as f: f.write(await file.read()) build_vectorstore(force_rebuild=True) return {"message": f"Added {file.filename}. Embeddings updated."} @app.delete("/delete/{filename}") # deletes selected filename, and updates vectorstore def delete_pdf(filename: str): if filename not in get_pdf_list(): raise HTTPException(status_code=404, detail="PDF not found.") filepath = os.path.join(DATA_DIR, filename) os.remove(filepath) build_vectorstore(force_rebuild=True) return {"message": f"Deleted {filename}. Embeddings updated."} @app.post("/generate") # docs includes relevant documents retrieved from vectorstore # full_prompt is the augmented prompt with context def generate_response(prompt: Prompt): global vs vs = get_vectorstore() if vs is None: raise HTTPException(status_code=400, detail="No PDFs loaded.") # Retrieve relevant docs (limit context size) retriever = vs.as_retriever(search_kwargs={"k": CONFIG["search_k"]}) docs = retriever.invoke(prompt.prompt) # Use all retrieved chunks context = "\n\n".join([doc.page_content for doc in docs]) # Augment prompt full_prompt = ( "Answer the following question based ONLY on the context provided below.\n" "If the answer is not present in the context, reply exactly with: 'I don't know.'\n" "Do NOT make up or guess any information that is not explicitly in the context.\n\n" "Your answer MUST be a concise summary, listing the main topics or key points found in the context.\n" "If the question asks for a list, provide a bulleted or numbered list.\n\n" f"Context:\n{context}\n\n" f"Question: {prompt.prompt}\n\n" "Answer:" ) try: if IS_HF_SPACE and hf_client is not None: response = hf_generate_text_response(full_prompt, context, hf_client) # temprature is set to 0 in "check hf_generate_text_response()" return {"response": response} elif not IS_HF_SPACE and ollama_llm is not None: print(f"Generating response with Ollama ({LLM_NAME})...") try: response = ollama_llm.invoke(full_prompt, temperature=0) print(f"✓ Success! Response generated.") return {"response": response} except Exception as ollama_error: print(f"❌ Ollama error: {str(ollama_error)}") return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."} else: return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."} except Exception as e: print(f"LLM failed: {str(e)}") return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."} @app.get("/refresh") # "force_rebuild=True" rebuils vectorstore embeddings from scratch, even if an existing vectorstore is present def refresh_embeddings(): build_vectorstore(force_rebuild=True) return {"message": "Embeddings refreshed."} # ============================================================================ # GRADIO UI FUNCTIONS # ============================================================================ # When a user selects and uploads files in the UI, Gradio automatically provides the list of uploaded file objects as the "files" argument to add_pdf. # add_pdf() is called by "file_upload.upload()" def add_pdf(files): # files: list of file objects if files is None or len(files) == 0: return ( make_pdf_dropdown(), "No files selected.", "", "\n".join(get_pdf_list()) ) start_time = datetime.now() get_elapsed = create_elapsed_timer(start_time) results = [] total_files = len(files) upload_log = [] upload_log.append(f"[{get_timestamp()}] Starting upload process for {total_files} file(s)") prev_embedded_files = get_pdf_list() for i, file_obj in enumerate(files, 1): # i starts from 1 filename = os.path.basename(file_obj.name) # basename returns the final component of a file path, which is typically the filename itself progress_percent = int((i * 2 - 1) / (total_files * 2) * 100) status_msg = f"📤 Uploading {i}/{total_files}: {filename}..." progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg) upload_log.append(f"[{get_timestamp()}] Uploading file {i}: {filename}") # Always use the list before upload until embedding completes # A dropdown is a Gradio UI component that lets users select one (or more) options from a list. # here, we use it to show the list of currently embedded PDF files. # we can select some of those files for deletion later. yield ( make_pdf_dropdown(), "\n".join(results) if results else "Starting upload...", progress_display, "\n".join(prev_embedded_files) ) try: dest_path = os.path.join(DATA_DIR, filename) shutil.copy2(file_obj.name, dest_path) # copy2 preserves file metadata (like modification time). results.append(f"✓ {filename} uploaded") # We need to return (or yield) results, so the Gradio UI can show upload progress and status messages to the user in make_pdf_dropdown() upload_log.append(f"[{get_timestamp()}] Uploading file {i} completed") progress_percent = int(((i * 2) - 1) / (total_files * 2) * 100) status_msg = f"🧠 Creating embeddings for {filename}..." progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg) upload_log.append(f"[{get_timestamp()}] Embedding file {i}: {filename}") # Do NOT update embedded files before embedding completes yield ( make_pdf_dropdown(), "\n".join(results), progress_display, "\n".join(prev_embedded_files) ) try: build_vectorstore(force_rebuild=True) results[-1] = f"✅ {filename} (uploaded & embedded)" upload_log.append(f"[{get_timestamp()}] Embedding file {i} completed") upload_log.append("") # Show progress bar after embedding completes progress_percent = int((i * 2) / (total_files * 2) * 100) status_msg = f"✅ Embedded {i}/{total_files}: {filename}" progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg) # Update embedded files to show the new file immediately after embedding completes new_embedded_files = get_pdf_list() yield ( make_pdf_dropdown(), "\n".join(results), progress_display, "\n".join(new_embedded_files) ) except Exception as embed_error: results[-1] = f"âš ī¸ {filename} (uploaded, embedding error: {str(embed_error)})" upload_log.append(f"[{get_timestamp()}] Embedding file {i} failed") upload_log.append("") completed_progress = int((i * 2) / (total_files * 2) * 100) status_msg = f"âš ī¸ File {i}/{total_files} completed with error: {filename}" progress_display = format_progress_bar(get_elapsed(), completed_progress, status_msg) # Do NOT update embedded files if embedding failed yield ( make_pdf_dropdown(), "\n".join(results), progress_display, "\n".join(get_pdf_list()) ) except Exception as e: results.append(f"❌ {filename}: {str(e)}") upload_log.append(f"[{get_timestamp()}] Uploading file {i} failed") final_message = "\n".join(results) final_progress = format_progress_bar(get_elapsed(), 100, f"🎉 All done! Processed {len(files)} file(s) successfully") upload_log.append(f"[{get_timestamp()}] All {len(files)} file(s) completed") # Only show fully embedded files in the Available Embedded Files window # Reset the progress bar to its original empty state after completion (like delete) yield ( make_pdf_dropdown(), final_message, "", "\n".join(get_pdf_list()) ) # yield: "Send this update to the UI, but keep the function alive for possible further steps." # return: "Stop here; no more updates or actions are needed." def delete_pdf_ui(selected_pdf): import time available_files = get_pdf_list() if not available_files: # Disable delete button if no files are available yield make_pdf_dropdown(disabled=True), "No embedded files to delete.", "" return if not selected_pdf: # Hide overlay if nothing to delete yield make_pdf_dropdown(), "\n".join(available_files), "" return # Show progress bar immediately on click bar = format_progress_bar("", 0, "Preparing to delete files...") yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar # Support both single and multiple selection if isinstance(selected_pdf, str): selected_files = [selected_pdf] else: selected_files = list(selected_pdf) total_files = len(selected_files) for idx, file_name in enumerate(selected_files, 1): file_path = os.path.join(DATA_DIR, file_name) # Remove file and all leftovers (e.g., embeddings) before advancing progress deleted = False leftovers_removed = False # Remove file if os.path.exists(file_path): try: os.remove(file_path) deleted = True except Exception: deleted = False # Remove leftovers (add your per-file embedding removal logic here if needed) # Example: remove embedding file if it exists (customize as needed) embedding_path = os.path.join(EMBEDDINGS_DIR, file_name + ".embedding") if os.path.exists(embedding_path): try: os.remove(embedding_path) leftovers_removed = True except Exception: leftovers_removed = False else: leftovers_removed = True # No leftovers to remove # Only advance progress bar after both file and leftovers are deleted if deleted and leftovers_removed: build_vectorstore(force_rebuild=True) percent = int(idx / total_files * 100) if total_files else 100 bar = format_progress_bar("", percent, f"Deleted {idx}/{total_files}: {file_name}") yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar else: bar = format_progress_bar("", int(idx / total_files * 100) if total_files else 100, f"âš ī¸ Error deleting {file_name}") yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar time.sleep(0.2) # Clear progress bar after all deletions yield make_pdf_dropdown(), "\n".join(get_pdf_list()), "" # Enables "Delete All" button only if there are files to delet, otherwise disables it def toggle_delete_all_btn(): # Check if there is at least one file in Available Embedded Files files = get_pdf_list() return gr.update(interactive=bool(files)) def delete_all_files(): import time all_files = get_pdf_list() if not all_files: bar = format_progress_bar("", 0, "No files to delete.") yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar return bar = format_progress_bar("", 0, "Preparing to delete all files...") yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar total_files = len(all_files) for idx, file_name in enumerate(all_files, 1): file_path = os.path.join(DATA_DIR, file_name) deleted = False leftovers_removed = False if os.path.exists(file_path): try: os.remove(file_path) deleted = True except Exception: deleted = False embedding_path = os.path.join(EMBEDDINGS_DIR, file_name + ".embedding") if os.path.exists(embedding_path): try: os.remove(embedding_path) leftovers_removed = True except Exception: leftovers_removed = False else: leftovers_removed = True if deleted and leftovers_removed: build_vectorstore(force_rebuild=True) percent = int(idx / total_files * 100) if total_files else 100 bar = format_progress_bar("", percent, f"Deleted {idx}/{total_files}: {file_name}") yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar else: bar = format_progress_bar("", int(idx / total_files * 100) if total_files else 100, f"âš ī¸ Error deleting {file_name}") yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar time.sleep(0.2) yield make_pdf_dropdown(), "\n".join(get_pdf_list()), "" def show_delete_all_warning(): return ( gr.Markdown("**âš ī¸ Are you sure you want to delete ALL files? This cannot be undone. Click 'Confirm Delete All' to proceed.**", visible=True), gr.update(interactive=True, visible=True) ) def hide_delete_all_warning(): return ( gr.Markdown(visible=False), gr.update(interactive=False, visible=False) ) def analyze_query_and_use_tools(query: str) -> str: """Analyze query and use appropriate tools to gather information.""" query_lower = query.lower() results = [] # Check for memory-related queries first memory_keywords = ["remember", "earlier", "before", "previous", "last time", "we discussed", "you said", "i asked", "conversation", "history", "recall", "what did we"] if any(word in query_lower for word in memory_keywords): print(f"🧠 Memory query detected, fetching conversation history...") memory_result = get_memory_context(last_n=10) if memory_result and "No previous conversation" not in memory_result: results.append(f"📝 **Conversation History:**\n{memory_result}") search_result = search_memory(query) if search_result and "No conversation history" not in search_result: results.append(f"🔍 **Relevant Past Discussions:**\n{search_result}") if results: return "\n\n".join(results) # Try using LangGraph agent (local only) if not IS_HF_SPACE and agent_executor is not None: agent_result = run_agent(query) if agent_result: return agent_result # Fallback: Manual tool routing try: if any(word in query_lower for word in ["what documents", "list documents", "available documents", "what files", "documents do i have"]): results.append(list_documents.invoke({})) if any(word in query_lower for word in ["how many", "count", "number of documents"]): results.append(count_documents.invoke({})) results.append(search_documents.invoke({"query": query})) return "\n\n".join(results) if results else "No relevant information found." except Exception as e: return f"Error analyzing query: {str(e)}" def chat_response(message, history): """Agent-enhanced chat response function with visual progress tracking.""" global vs if not message: return history, "", "đŸ’Ŧ Ready for your question" start_time = datetime.now() get_elapsed = create_elapsed_timer(start_time) if not isinstance(history, list): history = [] history.append({"role": "user", "content": str(message)}) add_to_memory("user", message) try: yield (history, "", format_progress_bar(get_elapsed(), 33, "🔍 Analyzing your question...", bar_length=15)) print(f"🤖 Agent analyzing query: {message}") try: pdf_files = get_pdf_list() print(f"🐛 DEBUG: PDF files available: {len(pdf_files)} - {pdf_files}") print(f"🐛 DEBUG: Global vectorstore state: {get_vectorstore() is not None}") except Exception as debug_error: print(f"🐛 DEBUG ERROR: {str(debug_error)}") try: tool_results = analyze_query_and_use_tools(message) print(f"🔧 Tool results: {tool_results[:100]}...") except Exception as tool_error: error_msg = f"❌ Tool execution failed: {str(tool_error)}" print(error_msg) history.append({"role": "assistant", "content": error_msg}) yield (history, "", f"{get_elapsed()} | [100%] ❌ Error during tool execution") return yield (history, "", format_progress_bar(get_elapsed(), 66, "🧠 Generating intelligent response...", bar_length=15)) try: memory_context = get_memory_context(last_n=5) llm_prompt = f""" You are a helpful AI assistant with access to both the user's previous conversations and relevant document excerpts. When answering, always: - Use information from the provided conversation history and document excerpts. - If the answer is not found in either, reply with: "I don't know." - Do not make up or guess information. - Provide concise, accurate, and context-aware answers. - If the question asks for a list, use bullet points or numbers. Conversation History: {memory_context} Document Excerpts: {tool_results} Question: {message} Answer: """ if IS_HF_SPACE and hf_client is not None: result = hf_generate_chat_response(llm_prompt, hf_client) if result is None: result = tool_results elif not IS_HF_SPACE and ollama_llm is not None: result = ollama_generate_response(llm_prompt, ollama_llm) if result is None: result = tool_results else: result = tool_results print("â„šī¸ No LLM available, returning tool results") except Exception as llm_error: print(f"❌ LLM processing error: {str(llm_error)}") result = tool_results result_str = str(result.content) if hasattr(result, 'content') else str(result) history.append({"role": "assistant", "content": result_str}) add_to_memory("assistant", result_str) yield (history, "", format_progress_bar(get_elapsed(), 100, "✅ Response generated successfully!", bar_length=15)) # Reset AI Processing Progress to original state yield (history, "", "đŸ’Ŧ Ready for your question") except Exception as e: error_msg = f"đŸšĢ System error: {str(e)}\n\nPlease try again or upload your documents again." print(f"đŸ’Ĩ CRITICAL ERROR: {str(e)}") import traceback traceback.print_exc() history.append({"role": "assistant", "content": error_msg}) yield (history, "", f"{get_elapsed()} | [100%] ❌ System error occurred") def refresh_embeddings_ui(): """Refresh embeddings directly""" try: build_vectorstore(force_rebuild=True) return make_pdf_dropdown(), "Embeddings refreshed." except Exception as e: return make_pdf_dropdown(), f"Error refreshing embeddings: {str(e)}" def clear_chat_and_memory(): """Clear chat history and conversation memory.""" clear_memory() return [], "", "đŸ’Ŧ Chat and memory cleared. Ready for your question" # ============================================================================ # GRADIO UI # ============================================================================ ENV_NAME = "🌐 HuggingFace Space" if IS_HF_SPACE else "đŸ’ģ Local Environment" ENV_COLOR = "#FF6B6B" if IS_HF_SPACE else "#4ECDC4" with gr.Blocks(title="RAG Agent Chatbot") as demo: gr.Markdown(f"# 🤖 RAG Agent - AI Assistant with Tools\nUpload PDFs and interact with an intelligent agent that can search, analyze, and answer questions about your documents.") if not IS_HF_SPACE: from helpers_LOCAL import get_installed_llms, init_ollama_llm, create_langchain_agent llm_choices = get_installed_llms() if llm_choices: llm_dropdown = gr.Dropdown( label="Select Local LLM", choices=llm_choices, value=LLM_NAME if LLM_NAME in llm_choices else (llm_choices[0] if llm_choices else None), interactive=True, visible=True ) current_llm_display = gr.Markdown(f"**Current LLM:** {LLM_NAME if LLM_NAME else ''}", elem_id="current-llm-display", visible=True) top_banner = gr.Markdown( f"
Running on: {ENV_NAME} | LLM: {LLM_NAME if LLM_NAME else 'None'} | Agent: ✅ Active
", elem_id="top-llm-banner" ) def update_llm(selected_label): global ollama_llm, LLM_NAME, agent_executor if selected_label: try: ollama_llm, LLM_NAME = init_ollama_llm(selected_label) agent_executor = create_langchain_agent() banner_html = f"
Running on: {ENV_NAME} | LLM: {selected_label} | Agent: ✅ Active
" return ( gr.Markdown(f"**Current LLM:** {selected_label}", elem_id="current-llm-display"), banner_html ) except Exception as e: ollama_llm = None LLM_NAME = None agent_executor = None banner_html = f"
Running on: {ENV_NAME} | LLM: None | Agent: ❌ Inactive
" return ( gr.Markdown(f"**Current LLM:** (Error initializing {selected_label})", elem_id="current-llm-display"), banner_html ) banner_html = f"
Running on: {ENV_NAME} | LLM: None | Agent: ❌ Inactive
" return gr.Markdown("", elem_id="current-llm-display"), banner_html llm_dropdown.change( fn=lambda label: update_llm(label), inputs=[llm_dropdown], outputs=[current_llm_display, top_banner] ) else: gr.Markdown( "
No local LLMs are installed. Please install an Ollama model to enable LLM selection and chat capabilities.
" ) llm_dropdown = gr.Dropdown( label="Select Local LLM", choices=[], value=None, interactive=False, visible=True ) current_llm_display = gr.Markdown(f"**Current LLM:** None", elem_id="current-llm-display", visible=True) gr.Markdown(f"
Running on: {ENV_NAME} | LLM: None | Agent: ❌ Inactive
") else: # --- Hugging Face Space: dynamic LLM selection --- # Static list of free, popular LLMs on HF Inference API (can be expanded) hf_llm_choices = [ "Llama-3.2-3B-Instruct", "mistralai/Mistral-7B-Instruct-v0.2", "meta-llama/Meta-Llama-3-8B-Instruct", "google/gemma-7b-it", "HuggingFaceH4/zephyr-7b-beta", "Qwen/Qwen1.5-7B-Chat", "tiiuae/falcon-7b-instruct" ] default_llm = "Llama-3.2-3B-Instruct" llm_dropdown = gr.Dropdown( label="Select HF LLM", choices=hf_llm_choices, value=LLM_NAME if LLM_NAME in hf_llm_choices else default_llm, interactive=True, visible=True ) current_llm_display = gr.Markdown(f"**Current LLM:** {LLM_NAME if LLM_NAME else default_llm}", elem_id="current-llm-display", visible=True) top_banner = gr.Markdown( f"
Running on: {ENV_NAME} | LLM: {LLM_NAME if LLM_NAME else default_llm} | Agent: ✅ Active
", elem_id="top-llm-banner" ) def update_hf_llm(selected_label): global hf_client, LLM_NAME from helpers_HF import init_hf_llm if selected_label: try: hf_client, LLM_NAME = init_hf_llm(selected_label) banner_html = f"
Running on: {ENV_NAME} | LLM: {selected_label} | Agent: ✅ Active
" return ( gr.Markdown(f"**Current LLM:** {selected_label}", elem_id="current-llm-display"), banner_html ) except Exception as e: hf_client = None LLM_NAME = None banner_html = f"
Running on: {ENV_NAME} | LLM: None | Agent: ❌ Inactive
" return ( gr.Markdown(f"**Current LLM:** (Error initializing {selected_label})", elem_id="current-llm-display"), banner_html ) banner_html = f"
Running on: {ENV_NAME} | LLM: None | Agent: ❌ Inactive
" return gr.Markdown("", elem_id="current-llm-display"), banner_html llm_dropdown.change( fn=lambda label: update_hf_llm(label), inputs=[llm_dropdown], outputs=[current_llm_display, top_banner] ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📁 File Management") if IS_HF_SPACE and STORAGE_WARNING: gr.Markdown(f"**Storage Status:** {STORAGE_WARNING}") file_upload = gr.File( label="Upload Files (Multiple files supported)", file_types=[".pdf"], file_count="multiple" ) upload_status = gr.Textbox( label="Upload Status", value="", interactive=False, lines=8, max_lines=8, autoscroll=True ) with gr.Row(): progress_bar = gr.Textbox( label="Uploading Progress", value="", interactive=False, lines=1, max_lines=1, autoscroll=True ) delete_progress_bar = gr.Textbox( label="Deleting Progress", value="", interactive=False, lines=1, max_lines=1, autoscroll=True ) embedded_files = gr.Textbox( label="Available Embedded Files", value="\n".join(get_pdf_list()), interactive=False, lines=8, max_lines=8, autoscroll=True ) with gr.Row(): pdf_dropdown = gr.Dropdown( label="Select Files to Delete", choices=get_pdf_list_ui(), interactive=True, allow_custom_value=False, multiselect=True ) delete_btn = gr.Button("đŸ—‘ī¸ Delete Files", variant="stop", interactive=False) delete_all_btn = gr.Button("đŸ—‘ī¸ Delete All", variant="stop", interactive=bool(get_pdf_list())) delete_all_warning = gr.Markdown(visible=False) confirm_delete_all_btn = gr.Button("Confirm Delete All", variant="stop", interactive=True, visible=False) delete_progress_overlay = gr.HTML(floating_progress_bar_html()) with gr.Column(scale=4): gr.Markdown("### 🤖 AI Agent Chat") gr.Markdown("**Agent Capabilities:** Search documents, list files, count documents, intelligent reasoning") chatbot = gr.Chatbot(height=CONFIG["chatbot_height"], layout="bubble") if IS_HF_SPACE and not HAS_PERSISTENT_STORAGE: gr.Markdown("âš ī¸ **Storage Notice:** Files are temporary and will be lost when Space restarts. To enable persistent storage, upgrade to a paid plan in Settings → Hardware.") gr.Markdown("**đŸ› ī¸ Agent Commands - Try these tool-powered queries:**") with gr.Row(): sample1 = gr.Button(f"📋 {SAMPLE_Q1}", size="sm") sample2 = gr.Button(f"🔍 {SAMPLE_Q2}", size="sm") sample3 = gr.Button(f"📊 {SAMPLE_Q3}", size="sm") with gr.Row(): sample4 = gr.Button(f"🧠 {SAMPLE_Q4}", size="sm") sample5 = gr.Button(f"đŸŗ {SAMPLE_Q5}", size="sm") sample6 = gr.Button(f"🧠 {SAMPLE_Q6}", size="sm") sample7 = gr.Button(f"📝 {SAMPLE_Q7}", size="sm") msg_input = gr.Textbox( placeholder="Ask a question about your PDFs...", label="Ask about your PDFs", show_label=False ) ai_status = gr.Textbox( label="📊 AI Processing Progress", value="đŸ’Ŧ Ready for your question", interactive=False, placeholder="AI processing status with progress tracking..." ) with gr.Row(): submit_btn = gr.Button("Send", variant="primary", scale=1, interactive=False) clear_btn = gr.Button("Clear", scale=1, interactive=False) # Event handlers # outputs=[pdf_dropdown, upload_status, progress_bar, embedded_files] means: # that the function add_pdf will return (or yield) a tuple of four values, and each value will be used to update the corresponding UI component # here for example "embedded_files" in a UI Textbox component eith the label "Available Embedded Files" file_upload.upload( fn=add_pdf, inputs=[file_upload], outputs=[pdf_dropdown, upload_status, progress_bar, embedded_files] ) delete_btn.click( fn=delete_pdf_ui, inputs=[pdf_dropdown], outputs=[pdf_dropdown, embedded_files, delete_progress_bar] ) delete_all_btn.click( fn=show_delete_all_warning, inputs=[], outputs=[delete_all_warning, confirm_delete_all_btn] ) # Update Delete All button state when files change demo.load(fn=toggle_delete_all_btn, outputs=[delete_all_btn]) pdf_dropdown.change(fn=toggle_delete_all_btn, outputs=[delete_all_btn]) embedded_files.change(fn=toggle_delete_all_btn, outputs=[delete_all_btn]) confirm_delete_all_btn.click( fn=delete_all_files, inputs=[], outputs=[pdf_dropdown, embedded_files, delete_progress_bar] ).then( fn=hide_delete_all_warning, inputs=[], outputs=[delete_all_warning, confirm_delete_all_btn] ) # Enable/disable delete button based on selection def toggle_delete_btn(selected): # Enable if there are files in embedded_files, disable otherwise files_exist = bool(get_pdf_list()) return gr.update(interactive=files_exist) # Update delete button state on both dropdown change and embedded_files change pdf_dropdown.change( fn=toggle_delete_btn, inputs=[pdf_dropdown], outputs=[delete_btn] ) embedded_files.change( fn=lambda _: gr.update(interactive=bool(get_pdf_list())), inputs=[embedded_files], outputs=[delete_btn] ) demo.load(fn=lambda: gr.update(interactive=bool(get_pdf_list())), outputs=[delete_btn]) demo.load(fn=lambda: "\n".join(get_pdf_list()), outputs=[embedded_files]) # Ensure embedded_files is updated on app start demo.load(fn=lambda: "\n".join(get_pdf_list()), outputs=[embedded_files]) # Sample question handlers sample_buttons = [sample1, sample2, sample3, sample4, sample5, sample6, sample7] sample_questions = [SAMPLE_Q1, SAMPLE_Q2, SAMPLE_Q3, SAMPLE_Q4, SAMPLE_Q5, SAMPLE_Q6, SAMPLE_Q7] for btn, question in zip(sample_buttons, sample_questions): btn.click(fn=lambda q=question: q, outputs=[msg_input]) msg_input.submit( fn=chat_response, inputs=[msg_input, chatbot], outputs=[chatbot, msg_input, ai_status] ) submit_btn.click( fn=chat_response, inputs=[msg_input, chatbot], outputs=[chatbot, msg_input, ai_status] ) # Enable/disable send button based on input def toggle_send_btn(text): return gr.update(interactive=bool(text and text.strip())) msg_input.change( fn=toggle_send_btn, inputs=[msg_input], outputs=[submit_btn] ) clear_btn.click( fn=clear_chat_and_memory, outputs=[chatbot, msg_input, ai_status] ) # Enable/disable clear button based on input or chat def toggle_clear_btn(text, chat): return gr.update(interactive=bool((text and text.strip()) or (chat and len(chat) > 0))) msg_input.change( fn=lambda text: toggle_clear_btn(text, chatbot.value if hasattr(chatbot, 'value') else []), inputs=[msg_input], outputs=[clear_btn] ) chatbot.change( fn=lambda chat: toggle_clear_btn(msg_input.value if hasattr(msg_input, 'value') else '', chat), inputs=[chatbot], outputs=[clear_btn] ) demo.load(fn=make_pdf_dropdown, outputs=[pdf_dropdown]) # ============================================================================ # LAUNCH application # ============================================================================ if IS_HF_SPACE: try: demo.launch( server_name=CONFIG["server_host"], server_port=CONFIG["server_port"], share=True, show_error=True, quiet=False ) except Exception as launch_error: print(f"Launch error: {launch_error}") demo.launch(server_name=CONFIG["server_host"], server_port=CONFIG["server_port"]) else: app_with_gradio = gr.mount_gradio_app(app, demo, path="/") if __name__ == "__main__": import uvicorn import webbrowser from threading import Timer Timer(3, lambda: webbrowser.open(f"http://127.0.0.1:{CONFIG['server_port']}")).start() print("Starting server... Browser will open automatically in 3 seconds.") uvicorn.run(app_with_gradio, host=CONFIG["server_host"], port=CONFIG["server_port"])