import os import base64 import json import re from io import BytesIO from typing import Any, Dict, List import httpx try: import fitz # PyMuPDF from PIL import Image PDF_SUPPORT = True except ImportError as e: PDF_SUPPORT = False print(f"[WARNING] PDF support libraries not available: {e}. PDF conversion will not work.") # Get your OpenRouter API key from env (you'll set this in Hugging Face later) OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1/chat/completions" MODEL_NAME = "qwen/qwen3-vl-235b-a22b-instruct" # HuggingFace Inference API HF_TOKEN = os.environ.get("HF_TOKEN") HF_INFERENCE_API_URL = "https://api-inference.huggingface.co/models" HF_MODEL_NAME = os.environ.get("HF_MODEL_NAME", "Qwen/Qwen3-VL-235B-A22B-Instruct") # Default HF model # OpenAI API OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") OPENAI_BASE_URL = "https://api.openai.com/v1/chat/completions" OPENAI_MODEL_NAME = os.environ.get("OPENAI_MODEL_NAME", "gpt-4o") # Default OpenAI vision model # Backend selection: "openrouter", "huggingface", or "openai" EXTRACTION_BACKEND = os.environ.get("EXTRACTION_BACKEND", "openrouter").lower() def _pdf_to_images(pdf_bytes: bytes) -> List[bytes]: """ Convert PDF pages to PNG images. Returns a list of PNG image bytes, one per page. """ if not PDF_SUPPORT: raise RuntimeError("PyMuPDF not installed. Cannot convert PDF to images.") pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf") images = [] print(f"[INFO] PDF has {len(pdf_doc)} page(s)") for page_num in range(len(pdf_doc)): page = pdf_doc[page_num] # Render page to image (zoom factor 2 for better quality) mat = fitz.Matrix(2.0, 2.0) # 2x zoom for better quality pix = page.get_pixmap(matrix=mat) # Convert to PIL Image then to JPEG bytes (better compression, matches working code) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) img_bytes = BytesIO() img.save(img_bytes, format="JPEG", quality=95) images.append(img_bytes.getvalue()) print(f"[INFO] Converted page {page_num + 1} to image ({pix.width}x{pix.height})") pdf_doc.close() return images def _image_bytes_to_base64(image_bytes: bytes) -> str: """Convert image bytes to base64 data URL (JPEG format).""" b64 = base64.b64encode(image_bytes).decode("utf-8") data_url = f"data:image/jpeg;base64,{b64}" print(f"[DEBUG] Base64 encoded image: {len(image_bytes)} bytes -> {len(data_url)} chars") return data_url def _file_to_image_blocks(file_bytes: bytes, content_type: str) -> List[Dict[str, Any]]: """ Convert file to image blocks for the vision model. - For images: Returns single image block - For PDFs: Converts each page to an image and returns multiple blocks """ # Handle PDF files if content_type == "application/pdf" or content_type.endswith("/pdf"): if not PDF_SUPPORT: raise RuntimeError("PDF support requires PyMuPDF. Please install it.") print(f"[INFO] Converting PDF to images...") pdf_images = _pdf_to_images(file_bytes) # Create image blocks for each page # OpenRouter format: {"type": "image_url", "image_url": {"url": "data:..."}} image_blocks = [] for i, img_bytes in enumerate(pdf_images): data_url = _image_bytes_to_base64(img_bytes) image_blocks.append({ "type": "image_url", "image_url": {"url": data_url} }) print(f"[INFO] Created image block for page {i + 1} ({len(img_bytes)} bytes)") return image_blocks # Handle regular image files else: # Convert to JPEG for consistency (better compression) try: img = Image.open(BytesIO(file_bytes)) if img.mode != "RGB": img = img.convert("RGB") # Resize if too large (max 1920px on longest side) - matches your working code max_size = 1920 w, h = img.size if w > max_size or h > max_size: if w > h: new_w = max_size new_h = int(h * (max_size / w)) else: new_h = max_size new_w = int(w * (max_size / h)) img = img.resize((new_w, new_h), Image.LANCZOS) print(f"[INFO] Resized image from {w}x{h} to {new_w}x{new_h}") # Convert to JPEG bytes img_bytes = BytesIO() img.save(img_bytes, format="JPEG", quality=95) img_bytes = img_bytes.getvalue() data_url = _image_bytes_to_base64(img_bytes) except Exception as e: # Fallback: use original file bytes print(f"[WARNING] Could not process image with PIL: {e}. Using original bytes.") b64 = base64.b64encode(file_bytes).decode("utf-8") data_url = f"data:{content_type};base64,{b64}" print(f"[DEBUG] Encoding image file. Content type: {content_type}, Size: {len(file_bytes)} bytes") return [{ "type": "image_url", "image_url": {"url": data_url} }] async def _extract_single_page(image_bytes: bytes, page_num: int, total_pages: int, backend: str = None) -> Dict[str, Any]: """ Extract text from a single page/image. Processes one page at a time to avoid large payloads. """ backend = backend or EXTRACTION_BACKEND if backend == "huggingface": return await _extract_with_hf(image_bytes, page_num, total_pages) elif backend == "openai": return await _extract_with_openai_single(image_bytes, page_num, total_pages) else: return await _extract_with_openrouter_single(image_bytes, page_num, total_pages) async def extract_fields_from_document( file_bytes: bytes, content_type: str, filename: str, ) -> Dict[str, Any]: """ Extract fields from document. Processes pages separately for better reliability. Supports OpenRouter, HuggingFace Inference API, and OpenAI Vision API. """ # Convert file to image blocks (handles PDF conversion) image_blocks_data = _file_to_image_blocks(file_bytes, content_type) if not image_blocks_data: raise ValueError("No images generated from file") # Get raw image bytes for processing if content_type == "application/pdf" or content_type.endswith("/pdf"): # For PDFs, we need to get the raw image bytes pdf_images = _pdf_to_images(file_bytes) image_bytes_list = pdf_images else: # For regular images, use the file bytes directly image_bytes_list = [file_bytes] total_pages = len(image_bytes_list) print(f"[INFO] Processing {total_pages} page(s) separately for better reliability...") # Process each page separately page_results = [] for page_num, img_bytes in enumerate(image_bytes_list): print(f"[INFO] Processing page {page_num + 1}/{total_pages}...") try: page_result = await _extract_single_page(img_bytes, page_num + 1, total_pages) page_results.append({ "page_number": page_num + 1, "text": page_result.get("full_text", ""), "fields": page_result.get("fields", {}), "confidence": page_result.get("confidence", 0), "doc_type": page_result.get("doc_type", "other"), }) print(f"[INFO] Page {page_num + 1} processed successfully") except Exception as e: print(f"[ERROR] Failed to process page {page_num + 1}: {e}") page_results.append({ "page_number": page_num + 1, "text": "", "fields": {}, "confidence": 0, "error": str(e) }) # Combine results from all pages combined_full_text = "\n\n".join([f"=== PAGE {p['page_number']} ===\n\n{p['text']}" for p in page_results if p.get("text")]) # Merge fields from all pages (prefer non-empty values) combined_fields = {} for page_result in page_results: page_fields = page_result.get("fields", {}) for key, value in page_fields.items(): if value and (key not in combined_fields or not combined_fields[key]): combined_fields[key] = value # Calculate average confidence confidences = [p.get("confidence", 0) for p in page_results if p.get("confidence", 0) > 0] avg_confidence = sum(confidences) / len(confidences) if confidences else 0 # Determine doc_type from first successful page doc_type = "other" for page_result in page_results: if page_result.get("doc_type") and page_result["doc_type"] != "other": doc_type = page_result["doc_type"] break return { "doc_type": doc_type, "confidence": avg_confidence, "full_text": combined_full_text, "fields": combined_fields, "pages": page_results } async def _extract_with_openrouter_single(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]: """Extract from a single page using OpenRouter.""" if not OPENROUTER_API_KEY: raise RuntimeError("OPENROUTER_API_KEY environment variable is not set") # Create single image block data_url = _image_bytes_to_base64(image_bytes) image_block = { "type": "image_url", "image_url": {"url": data_url} } system_prompt = ( "You are a document extraction engine with vision capabilities. " "You read and extract text from documents in any language, preserving structure, formatting, and all content. " "You output structured JSON with both the full extracted text and key-value pairs." ) user_prompt = ( f"Read this document page ({page_num} of {total_pages}) using your vision capability and extract ALL text content. " "I want the complete end-to-end text, preserving structure, headings, formatting, and content in all languages.\n\n" "Extract every word, number, and piece of information, including any non-English text (Punjabi, Hindi, etc.).\n\n" "Respond with JSON in this format:\n" "{\n" ' \"doc_type\": \"invoice | receipt | contract | report | notice | other\",\n' ' \"confidence\": number between 0 and 100,\n' ' \"full_text\": \"Complete extracted text from this page, preserving structure and formatting. Include all languages.\",\n' ' \"fields\": {\n' ' \"invoice_number\": \"...\",\n' ' \"date\": \"...\",\n' ' \"company_name\": \"...\",\n' ' \"address\": \"...\",\n' ' \"other_field\": \"...\"\n' " }\n" "}\n\n" "IMPORTANT:\n" "- Extract ALL text from this page, including non-English languages\n" "- Preserve structure, headings, and formatting\n" "- Fill in fields with relevant extracted information\n" "- If a field is not found, use empty string or omit it" ) payload: Dict[str, Any] = { "model": MODEL_NAME, "messages": [ { "role": "system", "content": [{"type": "text", "text": system_prompt}], }, { "role": "user", "content": [ {"type": "text", "text": user_prompt}, image_block ], }, ], "max_tokens": 4096, # Smaller for single page } headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": os.environ.get("APP_URL", "https://huggingface.co/spaces/your-space"), "X-Title": "Document Capture Demo", } payload_size_mb = len(json.dumps(payload).encode('utf-8')) / 1024 / 1024 print(f"[INFO] OpenRouter: Processing page {page_num}, payload: {payload_size_mb:.2f} MB") try: timeout = httpx.Timeout(180.0, connect=30.0) # 3 min per page async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post(OPENROUTER_BASE_URL, headers=headers, json=payload) resp.raise_for_status() data = resp.json() except httpx.TimeoutException: raise RuntimeError(f"OpenRouter API timed out for page {page_num}") except Exception as e: raise RuntimeError(f"OpenRouter API error for page {page_num}: {str(e)}") if "choices" not in data or len(data["choices"]) == 0: raise ValueError(f"No choices in OpenRouter response for page {page_num}") content = data["choices"][0]["message"]["content"] if isinstance(content, list): text = "".join(part.get("text", "") for part in content if part.get("type") == "text") else: text = content # Parse JSON response return _parse_model_response(text, page_num) async def _extract_with_openai_single(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]: """Extract from a single page using OpenAI GPT-4o Vision API.""" if not OPENAI_API_KEY: raise RuntimeError("OPENAI_API_KEY environment variable is not set") # Create single image block data_url = _image_bytes_to_base64(image_bytes) image_block = { "type": "image_url", "image_url": {"url": data_url} } system_prompt = ( "You are a document extraction engine with vision capabilities. " "You read and extract text from documents in any language, preserving structure, formatting, and all content. " "You output structured JSON with both the full extracted text and key-value pairs." ) user_prompt = ( f"Read this document page ({page_num} of {total_pages}) using your vision capability and extract ALL text content. " "I want the complete end-to-end text, preserving structure, headings, formatting, and content in all languages.\n\n" "Extract every word, number, and piece of information, including any non-English text (Punjabi, Hindi, etc.).\n\n" "Respond with JSON in this format:\n" "{\n" ' "doc_type": "invoice | receipt | contract | report | notice | other",\n' ' "confidence": number between 0 and 100,\n' ' "full_text": "Complete extracted text from this page, preserving structure and formatting. Include all languages.",\n' ' "fields": {\n' ' "invoice_number": "...",\n' ' "date": "...",\n' ' "company_name": "...",\n' ' "address": "...",\n' ' "other_field": "..."\n' " }\n" "}\n\n" "IMPORTANT:\n" "- Extract ALL text from this page, including non-English languages\n" "- Preserve structure, headings, and formatting\n" "- Fill in fields with relevant extracted information\n" "- If a field is not found, use empty string or omit it" ) payload: Dict[str, Any] = { "model": OPENAI_MODEL_NAME, "messages": [ { "role": "system", "content": system_prompt, }, { "role": "user", "content": [ {"type": "text", "text": user_prompt}, image_block ], }, ], "max_tokens": 4096, # Similar to OpenRouter "temperature": 0.1, # Lower temperature for more consistent extraction } headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", } payload_size_mb = len(json.dumps(payload).encode('utf-8')) / 1024 / 1024 print(f"[INFO] OpenAI: Processing page {page_num} with model {OPENAI_MODEL_NAME}, payload: {payload_size_mb:.2f} MB") try: timeout = httpx.Timeout(180.0, connect=30.0) # 3 min per page async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post(OPENAI_BASE_URL, headers=headers, json=payload) resp.raise_for_status() data = resp.json() except httpx.TimeoutException: raise RuntimeError(f"OpenAI API timed out for page {page_num}") except Exception as e: error_msg = str(e) print(f"[ERROR] OpenAI API error details: {type(e).__name__}: {error_msg}") raise RuntimeError(f"OpenAI API error for page {page_num}: {error_msg}") if "choices" not in data or len(data["choices"]) == 0: raise ValueError(f"No choices in OpenAI response for page {page_num}") response_text = data["choices"][0]["message"]["content"] print(f"[DEBUG] OpenAI response preview: {response_text[:500]}") return _parse_model_response(response_text, page_num) async def _extract_with_hf(image_bytes: bytes, page_num: int, total_pages: int) -> Dict[str, Any]: """Extract from a single page using HuggingFace Inference API (router endpoint).""" if not HF_TOKEN: raise RuntimeError("HF_TOKEN environment variable is not set") try: from huggingface_hub import InferenceClient except ImportError: raise RuntimeError("huggingface_hub not installed. Add it to requirements.txt") # Use InferenceClient with router endpoint (required for newer models) client = InferenceClient( api_key=HF_TOKEN, timeout=180.0 ) prompt = ( f"Read this document page ({page_num} of {total_pages}) and extract ALL text content. " "Extract every word, number, and piece of information, including any non-English text. " "Return JSON with 'full_text', 'doc_type', 'confidence', and 'fields'." ) print(f"[INFO] HuggingFace: Processing page {page_num} with model {HF_MODEL_NAME}") try: # Convert image bytes to base64 data URL image_base64 = base64.b64encode(image_bytes).decode('utf-8') image_data_url = f"data:image/jpeg;base64,{image_base64}" # Use chat.completions.create() as shown in HuggingFace documentation # This uses the router endpoint which is now required # Run in executor since it's a blocking synchronous call import asyncio loop = asyncio.get_event_loop() completion = await loop.run_in_executor( None, lambda: client.chat.completions.create( model=HF_MODEL_NAME, messages=[ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "image_url", "image_url": { "url": image_data_url } } ] } ], max_tokens=2048, temperature=0.1 ) ) # Extract response text from completion if hasattr(completion, 'choices') and len(completion.choices) > 0: message = completion.choices[0].message if hasattr(message, 'content'): response_text = message.content else: response_text = str(message) else: response_text = str(completion) if not response_text: raise ValueError("Empty response from HuggingFace API") print(f"[DEBUG] HuggingFace response preview: {response_text[:500]}") return _parse_model_response(response_text, page_num) except Exception as e: error_msg = str(e) print(f"[ERROR] HuggingFace API error details: {type(e).__name__}: {error_msg}") # Check if it's a permissions error if "403" in error_msg or "permissions" in error_msg.lower() or "Forbidden" in error_msg: raise RuntimeError( f"HuggingFace API error for page {page_num}: Insufficient permissions. " "Your HF_TOKEN may need to be a token with 'read' access to Inference API. " "Check your HuggingFace account settings and token permissions." ) raise RuntimeError(f"HuggingFace API error for page {page_num}: {error_msg}") def _parse_model_response(text: str, page_num: int = None) -> Dict[str, Any]: """Parse JSON response from model, handling truncation and errors.""" if not text or not text.strip(): raise ValueError("Empty response from model") # Try to parse JSON try: parsed = json.loads(text) print(f"[DEBUG] Successfully parsed JSON for page {page_num or 'single'}") return parsed except json.JSONDecodeError as e: print(f"[DEBUG] Direct JSON parse failed: {e}") # Try to extract JSON from markdown code blocks json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass # Try to find JSON object json_match = re.search(r'\{.*\}', text, re.DOTALL) if json_match: try: fixed_json = _fix_truncated_json(json_match.group(0)) return json.loads(fixed_json) except Exception: pass # Extract full_text even from truncated JSON full_text_match = re.search(r'"full_text"\s*:\s*"(.*?)(?:"\s*[,}]|$)', text, re.DOTALL) if full_text_match: full_text = (full_text_match.group(1) .replace('\\n', '\n') .replace('\\"', '"') .replace('\\\\', '\\')) return { "doc_type": "other", "confidence": 90.0, "full_text": full_text, "fields": {"full_text": full_text} } # Last resort: return raw text return { "doc_type": "other", "confidence": 50.0, "full_text": text[:2000], "fields": {"raw_text": text[:2000]} } def _fix_truncated_json(json_str: str) -> str: """Attempt to fix truncated JSON by closing unclosed strings and objects.""" # Count open braces open_braces = json_str.count('{') - json_str.count('}') open_brackets = json_str.count('[') - json_str.count(']') # Check if we're in the middle of a string in_string = False escape_next = False for i, char in enumerate(json_str): if escape_next: escape_next = False continue if char == '\\': escape_next = True continue if char == '"': in_string = not in_string # If we're in a string, close it if in_string: json_str = json_str.rstrip() + '"' # Close any open brackets json_str += ']' * open_brackets # Close any open braces json_str += '}' * open_braces return json_str def _extract_partial_json(text: str) -> Dict[str, Any]: """Extract what we can from a partial JSON response.""" result = { "doc_type": "other", "confidence": 0.0, "fields": {} } # Try to extract doc_type doc_type_match = re.search(r'"doc_type"\s*:\s*"([^"]+)"', text) if doc_type_match: result["doc_type"] = doc_type_match.group(1) # Try to extract confidence confidence_match = re.search(r'"confidence"\s*:\s*(\d+(?:\.\d+)?)', text) if confidence_match: result["confidence"] = float(confidence_match.group(1)) # Try to extract full_text (even if truncated) full_text_match = re.search(r'"full_text"\s*:\s*"([^"]*(?:\\.[^"]*)*)', text, re.DOTALL) if full_text_match: try: full_text = full_text_match.group(1) # Unescape common sequences full_text = full_text.replace('\\n', '\n').replace('\\"', '"').replace('\\\\', '\\') result["full_text"] = full_text result["fields"]["full_text"] = full_text except Exception: pass return result