import os import time import random from typing import List, Optional from pathlib import Path from dotenv import load_dotenv import httpx from openai import OpenAI try: from huggingface_hub import InferenceClient except ImportError: InferenceClient = None def _chunk_list(items: List[str], chunk_size: int) -> List[List[str]]: return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] class NoProxyHTTPClient(httpx.Client): def __init__(self, *args, **kwargs): # Ensure any 'proxies' key is ignored to prevent incompat issues kwargs.pop("proxies", None) super().__init__(*args, **kwargs) class OpenAIEmbeddingsWrapper: """ Minimal embeddings wrapper compatible with LangChain's embeddings interface. Uses OpenAI Embeddings API via official SDK (client.embeddings.create) with batching and retries. Forces a custom httpx.Client to avoid unexpected 'proxies' kw errors. """ def __init__(self, model: str = "text-embedding-ada-002", api_key: str | None = None, timeout: float = 30.0): # Load .env from project root (one level up from backend/) project_root = Path(__file__).resolve().parents[1] load_dotenv(project_root / ".env") self.model = model self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError("OPENAI_API_KEY is required for embeddings") # Timeout/backoff config self.timeout = timeout self.batch_size = int(os.getenv("OPENAI_EMBED_BATCH_SIZE", "64")) self.max_retries = int(os.getenv("OPENAI_EMBED_MAX_RETRIES", "6")) self.initial_backoff = float(os.getenv("OPENAI_EMBED_INITIAL_BACKOFF", "1.0")) self.backoff_multiplier = float(os.getenv("OPENAI_EMBED_BACKOFF_MULTIPLIER", "2.0")) # Initialize OpenAI SDK client with custom http client; rely on env for API key/base URL os.environ.setdefault("OPENAI_API_KEY", self.api_key) http_client = NoProxyHTTPClient(timeout=self.timeout) self.client = OpenAI(http_client=http_client) def _embed_once(self, inputs: List[str]) -> List[List[float]]: resp = self.client.embeddings.create( model=self.model, input=inputs, encoding_format="float", ) return [item.embedding for item in resp.data] def _embed_with_retries(self, inputs: List[str]) -> List[List[float]]: attempt = 0 backoff = self.initial_backoff while True: try: return self._embed_once(inputs) except Exception as err: status = None try: status = getattr(getattr(err, "response", None), "status_code", None) except Exception: status = None if (status in (429, 500, 502, 503, 504) or status is None) and attempt < self.max_retries: retry_after = 0.0 try: retry_after = float(getattr(getattr(err, "response", None), "headers", {}).get("Retry-After", 0)) except Exception: retry_after = 0.0 jitter = random.uniform(0, 0.5) sleep_s = max(retry_after, backoff) + jitter time.sleep(sleep_s) attempt += 1 backoff *= self.backoff_multiplier continue raise def _embed(self, inputs: List[str]) -> List[List[float]]: all_embeddings: List[List[float]] = [] for batch in _chunk_list(inputs, self.batch_size): embeds = self._embed_with_retries(batch) all_embeddings.extend(embeds) time.sleep(float(os.getenv("OPENAI_EMBED_INTER_BATCH_DELAY", "0.2"))) return all_embeddings def embed_query(self, text: str) -> List[float]: return self._embed([text])[0] def embed_documents(self, texts: List[str]) -> List[List[float]]: return self._embed(texts) def __call__(self, text: str) -> List[float]: """ Make the embeddings wrapper callable for compatibility with FAISS. When FAISS calls the embeddings object directly, this delegates to embed_query. """ return self.embed_query(text) class HuggingFaceEmbeddingsWrapper: """ Embeddings wrapper compatible with LangChain's embeddings interface. Uses HuggingFace InferenceClient with Nebius provider for embeddings. Implements same interface as OpenAIEmbeddingsWrapper for drop-in replacement. """ def __init__(self, model: str = "Qwen/Qwen3-Embedding-8B", api_key: str | None = None, timeout: float = 60.0): if InferenceClient is None: raise ImportError("huggingface_hub is required for HuggingFace embeddings. Install it with: pip install huggingface_hub") # Load .env from project root (one level up from backend/) project_root = Path(__file__).resolve().parents[1] load_dotenv(project_root / ".env") self.model = model or os.getenv("HF_EMBEDDING_MODEL", "Qwen/Qwen3-Embedding-8B") self.api_key = api_key or os.getenv("HF_TOKEN") if not self.api_key: raise ValueError("HF_TOKEN is required for HuggingFace embeddings. Set HF_TOKEN environment variable.") # Timeout/backoff config self.timeout = timeout self.batch_size = int(os.getenv("HF_EMBED_BATCH_SIZE", "32")) # Smaller batch size for HF self.max_retries = int(os.getenv("HF_EMBED_MAX_RETRIES", "6")) self.initial_backoff = float(os.getenv("HF_EMBED_INITIAL_BACKOFF", "1.0")) self.backoff_multiplier = float(os.getenv("HF_EMBED_BACKOFF_MULTIPLIER", "2.0")) # Initialize HuggingFace InferenceClient with Nebius provider self.client = InferenceClient( provider="nebius", api_key=self.api_key ) print(f"[HF Embeddings] Initialized with model: {self.model}, provider: nebius") def _embed_once(self, inputs: List[str]) -> List[List[float]]: """Call HuggingFace feature_extraction API for a batch of texts""" import numpy as np # HuggingFace feature_extraction can handle single or batch inputs if len(inputs) == 1: # Single text result = self.client.feature_extraction(inputs[0], model=self.model) # Result is numpy.ndarray - convert to list if isinstance(result, np.ndarray): if result.ndim == 2: # 2D array - extract first row result = result[0].tolist() else: # 1D array - convert directly result = result.tolist() # Result is a list of floats (embedding vector) return [result] else: # Batch processing - HF may support batch, but we'll process one by one for reliability embeddings = [] for text in inputs: result = self.client.feature_extraction(text, model=self.model) # Convert numpy array to list if needed if isinstance(result, np.ndarray): if result.ndim == 2: result = result[0].tolist() # Extract first row if 2D else: result = result.tolist() embeddings.append(result) return embeddings def _embed_with_retries(self, inputs: List[str]) -> List[List[float]]: """Embed with retry logic similar to OpenAI wrapper""" attempt = 0 backoff = self.initial_backoff while True: try: return self._embed_once(inputs) except Exception as err: status = None try: # Try to extract status code from error if available status = getattr(getattr(err, "response", None), "status_code", None) except Exception: status = None if (status in (429, 500, 502, 503, 504) or status is None) and attempt < self.max_retries: retry_after = 0.0 try: retry_after = float(getattr(getattr(err, "response", None), "headers", {}).get("Retry-After", 0)) except Exception: retry_after = 0.0 jitter = random.uniform(0, 0.5) sleep_s = max(retry_after, backoff) + jitter time.sleep(sleep_s) attempt += 1 backoff *= self.backoff_multiplier continue raise def _embed(self, inputs: List[str]) -> List[List[float]]: """Process embeddings in batches with delays between batches""" all_embeddings: List[List[float]] = [] for batch in _chunk_list(inputs, self.batch_size): embeds = self._embed_with_retries(batch) all_embeddings.extend(embeds) # Small delay between batches to avoid rate limiting time.sleep(float(os.getenv("HF_EMBED_INTER_BATCH_DELAY", "0.2"))) return all_embeddings def embed_query(self, text: str) -> List[float]: """Embed a single query text""" return self._embed([text])[0] def embed_documents(self, texts: List[str]) -> List[List[float]]: """Embed multiple documents""" return self._embed(texts) def __call__(self, text: str) -> List[float]: """ Make the embeddings wrapper callable for compatibility with FAISS. When FAISS calls the embeddings object directly, this delegates to embed_query. """ return self.embed_query(text) def get_embeddings_wrapper( model: Optional[str] = None, api_key: Optional[str] = None, timeout: float = 30.0 ): """ Factory function to get the appropriate embeddings wrapper based on configuration. Args: model: Model name (provider-specific) api_key: API key (provider-specific) timeout: Timeout in seconds Returns: Either OpenAIEmbeddingsWrapper or HuggingFaceEmbeddingsWrapper instance Environment Variables: EMBEDDINGS_PROVIDER: "openai" (default), "huggingface", "hf", or "nebius" HF_TOKEN: Required if using HuggingFace provider HF_EMBEDDING_MODEL: Optional model override for HuggingFace (default: "Qwen/Qwen3-Embedding-8B") """ # Load .env from project root project_root = Path(__file__).resolve().parents[1] load_dotenv(project_root / ".env") provider = os.getenv("EMBEDDINGS_PROVIDER", "hf").lower() #openai if provider in ["huggingface", "hf", "nebius"]: print(f"[Embeddings Factory] Using HuggingFace/Nebius provider") hf_model = model or os.getenv("HF_EMBEDDING_MODEL", "Qwen/Qwen3-Embedding-8B") return HuggingFaceEmbeddingsWrapper(model=hf_model, api_key=api_key, timeout=timeout) else: print(f"[Embeddings Factory] Using OpenAI provider (default)") openai_model = model or os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002") return OpenAIEmbeddingsWrapper(model=openai_model, api_key=api_key, timeout=timeout) #شرح نظام الأحوال الشخصية