Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import faiss | |
| import pickle | |
| import pandas as pd | |
| import sqlite3 | |
| from sentence_transformers import SentenceTransformer | |
| import google.generativeai as genai | |
| from langchain_core.runnables import RunnableLambda | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| SAVE_DIR = "vector_store" | |
| DATASET_DIR = "datasets" | |
| GOOGLE_API_KEY = "AIzaSyD-iwKoPUSxGerqKjKhjvAJ3KRERpy0-18" | |
| st.set_page_config(page_title="π¬ Gemini Q&A App", layout="centered") | |
| def load_resources(): | |
| """Load FAISS index, metadata, and embedding model once.""" | |
| # Load FAISS index | |
| index = faiss.read_index(f"{SAVE_DIR}/metadata_index.faiss") | |
| # Load dataset metadata | |
| with open(f"{SAVE_DIR}/metadata_info.pkl", "rb") as f: | |
| data = pickle.load(f) | |
| # Load embedding model | |
| embedding_model = SentenceTransformer("./all-MiniLM-L6-v2") | |
| return index, data, embedding_model | |
| def load_gemini_model(): | |
| """Initialize Gemini model once.""" | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| return genai.GenerativeModel("gemini-2.5-flash") | |
| index, data, embedding_model = load_resources() | |
| gemini_model = load_gemini_model() | |
| conn = sqlite3.connect(":memory:") | |
| datasets_list = data["datasets_list"] | |
| metadata_texts = data["metadata_texts"] | |
| dataset_names = data["dataset_names"] | |
| dataset_links = data["source_list"] | |
| def similarity_search(query: str): | |
| """Find most relevant dataset for a given query using vector similarity.""" | |
| query_embedding = embedding_model.encode([query]).astype("float32") | |
| D, I = index.search(query_embedding, k=1) | |
| best_idx = int(I[0][0]) | |
| dataset_name = datasets_list[best_idx] | |
| meta = metadata_texts[best_idx] | |
| df = pd.read_csv(f"{DATASET_DIR}/{dataset_name}", encoding="latin1") | |
| columns = df.columns.tolist() | |
| link = meta["table_source"] | |
| columns_info = meta['canonical_schema'][1] | |
| sample_rows = {col: df[col].head(6).tolist() for col in df.columns} | |
| return dataset_name, { | |
| "columns": columns, | |
| "columns_info": columns_info, | |
| "sample_rows": sample_rows, | |
| }, link | |
| def execute_sql(dataset_name: str, command: str) -> str: | |
| """Run SQL query on selected dataset.""" | |
| try: | |
| df = pd.read_csv(f"{DATASET_DIR}/{dataset_name}", encoding="latin1") | |
| df.to_sql("selected_table", conn, index=False, if_exists="replace") | |
| result = pd.read_sql_query(command, conn) | |
| return result.to_markdown(index=False) | |
| except Exception as e: | |
| return f"SQL Execution Error: {e}" | |
| llm_model = RunnableLambda( | |
| lambda x: gemini_model.generate_content(str(x)).text | |
| ) | |
| sql_prompt = PromptTemplate( | |
| input_variables=["question", "columns", "columns_info", "sample_rows"], | |
| template=""" | |
| You are an expert SQL data analyst. | |
| Your task is to write a **valid and accurate SQL query** that answers the user's question | |
| using only the information from the given table. | |
| --- | |
| ### USER QUESTION | |
| {question} | |
| ### TABLE INFORMATION | |
| Table name: selected_table | |
| Columns: | |
| {columns} | |
| Column descriptions: | |
| {columns_info} | |
| Sample rows: | |
| {sample_rows} | |
| --- | |
| ### RULES | |
| - Use **only** the given columns and table name. Do NOT invent or assume new columns. | |
| - Be careful with **spelling and case sensitivity** β match column names exactly. | |
| - Prefer general operators like `LIKE` or `BETWEEN` instead of exact equality if unsure about values. | |
| - For text filters, wrap string literals in single quotes `'like this'`. | |
| - If an aggregation (SUM, AVG, COUNT, MAX, MIN) is clearly implied, include it. | |
| - Avoid selecting unnecessary columns β keep output relevant and concise. | |
| - Do NOT include explanations, markdown formatting, or comments. | |
| - Return **only** the SQL query as plain text (no ```sql fences, no prose). | |
| --- | |
| ### OUTPUT | |
| Return only one SQL query that directly answers the user's question. | |
| """ | |
| ) | |
| final_prompt = PromptTemplate( | |
| input_variables=["question", "answer", "link"], | |
| template=""" | |
| You are a precise and factual data assistant. | |
| You are given: | |
| 1. A user's question. | |
| 2. The SQL query result. | |
| 3. The dataset's link. | |
| Your goal is to provide a clear, natural-language answer in a few sentences. | |
| --- | |
| ### QUESTION | |
| {question} | |
| ### SQL RESULT | |
| {answer} | |
| ### DATA SOURCE | |
| {link} | |
| --- | |
| ### INSTRUCTIONS | |
| - If the SQL result is empty, respond exactly with: | |
| Sorry, the information related to your question is not available in the current dataset. | |
| (No link in this case.) | |
| - Otherwise: | |
| 1. Summarize the insight naturally. | |
| 2. Then, start a **new line** and include this line exactly: | |
| Source of information: Government of India Open Data Portal β {link} | |
| --- | |
| ### OUTPUT | |
| Return only the final answer. | |
| """ | |
| ) | |
| parser = StrOutputParser() | |
| sql_chain = sql_prompt | llm_model | parser | |
| final_chain = final_prompt | llm_model | parser | |
| def get_output(query: str) -> str: | |
| """Run complete pipeline: find dataset β generate SQL β execute β explain.""" | |
| dataset_name, llm_input, link = similarity_search(query) | |
| llm_input["question"] = query | |
| sql_query = sql_chain.invoke(llm_input) | |
| sql_result = execute_sql(dataset_name, sql_query) | |
| final_response = final_chain.invoke({"question": query, "answer": sql_result, "link": link}) | |
| return final_response | |
| # ============================== | |
| # STREAMLIT UI | |
| # ============================== | |
| st.title("π¬ Samarth Q&A App π§βπ»") | |
| st.write("Ask questions about Agriculture β Samarth gives answer from official goverment data sources.") | |
| user_query = st.text_area("Enter your question:") | |
| if st.button("Submit"): | |
| if not user_query.strip(): | |
| st.warning("Please enter a question.") | |
| else: | |
| with st.spinner("Generating response..."): | |
| try: | |
| response = get_output(user_query) | |
| st.markdown("### β Response") | |
| st.success(response) | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |