Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import unicodedata | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| from openai import OpenAI | |
| import httpx | |
| class NoProxyHTTPClient(httpx.Client): | |
| def __init__(self, *args, **kwargs): | |
| kwargs.pop("proxies", None) | |
| super().__init__(*args, **kwargs) | |
| class DocumentProcessor: | |
| """Processes PDF documents using LLM to extract clean text and generate summaries""" | |
| def __init__(self, api_key: Optional[str] = None, model: str = "gpt-5"): | |
| api_key = api_key or os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OpenAI API key is required") | |
| os.environ.setdefault("OPENAI_API_KEY", api_key) | |
| http_client = NoProxyHTTPClient(timeout=900.0) | |
| self.client = OpenAI(http_client=http_client) | |
| self.model = model | |
| def _normalize_filename(filename: str) -> str: | |
| """ | |
| Normalize filename for comparison (handle Unicode encoding variations). | |
| Args: | |
| filename: Original filename | |
| Returns: Normalized filename (NFC form, lowercased, stripped) | |
| """ | |
| if not filename: | |
| return "" | |
| # Normalize to NFC (composed form) to handle encoding variations | |
| normalized = unicodedata.normalize("NFC", filename) | |
| # Lowercase and strip for case-insensitive comparison | |
| return normalized.lower().strip() | |
| def process_pdf_with_llm(self, pdf_path: str) -> Dict[str, str]: | |
| """ | |
| Process PDF by uploading it to OpenAI and requesting cleaned text plus a summary. | |
| Args: | |
| pdf_path: Path to PDF file | |
| Returns: {"filename": str, "text": str, "summary": str} | |
| """ | |
| filename = Path(pdf_path).name | |
| print(f"Processing {filename} with LLM via file upload...") | |
| uploaded_file = None | |
| try: | |
| # Upload file | |
| with open(pdf_path, "rb") as pdf_file: | |
| uploaded_file = self.client.files.create( | |
| file=pdf_file, | |
| purpose="user_data" | |
| ) | |
| prompt =(""" | |
| You are processing a legal PDF document (in Arabic) that has been uploaded as a file. | |
| Your task has TWO parts: | |
| 1) TEXT EXTRACTION & CLEANING | |
| 2) GLOBAL SUMMARY IN ARABIC | |
| ======================== | |
| 1) TEXT EXTRACTION & CLEANING | |
| ======================== | |
| Extract ONLY the **main body text** of the entire document, in order, exactly as it appears logically in the statute, while cleaning away non-content noise. | |
| INCLUDE: | |
| - All legal text and provisions | |
| - Article numbers and titles | |
| - Section / chapter / part / الباب / الفصل headings | |
| - Numbered clauses, subclauses, bullet points | |
| - Any explanatory legal text that is part of the law itself | |
| EXCLUDE (REMOVE COMPLETELY): | |
| - Headers on each page (e.g., publication dates, التصنيف, نوع التشريع, حالة التشريع, etc.) | |
| - Footers on each page | |
| - Page numbers | |
| - Any repeated boilerplate that appears identically on each page | |
| - Scanning artifacts, junk characters, or layout noise | |
| - Empty or whitespace-only lines that are not meaningful | |
| IMPORTANT CLEANING RULES: | |
| - Preserve the original language (Arabic). Do NOT translate the law. | |
| - Preserve the logical order of the articles and sections as in the original law. | |
| - Do NOT paraphrase, shorten, summarize, or reword the legal text. Copy the body text as-is (except for removing headers/footers/page numbers and cleaning artifacts). | |
| - If the same header/footer text appears on many pages, remove all occurrences. | |
| - If you are unsure whether a short line is a page number or header/footer (e.g. just a digit or date in the margin), treat it as NON-content and remove it. | |
| - Keep reasonable line breaks and blank lines between titles, articles, and sections so the text is readable and structured, but do not insert additional commentary. | |
| - Do NOT invent or hallucinate any missing articles or text. Only use what is actually present in the PDF content. | |
| The final "text" field should contain the **full cleaned main body** of the law as ONE string, with newline characters where appropriate. | |
| ======================== | |
| 2) GLOBAL SUMMARY (IN ARABIC) | |
| ======================== | |
| After extracting the cleaned body text, generate a **concise summary in Arabic** that: | |
| - Covers جميع الأبواب والفصول والمواد بشكل موجز | |
| - يوضح موضوع النظام، نطاق تطبيقه، وأهم الأحكام (مثل: الزواج، الحقوق والواجبات، النفقة، النسب، الفرقة، العدة، الحضانة، الوصاية، الولاية، الوصية، المفقود، إلخ) | |
| - يكون بصياغة عربية فصحى واضحة ومباشرة | |
| - يكون في بضع فقرات قصيرة أو قائمة نقاط موجزة (بدون إطالة مفرطة) | |
| لا تُدخل في الملخص أي تحليلات فقهية أو آراء، فقط وصف منظم لأهم الأحكام. | |
| REQUIREMENTS: | |
| - Do NOT wrap the JSON in Markdown. | |
| - Do NOT add any extra keys or metadata. | |
| - Do NOT add explanations before or after the JSON. | |
| - Ensure the JSON is valid and parseable (proper quotes, commas, and escaping). | |
| ======================== | |
| OUTPUT FORMAT (STRICT) | |
| ======================== | |
| Return ONLY a single JSON object, with EXACTLY these two fields: | |
| { | |
| "text": "<the full cleaned main body text of the document as one string>", | |
| "summary": "<the concise Arabic summary of the entire document>" | |
| } """) | |
| # Use SDK responses API | |
| response = self.client.responses.create( | |
| model=self.model, | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_file", | |
| "file_id": uploaded_file.id, | |
| }, | |
| { | |
| "type": "input_text", | |
| "text": prompt, | |
| }, | |
| ], | |
| } | |
| ], | |
| ) | |
| # Extract output_text from response | |
| response_text = response.output_text | |
| if not response_text: | |
| raise ValueError("No text returned from OpenAI response.") | |
| result = json.loads(response_text) | |
| combined_text = result.get("text", "") | |
| final_summary = result.get("summary", "") | |
| except Exception as e: | |
| print(f"Error processing {filename} via OpenAI: {e}") | |
| raise | |
| finally: | |
| if uploaded_file: | |
| try: | |
| self.client.files.delete(uploaded_file.id) | |
| except Exception as cleanup_error: | |
| print(f"Warning: failed to delete uploaded file for {filename}: {cleanup_error}") | |
| return { | |
| "filename": filename, | |
| "text": combined_text, | |
| "summary": final_summary | |
| } | |
| def process_all_pdfs(self, documents_folder: str, skip_existing: bool = True) -> List[Dict[str, str]]: | |
| """ | |
| Process all PDF files in a folder, skipping already processed documents. | |
| Args: | |
| documents_folder: Path to folder containing PDF files | |
| skip_existing: If True, skip PDFs that are already in processed_documents.json | |
| Returns: List of newly processed documents | |
| """ | |
| folder = Path(documents_folder) | |
| if not folder.exists(): | |
| raise ValueError(f"Folder {documents_folder} does not exist") | |
| # Load existing processed documents | |
| existing_docs = [] | |
| existing_filenames = set() # Original filenames for reference | |
| existing_filenames_normalized = set() # Normalized filenames for comparison | |
| if skip_existing: | |
| existing_docs = self.load_from_json() | |
| for doc in existing_docs: | |
| original_filename = doc.get("filename") | |
| if original_filename: | |
| original_filename = original_filename.strip() | |
| normalized = self._normalize_filename(original_filename) | |
| existing_filenames.add(original_filename) | |
| existing_filenames_normalized.add(normalized) | |
| if existing_filenames: | |
| print(f"Found {len(existing_filenames)} already processed documents") | |
| pdf_files = list(folder.glob("*.pdf")) | |
| new_processed_docs = [] | |
| skipped_count = 0 | |
| for pdf_file in pdf_files: | |
| filename = pdf_file.name | |
| filename_normalized = self._normalize_filename(filename) | |
| # Debug: Print comparison attempt | |
| # Skip if already processed (using normalized comparison) | |
| if skip_existing and filename_normalized in existing_filenames_normalized: | |
| print(f"⊘ Skipped (already processed): {filename}") | |
| skipped_count += 1 | |
| continue | |
| # Also check original filename for backward compatibility | |
| if skip_existing and filename in existing_filenames: | |
| print(f"⊘ Skipped (already processed, exact match): {filename}") | |
| skipped_count += 1 | |
| continue | |
| # Process new document | |
| try: | |
| result = self.process_pdf_with_llm(str(pdf_file)) | |
| new_processed_docs.append(result) | |
| print(f"✓ Processed: {result['filename']}") | |
| except Exception as e: | |
| print(f"✗ Failed to process {pdf_file.name}: {e}") | |
| # Merge with existing documents and save | |
| if new_processed_docs: | |
| all_docs = existing_docs + new_processed_docs | |
| self.save_to_json(all_docs) | |
| print(f"Processed {len(new_processed_docs)} new documents, skipped {skipped_count} existing") | |
| elif skipped_count > 0: | |
| print(f"All documents already processed. Skipped {skipped_count} documents.") | |
| return new_processed_docs | |
| def save_to_json(self, processed_docs: List[Dict[str, str]], json_path: Optional[str] = None, append: bool = False): | |
| """ | |
| Save processed documents to JSON file. | |
| Args: | |
| processed_docs: List of documents to save | |
| json_path: Optional path to JSON file | |
| append: If True, append to existing file (avoiding duplicates). If False, overwrite. | |
| """ | |
| if json_path is None: | |
| project_root = Path(__file__).resolve().parents[1] | |
| json_path = str(project_root / "processed_documents.json") | |
| json_path = Path(json_path) | |
| if append and json_path.exists(): | |
| # Load existing and merge, avoiding duplicates | |
| existing_docs = self.load_from_json(json_path) | |
| existing_filenames = {doc.get("filename") for doc in existing_docs if doc.get("filename")} | |
| existing_filenames_normalized = {self._normalize_filename(fn) for fn in existing_filenames} | |
| # Add only new documents (using normalized comparison) | |
| for doc in processed_docs: | |
| doc_filename = doc.get("filename", "") | |
| doc_filename_normalized = self._normalize_filename(doc_filename) | |
| # Check both normalized and original for backward compatibility | |
| if doc_filename not in existing_filenames and doc_filename_normalized not in existing_filenames_normalized: | |
| existing_docs.append(doc) | |
| processed_docs = existing_docs | |
| with open(json_path, "w", encoding="utf-8") as f: | |
| json.dump(processed_docs, f, ensure_ascii=False, indent=2) | |
| print(f"Saved {len(processed_docs)} documents to {json_path}") | |
| def load_from_json(self, json_path: Optional[str] = None) -> List[Dict[str, str]]: | |
| """Load processed documents from JSON file""" | |
| if json_path is None: | |
| project_root = Path(__file__).resolve().parents[1] | |
| json_path = str(project_root / "processed_documents.json") | |
| json_path = Path(json_path) | |
| if not json_path.exists(): | |
| return [] | |
| with open(json_path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def get_text_by_filename(self, filename: str, json_path: Optional[str] = None) -> Optional[str]: | |
| """Get full text for a document by filename""" | |
| docs = self.load_from_json(json_path) | |
| for doc in docs: | |
| if doc.get("filename") == filename: | |
| return doc.get("text", "") | |
| return None | |