Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """Enhanced web document annotation tool with modern UI.""" | |
| import hashlib | |
| import json | |
| import os | |
| import uuid | |
| from collections import defaultdict | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from random import sample, shuffle | |
| import gradio as gr | |
| from datasets import Dataset, load_dataset | |
| from loguru import logger | |
| def doc_hash(url: str, text: str) -> str: | |
| return hashlib.sha256(f"{url}{text}".encode()).hexdigest() | |
| def filterfunc(x: dict) -> bool: | |
| if len(x.get("text", "").split()) < 100: | |
| return False | |
| excluded = {"Promotional/Advertisement", "Machine-Generated", "Images/Videos/Audio", | |
| "Truncated", "Spam/Ads", "Product Page", "Content Listing"} | |
| for version in ["document_type_v1", "document_type_v2"]: | |
| for level in ["primary", "secondary"]: | |
| if label := x.get("eai_taxonomy", {}).get(version, {}).get(level, {}).get("label"): | |
| if label in excluded: | |
| return False | |
| return True | |
| class DocLoader: | |
| __slots__ = ("docs", "index", "processed", "_dataset") | |
| def __init__(self, processed: set[str]): | |
| self.processed = processed | |
| self.index = 0 | |
| self.docs = [] | |
| self._dataset = {} | |
| self._load() | |
| def _load(self): | |
| ds = load_dataset("sumuks/essential-web-v1.0-sample-10M", split="train") | |
| logger.info(f"Loaded {len(ds)} documents") | |
| ds = ds.filter(filterfunc) | |
| logger.info(f"Filtered to {len(ds)} documents") | |
| # Build dataset lookup and collect unprocessed docs | |
| unprocessed = [] | |
| for idx, doc in enumerate(ds): | |
| doc_key = doc.get("id", idx) | |
| doc_with_key = dict(doc) | |
| doc_with_key["_dataset_key"] = doc_key | |
| self._dataset[doc_key] = doc_with_key | |
| # Check if already processed | |
| url = doc.get("metadata", {}).get("url", doc.get("url", "")) | |
| h = doc_hash(url, doc.get("text", "")) | |
| if h not in self.processed: | |
| unprocessed.append(doc_with_key) | |
| logger.info(f"Found {len(unprocessed)} unprocessed documents") | |
| # Randomize the order for this session | |
| shuffle(unprocessed) | |
| self.docs = unprocessed | |
| logger.info(f"Loaded {len(self.docs)} documents for this session") | |
| def next(self) -> dict | None: | |
| if self.index < len(self.docs): | |
| doc = self.docs[self.index] | |
| self.index += 1 | |
| return doc | |
| return None | |
| def get_by_id(self, doc_id: str | int) -> dict | None: | |
| result = self._dataset.get(doc_id) | |
| if result is None and isinstance(doc_id, str) and doc_id.isdigit(): | |
| result = self._dataset.get(int(doc_id)) | |
| elif result is None and isinstance(doc_id, int): | |
| result = self._dataset.get(str(doc_id)) | |
| return result | |
| def remaining(self) -> int: | |
| return max(0, len(self.docs) - self.index) | |
| class AnnotationStore: | |
| path: Path | |
| session_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| buffer: list[dict] = field(default_factory=list) | |
| threshold: int = 25 | |
| processed: set[str] = field(default_factory=set) | |
| annotations: list[dict] = field(default_factory=list) | |
| session_stats: dict = field(default_factory=lambda: { | |
| "total": 0, | |
| "selected": 0, | |
| "discarded": 0, | |
| "start_time": datetime.now(timezone.utc), | |
| "decisions": [] | |
| }) | |
| def __post_init__(self): | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| if self.path.exists(): | |
| for line in self.path.read_text().splitlines(): | |
| if rec := self._parse_line(line): | |
| self.processed.add(rec["hash"]) | |
| self.annotations.append(rec) | |
| logger.info(f"Loaded {len(self.processed)} existing annotations") | |
| def _parse_line(self, line: str) -> dict | None: | |
| try: | |
| return json.loads(line) | |
| except: | |
| return None | |
| def add(self, doc_hash: str, decision: str, doc_id: str | int): | |
| if doc_hash in self.processed: | |
| logger.warning(f"Attempted to add already processed document: {doc_hash}") | |
| return | |
| rec = { | |
| "hash": doc_hash, | |
| "decision": decision, | |
| "session": self.session_id, | |
| "id": doc_id, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| } | |
| self.path.open("a").write(json.dumps(rec) + "\n") | |
| self.processed.add(doc_hash) | |
| self.buffer.append(rec) | |
| self.annotations.append(rec) | |
| self.session_stats["total"] += 1 | |
| if decision == "selected": | |
| self.session_stats["selected"] += 1 | |
| elif decision == "discarded": | |
| self.session_stats["discarded"] += 1 | |
| self.session_stats["decisions"].append((datetime.now(timezone.utc), decision)) | |
| if len(self.buffer) >= self.threshold: | |
| self.flush() | |
| def flush(self): | |
| if not self.buffer or not (token := os.getenv("HF_TOKEN")): | |
| self.buffer.clear() | |
| return | |
| try: | |
| Dataset.from_list(self.buffer).push_to_hub( | |
| "yourbench/essential-web-annotations", | |
| token=token | |
| ) | |
| logger.info(f"Pushed {len(self.buffer)} annotations") | |
| self.buffer.clear() | |
| except Exception as e: | |
| logger.error(f"Push failed: {e}") | |
| def get_rate(self) -> float: | |
| if not self.session_stats["decisions"]: | |
| return 0.0 | |
| elapsed = (datetime.now(timezone.utc) - self.session_stats["start_time"]).total_seconds() | |
| return (self.session_stats["total"] / elapsed * 3600) if elapsed > 0 else 0.0 | |
| def get_filtered(self, decision: str | None = None) -> list[dict]: | |
| if decision is None or decision == "all": | |
| return self.annotations | |
| return [a for a in self.annotations if a.get("decision") == decision] | |
| SESSION_LIMIT = 50 | |
| store = AnnotationStore(Path("data/annotations.jsonl")) | |
| loader = DocLoader(store.processed) | |
| current = loader.next() | |
| # Viewer state | |
| viewer_state = { | |
| "annotations": [], | |
| "index": 0, | |
| "filter": "all" | |
| } | |
| def format_stats() -> str: | |
| stats = store.session_stats | |
| rate = store.get_rate() | |
| return f""" | |
| <div class="stats-container"> | |
| <div class="stat-item"> | |
| <div class="stat-value">{stats['total']}</div> | |
| <div class="stat-label">Total Annotated</div> | |
| </div> | |
| <div class="stat-item"> | |
| <div class="stat-value">{stats['selected']}</div> | |
| <div class="stat-label">Selected</div> | |
| </div> | |
| <div class="stat-item"> | |
| <div class="stat-value">{stats['discarded']}</div> | |
| <div class="stat-label">Discarded</div> | |
| </div> | |
| <div class="stat-item"> | |
| <div class="stat-value">{rate:.0f}/hr</div> | |
| <div class="stat-label">Annotation Rate</div> | |
| </div> | |
| <div class="stat-item"> | |
| <div class="stat-value">{loader.remaining:,}</div> | |
| <div class="stat-label">Remaining Docs</div> | |
| </div> | |
| </div> | |
| """ | |
| def format_progress() -> tuple[str, float]: | |
| session_completed = store.session_stats["total"] | |
| session_total = SESSION_LIMIT | |
| progress = (session_completed / session_total) if session_total > 0 else 0 | |
| percentage = progress * 100 | |
| return ( | |
| f""" | |
| <div class="progress-container"> | |
| <div class="progress-header"> | |
| <span class="progress-title">Session Progress</span> | |
| <span class="progress-numbers">{session_completed:,} / {session_total:,}</span> | |
| </div> | |
| <div class="progress-bar-bg"> | |
| <div class="progress-bar-fill" style="width: {percentage:.1f}%"></div> | |
| </div> | |
| <div class="progress-percentage">{percentage:.1f}% Complete</div> | |
| </div> | |
| """, | |
| progress | |
| ) | |
| def format_document_info(doc: dict, annotation: dict | None = None) -> str: | |
| if not doc: | |
| return "" | |
| meta = doc.get("metadata", {}) | |
| url = meta.get("url", doc.get("url", "")) | |
| domain = url.split('/')[2] if url and '/' in url else "Unknown" | |
| cat = doc.get("eai_taxonomy", {}).get("document_type_v2", {}).get("primary", {}).get("label", "Uncategorized") | |
| word_count = len(doc.get("text", "").split()) | |
| annotation_info = "" | |
| if annotation: | |
| timestamp = datetime.fromisoformat(annotation["timestamp"].replace("Z", "+00:00")) | |
| decision_color = "#667eea" if annotation["decision"] == "selected" else "#f5576c" | |
| annotation_info = f""" | |
| <div class="annotation-info" style="border-left: 4px solid {decision_color};"> | |
| <span class="annotation-decision" style="color: {decision_color};"> | |
| {"β " if annotation["decision"] == "selected" else "β"} {annotation["decision"].title()} | |
| </span> | |
| <span class="annotation-time">π {timestamp.strftime("%Y-%m-%d %H:%M:%S")}</span> | |
| </div> | |
| """ | |
| return f""" | |
| <div class="doc-info"> | |
| {annotation_info} | |
| <div class="doc-meta"> | |
| <span class="doc-domain">π {domain}</span> | |
| <span class="doc-category">π·οΈ {cat}</span> | |
| <span class="doc-words">π {word_count:,} words</span> | |
| </div> | |
| <a href="{url}" target="_blank" class="doc-url">{url}</a> | |
| </div> | |
| """ | |
| def choose(decision: str): | |
| global current | |
| if not current: | |
| return done_state() | |
| url = current.get("metadata", {}).get("url", current.get("url", "")) | |
| h = doc_hash(url, current.get("text", "")) | |
| doc_id = current.get("_dataset_key", current.get("id", "")) | |
| store.add(h, decision, doc_id) | |
| if store.session_stats["total"] >= SESSION_LIMIT: | |
| return done_state() | |
| current = loader.next() | |
| if not current: | |
| return done_state() | |
| progress_html, progress_num = format_progress() | |
| return ( | |
| format_document_info(current), | |
| current.get("text", ""), | |
| gr.update(interactive=True), | |
| gr.update(interactive=True), | |
| format_stats(), | |
| progress_html, | |
| progress_num | |
| ) | |
| def done_state(): | |
| progress_html, progress_num = format_progress() | |
| if store.session_stats["total"] >= SESSION_LIMIT: | |
| message = "π Session Complete!" | |
| description = f"Great job! You've completed your session of {SESSION_LIMIT} documents." | |
| else: | |
| message = "π All documents annotated!" | |
| description = "Great job! You've completed all available documents." | |
| return ( | |
| f"<div class='done-message'>{message}</div>", | |
| description, | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| format_stats(), | |
| progress_html, | |
| 1.0 | |
| ) | |
| def update_viewer_filter(filter_value: str): | |
| viewer_state["filter"] = filter_value | |
| viewer_state["index"] = 0 | |
| viewer_state["annotations"] = store.get_filtered(filter_value) | |
| logger.info(f"Filter: {filter_value}, Found {len(viewer_state['annotations'])} annotations") | |
| return update_viewer_display() | |
| def navigate_viewer(direction: int): | |
| if not viewer_state["annotations"]: | |
| return update_viewer_display() | |
| viewer_state["index"] = (viewer_state["index"] + direction) % len(viewer_state["annotations"]) | |
| return update_viewer_display() | |
| def update_viewer_display(): | |
| if not viewer_state["annotations"]: | |
| return ( | |
| "<div class='viewer-empty'>No annotations to display</div>", | |
| "", | |
| f"0 / 0", | |
| gr.update(interactive=False), | |
| gr.update(interactive=False) | |
| ) | |
| idx = viewer_state["index"] | |
| annotation = viewer_state["annotations"][idx] | |
| doc = loader.get_by_id(annotation["id"]) | |
| if not doc: | |
| logger.warning(f"Document not found for ID: {annotation['id']} (type: {type(annotation['id'])})") | |
| return ( | |
| "<div class='viewer-error'>Document not found in dataset</div>", | |
| f"Annotation details: {json.dumps(annotation, indent=2)}", | |
| f"{idx + 1} / {len(viewer_state['annotations'])}", | |
| gr.update(interactive=idx > 0), | |
| gr.update(interactive=idx < len(viewer_state["annotations"]) - 1) | |
| ) | |
| return ( | |
| format_document_info(doc, annotation), | |
| doc.get("text", ""), | |
| f"{idx + 1} / {len(viewer_state['annotations'])}", | |
| gr.update(interactive=idx > 0), | |
| gr.update(interactive=idx < len(viewer_state["annotations"]) - 1) | |
| ) | |
| def build() -> gr.Blocks: | |
| css = """ | |
| .stats-container { | |
| display: flex; | |
| gap: 15px; | |
| margin: 10px 0; | |
| flex-wrap: nowrap; | |
| justify-content: space-between; | |
| } | |
| .stat-item { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border-radius: 12px; | |
| padding: 15px; | |
| flex: 1; | |
| min-width: 100px; | |
| text-align: center; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| transition: transform 0.2s; | |
| } | |
| .stat-item:hover { | |
| transform: translateY(-2px); | |
| } | |
| .stat-value { | |
| font-size: 24px; | |
| font-weight: bold; | |
| color: white; | |
| margin-bottom: 3px; | |
| } | |
| .stat-label { | |
| font-size: 12px; | |
| color: rgba(255, 255, 255, 0.9); | |
| } | |
| .progress-container { | |
| background: #f8f9fa; | |
| border-radius: 12px; | |
| padding: 15px; | |
| margin: 10px 0; | |
| } | |
| .progress-header { | |
| display: flex; | |
| justify-content: space-between; | |
| margin-bottom: 10px; | |
| font-weight: 600; | |
| } | |
| .progress-bar-bg { | |
| background: #e9ecef; | |
| height: 20px; | |
| border-radius: 10px; | |
| overflow: hidden; | |
| margin-bottom: 10px; | |
| } | |
| .progress-bar-fill { | |
| background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); | |
| height: 100%; | |
| transition: width 0.3s ease; | |
| } | |
| .progress-percentage { | |
| text-align: center; | |
| color: #6c757d; | |
| font-size: 14px; | |
| } | |
| .doc-info { | |
| background: #f8f9fa; | |
| border-radius: 12px; | |
| padding: 15px; | |
| margin-bottom: 10px; | |
| } | |
| .doc-meta { | |
| display: flex; | |
| gap: 20px; | |
| margin-bottom: 10px; | |
| flex-wrap: wrap; | |
| } | |
| .doc-meta span { | |
| font-size: 14px; | |
| color: #495057; | |
| } | |
| .doc-url { | |
| font-size: 14px; | |
| color: #667eea; | |
| text-decoration: none; | |
| word-break: break-all; | |
| } | |
| .doc-url:hover { | |
| text-decoration: underline; | |
| } | |
| .done-message { | |
| font-size: 32px; | |
| text-align: center; | |
| padding: 40px; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| border-radius: 12px; | |
| font-weight: bold; | |
| } | |
| .annotation-info { | |
| display: flex; | |
| justify-content: space-between; | |
| margin-bottom: 10px; | |
| padding-left: 10px; | |
| } | |
| .annotation-decision { | |
| font-weight: 600; | |
| } | |
| .annotation-time { | |
| color: #6c757d; | |
| font-size: 12px; | |
| } | |
| .viewer-empty, .viewer-error { | |
| text-align: center; | |
| padding: 40px; | |
| color: #6c757d; | |
| font-size: 18px; | |
| } | |
| .viewer-nav { | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| gap: 20px; | |
| margin: 10px 0; | |
| } | |
| .viewer-counter { | |
| font-weight: 600; | |
| color: #495057; | |
| } | |
| #select { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border: none; | |
| font-size: 18px; | |
| padding: 12px 24px; | |
| } | |
| #discard { | |
| background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); | |
| border: none; | |
| font-size: 18px; | |
| padding: 12px 24px; | |
| } | |
| .dark .stat-item { | |
| background: linear-gradient(135deg, #434343 0%, #000000 100%); | |
| } | |
| .dark .progress-container, .dark .doc-info { | |
| background: #1a1a1a; | |
| } | |
| .dark .progress-bar-bg { | |
| background: #2a2a2a; | |
| } | |
| @keyframes pulse { | |
| 0% { transform: scale(1); } | |
| 50% { transform: scale(1.05); } | |
| 100% { transform: scale(1); } | |
| } | |
| """ | |
| shortcut_js = """ | |
| <script> | |
| function handleKeyboardShortcuts(e) { | |
| var target = e.target || e.srcElement; | |
| switch (target.tagName.toLowerCase()) { | |
| case "input": | |
| case "textarea": | |
| case "select": | |
| case "button": | |
| return; | |
| default: | |
| if (e.code === "Digit1" || e.key === "1") { | |
| var selectBtn = document.getElementById("select"); | |
| if (selectBtn && !selectBtn.disabled) { | |
| selectBtn.click(); | |
| e.preventDefault(); | |
| } | |
| } | |
| else if (e.code === "Digit2" || e.key === "2") { | |
| var discardBtn = document.getElementById("discard"); | |
| if (discardBtn && !discardBtn.disabled) { | |
| discardBtn.click(); | |
| e.preventDefault(); | |
| } | |
| } | |
| } | |
| } | |
| document.addEventListener('keyup', handleKeyboardShortcuts, false); | |
| document.addEventListener('keydown', function(e) { | |
| if ((e.code === "Digit1" || e.key === "1") && document.getElementById("select") && !document.getElementById("select").disabled) { | |
| document.getElementById("select").style.transform = "scale(0.95)"; | |
| } | |
| if ((e.code === "Digit2" || e.key === "2") && document.getElementById("discard") && !document.getElementById("discard").disabled) { | |
| document.getElementById("discard").style.transform = "scale(0.95)"; | |
| } | |
| }); | |
| document.addEventListener('keyup', function(e) { | |
| if (e.code === "Digit1" || e.key === "1") { | |
| var btn = document.getElementById("select"); | |
| if (btn) btn.style.transform = "scale(1)"; | |
| } | |
| if (e.code === "Digit2" || e.key === "2") { | |
| var btn = document.getElementById("discard"); | |
| if (btn) btn.style.transform = "scale(1)"; | |
| } | |
| }); | |
| </script> | |
| """ | |
| with gr.Blocks( | |
| title="Essential Web Annotation", | |
| theme=gr.themes.Default(), | |
| css=css, | |
| head=shortcut_js | |
| ) as demo: | |
| gr.Markdown("# π Essential Web Annotation Tool") | |
| with gr.Tabs(): | |
| with gr.Tab("Annotate"): | |
| gr.Markdown(""" | |
| ## π Document Quality Assessment | |
| Your task is to evaluate documents for **high-quality, valuable content** that provides generalizable information. | |
| ### β **Select High-Quality Documents:** | |
| Examples include: | |
| - **Technical blogs** with detailed explanations | |
| - **Scientific papers** and research articles | |
| - **Information-rich discussions** with insights | |
| - **Educational content** with actionable knowledge | |
| - **Professional documentation** and guides | |
| ### β **Discard Low-Quality Documents:** | |
| - Content with minimal informational value | |
| ### π― **Quick Assessment Tips:** | |
| - High-quality documents are usually immediately recognizable to a human. | |
| - Use the **Viewer** tab to browse examples of selected documents | |
| - Trust your judgment on content value and depth | |
| ### β¨οΈ **Keyboard Shortcuts:** | |
| | Key | Action | | |
| |-----|--------| | |
| | **`1`** | β Select document | | |
| | **`2`** | β Discard document | | |
| """) | |
| progress_html, progress_num = format_progress() | |
| progress_display = gr.HTML(progress_html) | |
| stats_display = gr.HTML(format_stats()) | |
| if current: | |
| doc_info_html = format_document_info(current) | |
| text_val = current.get("text", "") | |
| else: | |
| doc_info_html = "<div class='doc-info'>No documents loaded.</div>" | |
| text_val = "" | |
| doc_info = gr.HTML(doc_info_html) | |
| with gr.Column(variant="panel"): | |
| text_display = gr.Textbox( | |
| text_val, | |
| label="π Document Content", | |
| lines=20, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| with gr.Row(): | |
| btn_sel = gr.Button( | |
| "β Select (1)", | |
| elem_id="select", | |
| variant="primary", | |
| interactive=bool(current), | |
| size="lg" | |
| ) | |
| btn_dis = gr.Button( | |
| "β Discard (2)", | |
| elem_id="discard", | |
| variant="stop", | |
| interactive=bool(current), | |
| size="lg" | |
| ) | |
| progress_bar = gr.Number(value=progress_num, visible=False) | |
| outputs = [doc_info, text_display, btn_sel, btn_dis, stats_display, progress_display, progress_bar] | |
| btn_sel.click(lambda: choose("selected"), outputs=outputs) | |
| btn_dis.click(lambda: choose("discarded"), outputs=outputs) | |
| with gr.Tab("Viewer"): | |
| gr.Markdown("### π Browse Annotated Documents") | |
| with gr.Row(): | |
| filter_dropdown = gr.Radio( | |
| choices=["all", "selected", "discarded"], | |
| value="all", | |
| label="Filter", | |
| interactive=True | |
| ) | |
| viewer_info = gr.HTML() | |
| with gr.Column(variant="panel"): | |
| viewer_text = gr.Textbox( | |
| label="π Document Content", | |
| lines=20, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| with gr.Row(): | |
| prev_btn = gr.Button("β Previous", size="lg") | |
| viewer_counter = gr.HTML("<div class='viewer-counter'>0 / 0</div>") | |
| next_btn = gr.Button("Next β", size="lg") | |
| filter_dropdown.change( | |
| update_viewer_filter, | |
| inputs=[filter_dropdown], | |
| outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] | |
| ) | |
| prev_btn.click( | |
| lambda: navigate_viewer(-1), | |
| outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] | |
| ) | |
| next_btn.click( | |
| lambda: navigate_viewer(1), | |
| outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] | |
| ) | |
| demo.load( | |
| lambda: update_viewer_filter("all"), | |
| outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] | |
| ) | |
| gr.HTML(""" | |
| <script> | |
| const observer = new MutationObserver(() => { | |
| document.querySelectorAll('.stat-item').forEach(item => { | |
| item.style.animation = 'pulse 0.3s ease-out'; | |
| }); | |
| }); | |
| observer.observe(document.body, { childList: true, subtree: true }); | |
| </script> | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| build().launch() |