Spaces:
Sleeping
Sleeping
| """ | |
| uv init | |
| uv venv --python 3.12 | |
| source .venv/bin/activate | |
| uv pip install -r requirements.txt | |
| """ | |
| # Note: HuggingFace Spaces reads configuration from the README.md frontmatter, not from a separate YAML file. | |
| # The config.yaml is for your reference/organization, but the actual Space config must remain in README.md. | |
| # The Space was created with Docker SDK and README.md frontmatter specifies sdk: docker: | |
| # huggingface-cli repo create Agentic_Rag3_dep_space --type space --space_sdk docker | |
| # Without Docker, we use the Gradio SDK option in README.md frontmatter: | |
| # --- | |
| # sdk: gradio | |
| # sdk_version: "6.0.1" | |
| # python_version: "3.12" | |
| # app_file: app.py | |
| # --- | |
| # Or: | |
| # huggingface-cli repo create Agentic_Rag3_dep_space --type space --space_sdk gradio | |
| # AGENT DEPLOYMENT NOTES: | |
| # ===================== | |
| # - Local Environment: Uses Ollama (llama3.2) for development | |
| # - HF Space Environment: Uses Llama-3.2-3B-Instruct (cloud API) for production | |
| # - Environment Auto-Detection: Via SPACE_ID environment variable | |
| # - Agent Tools Available: Document listing, counting, RAG search | |
| # - Storage: Temporary (files lost on restart) or persistent (paid plans) | |
| # - UI Features: Tool-powered sample questions, environment indicators | |
| # - Security: Token stored as Space secret (HF_token), not in code | |
| # - Space URL: https://huggingface.co/spaces/irajkoohi/Agentic_Rag3_dep_space | |
| # A) If you want to run app.py locally: | |
| """ | |
| cd /Users/ik/UVcodes/Deployed_Agents_4 && clear && lsof -ti:7860 | xargs kill -9 2>/dev/null; sleep 2 && source .venv/bin/activate && python app.py | |
| """ | |
| # B) If you want to run app.py on Hugging Face Space: | |
| """ | |
| https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space | |
| """ | |
| # Create and Upload RAG Agent to HF Space Agentic_Rag4_dep_space (Docker SDK) | |
| """ | |
| # huggingface-cli repo create Agentic_Rag4_dep_space --type space --space_sdk docker 2>&1 | |
| Create new token with Write role at: https://huggingface.co/settings/tokens | |
| Add token to Space secrets at: https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space/settings | |
| clear | |
| rm -rf Agentic_Rag4_dep_space && git clone https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space | |
| cd /Users/ik/UVcodes/Deployed_Agents_4/Agentic_Rag4_dep_space && cp ../app.py . && cp ../helpers_SHARED.py . && cp ../helpers_HF.py . && cp ../helpers_LOCAL.py . && cp ../requirements.txt . && cp ../README.md . && cp ../Dockerfile . && cp ../config.yaml . | |
| mkdir -p data/embeddings | |
| git add . && git commit -m "Deploy RAG Agent with Dockerfile to HF space" | |
| git push --force | |
| https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space | |
| """ | |
| # if you want to upload all files: | |
| """ | |
| clear | |
| cd /Users/ik/UVcodes/Deployed_Agents_4/Agentic_Rag4_dep_space | |
| cp ../app.py . | |
| cp ../helpers_SHARED.py . | |
| cp ../helpers_HF.py . | |
| cp ../helpers_LOCAL.py . | |
| cp ../requirements.txt . | |
| # cp ../README.md . | |
| cp ../Dockerfile . | |
| cp ../config.yaml . | |
| git add . | |
| git commit -m "Update all files" | |
| git push --force | |
| """ | |
| # If you want to delete all files on HF space | |
| """ | |
| cd /Users/ik/UVcodes/Deployed_Agents_4 | |
| rm -rf Agentic_Rag4_dep_space && git clone https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space | |
| cd Agentic_Rag4_dep_space && find . -maxdepth 1 -not -name '.git' -not -name '.' -delete | |
| rm -rf data embeddings | |
| git add -A && git commit -m "Remove all files to clean the space" | |
| git push | |
| ls -la && pwd | |
| """ | |
| # If you want to delete a space on HF website | |
| """ | |
| 1. Go to: https://huggingface.co/spaces/irajkoohi/Agentic_Rag4_dep_space/settings | |
| 2. Scroll down to "Delete this Space" | |
| 4. Type: irajkoohi/Agentic_Rag4_dep_space | |
| 4. Click "Delete" | |
| """ | |
| # if you want to sync changes of some files (like app.py and helpers_SHARED.py): | |
| """ | |
| cp ../app.py . && cp ../helpers_SHARED.py . | |
| git add app.py helpers_SHARED.py | |
| git commit -m "Sync app.py and helpers_SHARED.py with latest changes" && git push | |
| """ | |
| #%% | |
| import os | |
| import shutil | |
| import warnings | |
| from datetime import datetime | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from pydantic import BaseModel | |
| import gradio as gr | |
| # Suppress warnings for cleaner output on HF Spaces | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| # Fix event loop issues on HF Spaces | |
| if os.getenv("SPACE_ID") is not None: | |
| try: | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| except ImportError: | |
| pass | |
| # ============================================================================ | |
| # IMPORT FROM HELPER MODULES | |
| # ============================================================================ | |
| from helpers_SHARED import ( | |
| # Configuration | |
| CONFIG, IS_HF_SPACE, DATA_DIR, EMBEDDINGS_DIR, | |
| HAS_PERSISTENT_STORAGE, STORAGE_WARNING, | |
| # Memory functions | |
| add_to_memory, get_memory_context, search_memory, clear_memory, | |
| # Utility functions | |
| get_timestamp, create_elapsed_timer, format_progress_bar, | |
| # PDF helpers | |
| get_pdf_list, get_pdf_list_ui, make_pdf_dropdown, | |
| # Vectorstore | |
| build_vectorstore, get_vectorstore, set_vectorstore, embeddings, | |
| # Agent tools | |
| AGENT_TOOLS, list_documents, count_documents, search_documents, | |
| # Sample questions | |
| SAMPLE_Q1, SAMPLE_Q2, SAMPLE_Q3, SAMPLE_Q4, SAMPLE_Q5, SAMPLE_Q6, SAMPLE_Q7, | |
| ) | |
| from helpers_SHARED import floating_progress_bar_html | |
| # Import environment-specific helpers | |
| if IS_HF_SPACE: | |
| from helpers_HF import ( | |
| init_hf_llm, hf_generate_chat_response, hf_generate_text_response, | |
| get_hf_client, get_hf_llm_name | |
| ) | |
| # Initialize HF LLM (default model from config) | |
| hf_client, LLM_NAME = init_hf_llm(CONFIG["hf_model"] if "hf_model" in CONFIG else None) | |
| ollama_llm = None | |
| agent_executor = None | |
| else: | |
| from helpers_LOCAL import ( | |
| init_ollama_llm, ollama_generate_response, run_agent, | |
| create_langchain_agent, get_ollama_llm, get_local_llm_name, get_agent_executor | |
| ) | |
| # Initialize Ollama LLM | |
| ollama_llm, LLM_NAME = init_ollama_llm() | |
| hf_client = None | |
| # Create directories | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| os.makedirs(EMBEDDINGS_DIR, exist_ok=True) | |
| # Build initial vectorstore | |
| vs = build_vectorstore() | |
| # Create agent (local only) | |
| if not IS_HF_SPACE and ollama_llm is not None: | |
| agent_executor = create_langchain_agent() | |
| else: | |
| agent_executor = None | |
| # Debug: Print initial state | |
| print(f"🐛 DEBUG: Initial vectorstore state: {vs is not None}") | |
| print(f"🐛 DEBUG: IS_HF_SPACE: {IS_HF_SPACE}") | |
| print(f"🐛 DEBUG: DATA_DIR: {DATA_DIR}") | |
| print(f"🐛 DEBUG: EMBEDDINGS_DIR: {EMBEDDINGS_DIR}") | |
| if IS_HF_SPACE: | |
| print(f"🐛 DEBUG: /data exists: {os.path.exists('/data')}") | |
| print(f"🐛 DEBUG: HF token available: {os.getenv('HF_token') is not None}") | |
| print(f"🐛 DEBUG: LLM available: {(hf_client is not None) if IS_HF_SPACE else (ollama_llm is not None)}") | |
| # ============================================================================ | |
| # FASTAPI APP (FastAPI is only used for local runs, not on HuggingFace Spaces) | |
| # ============================================================================ | |
| app = FastAPI(title="RAG Chatbot API") | |
| class Prompt(BaseModel): | |
| prompt: str | |
| # get_pdf_list() returns list of pdf files | |
| def list_pdfs(): | |
| return {"pdfs": get_pdf_list()} | |
| # The @app.post("/upload") decorator in FastAPI means that the upload_pdf function will handle HTTP POST requests sent to the /upload endpoint. | |
| # read pdf files and save to DATA_DIR, and rebuild vectorstore (embeddins completed here) | |
| async def upload_pdf(file: UploadFile = File(...)): | |
| if not file.filename or not file.filename.endswith(".pdf"): | |
| raise HTTPException(status_code=400, detail="Only PDFs allowed.") | |
| filepath = os.path.join(DATA_DIR, file.filename) | |
| with open(filepath, "wb") as f: | |
| f.write(await file.read()) | |
| build_vectorstore(force_rebuild=True) | |
| return {"message": f"Added {file.filename}. Embeddings updated."} | |
| # deletes selected filename, and updates vectorstore | |
| def delete_pdf(filename: str): | |
| if filename not in get_pdf_list(): | |
| raise HTTPException(status_code=404, detail="PDF not found.") | |
| filepath = os.path.join(DATA_DIR, filename) | |
| os.remove(filepath) | |
| build_vectorstore(force_rebuild=True) | |
| return {"message": f"Deleted {filename}. Embeddings updated."} | |
| # docs includes relevant documents retrieved from vectorstore | |
| # full_prompt is the augmented prompt with context | |
| def generate_response(prompt: Prompt): | |
| global vs | |
| vs = get_vectorstore() | |
| if vs is None: | |
| raise HTTPException(status_code=400, detail="No PDFs loaded.") | |
| # Retrieve relevant docs (limit context size) | |
| retriever = vs.as_retriever(search_kwargs={"k": CONFIG["search_k"]}) | |
| docs = retriever.invoke(prompt.prompt) | |
| # Use all retrieved chunks | |
| context = "\n\n".join([doc.page_content for doc in docs]) | |
| # Augment prompt | |
| full_prompt = ( | |
| "Answer the following question based ONLY on the context provided below.\n" | |
| "If the answer is not present in the context, reply exactly with: 'I don't know.'\n" | |
| "Do NOT make up or guess any information that is not explicitly in the context.\n\n" | |
| "Your answer MUST be a concise summary, listing the main topics or key points found in the context.\n" | |
| "If the question asks for a list, provide a bulleted or numbered list.\n\n" | |
| f"Context:\n{context}\n\n" | |
| f"Question: {prompt.prompt}\n\n" | |
| "Answer:" | |
| ) | |
| try: | |
| if IS_HF_SPACE and hf_client is not None: | |
| response = hf_generate_text_response(full_prompt, context, hf_client) # temprature is set to 0 in "check hf_generate_text_response()" | |
| return {"response": response} | |
| elif not IS_HF_SPACE and ollama_llm is not None: | |
| print(f"Generating response with Ollama ({LLM_NAME})...") | |
| try: | |
| response = ollama_llm.invoke(full_prompt, temperature=0) | |
| print(f"✓ Success! Response generated.") | |
| return {"response": response} | |
| except Exception as ollama_error: | |
| print(f"❌ Ollama error: {str(ollama_error)}") | |
| return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."} | |
| else: | |
| return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."} | |
| except Exception as e: | |
| print(f"LLM failed: {str(e)}") | |
| return {"response": f"I found relevant information in your documents:\n\n{context[:CONFIG['search_content_limit']]}..."} | |
| # "force_rebuild=True" rebuils vectorstore embeddings from scratch, even if an existing vectorstore is present | |
| def refresh_embeddings(): | |
| build_vectorstore(force_rebuild=True) | |
| return {"message": "Embeddings refreshed."} | |
| # ============================================================================ | |
| # GRADIO UI FUNCTIONS | |
| # ============================================================================ | |
| # When a user selects and uploads files in the UI, Gradio automatically provides the list of uploaded file objects as the "files" argument to add_pdf. | |
| # add_pdf() is called by "file_upload.upload()" | |
| def add_pdf(files): # files: list of file objects | |
| if files is None or len(files) == 0: | |
| return ( | |
| make_pdf_dropdown(), | |
| "No files selected.", | |
| "", | |
| "\n".join(get_pdf_list()) | |
| ) | |
| start_time = datetime.now() | |
| get_elapsed = create_elapsed_timer(start_time) | |
| results = [] | |
| total_files = len(files) | |
| upload_log = [] | |
| upload_log.append(f"[{get_timestamp()}] Starting upload process for {total_files} file(s)") | |
| prev_embedded_files = get_pdf_list() | |
| for i, file_obj in enumerate(files, 1): # i starts from 1 | |
| filename = os.path.basename(file_obj.name) # basename returns the final component of a file path, which is typically the filename itself | |
| progress_percent = int((i * 2 - 1) / (total_files * 2) * 100) | |
| status_msg = f"📤 Uploading {i}/{total_files}: {filename}..." | |
| progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg) | |
| upload_log.append(f"[{get_timestamp()}] Uploading file {i}: {filename}") | |
| # Always use the list before upload until embedding completes | |
| # A dropdown is a Gradio UI component that lets users select one (or more) options from a list. | |
| # here, we use it to show the list of currently embedded PDF files. | |
| # we can select some of those files for deletion later. | |
| yield ( | |
| make_pdf_dropdown(), | |
| "\n".join(results) if results else "Starting upload...", | |
| progress_display, | |
| "\n".join(prev_embedded_files) | |
| ) | |
| try: | |
| dest_path = os.path.join(DATA_DIR, filename) | |
| shutil.copy2(file_obj.name, dest_path) # copy2 preserves file metadata (like modification time). | |
| results.append(f"✓ {filename} uploaded") # We need to return (or yield) results, so the Gradio UI can show upload progress and status messages to the user in make_pdf_dropdown() | |
| upload_log.append(f"[{get_timestamp()}] Uploading file {i} completed") | |
| progress_percent = int(((i * 2) - 1) / (total_files * 2) * 100) | |
| status_msg = f"🧠 Creating embeddings for {filename}..." | |
| progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg) | |
| upload_log.append(f"[{get_timestamp()}] Embedding file {i}: {filename}") | |
| # Do NOT update embedded files before embedding completes | |
| yield ( | |
| make_pdf_dropdown(), | |
| "\n".join(results), | |
| progress_display, | |
| "\n".join(prev_embedded_files) | |
| ) | |
| try: | |
| build_vectorstore(force_rebuild=True) | |
| results[-1] = f"✅ {filename} (uploaded & embedded)" | |
| upload_log.append(f"[{get_timestamp()}] Embedding file {i} completed") | |
| upload_log.append("") | |
| # Show progress bar after embedding completes | |
| progress_percent = int((i * 2) / (total_files * 2) * 100) | |
| status_msg = f"✅ Embedded {i}/{total_files}: {filename}" | |
| progress_display = format_progress_bar(get_elapsed(), progress_percent, status_msg) | |
| # Update embedded files to show the new file immediately after embedding completes | |
| new_embedded_files = get_pdf_list() | |
| yield ( | |
| make_pdf_dropdown(), | |
| "\n".join(results), | |
| progress_display, | |
| "\n".join(new_embedded_files) | |
| ) | |
| except Exception as embed_error: | |
| results[-1] = f"⚠️ {filename} (uploaded, embedding error: {str(embed_error)})" | |
| upload_log.append(f"[{get_timestamp()}] Embedding file {i} failed") | |
| upload_log.append("") | |
| completed_progress = int((i * 2) / (total_files * 2) * 100) | |
| status_msg = f"⚠️ File {i}/{total_files} completed with error: {filename}" | |
| progress_display = format_progress_bar(get_elapsed(), completed_progress, status_msg) | |
| # Do NOT update embedded files if embedding failed | |
| yield ( | |
| make_pdf_dropdown(), | |
| "\n".join(results), | |
| progress_display, | |
| "\n".join(get_pdf_list()) | |
| ) | |
| except Exception as e: | |
| results.append(f"❌ {filename}: {str(e)}") | |
| upload_log.append(f"[{get_timestamp()}] Uploading file {i} failed") | |
| final_message = "\n".join(results) | |
| final_progress = format_progress_bar(get_elapsed(), 100, f"🎉 All done! Processed {len(files)} file(s) successfully") | |
| upload_log.append(f"[{get_timestamp()}] All {len(files)} file(s) completed") | |
| # Only show fully embedded files in the Available Embedded Files window | |
| # Reset the progress bar to its original empty state after completion (like delete) | |
| yield ( | |
| make_pdf_dropdown(), | |
| final_message, | |
| "", | |
| "\n".join(get_pdf_list()) | |
| ) | |
| # yield: "Send this update to the UI, but keep the function alive for possible further steps." | |
| # return: "Stop here; no more updates or actions are needed." | |
| def delete_pdf_ui(selected_pdf): | |
| import time | |
| available_files = get_pdf_list() | |
| if not available_files: | |
| # Disable delete button if no files are available | |
| yield make_pdf_dropdown(disabled=True), "No embedded files to delete.", "" | |
| return | |
| if not selected_pdf: | |
| # Hide overlay if nothing to delete | |
| yield make_pdf_dropdown(), "\n".join(available_files), "" | |
| return | |
| # Show progress bar immediately on click | |
| bar = format_progress_bar("", 0, "Preparing to delete files...") | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar | |
| # Support both single and multiple selection | |
| if isinstance(selected_pdf, str): | |
| selected_files = [selected_pdf] | |
| else: | |
| selected_files = list(selected_pdf) | |
| total_files = len(selected_files) | |
| for idx, file_name in enumerate(selected_files, 1): | |
| file_path = os.path.join(DATA_DIR, file_name) | |
| # Remove file and all leftovers (e.g., embeddings) before advancing progress | |
| deleted = False | |
| leftovers_removed = False | |
| # Remove file | |
| if os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| deleted = True | |
| except Exception: | |
| deleted = False | |
| # Remove leftovers (add your per-file embedding removal logic here if needed) | |
| # Example: remove embedding file if it exists (customize as needed) | |
| embedding_path = os.path.join(EMBEDDINGS_DIR, file_name + ".embedding") | |
| if os.path.exists(embedding_path): | |
| try: | |
| os.remove(embedding_path) | |
| leftovers_removed = True | |
| except Exception: | |
| leftovers_removed = False | |
| else: | |
| leftovers_removed = True # No leftovers to remove | |
| # Only advance progress bar after both file and leftovers are deleted | |
| if deleted and leftovers_removed: | |
| build_vectorstore(force_rebuild=True) | |
| percent = int(idx / total_files * 100) if total_files else 100 | |
| bar = format_progress_bar("", percent, f"Deleted {idx}/{total_files}: {file_name}") | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar | |
| else: | |
| bar = format_progress_bar("", int(idx / total_files * 100) if total_files else 100, f"⚠️ Error deleting {file_name}") | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar | |
| time.sleep(0.2) | |
| # Clear progress bar after all deletions | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), "" | |
| # Enables "Delete All" button only if there are files to delet, otherwise disables it | |
| def toggle_delete_all_btn(): | |
| # Check if there is at least one file in Available Embedded Files | |
| files = get_pdf_list() | |
| return gr.update(interactive=bool(files)) | |
| def delete_all_files(): | |
| import time | |
| all_files = get_pdf_list() | |
| if not all_files: | |
| bar = format_progress_bar("", 0, "No files to delete.") | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar | |
| return | |
| bar = format_progress_bar("", 0, "Preparing to delete all files...") | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar | |
| total_files = len(all_files) | |
| for idx, file_name in enumerate(all_files, 1): | |
| file_path = os.path.join(DATA_DIR, file_name) | |
| deleted = False | |
| leftovers_removed = False | |
| if os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| deleted = True | |
| except Exception: | |
| deleted = False | |
| embedding_path = os.path.join(EMBEDDINGS_DIR, file_name + ".embedding") | |
| if os.path.exists(embedding_path): | |
| try: | |
| os.remove(embedding_path) | |
| leftovers_removed = True | |
| except Exception: | |
| leftovers_removed = False | |
| else: | |
| leftovers_removed = True | |
| if deleted and leftovers_removed: | |
| build_vectorstore(force_rebuild=True) | |
| percent = int(idx / total_files * 100) if total_files else 100 | |
| bar = format_progress_bar("", percent, f"Deleted {idx}/{total_files}: {file_name}") | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar | |
| else: | |
| bar = format_progress_bar("", int(idx / total_files * 100) if total_files else 100, f"⚠️ Error deleting {file_name}") | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), bar | |
| time.sleep(0.2) | |
| yield make_pdf_dropdown(), "\n".join(get_pdf_list()), "" | |
| def show_delete_all_warning(): | |
| return ( | |
| gr.Markdown("**⚠️ Are you sure you want to delete ALL files? This cannot be undone. Click 'Confirm Delete All' to proceed.**", visible=True), | |
| gr.update(interactive=True, visible=True) | |
| ) | |
| def hide_delete_all_warning(): | |
| return ( | |
| gr.Markdown(visible=False), | |
| gr.update(interactive=False, visible=False) | |
| ) | |
| def analyze_query_and_use_tools(query: str) -> str: | |
| """Analyze query and use appropriate tools to gather information.""" | |
| query_lower = query.lower() | |
| results = [] | |
| # Check for memory-related queries first | |
| memory_keywords = ["remember", "earlier", "before", "previous", "last time", "we discussed", | |
| "you said", "i asked", "conversation", "history", "recall", "what did we"] | |
| if any(word in query_lower for word in memory_keywords): | |
| print(f"🧠 Memory query detected, fetching conversation history...") | |
| memory_result = get_memory_context(last_n=10) | |
| if memory_result and "No previous conversation" not in memory_result: | |
| results.append(f"📝 **Conversation History:**\n{memory_result}") | |
| search_result = search_memory(query) | |
| if search_result and "No conversation history" not in search_result: | |
| results.append(f"🔍 **Relevant Past Discussions:**\n{search_result}") | |
| if results: | |
| return "\n\n".join(results) | |
| # Try using LangGraph agent (local only) | |
| if not IS_HF_SPACE and agent_executor is not None: | |
| agent_result = run_agent(query) | |
| if agent_result: | |
| return agent_result | |
| # Fallback: Manual tool routing | |
| try: | |
| if any(word in query_lower for word in ["what documents", "list documents", "available documents", "what files", "documents do i have"]): | |
| results.append(list_documents.invoke({})) | |
| if any(word in query_lower for word in ["how many", "count", "number of documents"]): | |
| results.append(count_documents.invoke({})) | |
| results.append(search_documents.invoke({"query": query})) | |
| return "\n\n".join(results) if results else "No relevant information found." | |
| except Exception as e: | |
| return f"Error analyzing query: {str(e)}" | |
| def chat_response(message, history): | |
| """Agent-enhanced chat response function with visual progress tracking.""" | |
| global vs | |
| if not message: | |
| return history, "", "💬 Ready for your question" | |
| start_time = datetime.now() | |
| get_elapsed = create_elapsed_timer(start_time) | |
| if not isinstance(history, list): | |
| history = [] | |
| history.append({"role": "user", "content": str(message)}) | |
| add_to_memory("user", message) | |
| try: | |
| yield (history, "", format_progress_bar(get_elapsed(), 33, "🔍 Analyzing your question...", bar_length=15)) | |
| print(f"🤖 Agent analyzing query: {message}") | |
| try: | |
| pdf_files = get_pdf_list() | |
| print(f"🐛 DEBUG: PDF files available: {len(pdf_files)} - {pdf_files}") | |
| print(f"🐛 DEBUG: Global vectorstore state: {get_vectorstore() is not None}") | |
| except Exception as debug_error: | |
| print(f"🐛 DEBUG ERROR: {str(debug_error)}") | |
| try: | |
| tool_results = analyze_query_and_use_tools(message) | |
| print(f"🔧 Tool results: {tool_results[:100]}...") | |
| except Exception as tool_error: | |
| error_msg = f"❌ Tool execution failed: {str(tool_error)}" | |
| print(error_msg) | |
| history.append({"role": "assistant", "content": error_msg}) | |
| yield (history, "", f"{get_elapsed()} | [100%] ❌ Error during tool execution") | |
| return | |
| yield (history, "", format_progress_bar(get_elapsed(), 66, "🧠 Generating intelligent response...", bar_length=15)) | |
| try: | |
| memory_context = get_memory_context(last_n=5) | |
| llm_prompt = f""" | |
| You are a helpful AI assistant with access to both the user's previous conversations and relevant document excerpts. | |
| When answering, always: | |
| - Use information from the provided conversation history and document excerpts. | |
| - If the answer is not found in either, reply with: "I don't know." | |
| - Do not make up or guess information. | |
| - Provide concise, accurate, and context-aware answers. | |
| - If the question asks for a list, use bullet points or numbers. | |
| Conversation History: | |
| {memory_context} | |
| Document Excerpts: | |
| {tool_results} | |
| Question: {message} | |
| Answer: | |
| """ | |
| if IS_HF_SPACE and hf_client is not None: | |
| result = hf_generate_chat_response(llm_prompt, hf_client) | |
| if result is None: | |
| result = tool_results | |
| elif not IS_HF_SPACE and ollama_llm is not None: | |
| result = ollama_generate_response(llm_prompt, ollama_llm) | |
| if result is None: | |
| result = tool_results | |
| else: | |
| result = tool_results | |
| print("ℹ️ No LLM available, returning tool results") | |
| except Exception as llm_error: | |
| print(f"❌ LLM processing error: {str(llm_error)}") | |
| result = tool_results | |
| result_str = str(result.content) if hasattr(result, 'content') else str(result) | |
| history.append({"role": "assistant", "content": result_str}) | |
| add_to_memory("assistant", result_str) | |
| yield (history, "", format_progress_bar(get_elapsed(), 100, "✅ Response generated successfully!", bar_length=15)) | |
| # Reset AI Processing Progress to original state | |
| yield (history, "", "💬 Ready for your question") | |
| except Exception as e: | |
| error_msg = f"🚫 System error: {str(e)}\n\nPlease try again or upload your documents again." | |
| print(f"💥 CRITICAL ERROR: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| history.append({"role": "assistant", "content": error_msg}) | |
| yield (history, "", f"{get_elapsed()} | [100%] ❌ System error occurred") | |
| def refresh_embeddings_ui(): | |
| """Refresh embeddings directly""" | |
| try: | |
| build_vectorstore(force_rebuild=True) | |
| return make_pdf_dropdown(), "Embeddings refreshed." | |
| except Exception as e: | |
| return make_pdf_dropdown(), f"Error refreshing embeddings: {str(e)}" | |
| def clear_chat_and_memory(): | |
| """Clear chat history and conversation memory.""" | |
| clear_memory() | |
| return [], "", "💬 Chat and memory cleared. Ready for your question" | |
| # ============================================================================ | |
| # GRADIO UI | |
| # ============================================================================ | |
| ENV_NAME = "🌐 HuggingFace Space" if IS_HF_SPACE else "💻 Local Environment" | |
| ENV_COLOR = "#FF6B6B" if IS_HF_SPACE else "#4ECDC4" | |
| with gr.Blocks(title="RAG Agent Chatbot") as demo: | |
| gr.Markdown(f"# 🤖 RAG Agent - AI Assistant with Tools\nUpload PDFs and interact with an intelligent agent that can search, analyze, and answer questions about your documents.") | |
| if not IS_HF_SPACE: | |
| from helpers_LOCAL import get_installed_llms, init_ollama_llm, create_langchain_agent | |
| llm_choices = get_installed_llms() | |
| if llm_choices: | |
| llm_dropdown = gr.Dropdown( | |
| label="Select Local LLM", | |
| choices=llm_choices, | |
| value=LLM_NAME if LLM_NAME in llm_choices else (llm_choices[0] if llm_choices else None), | |
| interactive=True, | |
| visible=True | |
| ) | |
| current_llm_display = gr.Markdown(f"**Current LLM:** {LLM_NAME if LLM_NAME else ''}", elem_id="current-llm-display", visible=True) | |
| top_banner = gr.Markdown( | |
| f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>{LLM_NAME if LLM_NAME else 'None'}</span> | Agent: ✅ Active</div>", | |
| elem_id="top-llm-banner" | |
| ) | |
| def update_llm(selected_label): | |
| global ollama_llm, LLM_NAME, agent_executor | |
| if selected_label: | |
| try: | |
| ollama_llm, LLM_NAME = init_ollama_llm(selected_label) | |
| agent_executor = create_langchain_agent() | |
| banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>{selected_label}</span> | Agent: ✅ Active</div>" | |
| return ( | |
| gr.Markdown(f"**Current LLM:** {selected_label}", elem_id="current-llm-display"), | |
| banner_html | |
| ) | |
| except Exception as e: | |
| ollama_llm = None | |
| LLM_NAME = None | |
| agent_executor = None | |
| banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>" | |
| return ( | |
| gr.Markdown(f"**Current LLM:** (Error initializing {selected_label})", elem_id="current-llm-display"), | |
| banner_html | |
| ) | |
| banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>" | |
| return gr.Markdown("", elem_id="current-llm-display"), banner_html | |
| llm_dropdown.change( | |
| fn=lambda label: update_llm(label), | |
| inputs=[llm_dropdown], | |
| outputs=[current_llm_display, top_banner] | |
| ) | |
| else: | |
| gr.Markdown( | |
| "<div style='background-color: #ffcccc; padding: 10px; border-radius: 5px; text-align: center; color: #b30000; font-weight: bold;'>⚠ <b>No local LLMs are installed.</b> Please install an Ollama model to enable LLM selection and chat capabilities.</div>" | |
| ) | |
| llm_dropdown = gr.Dropdown( | |
| label="Select Local LLM", | |
| choices=[], | |
| value=None, | |
| interactive=False, | |
| visible=True | |
| ) | |
| current_llm_display = gr.Markdown(f"**Current LLM:** None", elem_id="current-llm-display", visible=True) | |
| gr.Markdown(f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>") | |
| else: | |
| # --- Hugging Face Space: dynamic LLM selection --- | |
| # Static list of free, popular LLMs on HF Inference API (can be expanded) | |
| hf_llm_choices = [ | |
| "Llama-3.2-3B-Instruct", | |
| "mistralai/Mistral-7B-Instruct-v0.2", | |
| "meta-llama/Meta-Llama-3-8B-Instruct", | |
| "google/gemma-7b-it", | |
| "HuggingFaceH4/zephyr-7b-beta", | |
| "Qwen/Qwen1.5-7B-Chat", | |
| "tiiuae/falcon-7b-instruct" | |
| ] | |
| default_llm = "Llama-3.2-3B-Instruct" | |
| llm_dropdown = gr.Dropdown( | |
| label="Select HF LLM", | |
| choices=hf_llm_choices, | |
| value=LLM_NAME if LLM_NAME in hf_llm_choices else default_llm, | |
| interactive=True, | |
| visible=True | |
| ) | |
| current_llm_display = gr.Markdown(f"**Current LLM:** {LLM_NAME if LLM_NAME else default_llm}", elem_id="current-llm-display", visible=True) | |
| top_banner = gr.Markdown( | |
| f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>{LLM_NAME if LLM_NAME else default_llm}</span> | Agent: ✅ Active</div>", | |
| elem_id="top-llm-banner" | |
| ) | |
| def update_hf_llm(selected_label): | |
| global hf_client, LLM_NAME | |
| from helpers_HF import init_hf_llm | |
| if selected_label: | |
| try: | |
| hf_client, LLM_NAME = init_hf_llm(selected_label) | |
| banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>{selected_label}</span> | Agent: ✅ Active</div>" | |
| return ( | |
| gr.Markdown(f"**Current LLM:** {selected_label}", elem_id="current-llm-display"), | |
| banner_html | |
| ) | |
| except Exception as e: | |
| hf_client = None | |
| LLM_NAME = None | |
| banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>" | |
| return ( | |
| gr.Markdown(f"**Current LLM:** (Error initializing {selected_label})", elem_id="current-llm-display"), | |
| banner_html | |
| ) | |
| banner_html = f"<div style='background-color: {ENV_COLOR}; padding: 10px; border-radius: 5px; text-align: center; color: white; font-weight: bold;'>Running on: {ENV_NAME} | LLM: <span id='llm-name'>None</span> | Agent: ❌ Inactive</div>" | |
| return gr.Markdown("", elem_id="current-llm-display"), banner_html | |
| llm_dropdown.change( | |
| fn=lambda label: update_hf_llm(label), | |
| inputs=[llm_dropdown], | |
| outputs=[current_llm_display, top_banner] | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📁 File Management") | |
| if IS_HF_SPACE and STORAGE_WARNING: | |
| gr.Markdown(f"**Storage Status:** {STORAGE_WARNING}") | |
| file_upload = gr.File( | |
| label="Upload Files (Multiple files supported)", | |
| file_types=[".pdf"], | |
| file_count="multiple" | |
| ) | |
| upload_status = gr.Textbox( | |
| label="Upload Status", | |
| value="", | |
| interactive=False, | |
| lines=8, | |
| max_lines=8, | |
| autoscroll=True | |
| ) | |
| with gr.Row(): | |
| progress_bar = gr.Textbox( | |
| label="Uploading Progress", | |
| value="", | |
| interactive=False, | |
| lines=1, | |
| max_lines=1, | |
| autoscroll=True | |
| ) | |
| delete_progress_bar = gr.Textbox( | |
| label="Deleting Progress", | |
| value="", | |
| interactive=False, | |
| lines=1, | |
| max_lines=1, | |
| autoscroll=True | |
| ) | |
| embedded_files = gr.Textbox( | |
| label="Available Embedded Files", | |
| value="\n".join(get_pdf_list()), | |
| interactive=False, | |
| lines=8, | |
| max_lines=8, | |
| autoscroll=True | |
| ) | |
| with gr.Row(): | |
| pdf_dropdown = gr.Dropdown( | |
| label="Select Files to Delete", | |
| choices=get_pdf_list_ui(), | |
| interactive=True, | |
| allow_custom_value=False, | |
| multiselect=True | |
| ) | |
| delete_btn = gr.Button("🗑️ Delete Files", variant="stop", interactive=False) | |
| delete_all_btn = gr.Button("🗑️ Delete All", variant="stop", interactive=bool(get_pdf_list())) | |
| delete_all_warning = gr.Markdown(visible=False) | |
| confirm_delete_all_btn = gr.Button("Confirm Delete All", variant="stop", interactive=True, visible=False) | |
| delete_progress_overlay = gr.HTML(floating_progress_bar_html()) | |
| with gr.Column(scale=4): | |
| gr.Markdown("### 🤖 AI Agent Chat") | |
| gr.Markdown("**Agent Capabilities:** Search documents, list files, count documents, intelligent reasoning") | |
| chatbot = gr.Chatbot(height=CONFIG["chatbot_height"], layout="bubble") | |
| if IS_HF_SPACE and not HAS_PERSISTENT_STORAGE: | |
| gr.Markdown("⚠️ **Storage Notice:** Files are temporary and will be lost when Space restarts. To enable persistent storage, upgrade to a paid plan in Settings → Hardware.") | |
| gr.Markdown("**🛠️ Agent Commands - Try these tool-powered queries:**") | |
| with gr.Row(): | |
| sample1 = gr.Button(f"📋 {SAMPLE_Q1}", size="sm") | |
| sample2 = gr.Button(f"🔍 {SAMPLE_Q2}", size="sm") | |
| sample3 = gr.Button(f"📊 {SAMPLE_Q3}", size="sm") | |
| with gr.Row(): | |
| sample4 = gr.Button(f"🧠 {SAMPLE_Q4}", size="sm") | |
| sample5 = gr.Button(f"🍳 {SAMPLE_Q5}", size="sm") | |
| sample6 = gr.Button(f"🧠 {SAMPLE_Q6}", size="sm") | |
| sample7 = gr.Button(f"📝 {SAMPLE_Q7}", size="sm") | |
| msg_input = gr.Textbox( | |
| placeholder="Ask a question about your PDFs...", | |
| label="Ask about your PDFs", | |
| show_label=False | |
| ) | |
| ai_status = gr.Textbox( | |
| label="📊 AI Processing Progress", | |
| value="💬 Ready for your question", | |
| interactive=False, | |
| placeholder="AI processing status with progress tracking..." | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Send", variant="primary", scale=1, interactive=False) | |
| clear_btn = gr.Button("Clear", scale=1, interactive=False) | |
| # Event handlers | |
| # outputs=[pdf_dropdown, upload_status, progress_bar, embedded_files] means: | |
| # that the function add_pdf will return (or yield) a tuple of four values, and each value will be used to update the corresponding UI component | |
| # here for example "embedded_files" in a UI Textbox component eith the label "Available Embedded Files" | |
| file_upload.upload( | |
| fn=add_pdf, | |
| inputs=[file_upload], | |
| outputs=[pdf_dropdown, upload_status, progress_bar, embedded_files] | |
| ) | |
| delete_btn.click( | |
| fn=delete_pdf_ui, | |
| inputs=[pdf_dropdown], | |
| outputs=[pdf_dropdown, embedded_files, delete_progress_bar] | |
| ) | |
| delete_all_btn.click( | |
| fn=show_delete_all_warning, | |
| inputs=[], | |
| outputs=[delete_all_warning, confirm_delete_all_btn] | |
| ) | |
| # Update Delete All button state when files change | |
| demo.load(fn=toggle_delete_all_btn, outputs=[delete_all_btn]) | |
| pdf_dropdown.change(fn=toggle_delete_all_btn, outputs=[delete_all_btn]) | |
| embedded_files.change(fn=toggle_delete_all_btn, outputs=[delete_all_btn]) | |
| confirm_delete_all_btn.click( | |
| fn=delete_all_files, | |
| inputs=[], | |
| outputs=[pdf_dropdown, embedded_files, delete_progress_bar] | |
| ).then( | |
| fn=hide_delete_all_warning, | |
| inputs=[], | |
| outputs=[delete_all_warning, confirm_delete_all_btn] | |
| ) | |
| # Enable/disable delete button based on selection | |
| def toggle_delete_btn(selected): | |
| # Enable if there are files in embedded_files, disable otherwise | |
| files_exist = bool(get_pdf_list()) | |
| return gr.update(interactive=files_exist) | |
| # Update delete button state on both dropdown change and embedded_files change | |
| pdf_dropdown.change( | |
| fn=toggle_delete_btn, | |
| inputs=[pdf_dropdown], | |
| outputs=[delete_btn] | |
| ) | |
| embedded_files.change( | |
| fn=lambda _: gr.update(interactive=bool(get_pdf_list())), | |
| inputs=[embedded_files], | |
| outputs=[delete_btn] | |
| ) | |
| demo.load(fn=lambda: gr.update(interactive=bool(get_pdf_list())), outputs=[delete_btn]) | |
| demo.load(fn=lambda: "\n".join(get_pdf_list()), outputs=[embedded_files]) | |
| # Ensure embedded_files is updated on app start | |
| demo.load(fn=lambda: "\n".join(get_pdf_list()), outputs=[embedded_files]) | |
| # Sample question handlers | |
| sample_buttons = [sample1, sample2, sample3, sample4, sample5, sample6, sample7] | |
| sample_questions = [SAMPLE_Q1, SAMPLE_Q2, SAMPLE_Q3, SAMPLE_Q4, SAMPLE_Q5, SAMPLE_Q6, SAMPLE_Q7] | |
| for btn, question in zip(sample_buttons, sample_questions): | |
| btn.click(fn=lambda q=question: q, outputs=[msg_input]) | |
| msg_input.submit( | |
| fn=chat_response, | |
| inputs=[msg_input, chatbot], | |
| outputs=[chatbot, msg_input, ai_status] | |
| ) | |
| submit_btn.click( | |
| fn=chat_response, | |
| inputs=[msg_input, chatbot], | |
| outputs=[chatbot, msg_input, ai_status] | |
| ) | |
| # Enable/disable send button based on input | |
| def toggle_send_btn(text): | |
| return gr.update(interactive=bool(text and text.strip())) | |
| msg_input.change( | |
| fn=toggle_send_btn, | |
| inputs=[msg_input], | |
| outputs=[submit_btn] | |
| ) | |
| clear_btn.click( | |
| fn=clear_chat_and_memory, | |
| outputs=[chatbot, msg_input, ai_status] | |
| ) | |
| # Enable/disable clear button based on input or chat | |
| def toggle_clear_btn(text, chat): | |
| return gr.update(interactive=bool((text and text.strip()) or (chat and len(chat) > 0))) | |
| msg_input.change( | |
| fn=lambda text: toggle_clear_btn(text, chatbot.value if hasattr(chatbot, 'value') else []), | |
| inputs=[msg_input], | |
| outputs=[clear_btn] | |
| ) | |
| chatbot.change( | |
| fn=lambda chat: toggle_clear_btn(msg_input.value if hasattr(msg_input, 'value') else '', chat), | |
| inputs=[chatbot], | |
| outputs=[clear_btn] | |
| ) | |
| demo.load(fn=make_pdf_dropdown, outputs=[pdf_dropdown]) | |
| # ============================================================================ | |
| # LAUNCH application | |
| # ============================================================================ | |
| if IS_HF_SPACE: | |
| try: | |
| demo.launch( | |
| server_name=CONFIG["server_host"], | |
| server_port=CONFIG["server_port"], | |
| share=True, | |
| show_error=True, | |
| quiet=False | |
| ) | |
| except Exception as launch_error: | |
| print(f"Launch error: {launch_error}") | |
| demo.launch(server_name=CONFIG["server_host"], server_port=CONFIG["server_port"]) | |
| else: | |
| app_with_gradio = gr.mount_gradio_app(app, demo, path="/") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| import webbrowser | |
| from threading import Timer | |
| Timer(3, lambda: webbrowser.open(f"http://127.0.0.1:{CONFIG['server_port']}")).start() | |
| print("Starting server... Browser will open automatically in 3 seconds.") | |
| uvicorn.run(app_with_gradio, host=CONFIG["server_host"], port=CONFIG["server_port"]) | |