AldawsariNLP's picture
pushing last changes - dockerignore ...9
4c59598
import os
import json
import time
import pickle
import hashlib
import re
from pathlib import Path
from typing import List, Tuple, Optional, Dict
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
try:
from backend.embeddings import get_embeddings_wrapper
from backend.document_processor import DocumentProcessor
from backend.chat_history import ChatHistory
except ModuleNotFoundError:
from embeddings import get_embeddings_wrapper
from document_processor import DocumentProcessor
from chat_history import ChatHistory
from openai import OpenAI
import httpx
import tiktoken
# Try to import EnsembleRetriever and BM25Retriever for hybrid search
try:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
ENSEMBLE_AVAILABLE = True
except ImportError:
ENSEMBLE_AVAILABLE = False
EnsembleRetriever = None
BM25Retriever = None
class NoProxyHTTPClient(httpx.Client):
def __init__(self, *args, **kwargs):
kwargs.pop("proxies", None)
super().__init__(*args, **kwargs)
class RAGSystem:
"""RAG system that indexes summaries and retrieves full text from JSON"""
def __init__(self, vectorstore_path: str = "vectorstore", json_path: Optional[str] = None, openai_api_key: Optional[str] = None):
self.vectorstore_path = vectorstore_path
# JSON path relative to project root
if json_path is None:
project_root = Path(__file__).resolve().parents[1]
self.json_path = str(project_root / "processed_documents.json")
else:
self.json_path = json_path
self.vectorstore = None
# Chunk vectorstores directory path
if json_path is None:
project_root = Path(__file__).resolve().parents[1]
self.chunk_vectorstores_path = str(project_root / "chunk_vectorstores")
else:
project_root = Path(json_path).parent
self.chunk_vectorstores_path = str(project_root / "chunk_vectorstores")
# Create directory if it doesn't exist
os.makedirs(self.chunk_vectorstores_path, exist_ok=True)
# Initialize embeddings (supports OpenAI or HuggingFace based on EMBEDDINGS_PROVIDER env var)
provider = os.getenv("EMBEDDINGS_PROVIDER", "openai").lower()
if provider in ["huggingface", "hf", "nebius"]:
# For HuggingFace, use HF_TOKEN
embeddings_api_key = os.getenv("HF_TOKEN")
if not embeddings_api_key:
raise ValueError("HF_TOKEN is required for HuggingFace embeddings. Set HF_TOKEN environment variable.")
else:
# For OpenAI, use OPENAI_API_KEY
embeddings_api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
if not embeddings_api_key:
raise ValueError("OpenAI API key is required. Set OPENAI_API_KEY environment variable.")
print(f"[RAGSystem] Using embeddings provider: {provider}")
self.embeddings = get_embeddings_wrapper(api_key=embeddings_api_key)
# Initialize document processor (always uses OpenAI for LLM processing)
openai_api_key_for_processor = openai_api_key or os.getenv("OPENAI_API_KEY")
if not openai_api_key_for_processor:
raise ValueError("OpenAI API key is required for document processing. Set OPENAI_API_KEY environment variable.")
self.processor = DocumentProcessor(api_key=openai_api_key_for_processor)
# Initialize LLM client for answering questions
os.environ.setdefault("OPENAI_API_KEY", openai_api_key_for_processor)
http_client = NoProxyHTTPClient(timeout=60.0)
self.llm_client = OpenAI(http_client=http_client)
self.llm_model = os.getenv("OPENAI_LLM_MODEL", "gpt-4o-mini")
# Chat history manager
self.chat_history = ChatHistory(max_history=int(os.getenv("CHAT_HISTORY_TURNS", "10")))
# Cache for JSON file contents and document texts
self._json_cache = None
self._json_cache_path = None
self._text_cache: Dict[str, str] = {} # Cache for document texts by filename
# Cache for per-document chunk vectorstores: {filename: {\"vectorstore\": FAISS, \"chunks\": List[Document]}}
self._chunk_cache: Dict[str, Dict[str, object]] = {}
# Hybrid search configuration (EnsembleRetriever: BM25 + Semantic)
self.use_hybrid_search = os.getenv("USE_HYBRID_SEARCH", "false").lower() == "true"
self.hybrid_weights = [
float(os.getenv("HYBRID_BM25_WEIGHT", "0.5")), # BM25 weight
float(os.getenv("HYBRID_SEMANTIC_WEIGHT", "0.5")) # Semantic (FAISS) weight
]
# Normalize weights to sum to 1.0
total_weight = sum(self.hybrid_weights)
if total_weight > 0:
self.hybrid_weights = [w / total_weight for w in self.hybrid_weights]
if self.use_hybrid_search:
if not ENSEMBLE_AVAILABLE:
print("[RAGSystem] WARNING: USE_HYBRID_SEARCH enabled but EnsembleRetriever not available. Disabling hybrid search.")
self.use_hybrid_search = False
else:
print(f"[RAGSystem] Hybrid search enabled (BM25: {self.hybrid_weights[0]:.2f}, Semantic: {self.hybrid_weights[1]:.2f})")
else:
print("[RAGSystem] Hybrid search disabled (using semantic search only)")
# Try to load existing vectorstore
self._load_vectorstore()
def _load_vectorstore(self):
"""Load existing vectorstore if it exists"""
if os.path.exists(self.vectorstore_path):
try:
self.vectorstore = FAISS.load_local(
self.vectorstore_path,
embeddings=self.embeddings,
allow_dangerous_deserialization=True
)
# Ensure embedding function is properly set.
# LangChain now expects an Embeddings *object* here, not a raw function.
if not hasattr(self.vectorstore, "embedding_function") or self.vectorstore.embedding_function is None:
self.vectorstore.embedding_function = self.embeddings
# Some versions may set a non-callable placeholder; override with our wrapper.
elif not callable(getattr(self.vectorstore.embedding_function, "embed_query", None)):
self.vectorstore.embedding_function = self.embeddings
print(f"Loaded existing vectorstore from {self.vectorstore_path}")
except Exception as e:
print(f"Could not load existing vectorstore: {e}")
self.vectorstore = None
def _count_tokens(self, messages: List[Dict[str, str]]) -> int:
"""Count tokens in messages for the current model"""
try:
encoding = tiktoken.encoding_for_model(self.llm_model)
except KeyError:
# Fallback to cl100k_base for unknown models
encoding = tiktoken.get_encoding("cl100k_base")
tokens = 0
for message in messages:
tokens += 4 # Every message follows <im_start>{role/name}\n{content}<im_end>\n
for key, value in message.items():
tokens += len(encoding.encode(str(value)))
if key == "name": # If there's a name, the role is omitted
tokens -= 1 # Role is always required and always 1 token
tokens += 2 # Every reply is primed with <im_start>assistant
return tokens
def _filter_thinking_process(self, text: str) -> str:
"""Filter out thinking process markers from Qwen model responses"""
if not text:
return text
# Quick path: remove <think>...</think> sections via regex
import re
filtered = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE)
# Remove legacy line-based markers (think>, think:)
lines = []
for line in filtered.splitlines():
strip = line.strip()
if strip.lower().startswith("think>") or strip.lower().startswith("think:"):
continue
lines.append(line)
return "\n".join(lines).strip()
def process_and_index_documents(self, documents_folder: str) -> int:
"""
Process all PDFs, save to JSON, and index summaries.
Skips already processed documents and only indexes new ones.
Returns: Number of new documents processed
"""
# Step 1: Process all PDFs with LLM (skips existing ones)
print("Processing PDFs with LLM...")
new_processed_docs = self.processor.process_all_pdfs(documents_folder, skip_existing=True)
# Step 2: Ensure vectorstore exists - build from existing documents if needed
if self.vectorstore is None:
# Load all existing documents to build initial vectorstore
all_docs = self.processor.load_from_json()
if all_docs:
print("Building vectorstore from existing processed documents...")
all_summary_docs = []
for doc in all_docs:
all_summary_docs.append(Document(
page_content=doc["summary"],
metadata={"filename": doc["filename"]}
))
self.vectorstore = FAISS.from_documents(
all_summary_docs,
embedding=self.embeddings
)
os.makedirs(self.vectorstore_path, exist_ok=True)
self.vectorstore.save_local(self.vectorstore_path)
print(f"Built vectorstore from {len(all_docs)} existing documents")
else:
print("Warning: No processed documents found in JSON file.")
# Step 3: Index summaries for new documents only
if new_processed_docs:
print("Indexing summaries for new documents...")
summary_docs = []
for doc in new_processed_docs:
summary_docs.append(Document(
page_content=doc["summary"],
metadata={"filename": doc["filename"]}
))
# Update vectorstore with new documents
if self.vectorstore is None:
print("Creating new vectorstore from new documents...")
self.vectorstore = FAISS.from_documents(
summary_docs,
embedding=self.embeddings
)
else:
print("Updating existing vectorstore with new documents...")
self.vectorstore.add_documents(summary_docs)
# Save vectorstore
os.makedirs(self.vectorstore_path, exist_ok=True)
self.vectorstore.save_local(self.vectorstore_path)
print(f"Indexed {len(new_processed_docs)} new document summaries")
else:
if self.vectorstore is not None:
print("No new documents to process. Using existing vectorstore.")
else:
print("Warning: No documents found to process or index. Vectorstore not available.")
return len(new_processed_docs)
@staticmethod
def _parse_llm_response(raw_response: str) -> Tuple[str, bool]:
"""
Parse LLM response to extract answer and found flag.
Args:
raw_response: The raw response from LLM
Returns:
Tuple of (answer text, found flag) where found=True means answer was found in context
"""
# Try to parse as JSON first
try:
# Look for JSON in the response (might be wrapped in markdown code blocks)
response_text = raw_response.strip()
# Remove markdown code blocks if present
if response_text.startswith("```json"):
response_text = response_text[7:] # Remove ```json
elif response_text.startswith("```"):
response_text = response_text[3:] # Remove ```
if response_text.endswith("```"):
response_text = response_text[:-3] # Remove closing ```
response_text = response_text.strip()
# Try to parse as JSON
parsed = json.loads(response_text)
answer = parsed.get("answer", raw_response)
found = parsed.get("found", True) # Default to True for backward compatibility
return answer, bool(found)
except (json.JSONDecodeError, ValueError) as e:
# If JSON parsing fails, return the raw response with found=True (backward compatibility)
return raw_response, True
def _load_json_cached(self) -> List[Dict[str, str]]:
"""Load JSON file with caching to avoid repeated file I/O"""
json_path = Path(self.json_path)
# Check if cache is valid (file hasn't changed)
if self._json_cache is not None and self._json_cache_path == str(json_path):
if json_path.exists():
# Check if file modification time changed
current_mtime = json_path.stat().st_mtime
if hasattr(self, '_json_cache_mtime') and self._json_cache_mtime == current_mtime:
return self._json_cache
# Load from file
if not json_path.exists():
return []
try:
with open(json_path, "r", encoding="utf-8") as f:
docs = json.load(f)
# Cache the results
self._json_cache = docs if isinstance(docs, list) else []
self._json_cache_path = str(json_path)
self._json_cache_mtime = json_path.stat().st_mtime
return self._json_cache
except Exception as e:
return []
def _get_text_by_filename_cached(self, filename: str) -> Optional[str]:
"""Get full text for a document by filename using cache"""
# Check text cache first
if filename in self._text_cache:
return self._text_cache[filename]
# Load from JSON cache
docs = self._load_json_cached()
for doc in docs:
if doc.get("filename") == filename:
text = doc.get("text", "")
# Cache the text
self._text_cache[filename] = text
return text
return None
def _sanitize_filename(self, filename: str) -> str:
"""
Sanitize filename to create a safe directory name.
Handles Arabic filenames and special characters.
Args:
filename: Original filename
Returns:
Sanitized directory name safe for filesystem
"""
# Remove extension
name_without_ext = Path(filename).stem
# Create a hash of the original filename for uniqueness
# This ensures Arabic and special characters are handled
filename_hash = hashlib.md5(filename.encode('utf-8')).hexdigest()[:8]
# Sanitize: keep alphanumeric, Arabic characters, spaces, hyphens, underscores
# Replace other special chars with underscore
sanitized = re.sub(r'[^\w\s\u0600-\u06FF\-]', '_', name_without_ext)
# Replace multiple spaces/underscores with single underscore
sanitized = re.sub(r'[\s_]+', '_', sanitized)
# Remove leading/trailing underscores
sanitized = sanitized.strip('_')
# Combine sanitized name with hash for uniqueness
if sanitized:
return f"{sanitized}_{filename_hash}"
else:
return filename_hash
def _get_chunk_vectorstore_path(self, filename: str) -> str:
"""
Get the directory path for a document's chunk vectorstore.
Args:
filename: Document filename
Returns:
Path to the directory containing the chunk vectorstore
"""
sanitized_name = self._sanitize_filename(filename)
return str(Path(self.chunk_vectorstores_path) / sanitized_name)
def _save_chunk_vectorstore(self, filename: str, vectorstore: FAISS, chunks: List[Document]) -> None:
"""
Save chunk vectorstore and chunks metadata to disk.
Args:
filename: Document filename
vectorstore: FAISS vectorstore to save
chunks: List of Document objects (chunks metadata)
"""
chunk_vs_path = self._get_chunk_vectorstore_path(filename)
os.makedirs(chunk_vs_path, exist_ok=True)
# Save FAISS vectorstore (saves index.faiss and index.pkl)
vectorstore.save_local(chunk_vs_path)
# Save chunks metadata as pickle
chunks_path = Path(chunk_vs_path) / "chunks.pkl"
with open(chunks_path, 'wb') as f:
pickle.dump(chunks, f)
print(f"[Chunk Vectorstore] Saved chunk vectorstore ")
def _load_chunk_vectorstore(self, filename: str) -> Optional[Tuple[FAISS, List[Document]]]:
"""
Load chunk vectorstore and chunks metadata from disk if exists.
Args:
filename: Document filename
Returns:
Tuple of (FAISS vectorstore, List[Document]) if found, None otherwise
"""
chunk_vs_path = self._get_chunk_vectorstore_path(filename)
chunk_vs_path_obj = Path(chunk_vs_path)
# Check if vectorstore files exist
faiss_index = chunk_vs_path_obj / "index.faiss"
faiss_pkl = chunk_vs_path_obj / "index.pkl"
chunks_pkl = chunk_vs_path_obj / "chunks.pkl"
if not (faiss_index.exists() and faiss_pkl.exists() and chunks_pkl.exists()):
return None
try:
# Load FAISS vectorstore
vectorstore = FAISS.load_local(
chunk_vs_path,
embeddings=self.embeddings,
allow_dangerous_deserialization=True
)
# Ensure embedding function is properly set to the embeddings wrapper object.
if not hasattr(vectorstore, "embedding_function") or vectorstore.embedding_function is None:
vectorstore.embedding_function = self.embeddings
elif not callable(getattr(vectorstore.embedding_function, "embed_query", None)):
vectorstore.embedding_function = self.embeddings
# Load chunks metadata
with open(chunks_pkl, 'rb') as f:
chunks = pickle.load(f)
print(f"[Chunk Vectorstore] Loaded chunk vectorstore for '{filename}'")
return vectorstore, chunks
except Exception as e:
print(f"[Chunk Vectorstore] Error loading chunk vectorstore for '{filename}': {e}")
return None
def _get_or_build_chunk_vectorstore(
self,
filename: str,
full_text: str,
chunk_size: int = 2000,
chunk_overlap: int = 300
) -> Tuple[FAISS, List[Document]]:
"""
Build or retrieve a FAISS vectorstore of semantic chunks for a single document.
Checks disk first, then memory cache, then builds if needed.
Args:
filename: Document filename used as key in cache/metadata
full_text: Full document text to chunk
chunk_size: Approximate character length for each chunk
chunk_overlap: Overlap between consecutive chunks (characters)
Returns:
Tuple of (FAISS vectorstore over chunks, list of chunk Documents)
"""
# Step 1: Return from memory cache if available (fastest)
if filename in self._chunk_cache:
entry = self._chunk_cache[filename]
return entry["vectorstore"], entry["chunks"] # type: ignore[return-value]
# Step 2: Try to load from disk
# NOTE: Existing chunk indexes on disk may have been created with a different distance
# metric. To guarantee consistency, delete the `chunk_vectorstores/` directory so
# these indexes are rebuilt on demand with the default L2 distance metric.
loaded = self._load_chunk_vectorstore(filename)
if loaded is not None:
vectorstore, chunks = loaded
# Cache in memory for faster access
self._chunk_cache[filename] = {
"vectorstore": vectorstore,
"chunks": chunks,
}
return vectorstore, chunks
# Step 3: Build new vectorstore (not found in cache or disk)
print(f"[Chunk Vectorstore] Building new chunk vectorstore for '{filename}'")
# Create text splitter tuned for Arabic legal text
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=[
"\n\n",
"\n",
"المادة ",
"مادة ",
". ",
" ",
""
],
)
chunks = text_splitter.split_text(full_text)
chunk_docs: List[Document] = []
for idx, chunk in enumerate(chunks):
chunk_docs.append(
Document(
page_content=chunk,
metadata={
"filename": filename,
"chunk_index": idx,
},
)
)
if not chunk_docs:
# Fallback: single chunk with entire text
chunk_docs = [
Document(
page_content=full_text,
metadata={
"filename": filename,
"chunk_index": 0,
},
)
]
# Build chunk-level FAISS index (uses default L2 distance metric)
chunk_vectorstore = FAISS.from_documents(
chunk_docs,
embedding=self.embeddings
)
# Step 4: Save to disk for future use
self._save_chunk_vectorstore(filename, chunk_vectorstore, chunk_docs)
# Step 5: Cache in memory for current session
self._chunk_cache[filename] = {
"vectorstore": chunk_vectorstore,
"chunks": chunk_docs,
}
return chunk_vectorstore, chunk_docs
def _create_ensemble_retriever(
self,
chunk_vs: FAISS,
chunk_docs: List[Document],
top_k: int
) -> EnsembleRetriever:
"""
Create an ensemble retriever combining BM25 and semantic search.
Args:
chunk_vs: FAISS vectorstore for semantic search
chunk_docs: List of all chunk Documents for BM25
top_k: Number of results to return
Returns:
EnsembleRetriever combining both methods
"""
if not ENSEMBLE_AVAILABLE:
raise ImportError("EnsembleRetriever and BM25Retriever are not available. Install rank-bm25 and ensure langchain-community is up to date.")
# Create BM25 retriever from chunk documents
bm25_retriever = BM25Retriever.from_documents(chunk_docs)
bm25_retriever.k = top_k
# Create FAISS retriever
faiss_retriever = chunk_vs.as_retriever(
search_kwargs={"k": top_k}
)
# Combine them with weighted ensemble
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, faiss_retriever],
weights=self.hybrid_weights
)
return ensemble_retriever
def _build_document_candidates(
self,
question: str,
label: str,
previous_document: Optional[str],
k: int = 5
) -> List[str]:
"""
Build a list of document candidates to try based on question label.
Args:
question: The user's question
label: Question label ("law-new" or "law-followup")
previous_document: Previous document filename if available
k: Number of documents to retrieve from summary search
Returns:
List of document filenames to try in order
"""
candidate_docs: List[str] = []
if label == "law-followup" and previous_document:
# For follow-up: start with previous document, then search summary
candidate_docs.append(previous_document)
print(f"[RAG] Follow-up question: starting with previous document: {previous_document}")
# Search summary vectorstore excluding the previous document
similar_docs_with_scores = self.vectorstore.similarity_search_with_score(question, k=k+1)
# Filter out the previous document and add others
for doc, score in similar_docs_with_scores:
filename = doc.metadata.get("filename", "")
if filename != previous_document and filename not in candidate_docs:
candidate_docs.append(filename)
if len(candidate_docs) >= k + 1: # +1 because we already have previous_doc
break
if len(candidate_docs) > 1:
print(f"[RAG] Added {len(candidate_docs) - 1} additional documents from summary search")
else:
# For new questions: search summary vectorstore
similar_docs_with_scores = self.vectorstore.similarity_search_with_score(question, k=k)
if not similar_docs_with_scores:
return []
for doc, score in similar_docs_with_scores:
filename = doc.metadata.get("filename", "")
if filename:
candidate_docs.append(filename)
if candidate_docs:
print(f"[RAG] New question: found {len(candidate_docs)} candidate documents")
return candidate_docs
def _classify_question(self, question: str, use_history: bool = True, model_provider: str = "openai") -> Tuple[str, Optional[str], Optional[List[str]], Optional[List[str]]]:
"""
Classify question into one of three categories: law-new, law-followup, or general.
Args:
question: The user's question
use_history: Whether to use chat history
model_provider: Model provider to use
Returns:
Tuple of (label, answer, sources, chunks) where:
- label: "law-new", "law-followup", or "general"
- For "general": answer contains the answer string, sources=[], chunks=None
- For "law-new" or "law-followup": answer=None, sources=None, chunks=None (RAG will handle answering)
"""
# Get previous turn context for distinguishing law-new from law-followup
previous_context = ""
if use_history:
last_turn = self.chat_history.get_last_turn()
if last_turn and len(last_turn) >= 2:
prev_user = last_turn[0].get("content", "") if last_turn[0].get("role") == "user" else ""
prev_assistant = last_turn[1].get("content", "") if last_turn[1].get("role") == "assistant" else ""
if prev_user and prev_assistant:
previous_context = f"\n\nPrevious conversation:\nUser: {prev_user}\nAssistant: {prev_assistant}"
classification_prompt = f"""Classify the following question as one of: "law-new", "law-followup", or "general".
A "law-new" question is:
- A law-related question that starts a new topic/thread or asking about different system (نظام)
- Not primarily dependent on the immediately previous answer
- About legal documents, regulations, laws, articles (المادة), legal cases, procedures, terms, definitions
- Anything related to legal matters in documents, but as a new inquiry
A "law-followup" question is:
- A law-related question that is a follow-up, inference, or clarification based on the previous assistant response
- Refers to or builds upon the previous answer (e.g., "what about...", "can you explain more about...", "based on that...", "how about...", "what if...")
- Asks for clarification, elaboration, or related information about what was just discussed
- Continues the conversation thread about the same legal topic
- Uses pronouns or references that relate to the previous response
A "general" question is:
- Greetings (السلام عليكم, مرحبا, etc.)
- Casual conversation
- Questions not related to legal documents or law
{previous_context}
Current Question: {question}
If the question is "general", provide a helpful answer in Arabic.
If the question is "law-new", respond with only "law-new".
If the question is "law-followup", respond with only "law-followup".
"""
try:
# Initialize client based on model_provider
if model_provider.lower() in ["qwen", "huggingface"]:
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
# Fallback to OpenAI if HF_TOKEN not available
llm_client = self.llm_client
llm_model = self.llm_model
else:
http_client = NoProxyHTTPClient(timeout=60.0)
llm_client = OpenAI(
base_url="https://router.huggingface.co/v1",
api_key=hf_token,
http_client=http_client
)
llm_model = os.getenv("QWEN_MODEL", "Qwen/Qwen3-32B:nscale")
else:
llm_client = self.llm_client
llm_model = self.llm_model
# Build messages with chat history if enabled
history_messages = []
if use_history:
history_messages = self.chat_history.get_recent_history(n_turns=2)
system_prompt = """You are a helpful assistant devloped by "الدكتور الدوسري ". Classify questions into one of three categories and answer general questions in Arabic.
If the question is a greeting or general question, provide a friendly, helpful answer in Arabic and mention that you are developed to Answer questions relateed to the KSA personal laws and regulations.
If the question is law-related and starts a new topic, respond with only "law-new".
If the question is law-related and is a follow-up to the previous response, respond with only "law-followup".
Respond with ONLY one of: "law-new", "law-followup", or provide an answer if it's general."""
messages = [{"role": "system", "content": system_prompt}]
# Add chat history
if history_messages:
for msg in history_messages[:-1] if len(history_messages) > 0 and history_messages[-1].get("content") == question else history_messages:
messages.append(msg)
messages.append({"role": "user", "content": classification_prompt})
response = llm_client.chat.completions.create(
model=llm_model,
messages=messages,
temperature=0.3
)
raw_response = response.choices[0].message.content.strip()
# Filter thinking process from Qwen responses
if model_provider.lower() in ["qwen", "huggingface"]:
raw_response = self._filter_thinking_process(raw_response)
# Check classification result
response_lower = raw_response.lower().strip()
is_law_new = "law-new" in response_lower and len(response_lower) < 20
is_law_followup = "law-followup" in response_lower and len(response_lower) < 20
if is_law_new:
print(f"[Classification] Question classified as: law-new")
return ("law-new", None, None, None) # Continue with RAG flow
elif is_law_followup:
print(f"[Classification] Question classified as: law-followup")
return ("law-followup", None, None, None) # Continue with RAG flow, will reuse chunks if available
else:
# General question - use the response as answer
answer, _ = self._parse_llm_response(raw_response) # Unpack tuple, ignore found flag for general questions
# Update chat history
self.chat_history.add_message("user", question)
self.chat_history.add_message("assistant", answer)
print(f"[Classification] Question classified as: general, answered directly")
return ("general", answer, [], None) # Return answer with empty sources and no chunks
except Exception as e:
# On error, default to law-new to use RAG flow
print(f"[Classification] Error classifying question, defaulting to law-new: {e}")
return ("law-new", None, None, None)
def answer_question(
self,
question: str,
use_history: bool = True,
model_provider: str = "openai",
context_mode: str = "full",
) -> Tuple[str, List[str], Optional[List[str]]]:
"""
Answer a question using RAG with multi-turn chat history
Args:
question: The user's question
use_history: Whether to use chat history
model_provider: Model provider to use - "openai" (default) or "qwen"/"huggingface" for Qwen model
context_mode: Context construction mode - "full" (entire document) or "chunks" (top semantic chunks)
Returns:
Tuple of (answer, list of source filenames, optional list of chunk texts for testing)
"""
start_time = time.perf_counter()
# Log formatted query for testing
print("[QUERY] Formatted Query Log")
print("=" * 80)
print(f"[QUERY] Question: {question}")
print("=" * 80)
if self.vectorstore is None:
raise ValueError("No documents indexed. Please process documents first.")
# Step 0: Classify question into law-new, law-followup, or general
classification_start = time.perf_counter()
label, answer, sources, chunks = self._classify_question(question, use_history, model_provider)
classification_time = (time.perf_counter() - classification_start) * 1000
print(f"[Timing] Question classification: {classification_time:.2f}ms")
# If general question was handled, return the result immediately
if label == "general":
return answer, sources, chunks
# Step 1: Build document candidate list based on label
search_start = time.perf_counter()
previous_document = None
if use_history:
previous_document = self.chat_history.get_last_document()
# Build candidate documents list
candidate_documents = self._build_document_candidates(question, label, previous_document, k=5)
if not candidate_documents:
return "I couldn't find any relevant information to answer your question.", [], None
# Step 2: Prepare chat history and context mode
history_messages = []
if use_history and label == "law-followup":
# Get last 3 messages (get 2 turns = 4 messages, then take last 3)
history_messages = self.chat_history.get_recent_history(n_turns=2)
# Decide how to construct document context for the LLM
context_mode_normalized = (context_mode or "full").lower()
if context_mode_normalized not in ["full", "chunks"]:
context_mode_normalized = "full"
# Build prompts (reusable parts)
mode_note = ""
if context_mode_normalized == "chunks":
mode_note = (
"\n\nNote: The provided document text consists of selected relevant excerpts (مقاطع) "
"from the same document, not the full law. Answer strictly based on these excerpts."
)
system_prompt = f"""You are a helpful legal document assistant. Answer questions based on the provided document text. {mode_note}
MODE 1 - General Questions:
- Understand the context and provide a clear, helpful answer
- You may paraphrase or summarize information from the document
- Explain concepts in your own words while staying true to the document's meaning
MODE 2 - Legal Articles/Terms (المادة):
- When the user asks about specific legal articles (المادة), legal terms, or exact regulations, you MUST quote the EXACT text from the document (con
text) verbatim
- Copy the complete text word-for-word, including all numbers, punctuation, and formatting
- Do NOT paraphrase, summarize, or generate new text for legal articles (المادة)
- NEVER create or generate legal text - only use what exists in the document
IMPORTANT - Response Format:
- Do NOT include source citations in your answer (e.g., do NOT write "المصدر: نظام الاحوال الشخصية.pdf" or similar source references)
- Do NOT mention the document filename or source at the end of your answer
- Simply provide the answer directly without any source attribution
- Whenever you refere to the document (context or filename) in response, refer to it by the filename WITHOUT the extension such as ".pdf" or".doc"
RESPONSE FORMAT (JSON) - CRITICAL:
You MUST respond in JSON format with the following structure:
{{
"answer": "your answer in Arabic here",
"found": true or false
}}
- "answer": Your answer to the question in Arabic.
- "found": Set to TRUE only if you can answer the question based on the provided document context. Set to FALSE if the question cannot be answered from the provided document excerpts OR if your answer says you could not find it.
MUST Answer in Arabic."""
# Check if question contains legal article/term keywords
is_legal_term_question = any(keyword in question for keyword in ["المادة", "مادة", "article", "نص","النص", "نص القانون"])
legal_instruction = ""
if is_legal_term_question:
legal_instruction = "\n\nCRITICAL: The user is asking about a legal article or legal term. Carefully search the provided context to find the relevant article. Reference the article correctly as it has been stated in the context. Articles might be referenced by their content, position, or topic - for example, 'المادة الأولى' might refer to the first article in a section even if not explicitly numbered. Find and quote the relevant text accurately from the document, maintaining the exact wording as it appears. Do NOT create or generate legal text - only use what exists in the document."
else:
legal_instruction = "\n\nAnswer the question by understanding the context from the document. You may paraphrase or explain in your own words while staying true to the document's meaning."
# Step 3: Initialize LLM client
llm_start = time.perf_counter()
try:
# Initialize client based on model_provider
if model_provider.lower() in ["qwen", "huggingface"]:
# Use HuggingFace router with Qwen model
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
raise ValueError("HF_TOKEN environment variable is required for Qwen model testing.")
http_client = NoProxyHTTPClient(timeout=60.0)
llm_client = OpenAI(
base_url="https://router.huggingface.co/v1",
api_key=hf_token,
http_client=http_client
)
llm_model = os.getenv("QWEN_MODEL", "Qwen/Qwen3-32B:nscale")
else:
# Use OpenAI (default) openai
llm_client = self.llm_client
llm_model = self.llm_model
# Step 4: Iterate through candidate documents
answer = None
found = False
final_matched_filename = None
final_selected_chunks: Optional[List[str]] = None
for doc_idx, current_filename in enumerate(candidate_documents):
print(f"[RAG] Trying document {doc_idx + 1}/{len(candidate_documents)}: {current_filename}")
# Get full text for this document
current_full_text = self._get_text_by_filename_cached(current_filename)
if not current_full_text:
print(f"[RAG] Could not retrieve text for {current_filename}, skipping...")
continue
# Build context for this document
prompt_start = time.perf_counter()
document_context_label = "Document Text"
selected_chunks: Optional[List[str]] = None
# Check if we can reuse previous chunks (only for follow-up on first document)
previous_chunks = None
if label == "law-followup" and doc_idx == 0 and use_history and current_filename == previous_document:
previous_chunks = self.chat_history.get_last_chunks()
if previous_chunks:
print(f"[RAG] Reusing previous chunks for law-followup question ({len(previous_chunks)} chunks)")
selected_chunks = previous_chunks
document_context_label = "Selected Document Excerpts"
chunk_texts: List[str] = []
for idx, chunk_text in enumerate(previous_chunks, start=1):
chunk_texts.append(f"[مقطع {idx}]\n{chunk_text}")
document_context = "\n\n".join(chunk_texts)[:25000]
# If not reusing chunks, build context normally
if previous_chunks is None:
if context_mode_normalized == "full":
print(f"[RAG] Using full document mode for {current_filename}")
document_context = current_full_text[:16000] # Limit to avoid token limits
else:
print(f"[RAG] Using chunk mode for {current_filename}")
# Build or load chunk vectorstore and retrieve top-k chunks
chunk_vs, chunk_docs = self._get_or_build_chunk_vectorstore(current_filename, current_full_text)
top_k = 4
try:
if self.use_hybrid_search:
# Use ensemble retriever for hybrid search (BM25 + Semantic)
ensemble_retriever = self._create_ensemble_retriever(chunk_vs, chunk_docs, top_k)
top_chunks = ensemble_retriever.get_relevant_documents(question)
print(f"[RAG] Used hybrid search (BM25 + Semantic) for chunk retrieval")
else:
# Use semantic search only
top_chunks = chunk_vs.similarity_search(question, k=top_k)
except Exception as e:
print(f"[RAG] Chunk retrieval failed for {current_filename}, falling back to full text: {e}")
document_context = current_full_text[:25000]
context_mode_normalized = "full"
else:
if not top_chunks:
print(f"[RAG] No chunks returned for {current_filename}, falling back to full text")
document_context = current_full_text[:8000]
context_mode_normalized = "full"
else:
document_context_label = "Selected Document Excerpts"
chunk_texts: List[str] = []
selected_chunks = [] # Store raw chunk texts for return
for idx, doc in enumerate(top_chunks, start=1):
chunk_text = doc.page_content
selected_chunks.append(chunk_text) # Store raw chunk text
chunk_texts.append(f"[مقطع {idx}]\n{chunk_text}")
document_context = "\n\n".join(chunk_texts)[:20000]
# Build user prompt
user_prompt = f"""{document_context_label}:
{document_context}
User Question: {question}
{legal_instruction}
Please answer the question based on the document text above.
Preserve any numbering, bullet points, and headings from the document when they are relevant, and use clear Arabic headings and numbered lists in your answer.
MUST Answer the Question in Arabic."""
# Build messages with chat history
messages = [{"role": "system", "content": system_prompt}]
# Add chat history (excluding the last user message if it's the current question)
if history_messages:
skip_last = len(history_messages) > 0 and history_messages[-1].get("content") == question
for msg in history_messages[:-1] if skip_last else history_messages:
messages.append(msg)
messages.append({"role": "user", "content": user_prompt})
prompt_time = (time.perf_counter() - prompt_start) * 1000
print(f"[Timing] Prompt construction: {prompt_time:.2f}ms")
# Call LLM
response = llm_client.chat.completions.create(
model=llm_model,
messages=messages,
temperature=0.3
)
raw_response = response.choices[0].message.content
llm_time = (time.perf_counter() - llm_start) * 1000
print(f"[Timing] LLM API call: {llm_time:.2f}ms")
# Filter thinking process from Qwen responses
if model_provider.lower() in ["qwen", "huggingface"]:
raw_response = self._filter_thinking_process(raw_response)
# Parse LLM response to extract answer and found flag
parse_start = time.perf_counter()
answer, found = self._parse_llm_response(raw_response)
parse_time = (time.perf_counter() - parse_start) * 1000
if found:
final_matched_filename = current_filename
final_selected_chunks = selected_chunks
print(f"[RAG] Answer found in document: {final_matched_filename}")
break # Found! Return immediately
else:
print(f"[RAG] Answer not found in document: {current_filename}")
# For law-new: continue to next document
# For law-followup: continue searching until found
if label == "law-new":
print(f"[RAG] law-new: trying next document...")
elif label == "law-followup":
print(f"[RAG] law-followup: continuing search...")
# Step 5: Handle final result
if not found:
# For law-new: return after trying all documents
# For law-followup: should have kept searching, but if still not found, return message
answer = "لم أجد الإجابة في أي من الوثائق المتاحة." if answer is None else answer
final_matched_filename = candidate_documents[0] if candidate_documents else None
print(f"[RAG] Answer not found in any of the {len(candidate_documents)} documents tried")
# Step 6: Update chat history with document source and chunks
self.chat_history.add_message("user", question)
self.chat_history.add_message("assistant", answer, source_document=final_matched_filename, chunks=final_selected_chunks)
total_time = (time.perf_counter() - start_time) * 1000
print(f"[Timing] Total inference time: {total_time:.2f}ms")
return answer, [final_matched_filename], final_selected_chunks
except Exception as e:
total_time = (time.perf_counter() - start_time) * 1000
print(f"[Timing] Total inference time (error): {total_time:.2f}ms")
error_msg = f"Error generating answer: {str(e)}"
# Get error filename safely (candidate_documents might not be defined if error occurs early)
try:
error_filename = candidate_documents[0] if candidate_documents else None
except NameError:
error_filename = None
self.chat_history.add_message("user", question)
self.chat_history.add_message("assistant", error_msg, source_document=error_filename, chunks=None)
return error_msg, [error_filename] if error_filename else [], None
def clear_chat_history(self):
"""Clear chat history"""
self.chat_history.clear()