diff --git "a/hf_deploy/app.py" "b/hf_deploy/app.py"
--- "a/hf_deploy/app.py"
+++ "b/hf_deploy/app.py"
@@ -1,2124 +1,94 @@
#!/usr/bin/env python3
"""
-Secure version of BytePlus Image Generation Studio
-Implements security best practices and recommendations
+HuggingFace Spaces main launcher for BytePlus Image Generation Studio
+Ensures all required directories exist before starting the app
"""
-import gradio as gr
-import requests
-import concurrent.futures
-import base64
-import io
-import time
import os
-import re
-import hashlib
-import secrets
-import random
-import zipfile
-import tempfile
-from datetime import datetime, timedelta
-from PIL import Image
-import numpy as np
-from functools import wraps
-from html import escape
-from typing import Optional, Tuple, List, Any
-import logging
-from urllib.parse import urlparse
-# Load environment variables - make dotenv optional
-try:
- from dotenv import load_dotenv
- load_dotenv()
-except ImportError:
- pass # Continue without dotenv if not available
-
-# Configure secure logging
-logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('app_security.log'),
- logging.StreamHandler()
+import sys
+from pathlib import Path
+import importlib.util
+
+def ensure_directories_exist():
+ """Create all required directories for BytePlus operation."""
+ print("🏗️ Ensuring required directories exist...")
+
+ # Define all directories needed
+ directories = [
+ "Generated",
+ "static",
+ "view_session",
+ "static/css",
+ "static/js",
+ "static/images"
]
-)
-logger = logging.getLogger(__name__)
-
-# Security Configuration
-MAX_IMAGE_SIZE = 10 * 1024 * 1024 # 10MB
-ALLOWED_IMAGE_FORMATS = {'JPEG', 'PNG', 'WEBP'}
-MAX_PROMPT_LENGTH = 1000 # Increased to accommodate the pre-set prompts
-RATE_LIMIT_REQUESTS = 20
-RATE_LIMIT_WINDOW = 60 # seconds
-MAX_CONCURRENT_REQUESTS = 4
-# Allowed domains for API and development
-ALLOWED_DOMAINS = {
- # BytePlus API and CDN domains
- 'bytepluses.com',
- 'volces.com',
- 'byteplus.com',
- 'volcengine.com',
- # Local development
- 'localhost',
- '127.0.0.1',
- '0.0.0.0',
- # Gradio sharing
- 'gradio.live',
- 'gradio.app',
- 'share.gradio.app'
-}
-
-class RateLimiter:
- """Rate limiting implementation to prevent abuse"""
- def __init__(self, max_requests=RATE_LIMIT_REQUESTS, window=RATE_LIMIT_WINDOW):
- self.requests = {}
- self.max_requests = max_requests
- self.window = window
-
- def check_rate_limit(self, session_id: str) -> bool:
- """Check if request is within rate limits"""
- now = time.time()
- if session_id not in self.requests:
- self.requests[session_id] = []
-
- # Clean old requests
- self.requests[session_id] = [
- req for req in self.requests[session_id]
- if now - req < self.window
- ]
-
- if len(self.requests[session_id]) >= self.max_requests:
- logger.warning(f"Rate limit exceeded for session: {session_id}")
- return False
-
- self.requests[session_id].append(now)
- return True
-
-class InputValidator:
- """Input validation and sanitization"""
-
- @staticmethod
- def sanitize_prompt(prompt: str) -> str:
- """Sanitize user prompts to prevent injection attacks"""
- if not prompt:
- return ""
-
- # Remove potential injection patterns
- prompt = re.sub(r'[<>\"\'`;\\]', '', prompt)
-
- # Escape HTML entities
- prompt = escape(prompt)
-
- # Limit length
- prompt = prompt[:MAX_PROMPT_LENGTH]
-
- # Remove multiple spaces
- prompt = ' '.join(prompt.split())
-
- return prompt.strip()
-
- @staticmethod
- def validate_image(image: Any) -> Tuple[bool, str]:
- """Validate image input for security"""
- if image is None:
- return False, "No image provided"
-
- try:
- # Handle different input types
- if isinstance(image, tuple):
- if len(image) > 0:
- image = image[0]
- else:
- return False, "Invalid image format"
-
- # Convert to PIL Image if needed
- if not isinstance(image, Image.Image):
- try:
- image = Image.fromarray(image)
- except Exception:
- return False, "Failed to process image"
-
- # Check image format
- if image.format and image.format not in ALLOWED_IMAGE_FORMATS:
- return False, f"Invalid image format. Allowed: {', '.join(ALLOWED_IMAGE_FORMATS)}"
-
- # Check image size
- img_byte_arr = io.BytesIO()
- image.save(img_byte_arr, format='JPEG')
- if len(img_byte_arr.getvalue()) > MAX_IMAGE_SIZE:
- return False, f"Image too large. Maximum size: {MAX_IMAGE_SIZE / 1024 / 1024}MB"
-
- # Check image dimensions
- width, height = image.size
- if width > 4096 or height > 4096:
- return False, "Image dimensions too large. Maximum: 4096x4096"
-
- return True, "Valid"
-
- except Exception as e:
- logger.error(f"Image validation error: {str(e)}")
- return False, "Image validation failed"
-
- @staticmethod
- def validate_api_key(api_key: str) -> bool:
- """Validate API key format"""
- if not api_key:
- return False
-
- # Basic validation - adjust pattern based on actual API key format
- pattern = r'^[A-Za-z0-9_\-]{20,}$'
- return bool(re.match(pattern, api_key))
-
-class SecureAPIManager:
- """Secure API management with encryption and protection"""
-
- def __init__(self):
- self.api_url = "https://ark.cn-beijing.volces.com/api/v3/images/generations"
- self._api_key_hash = None
- self.timeout = 30
- self.max_retries = 3
- self.retry_delay = 2
- self.session_keys = {} # Store encrypted keys per session
-
- def set_api_key(self, api_key: str, session_id: str) -> bool:
- """Securely store API key for session"""
- if not InputValidator.validate_api_key(api_key):
- logger.warning("Invalid API key format attempted")
- return False
-
- # Store hash for validation
- self._api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()
-
- # Store encrypted key for session
- session_key = secrets.token_hex(32)
- # In production, use proper encryption like cryptography.fernet
- self.session_keys[session_id] = {
- 'key': api_key, # In production, encrypt this
- 'expires': datetime.now() + timedelta(hours=1)
- }
-
- logger.info(f"API key set for session: {session_id[:8]}...")
- return True
-
- def get_api_key(self, session_id: str) -> Optional[str]:
- """Retrieve API key for session"""
- if session_id not in self.session_keys:
- return None
-
- session_data = self.session_keys[session_id]
-
- # Check expiration
- if datetime.now() > session_data['expires']:
- del self.session_keys[session_id]
- logger.info(f"Session expired: {session_id[:8]}...")
- return None
-
- return session_data['key']
- def validate_url(self, url: str) -> bool:
- """Validate external URLs - focused on API response URLs only"""
+ for dir_name in directories:
+ dir_path = Path(dir_name)
try:
- parsed = urlparse(url)
+ if not dir_path.exists():
+ dir_path.mkdir(parents=True, exist_ok=True)
+ print(f"✅ Created directory: {dir_name}")
+ else:
+ print(f"✅ Directory exists: {dir_name}")
- # For API response URLs from BytePlus
- if parsed.hostname:
- # Allow BytePlus/Volcengine CDN domains
- if any(domain in parsed.hostname for domain in [
- 'bytepluses.com',
- 'volces.com',
- 'byteplus.com',
- 'volcengine.com',
- 'ark-project.tos-cn-beijing.volces.com' # New CDN domain
- ]):
- # These are legitimate API response URLs
- return True
-
- # Allow localhost and Gradio URLs for development
- if any(pattern in parsed.hostname for pattern in [
- 'localhost',
- '127.0.0.1',
- '0.0.0.0',
- 'gradio.live',
- 'gradio.app',
- '.gradio.live',
- '.share.gradio.app'
- ]):
- return True
+ # Set proper permissions (755 for directories)
+ if dir_path.is_dir():
+ dir_path.chmod(0o755)
- # Log but don't block other domains - this is just for monitoring
- logger.info(f"External domain accessed: {parsed.hostname}")
-
- # For production, you might want to be more restrictive
- # For now, allow HTTPS URLs from the API
- if parsed.scheme == 'https':
- return True
-
- return False
-
except Exception as e:
- logger.error(f"URL validation error: {str(e)}")
- return False
-
-class SecureImageGenerator:
- """Secure implementation of BytePlus image generation"""
-
- def __init__(self):
- self.api_manager = SecureAPIManager()
- self.rate_limiter = RateLimiter()
- self.validator = InputValidator()
- self.active_requests = 0
- self.max_concurrent = MAX_CONCURRENT_REQUESTS
-
- def process_image_for_api(self, image: Any) -> Optional[str]:
- """Securely process and validate image for API"""
- is_valid, message = self.validator.validate_image(image)
- if not is_valid:
- logger.warning(f"Image validation failed: {message}")
- return None
-
- try:
- # Handle different input types
- if isinstance(image, tuple):
- if len(image) > 0:
- image = image[0]
- else:
- return None
-
- # Ensure PIL Image
- if not isinstance(image, Image.Image):
- image = Image.fromarray(image)
-
- # Resize securely
- image = image.resize((512, 512), Image.Resampling.LANCZOS)
-
- # Convert to base64
- buffer = io.BytesIO()
- image.save(buffer, format="JPEG", quality=85, optimize=True)
-
- # Check final size
- if len(buffer.getvalue()) > MAX_IMAGE_SIZE:
- logger.warning("Processed image exceeds size limit")
- return None
-
- image_base64 = base64.b64encode(buffer.getvalue()).decode()
- return f"data:image/jpeg;base64,{image_base64}"
-
- except Exception as e:
- logger.error(f"Image processing error: {str(e)}")
- return None
-
- def call_api_single(self, prompt: str, image_base64: str, gender_value: str,
- session_id: str, seed: int = -1,
- watermark: bool = False, size: str = "1K",
- sequential_image_generation: str = "disabled",
- style_image_base64: str = None, response_format: str = "url") -> Tuple[Optional[str], Optional[str], Optional[dict]]:
- """Make secure API call with validation and error handling"""
-
- # Check rate limit
- if not self.rate_limiter.check_rate_limit(session_id):
- return None, "Rate limit exceeded. Please wait before trying again.", None
-
- # Check concurrent requests
- if self.active_requests >= self.max_concurrent:
- return None, "Too many concurrent requests. Please wait.", None
-
- # Get API key
- api_key = self.api_manager.get_api_key(session_id)
- if not api_key:
- return None, "API key not set or session expired", None
-
- # Sanitize prompt
- prompt = self.validator.sanitize_prompt(prompt)
- processed_prompt = prompt.replace("{gender}", gender_value)
-
- # Validate parameters
- if seed != -1:
- seed = max(0, min(seed, 2147483647)) # Ensure valid seed range
-
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {api_key}",
- "X-Request-ID": secrets.token_hex(16) # Add request tracking
- }
-
- # Handle multi-image input - combine webcam image and style reference
- images_array = [image_base64]
- if style_image_base64:
- images_array.append(style_image_base64)
-
- payload = {
- "model": "doubao-seedream-4-0-250828",
- "prompt": processed_prompt,
- "image": images_array,
- "response_format": response_format,
- "size": size,
- "watermark": watermark,
- "sequential_image_generation": sequential_image_generation,
- "stream": False
- }
-
- # Only include seed if it's not -1 (let API use random seed)
- if seed != -1:
- payload["seed"] = seed
-
- # Log API request details
- logger.info(f"API Request - Session: {session_id[:8]}...")
- logger.info(f"API Request - Prompt: {processed_prompt[:100]}...")
- logger.info(f"API Request - Size: {size}, Watermark: {watermark}")
- logger.info(f"API Request - Sequential: {sequential_image_generation}, Seed: {seed if seed != -1 else 'random'}")
- logger.info(f"API Request - Images: {len(images_array)} image(s)")
-
- self.active_requests += 1
-
- try:
- for attempt in range(self.api_manager.max_retries):
- try:
- response = requests.post(
- self.api_manager.api_url,
- headers=headers,
- json=payload,
- timeout=self.api_manager.timeout
- )
-
- if response.status_code == 200:
- result = response.json()
-
- # Log comprehensive API response details
- logger.info(f"API Response - Status: 200 OK")
- logger.info(f"API Response - Full URL: {self.api_manager.api_url}")
- logger.info(f"API Response - Data keys: {list(result.keys())}")
- logger.info(f"API Response - Full response: {result}")
-
- if "data" in result and len(result["data"]) > 0:
- data_item = result["data"][0]
-
- if response_format == "b64_json":
- # Handle base64 JSON response
- b64_data = data_item.get("b64_json")
- if b64_data:
- # Return the base64 data directly with a special prefix to indicate it's base64
- image_data = f"data:image/jpeg;base64,{b64_data}"
- logger.info(f"API Response - SUCCESS for session: {session_id[:8]}... (b64_json format)")
-
- # Prepare response details for UI display
- response_details = {
- 'format': 'b64_json',
- 'url': image_data, # This is actually base64 data
- 'full_url': 'N/A (base64)',
- 'seed': data_item.get('seed'),
- 'usage': result.get('usage', {}),
- 'revised_prompt': data_item.get('revised_prompt'),
- 'response_keys': list(result.keys()),
- 'data_keys': list(data_item.keys()) if data_item else []
- }
-
- return image_data, None, response_details
- else:
- logger.warning(f"API Response - No b64_json data in response: {result}")
- return None, "No base64 data in response", None
- else:
- # Handle URL response (existing logic)
- url = data_item.get("url")
-
- # Log detailed response data
- logger.info(f"API Response - Generated URL: {url}")
- if "usage" in result:
- usage = result["usage"]
- logger.info(f"API Response - Usage: Images: {usage.get('generated_images', 0)}, Tokens: {usage.get('total_tokens', 0)}")
-
- # Additional response details
- if "revised_prompt" in data_item:
- logger.info(f"API Response - Revised Prompt: {data_item.get('revised_prompt')}")
- if "seed" in data_item:
- logger.info(f"API Response - Used Seed: {data_item.get('seed')}")
-
- # Validate returned URL
- if url and self.api_manager.validate_url(url):
- logger.info(f"API Response - SUCCESS for session: {session_id[:8]}...")
-
- # Prepare response details for UI display
- response_details = {
- 'format': 'url',
- 'url': url,
- 'full_url': self.api_manager.api_url,
- 'usage': result.get('usage', {}),
- 'revised_prompt': data_item.get('revised_prompt'),
- 'seed': data_item.get('seed'),
- 'response_keys': list(result.keys()),
- 'data_keys': list(data_item.keys()) if data_item else []
- }
-
- return url, None, response_details
- else:
- logger.warning(f"API Response - Invalid URL returned: {url}")
- return None, "Invalid response from API", None
- else:
- logger.warning(f"API Response - No data in response: {result}")
- return None, "No data returned from API", None
-
- elif response.status_code == 429:
- return None, "API rate limit exceeded", None
-
- elif response.status_code == 401:
- logger.warning(f"API Response - Authentication failed for session: {session_id[:8]}...")
- return None, "Authentication failed", None
-
- else:
- # Log error response details
- logger.error(f"API Response - Error {response.status_code}")
- try:
- error_detail = response.json()
- logger.error(f"API Response - Error details: {error_detail}")
- except:
- logger.error(f"API Response - Error text: {response.text[:200]}")
-
- if attempt == self.api_manager.max_retries - 1:
- return None, f"API error (Code: {response.status_code})", None
-
- except requests.Timeout:
- if attempt == self.api_manager.max_retries - 1:
- return None, "Request timeout", None
-
- except requests.RequestException:
- if attempt == self.api_manager.max_retries - 1:
- return None, "Network error", None
-
- # Wait before retry
- if attempt < self.api_manager.max_retries - 1:
- time.sleep(self.api_manager.retry_delay)
-
- return None, "Max retries exceeded", None
-
- finally:
- self.active_requests -= 1
-
-# Initialize secure generator
-generator = SecureImageGenerator()
-
-def generate_session_id() -> str:
- """Generate secure session ID"""
- return secrets.token_hex(32)
-
-def process_images_secure(image, prompt1, prompt2, prompt3, prompt4,
- gender, api_key, session_id, seed, watermark,
- size, sequential_image_generation, style_image1, style_image2,
- style_image3, style_image4, response_format="url", num_images=4, progress_callback=None):
- """Secure image processing with validation and error handling"""
-
- status_updates = []
-
- # Validate and set API key
- if not api_key:
- return [None] * 4, ["API key required"] * 4, status_updates
-
- status_updates.append("🔐 API key validated")
-
- if not generator.api_manager.set_api_key(api_key, session_id):
- return [None] * 4, ["Invalid API key format"] * 4, status_updates
-
- # Validate image
- is_valid, message = generator.validator.validate_image(image)
- if not is_valid:
- status_updates.append(f"❌ Image validation failed: {message}")
- return [None] * 4, [message] * 4, status_updates
-
- status_updates.append("✅ Image validated successfully")
-
- # Process image
- image_base64 = generator.process_image_for_api(image)
- if image_base64 is None:
- status_updates.append("❌ Failed to process image")
- return [None] * 4, ["Failed to process image"] * 4, status_updates
-
- status_updates.append("🖼️ Image processed and encoded")
-
- # Prepare prompts and style images - use only the provided prompts (selected by checkboxes)
- prompts = [p for p in [prompt1, prompt2, prompt3, prompt4] if p and p.strip()]
- style_images = [style_image1, style_image2, style_image3, style_image4]
-
- # Process style images into base64 format
- processed_style_images = []
- style_count = 0
- for i, style_img in enumerate(style_images):
- if style_img is not None:
- style_img_base64 = generator.process_image_for_api(style_img)
- if style_img_base64:
- processed_style_images.append(style_img_base64)
- style_count += 1
- status_updates.append(f"🎨 Style reference {i+1} processed")
- else:
- processed_style_images.append(None)
- status_updates.append(f"⚠️ Style reference {i+1} processing failed")
+ print(f"⚠️ Warning: Could not handle {dir_name}: {e}")
+
+ print("🎯 All required directories are ready!")
+
+def load_byteplus_app():
+ """Load and return the BytePlus application."""
+ try:
+ # Load the BytePlus app module
+ spec = importlib.util.spec_from_file_location("byteplus_app", "byteplus_app.py")
+ byteplus_module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(byteplus_module)
+
+ # Try to find the Gradio app
+ if hasattr(byteplus_module, 'demo'):
+ return byteplus_module.demo
+ elif hasattr(byteplus_module, 'app'):
+ return byteplus_module.app
+ elif hasattr(byteplus_module, 'interface'):
+ return byteplus_module.interface
else:
- processed_style_images.append(None)
-
- if style_count > 0:
- status_updates.append(f"✅ {style_count} style reference(s) ready")
-
- # Initialize results and errors arrays to match the number of actual prompts
- actual_num_prompts = len(prompts)
- results = [None] * actual_num_prompts
- errors = [None] * actual_num_prompts
-
- # Count active prompts
- active_prompts = sum(1 for p in prompts if p and p.strip())
- if active_prompts > 0:
- status_updates.append(f"🚀 Starting {active_prompts} API calls...")
- status_updates.append(f"📄 Response format: {response_format}")
-
- # Execute with thread pool
- with concurrent.futures.ThreadPoolExecutor(max_workers=min(MAX_CONCURRENT_REQUESTS, num_images)) as executor:
- futures = []
- prompt_index = 0
- for i, prompt in enumerate([prompt1, prompt2, prompt3, prompt4]):
- if prompt and prompt.strip():
- # Get first 50 chars of prompt for display
- prompt_preview = prompt[:50] + "..." if len(prompt) > 50 else prompt
- prompt = generator.validator.sanitize_prompt(prompt)
-
- # Get corresponding style image for this prompt index
- style_img_base64 = processed_style_images[i] if i < len(processed_style_images) else None
-
- future = executor.submit(
- generator.call_api_single,
- prompt,
- image_base64,
- gender,
- session_id,
- seed,
- watermark,
- size,
- sequential_image_generation,
- style_img_base64,
- response_format
- )
- futures.append((prompt_index, future))
-
- style_note = " + style reference" if style_img_base64 else ""
- status_updates.append(f"📤 API call {prompt_index+1} initiated: {prompt_preview}{style_note}")
- prompt_index += 1
-
- # Collect results
- for i, future in futures:
- try:
- url, error, response_details = future.result(timeout=60)
- results[i] = url
- errors[i] = error
- if url:
- status_updates.append(f"✅ Image {i+1} generated successfully")
-
- # Add detailed API response information to status
- if response_details:
- status_updates.append(f"🌐 API URL: {response_details.get('full_url', 'N/A')}")
- status_updates.append(f"🔗 Image URL: {response_details.get('url', 'N/A')[:50]}...")
-
- if response_details.get('seed'):
- status_updates.append(f"🎲 Used Seed: {response_details.get('seed')}")
-
- usage = response_details.get('usage', {})
- if usage:
- tokens = usage.get('total_tokens', 0)
- images = usage.get('generated_images', 0)
- status_updates.append(f"📊 Usage: {images} image(s), {tokens} tokens")
-
- if response_details.get('revised_prompt'):
- revised = response_details.get('revised_prompt')
- preview = revised[:100] + "..." if len(revised) > 100 else revised
- status_updates.append(f"✏️ Revised Prompt: {preview}")
- else:
- status_updates.append(f"⚠️ Image {i+1}: {error}")
- except concurrent.futures.TimeoutError:
- results[i] = None
- errors[i] = "Processing timeout"
- status_updates.append(f"⏱️ Image {i+1} timed out")
- except Exception as e:
- results[i] = None
- errors[i] = "Processing error"
- status_updates.append(f"❌ Image {i+1} error: {str(e)}")
-
- # Pad results to 4 elements for consistent return
- while len(results) < 4:
- results.append(None)
- while len(errors) < 4:
- errors.append(None)
-
- return results, errors, status_updates
-
-def get_generation_history(limit=20, offset=0):
- """Get generation history from Generated folder with lazy loading"""
- generated_dir = "Generated"
-
- if not os.path.exists(generated_dir):
- return []
-
- history_items = []
-
- # Get all session folders and zip files
- items = os.listdir(generated_dir)
- session_folders = [item for item in items if item.startswith('session_') and os.path.isdir(os.path.join(generated_dir, item))]
- zip_files = [item for item in items if item.endswith('.zip') and item.startswith('byteplus_images_')]
-
- # Sort by timestamp (newest first)
- def extract_timestamp(item):
- if item.startswith('session_'):
- return item.replace('session_', '')
- elif item.startswith('byteplus_images_'):
- return item.replace('byteplus_images_', '').replace('.zip', '')
- return ''
-
- all_items = session_folders + zip_files
- all_items.sort(key=extract_timestamp, reverse=True)
-
- # Apply pagination
- paginated_items = all_items[offset:offset + limit]
-
- for item in paginated_items:
- item_path = os.path.join(generated_dir, item)
- timestamp_str = extract_timestamp(item)
-
- try:
- # Parse timestamp
- dt = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
- formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
- except:
- formatted_time = timestamp_str
-
- if item.startswith('session_'):
- # Session folder - get images
- session_images = []
- image_files = [f for f in os.listdir(item_path) if f.endswith('.jpg')]
-
- for img_file in sorted(image_files):
- img_path = os.path.join(item_path, img_file)
- try:
- # Read image and convert to base64 for display
- with Image.open(img_path) as img:
- buffer = io.BytesIO()
- img.save(buffer, format='JPEG', quality=85)
- img_base64 = base64.b64encode(buffer.getvalue()).decode()
- img_data = f"data:image/jpeg;base64,{img_base64}"
-
- # Determine style from filename
- if 'Ghibli' in img_file.lower():
- style = 'Ghibli'
- elif 'van_gogh' in img_file.lower() or 'vangogh' in img_file.lower():
- style = 'Van Gogh'
- elif 'pixar' in img_file.lower():
- style = 'Pixar'
- elif 'picasso' in img_file.lower():
- style = 'Picasso'
- elif 'original' in img_file.lower():
- style = 'Original'
- else:
- style = 'Generated'
-
- session_images.append({
- 'filename': img_file,
- 'style': style,
- 'data': img_data,
- 'path': img_path
- })
- except Exception as e:
- logger.error(f"Failed to load image {img_file}: {str(e)}")
+ # Look for any object with a launch method
+ for attr_name in dir(byteplus_module):
+ attr = getattr(byteplus_module, attr_name)
+ if hasattr(attr, 'launch') and callable(attr.launch):
+ return attr
- history_items.append({
- 'type': 'session',
- 'timestamp': formatted_time,
- 'timestamp_raw': timestamp_str,
- 'path': item_path,
- 'images': session_images,
- 'image_count': len(session_images),
- 'zip_available': any(zip_file.replace('byteplus_images_', '').replace('.zip', '') == timestamp_str
- for zip_file in zip_files)
- })
+ raise ValueError("Could not find Gradio app in byteplus_app.py")
- elif item.endswith('.zip'):
- # ZIP file
- zip_path = item_path
- history_items.append({
- 'type': 'zip',
- 'timestamp': formatted_time,
- 'timestamp_raw': timestamp_str,
- 'path': zip_path,
- 'filename': item,
- 'size': os.path.getsize(zip_path) if os.path.exists(zip_path) else 0,
- 'images': [],
- 'image_count': 0 # Will be determined when ZIP is opened
- })
-
- return history_items
+ except Exception as e:
+ print(f"❌ Error loading BytePlus app: {e}")
+ raise
-def generate_generation_history_table(history_items, page=1, page_size=10):
- """Generate HTML table for generation history with lazy loading"""
- if not history_items:
- return "
No generation history found.
"
-
- total_items = len(history_items)
- start_idx = (page - 1) * page_size
- end_idx = min(start_idx + page_size, total_items)
- current_items = history_items[start_idx:end_idx]
+def main():
+ """Main function to setup and launch BytePlus."""
+ print("🚀 Starting BytePlus Image Generation Studio for HuggingFace Spaces...")
- table_html = f"""
-
-
-
📚 Generation History ({total_items} items)
-
-
-
- Timestamp
- Preview
- Actions
-
-
-
- """
+ # Ensure directories exist first
+ ensure_directories_exist()
- for item in current_items:
- table_html += f"{item['timestamp']}{item['type']} "
-
- if item['type'] == 'session' and item['images']:
- # Show thumbnails for session
- for img in item['images'][:4]: # Show max 4 thumbnails
- table_html += f" "
- if len(item['images']) > 4:
- table_html += f"+{len(item['images']) - 4} more "
- table_html += f"{item['image_count']} image(s)
"
- elif item['type'] == 'zip':
- # Show ZIP info
- size_mb = item['size'] / (1024 * 1024) if item['size'] else 0
- table_html += f""
- table_html += f"ZIP Archive "
- table_html += f"📦 {item['filename']} "
- table_html += f"💾 {size_mb:.1f} MB"
- table_html += "
"
- else:
- table_html += "No preview available "
+ # Load and launch the BytePlus app
+ try:
+ print("📥 Loading BytePlus Image Generation Studio...")
+ demo = load_byteplus_app()
- table_html += " "
-
- # Add direct download links
- if item['type'] == 'session':
- # Direct link to view session images in new tab
- table_html += f"👁️ View "
- if item['zip_available']:
- # Direct download link for ZIP file
- zip_filename = f"byteplus_images_{item['timestamp_raw']}.zip"
- table_html += f"📥 ZIP "
- elif item['type'] == 'zip':
- # Direct download link for ZIP file
- zip_filename = f"byteplus_images_{item['timestamp_raw']}.zip"
- table_html += f"📥 Download "
-
- table_html += " "
-
- table_html += "
"
-
- # Add pagination
- if total_items > page_size:
- total_pages = (total_items + page_size - 1) // page_size
- table_html += ""
-
- table_html += "
"
-
- return table_html
-
-def create_secure_interface():
- """Create Gradio interface with security features"""
-
- # Generate session ID for this instance
- session_id = generate_session_id()
-
- with gr.Blocks(title="Secure BytePlus Image Generation") as demo:
-
- # Store session ID
- session_state = gr.State(value=session_id)
-
- gr.Markdown("# BytePlus (Doubao-SeeDream-4.0 250828) Multi-Style Image Generation Studio")
-
- with gr.Row():
- with gr.Column(scale=1):
- gr.Markdown("""
- ### Step 1: 📸 Capture from webcam or upload an image
- You can either use your webcam (default) or switch to the Upload tab to select an existing photo.
- """)
-
- image_input = gr.Image(
- sources=["webcam", "upload"], # Allow both webcam capture and file upload (webcam default)
- label="Input Image (Webcam or Upload, Max 10MB)",
- height=300
- )
-
- gender_toggle = gr.Radio(
- choices=["man", "woman"],
- value="man",
- label="Step 2: Gender"
- )
-
- # Prompt selection checkboxes
- with gr.Row():
- prompt1_checkbox = gr.Checkbox(
- label="Prompt 1 - Ghibli Style",
- value=True, # Default to selected
- info="Generate image with Ghibli style"
- )
- prompt2_checkbox = gr.Checkbox(
- label="Prompt 2 - Van Gogh Style",
- value=True, # Default to selected
- info="Generate image with Van Gogh style"
- )
- with gr.Row():
- prompt3_checkbox = gr.Checkbox(
- label="Prompt 3 - Pixar Style",
- value=True, # Default to selected
- info="Generate image with Pixar style"
- )
- prompt4_checkbox = gr.Checkbox(
- label="Prompt 4 - Picasso Cubism Style",
- value=True, # Default to selected
- info="Generate image with Picasso Cubism style"
- )
-
- # Remove the old prompt selector since we now use checkboxes
- # prompt_selector = gr.Dropdown(...)
-
- generate_btn = gr.Button(
- "Step 3: 📸 Generate Selected Images",
- variant="primary",
- size="lg"
- )
-
- # Prompts container (always visible)
- with gr.Accordion("Show/Edit Prompts", open=False) as prompts_container:
- gr.Markdown("### 📝 Generation Prompts")
-
- with gr.Row():
- with gr.Column(scale=3):
- prompt1 = gr.Textbox(
- label="Prompt 1 - Ghibli Style",
- placeholder="Ghibli style with face preservation",
- value="A highly detailed close-up portrait of a {gender}, with all original facial features, proportions, and unique expressions perfectly preserved for superior face recognition. The face and hair must remain ultra-realistic, sharply rendered, and unmistakably identifiable as the original person—maintain lifelike textures, fine skin details, and accurate hair structure, with only subtle color and gentle cinematic lighting enhancement.Transform the clothing and background with a rich, full Studio Ghibli style transfer: infuse lush, whimsical scenery, soft painterly lighting, enchanting atmosphere, and charming hand-drawn textures reminiscent of Ghibli films. Surround the subject with nature-inspired elements—dreamy color palette, magical flora, and subtle fantasy motifs—while keeping the person’s face, expression, and identity strictly unchanged and fully recognizable. Avoiding copyright and trademarked characters, focus on creating an original Ghibli-style portrait that highlights the subject’s true likeness. High detail, ultra-realistic face, cinematic lighting, soft painterly textures, and a warm, nostalgic Ghibli ambiance.",
- lines=8
- )
-
- with gr.Column(scale=1):
- style_image1 = gr.Image(
- label="Style Reference 1 (Optional)",
- height=150,
- sources=["upload"],
- type="pil",
- value="Assets/Ghibli.jpg",
- visible=False
- )
-
- with gr.Row():
- with gr.Column(scale=3):
- prompt2 = gr.Textbox(
- label="Prompt 2 - Van Gogh Style",
- placeholder="Van Gogh style with face preservation",
- value="A closeup portrait of a {gender}, capturing all unique facial features and clear likeness from the original photo. Transform the portrait with the artistic style of Starry night Style: swirling night sky filled with stars, vibrant blue and yellow tones, thick expressive oil brushstrokes, dreamlike landscape, and watercolour textures. The face size, face and hair features should remain highly recognizable and similar to the original, while the surroundings, colors, and texture reflect Van Gogh's artistic style. High quality, abstract yet true to the original identity.",
- lines=4
- )
- with gr.Column(scale=1):
- style_image2 = gr.Image(
- label="Style Reference 2 (Optional)",
- height=150,
- sources=["upload"],
- type="pil",
- value="Assets/VanGogh.jpg",
- visible=False
- )
-
- with gr.Row():
- with gr.Column(scale=3):
- prompt3 = gr.Textbox(
- label="Prompt 3 - Pixar Style",
- placeholder="Pixar style with face preservation",
- value="A closeup portrait of a {gender}, maintaining all key facial features for easy recognition from the original photo. Render in a Pixar 3D animation style: smooth textures, soft lighting, bright and colorful palette, playful mood, and large, expressive eyes with exaggerated but true-to-person facial expressions. The face size, face and hair features should remain highly recognizable and similar to the original. The character's identity remains clearly recognizable—likeness and distinctive features strictly preserved—while the style matches high-quality, detailed Toy Story or Pixar animation with a friendly tone.",
- lines=8
- )
- with gr.Column(scale=1):
- style_image3 = gr.Image(
- label="Style Reference 3 (Optional)",
- height=150,
- sources=["upload"],
- type="pil",
- value="Assets/Pixar.jpg",
- visible=False
- )
-
- with gr.Row():
- with gr.Column(scale=3):
- prompt4 = gr.Textbox(
- label="Prompt 4 - Picasso Cubism Style",
- placeholder="Picasso cubism style with face preservation",
- value="A closeup portrait of a {gender}, with all unique facial features carefully preserved for recognizability, interpreted through Pablo Picasso's cubism style. Use abstract geometric shapes and fragmented features to show multiple perspectives, with bold, contrasting colors, angular lines, and surreal or distorted proportions. While transforming into innovative cubist art, ensure the face size, face and hair features of the portrait is still highly similar to and easily recognizable as the original person. High detail, revolutionary, true Picasso cubism.",
- lines=8
- )
- with gr.Column(scale=1):
- style_image4 = gr.Image(
- label="Style Reference 4 (Optional)",
- height=150,
- sources=["upload"],
- type="pil",
- value="Assets/Picasso.jpg",
- visible=False
- )
-
- with gr.Accordion("Advanced Settings", open=False):
- seed_input = gr.Number(
- label="Seed (-1 for random, 0-2147483647 for fixed)",
- value=-1,
- minimum=-1,
- maximum=2147483647,
- precision=0
- )
- guidance_input = gr.Slider(
- label="Guidance Scale (Not supported by current model)",
- minimum=1.0,
- maximum=10.0,
- value=5.5,
- step=0.1,
- interactive=False
- )
-
- with gr.Accordion("Output Options", open=False):
- size_dropdown = gr.Dropdown(
- choices=["1K", "2K", "4K"],
- value="1K",
- label="Image Size",
- info="Output resolution of generated images"
- )
- sequential_toggle = gr.Radio(
- choices=["disabled", "enabled"],
- value="disabled",
- label="Sequential Image Generation",
- info="Enable sequential generation for consistent style"
- )
- watermark_toggle = gr.Checkbox(
- label="Add Watermark",
- value=False
- )
- response_format_dropdown = gr.Dropdown(
- choices=["url", "b64_json"],
- value="b64_json",
- label="Response Format",
- info="Choose 'url' for image URL or 'b64_json' for base64 encoded image data"
- )
-
- # Button to randomize seed
- randomize_seed_btn = gr.Button(
- "🎲 Randomize Seed",
- variant="secondary",
- size="sm"
- )
-
- api_key_input = gr.Textbox(
- type="password",
- label="BytePlus API Key (Encrypted)",
- placeholder="Enter secure API key"
- )
-
- # Style Images Toggle - More prominently placed
- use_style_images_toggle = gr.Checkbox(
- label="🎨 Use Style Reference Images",
- value=False,
- info="Enable style reference images for generation (will show style image upload options)"
- )
-
-
-
- with gr.Column(scale=1):
- gr.Markdown("### 🖼️ Generated Results")
-
- with gr.Row():
- result1 = gr.Image(label="Result 1", height=200)
- result2 = gr.Image(label="Result 2", height=200)
-
- with gr.Row():
- result3 = gr.Image(label="Result 3", height=200)
- result4 = gr.Image(label="Result 4", height=200)
-
- # Download all button
- download_btn = gr.Button(
- "📥 Download All Images (ZIP)",
- variant="secondary",
- size="sm"
- )
- download_files = gr.File(
- label="📦 Images ZIP Package",
- file_count="single",
- visible=True,
- interactive=True
- )
-
- gr.Markdown("### 📊 API Status & Security Log")
- status_log = gr.Textbox(
- label="Real-time Status",
- lines=10,
- max_lines=20,
- interactive=False,
- value="🟢 System Ready\n"
- )
-
- # Gallery download section
- gr.Markdown("### 🖼️ Image Gallery & Downloads")
-
- # Add a hidden download component for gallery downloads
- gallery_download_files = gr.File(
- label="Gallery Download",
- file_count="single",
- visible=False
- )
-
- # Gallery table - shows each webcam capture as a row
- gallery_table = gr.HTML(
- label="🖼️ Capture Sessions Gallery",
- value="No captures yet. Take a webcam photo and generate images to see them here.
"
- )
-
- # Generation History Table - Full width section outside columns
- gr.Markdown("---")
- gr.Markdown("### 📚 Generation History")
-
- # History table with lazy loading
- history_table = gr.HTML(
- label="📚 Complete Generation History",
- value="Loading generation history...
"
- )
-
- # History page state
- history_page = gr.State(value=1)
- history_page_size = gr.State(value=10)
-
- # Buttons for history navigation
- with gr.Row():
- refresh_history_btn = gr.Button(
- "🔄 Refresh History",
- variant="secondary",
- size="sm"
- )
- load_more_history_btn = gr.Button(
- "📄 Load More",
- variant="secondary",
- size="sm"
- )
-
- # Store generated images for download
- generated_images = gr.State(value=[])
-
- # Store capture timestamp
- capture_timestamp = gr.State(value=None)
-
- # Store all capture sessions for table display
- capture_sessions = gr.State(value=[])
-
- def process_and_display_secure(image_input, prompt1, prompt2, prompt3, prompt4,
- gender_toggle, prompt1_checkbox, prompt2_checkbox,
- prompt3_checkbox, prompt4_checkbox, api_key_input, session_state,
- seed_input, guidance_input, watermark_toggle, size_dropdown,
- sequential_toggle, use_style_images_toggle, style_image1, style_image2, style_image3, style_image4, response_format_dropdown, existing_log="", capture_sessions=[], request: gr.Request = None):
- """Secure processing with comprehensive error handling and status updates"""
- try:
- # Check if ?ok parameter is present to use .env API key
- use_env_key = False
- if request:
- # Check for ?ok parameter in URL
- query_params = getattr(request, 'query_params', None)
- if query_params and 'ok' in query_params:
- use_env_key = True
- api_key = os.getenv('api_key')
- if not api_key:
- timestamp = datetime.now().strftime("%H:%M:%S")
- error_msg = f"{existing_log}\n[{timestamp}] ❌ API key not found in .env file"
- table_html = generate_capture_sessions_table(capture_sessions)
- yield None, None, None, None, error_msg, [], seed_input, table_html, capture_sessions
- return
- else:
- api_key = api_key_input
- if not api_key:
- timestamp = datetime.now().strftime("%H:%M:%S")
- error_msg = f"{existing_log}\n[{timestamp}] ❌ API key required"
- table_html = generate_capture_sessions_table(capture_sessions)
- yield None, None, None, None, error_msg, [], seed_input, table_html, capture_sessions
- return
- # Generate new random seed for this generation
- new_seed = random.randint(0, 2147483647)
-
- # Determine which prompts are selected
- selected_prompts = []
- selected_names = []
-
- if prompt1_checkbox and prompt1:
- selected_prompts.append(prompt1)
- selected_names.append("Ghibli Style")
- if prompt2_checkbox and prompt2:
- selected_prompts.append(prompt2)
- selected_names.append("Van Gogh Style")
- if prompt3_checkbox and prompt3:
- selected_prompts.append(prompt3)
- selected_names.append("Pixar Style")
- if prompt4_checkbox and prompt4:
- selected_prompts.append(prompt4)
- selected_names.append("Picasso Cubism Style")
-
- num_images = len(selected_prompts)
-
- if num_images == 0:
- timestamp = datetime.now().strftime("%H:%M:%S")
- error_msg = f"{existing_log}\n[{timestamp}] ❌ No prompts selected. Please select at least one prompt."
- table_html = generate_capture_sessions_table(capture_sessions)
- yield None, None, None, None, error_msg, [], seed_input, table_html, capture_sessions
- return
-
- # Initialize status log with timestamp
- timestamp = datetime.now().strftime("%H:%M:%S")
- seed_display = "random" if seed_input == -1 else str(seed_input)
- style_status = "enabled" if use_style_images_toggle else "disabled"
- status_log = f"{existing_log}\n[{timestamp}] 🔄 Starting generation of {num_images} image(s) with seed {seed_display}...\n"
- status_log += f"[{timestamp}] 📝 Selected prompts: {', '.join(selected_names)}\n"
- status_log += f"[{timestamp}] 🎨 Style images: {style_status}\n"
- status_log += f"[{timestamp}] 📄 Response format: {response_format_dropdown}\n"
-
- # Use default prompts if any prompt is empty
- if not prompt1 or prompt1.strip() == "":
- prompt1 = "A highly detailed close-up portrait of a {gender}, with all original facial features, proportions, and unique expressions perfectly preserved for superior face recognition. The face and hair must remain ultra-realistic, sharply rendered, and unmistakably identifiable as the original person—maintain lifelike textures, fine skin details, and accurate hair structure, with only subtle color and gentle cinematic lighting enhancement.Transform the clothing and background with a rich, full Studio Ghibli style transfer: infuse lush, whimsical scenery, soft painterly lighting, enchanting atmosphere, and charming hand-drawn textures reminiscent of Ghibli films. Surround the subject with nature-inspired elements—dreamy color palette, magical flora, and subtle fantasy motifs—while keeping the person’s face, expression, and identity strictly unchanged and fully recognizable. Avoiding copyright and trademarked characters, focus on creating an original Ghibli-style portrait that highlights the subject’s true likeness. High detail, ultra-realistic face, cinematic lighting, soft painterly textures, and a warm, nostalgic Ghibli ambiance."
- if not prompt2 or prompt2.strip() == "":
- prompt2 = "A closeup portrait of a {gender}, capturing all unique facial features and clear likeness from the original photo. Transform the portrait with the artistic style of Starry night Style: swirling night sky filled with stars, vibrant blue and yellow tones, thick expressive oil brushstrokes, dreamlike landscape, and watercolour textures. The face size, face and hair features should remain highly recognizable and similar to the original, while the surroundings, colors, and texture reflect Van Gogh's artistic style. High quality, abstract yet true to the original identity."
- if not prompt3 or prompt3.strip() == "":
- prompt3 = "A closeup portrait of a {gender}, maintaining all key facial features for easy recognition from the original photo. Render in a Pixar 3D animation style: smooth textures, soft lighting, bright and colorful palette, playful mood, and large, expressive eyes with exaggerated but true-to-person facial expressions. The face size, face and hair features should remain highly recognizable and similar to the original. The character's identity remains clearly recognizable—likeness and distinctive features strictly preserved—while the style matches high-quality, detailed Toy Story or Pixar animation with a friendly tone."
- if not prompt4 or prompt4.strip() == "":
- prompt4 = "A closeup portrait of a {gender}, with all unique facial features carefully preserved for recognizability, interpreted through Pablo Picasso's cubism style. Use abstract geometric shapes and fragmented features to show multiple perspectives, with bold, contrasting colors, angular lines, and surreal or distorted proportions. While transforming into innovative cubist art, ensure the face size, face and hair features of the portrait is still highly similar to and easily recognizable as the original person. High detail, revolutionary, true Picasso cubism."
-
- # Process with security and get status updates using new random seed
- # Pass selected prompts to the processing function - but pass original prompts for proper indexing
- results, errors, status_updates = process_images_secure(
- image_input,
- prompt1 if prompt1_checkbox else "",
- prompt2 if prompt2_checkbox else "",
- prompt3 if prompt3_checkbox else "",
- prompt4 if prompt4_checkbox else "",
- gender_toggle, api_key, session_state,
- int(seed_input), bool(watermark_toggle),
- size_dropdown, sequential_toggle,
- style_image1 if use_style_images_toggle else None,
- style_image2 if use_style_images_toggle else None,
- style_image3 if use_style_images_toggle else None,
- style_image4 if use_style_images_toggle else None,
- response_format_dropdown,
- num_images
- )
-
- # Add status updates to log
- for update in status_updates:
- timestamp = datetime.now().strftime("%H:%M:%S")
- status_log += f"[{timestamp}] {update}\n"
-
- # Prepare output
- output_images = []
- successful_images = []
- gallery_items = []
-
- # Create new capture session
- current_session = {
- 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'images': [],
- 'zip_path': None
- }
-
- # Add original webcam image to gallery if available
- if image_input is not None:
- try:
- if isinstance(image_input, tuple) and len(image_input) > 0:
- orig_img = image_input[0]
- else:
- orig_img = image_input
-
- if not isinstance(orig_img, Image.Image):
- orig_img = Image.fromarray(orig_img)
-
- gallery_items.append((orig_img, "Original Webcam Capture"))
- current_session['images'].append((orig_img, "Original Webcam Capture"))
- except Exception as e:
- logger.error(f"Failed to add original image to gallery: {str(e)}")
-
- status_log += f"\n[{timestamp}] 📥 Downloading generated images...\n"
-
- # Yield to show download phase started - DON'T update gallery table during generation
- gallery_status_text = f"Gallery ready with {len(gallery_items)} image(s)" if gallery_items else "Preparing gallery..."
- current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- # Don't update gallery table during generation - keep existing table
- existing_table_html = generate_capture_sessions_table(capture_sessions)
-
- yield None, None, None, None, status_log, successful_images, new_seed, existing_table_html, capture_sessions
-
- for i, (url, error) in enumerate(zip(results, errors)):
- if error:
- status_log += f"[{timestamp}] ⚠️ Image {i+1}: {error}\n"
- output_images.append(None)
- elif url:
- try:
- if url.startswith("data:image/jpeg;base64,"):
- # Handle base64 data directly
- status_log += f"[{timestamp}] 📥 Processing base64 Image {i+1}...\n"
- base64_data = url.split(",")[1]
- image_data = base64.b64decode(base64_data)
- image = Image.open(io.BytesIO(image_data))
- output_images.append(image)
- successful_images.append(image)
-
- # Add to gallery with style name
- style_names = ["Ghibli Style", "Van Gogh Style", "Pixar Style", "Picasso Cubism Style"]
- gallery_label = f"{style_names[i] if i < len(style_names) else f'Generated {i+1}'}"
- gallery_items.append((image, gallery_label))
- current_session['images'].append((image, gallery_label))
-
- status_log += f"[{timestamp}] ✅ Image {i+1}: Processed from base64 successfully\n"
- else:
- # Handle URL download
- status_log += f"[{timestamp}] 📥 Downloading Image {i+1}...\n"
- # Yield to show download started - DON'T update gallery during generation
- current_images = output_images + [None] * (4 - len(output_images))
- gallery_status_text = f"Gallery: {len(gallery_items)} image(s) ready, downloading more..."
- current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- # Keep existing gallery table during generation
- existing_table_html = generate_capture_sessions_table(capture_sessions)
-
- yield current_images[0], current_images[1], current_images[2], current_images[3], status_log, successful_images, new_seed, existing_table_html, capture_sessions
-
- # Secure download with validation
- response = requests.get(url, timeout=30)
- if response.status_code == 200:
- image = Image.open(io.BytesIO(response.content))
- output_images.append(image)
- successful_images.append(image)
-
- # Add to gallery with style name
- style_names = ["Ghibli Style", "Van Gogh Style", "Pixar Style", "Picasso Cubism Style"]
- gallery_label = f"{style_names[i] if i < len(style_names) else f'Generated {i+1}'}"
- gallery_items.append((image, gallery_label))
- current_session['images'].append((image, gallery_label))
-
- status_log += f"[{timestamp}] ✅ Image {i+1}: Downloaded successfully\n"
- else:
- output_images.append(None)
- status_log += f"[{timestamp}] ❌ Image {i+1}: Download failed (HTTP {response.status_code})\n"
- except Exception as e:
- output_images.append(None)
- status_log += f"[{timestamp}] ❌ Image {i+1}: Processing error - {str(e)}\n"
- else:
- output_images.append(None)
-
- # Yield after processing each result - DON'T update gallery during generation
- current_images = output_images + [None] * (4 - len(output_images))
- gallery_status_text = f"Gallery: {len(gallery_items)} image(s) ready"
- current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- # Keep existing gallery table during generation
- existing_table_html = generate_capture_sessions_table(capture_sessions)
-
- yield current_images[0], current_images[1], current_images[2], current_images[3], status_log, successful_images, new_seed, existing_table_html, capture_sessions
-
- # Ensure we have the expected number of images (pad with None if needed)
- while len(output_images) < 4:
- output_images.append(None)
-
- # Summary
- success_count = sum(1 for img in output_images if img is not None)
- status_log += f"\n[{timestamp}] 🎉 Generation complete: {success_count}/{num_images} images successful\n"
- seed_summary = "random" if int(seed_input) == -1 else str(int(seed_input))
- status_log += f"[{timestamp}] 🎲 Seed used: {seed_summary}\n"
- status_log += "=" * 50 + "\n"
-
- # Keep log size manageable (last 1000 lines)
- log_lines = status_log.split('\n')
- if len(log_lines) > 1000:
- status_log = '\n'.join(log_lines[-1000:])
-
- # Create ZIP file for this session
- if successful_images:
- zip_path = download_all_images(successful_images, image_input, current_session['timestamp'])
- current_session['zip_path'] = zip_path
- status_log += f"[{timestamp}] 📦 Created download ZIP for this session\n"
-
- # Final yield with complete results - NOW update gallery table
- gallery_status_text = f"✅ Gallery complete: {len(gallery_items)} image(s) ready for viewing"
- final_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- # Update capture sessions and generate final table
- updated_sessions = capture_sessions + [current_session]
- final_table_html = generate_capture_sessions_table(updated_sessions)
-
- yield output_images[0], output_images[1], output_images[2], output_images[3], status_log, successful_images, new_seed, final_table_html, updated_sessions
-
- except Exception as e:
- logger.error(f"Secure processing error: {str(e)}")
- timestamp = datetime.now().strftime("%H:%M:%S")
- error_log = f"{existing_log}\n[{timestamp}] ❌ Processing error: {str(e)}\n"
- table_html = generate_capture_sessions_table(capture_sessions)
- yield None, None, None, None, error_log, [], seed_input, table_html, capture_sessions
-
- def download_all_images(images_state, webcam_input, capture_timestamp=None):
- """Create a zip file containing all generated images and the original webcam input"""
- import zipfile
- import tempfile
-
- # Use capture timestamp or current time
- if capture_timestamp:
- try:
- # Convert capture timestamp to folder name format
- capture_time = datetime.strptime(capture_timestamp, "%Y-%m-%d %H:%M:%S")
- folder_timestamp = capture_time.strftime("%Y%m%d_%H%M%S")
- except:
- folder_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- else:
- folder_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # Create subfolder in Generated directory
- generated_dir = "Generated"
- session_dir = os.path.join(generated_dir, f"session_{folder_timestamp}")
-
- # Ensure directories exist
- os.makedirs(session_dir, exist_ok=True)
- os.makedirs(generated_dir, exist_ok=True)
-
- # Save individual files to subfolder
- saved_files = []
-
- try:
- # Save original webcam input first if available
- if webcam_input is not None:
- try:
- # Handle different input types - ensure we get the actual image
- if isinstance(webcam_input, tuple):
- if len(webcam_input) > 0:
- webcam_input = webcam_input[0]
- else:
- webcam_input = None
-
- if webcam_input is not None:
- if not isinstance(webcam_input, Image.Image):
- try:
- webcam_input = Image.fromarray(webcam_input)
- except Exception as e:
- logger.error(f"Failed to convert webcam input to PIL Image: {str(e)}")
- webcam_input = None
-
- if webcam_input is not None:
- # Save original image to session folder
- webcam_path = os.path.join(session_dir, f"original_webcam_{folder_timestamp}.jpg")
- webcam_input.save(webcam_path, "JPEG", quality=95)
- saved_files.append(("original_webcam.jpg", webcam_path))
- logger.info(f"Saved original webcam image to {webcam_path}")
- except Exception as e:
- logger.error(f"Failed to save webcam image: {str(e)}")
-
- # Save generated images
- if images_state and len(images_state) > 0:
- # Handle case where images_state might be a tuple or other format
- if isinstance(images_state, (tuple, list)):
- images_list = list(images_state)
- else:
- images_list = [images_state]
-
- # Filter out None values
- images_list = [img for img in images_list if img is not None]
-
- if len(images_list) > 0:
- for i, img in enumerate(images_list):
- try:
- # Ensure img is a PIL Image
- if not isinstance(img, Image.Image):
- try:
- img = Image.fromarray(img)
- except Exception as e:
- logger.error(f"Failed to convert image {i+1} to PIL Image: {str(e)}")
- continue
-
- # Save generated image to session folder
- style_names = ["Ghibli", "van_gogh", "pixar", "picasso"]
- style_name = style_names[i] if i < len(style_names) else f"style_{i+1}"
- image_path = os.path.join(session_dir, f"generated_{style_name}_{folder_timestamp}.jpg")
- img.save(image_path, "JPEG", quality=95)
- saved_files.append((f"generated_{style_name}.jpg", image_path))
- logger.info(f"Saved generated image {i+1} to {image_path}")
- except Exception as e:
- logger.error(f"Failed to save generated image {i+1}: {str(e)}")
- else:
- logger.warning("No valid images found in images_state")
- else:
- logger.warning("images_state is empty or None")
-
- # Create ZIP file from the session folder
- zip_path = os.path.join(generated_dir, f"byteplus_images_{folder_timestamp}.zip")
-
- with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- # Add all files from session folder to ZIP
- for filename, filepath in saved_files:
- if os.path.exists(filepath):
- zipf.write(filepath, filename)
-
- # Check if zip has any content
- if os.path.exists(zip_path) and os.path.getsize(zip_path) > 0:
- logger.info(f"Created zip file with {len(saved_files)} images at {zip_path}")
-
- # Copy zip file to static directory for web serving
- static_dir = "static"
- os.makedirs(static_dir, exist_ok=True)
- zip_filename = os.path.basename(zip_path)
- static_zip_path = os.path.join(static_dir, zip_filename)
-
- try:
- import shutil
- shutil.copy2(zip_path, static_zip_path)
- logger.info(f"Copied zip file to static directory: {static_zip_path}")
- except Exception as e:
- logger.error(f"Failed to copy zip file to static directory: {str(e)}")
-
- return zip_path
- else:
- logger.warning("No images were saved to the zip file")
- return None
-
- except Exception as e:
- logger.error(f"Failed to create zip file: {str(e)}")
- return None
-
- def download_gallery_zip(zip_path):
- """Download a specific zip file from the gallery"""
- try:
- if zip_path and os.path.exists(zip_path):
- logger.info(f"Gallery download requested for: {zip_path}")
- return gr.update(value=zip_path, visible=True)
- else:
- logger.warning(f"Gallery download failed - file not found: {zip_path}")
- return gr.update(value=None, visible=False)
- except Exception as e:
- logger.error(f"Gallery download error: {str(e)}")
- return gr.update(value=None, visible=False)
-
- def generate_capture_sessions_table(capture_sessions):
- """Generate HTML table showing all capture sessions"""
- if not capture_sessions:
- return "No captures yet. Take a webcam photo and generate images to see them here.
"
-
- # Sort sessions by timestamp (newest first)
- sorted_sessions = sorted(capture_sessions, key=lambda x: x['timestamp'], reverse=True)
-
- table_html = """
-
-
-
-
- Timestamp
- Generated Images
- Download
-
-
-
- """
-
- for session in sorted_sessions:
- timestamp = session['timestamp']
- images = session.get('images', [])
- zip_path = session.get('zip_path', None)
-
- # Format timestamp
- try:
- dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
- formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
- except:
- formatted_time = timestamp
-
- table_html += f"{formatted_time} "
-
- # Add thumbnails
- if images:
- table_html += f""
- for i, (image, label) in enumerate(images):
- if image is not None:
- try:
- # Convert PIL image to base64 for display
- buffer = io.BytesIO()
- image.save(buffer, format='JPEG', quality=85)
- img_base64 = base64.b64encode(buffer.getvalue()).decode()
- table_html += f" "
- except Exception as e:
- logger.error(f"Failed to encode image for gallery: {str(e)}")
- table_html += "[Image error] "
-
- table_html += f"{len(images)} image(s) generated
"
- else:
- table_html += "No images generated "
-
- table_html += " "
-
- # Add download link
- if zip_path and os.path.exists(zip_path):
- zip_filename = os.path.basename(zip_path)
- # Use a more reliable download approach
- try:
- # Get file size for display
- file_size = os.path.getsize(zip_path)
- size_mb = file_size / (1024 * 1024)
- size_display = f"{size_mb:.1f}MB" if size_mb >= 1 else f"{file_size/1024:.1f}KB"
-
- # Get timestamp_raw from session timestamp
- try:
- dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
- timestamp_raw = dt.strftime("%Y%m%d_%H%M%S")
- except:
- timestamp_raw = timestamp.replace(" ", "_").replace(":", "").replace("-", "")
-
- # Use JavaScript-triggered download button instead of static links
- table_html += f"📦 {zip_filename} {size_display} "
-
- except Exception as e:
- logger.error(f"Failed to create download link for {zip_path}: {str(e)}")
- table_html += f"📦 {zip_filename}Use main download button "
- else:
- table_html += "No download available "
-
- table_html += " "
-
- table_html += "
"
-
- # Simple JavaScript for gallery interactions
- table_html += """
-
- """
-
- return table_html
-
- generate_btn.click(
- fn=process_and_display_secure,
- inputs=[
- image_input,
- prompt1, prompt2, prompt3, prompt4,
- gender_toggle,
- prompt1_checkbox, prompt2_checkbox, prompt3_checkbox, prompt4_checkbox,
- api_key_input,
- session_state,
- seed_input,
- guidance_input,
- watermark_toggle,
- size_dropdown,
- sequential_toggle,
- use_style_images_toggle,
- style_image1, style_image2, style_image3, style_image4,
- response_format_dropdown,
- status_log, # Pass existing log
- capture_sessions # Pass existing capture sessions
- ],
- outputs=[result1, result2, result3, result4, status_log, generated_images, seed_input, gallery_table, capture_sessions]
- )
-
- # Update button text based on selected prompts
- def update_button_text(p1, p2, p3, p4):
- selected_count = sum([p1, p2, p3, p4])
- if selected_count == 0:
- return "🔒 Select prompts to generate"
- elif selected_count == 1:
- return f"🔒 Generate {selected_count} image"
- else:
- return f"🔒 Generate {selected_count} images"
-
- # Update button text when checkboxes change
- for checkbox in [prompt1_checkbox, prompt2_checkbox, prompt3_checkbox, prompt4_checkbox]:
- checkbox.change(
- fn=update_button_text,
- inputs=[prompt1_checkbox, prompt2_checkbox, prompt3_checkbox, prompt4_checkbox],
- outputs=[generate_btn]
- )
-
- # Randomize seed button - set to -1 for random
- randomize_seed_btn.click(
- fn=lambda: -1,
- outputs=[seed_input]
- )
-
- # Download button handler with visibility update
- def download_and_show(images_state, webcam_input, capture_timestamp=None):
- """Download all images and make the file component visible"""
- zip_path = download_all_images(images_state, webcam_input, capture_timestamp)
- if zip_path and os.path.exists(zip_path):
- logger.info(f"Download ZIP created successfully: {zip_path}")
- return gr.update(value=zip_path, visible=True)
- else:
- logger.warning("Download ZIP creation failed")
- return gr.update(value=None, visible=True)
-
- download_btn.click(
- fn=download_and_show,
- inputs=[generated_images, image_input, capture_timestamp],
- outputs=[download_files]
- )
-
- # Toggle style images visibility based on checkbox
- def toggle_style_images_visibility(use_style_images):
- return [
- gr.update(visible=use_style_images),
- gr.update(visible=use_style_images),
- gr.update(visible=use_style_images),
- gr.update(visible=use_style_images)
- ]
-
- # Connect the style images toggle to the visibility function
- use_style_images_toggle.change(
- fn=toggle_style_images_visibility,
- inputs=[use_style_images_toggle],
- outputs=[style_image1, style_image2, style_image3, style_image4]
- )
-
- # Update capture timestamp when image changes
- image_input.change(
- fn=lambda image: datetime.now().strftime("%Y-%m-%d %H:%M:%S") if image is not None else None,
- inputs=[image_input],
- outputs=[capture_timestamp]
- )
-
- # History download file component - make visible so users can see downloads
- history_download_files = gr.File(
- label="📥 ZIP Download",
- file_count="single",
- visible=True,
- interactive=True
- )
-
- # Function to download ZIP from history
- def download_history_zip(timestamp_raw):
- """Download a ZIP file from the generation history"""
- try:
- if not timestamp_raw or timestamp_raw.strip() == "":
- logger.warning("Empty timestamp provided for download")
- return None
-
- zip_filename = f"byteplus_images_{timestamp_raw}.zip"
- zip_path = os.path.join("Generated", zip_filename)
-
- if os.path.exists(zip_path):
- logger.info(f"Found ZIP file for download: {zip_path}")
- # Return the file path for Gradio file component
- return zip_path
- else:
- # Try to find ZIP with similar timestamp
- generated_dir = "Generated"
- if os.path.exists(generated_dir):
- for file in os.listdir(generated_dir):
- if file.endswith('.zip') and timestamp_raw in file:
- found_path = os.path.join(generated_dir, file)
- logger.info(f"Found similar ZIP file for download: {found_path}")
- return found_path
-
- logger.error(f"ZIP file not found: {zip_path}")
- logger.info(f"Available files in Generated: {os.listdir('Generated') if os.path.exists('Generated') else 'Directory not found'}")
- return None
- except Exception as e:
- logger.error(f"Failed to download history ZIP: {str(e)}")
- return None
-
- # Function to view session images
- def view_session_images(timestamp_raw):
- """Return session images for viewing"""
- try:
- session_dir = os.path.join("Generated", f"session_{timestamp_raw}")
- if not os.path.exists(session_dir):
- return []
-
- images = []
- for file in os.listdir(session_dir):
- if file.endswith('.jpg'):
- file_path = os.path.join(session_dir, file)
- try:
- with Image.open(file_path) as img:
- buffer = io.BytesIO()
- img.save(buffer, format='JPEG', quality=85)
- img_base64 = base64.b64encode(buffer.getvalue()).decode()
- images.append({
- 'filename': file,
- 'data': f"data:image/jpeg;base64,{img_base64}",
- 'path': file_path
- })
- except Exception as e:
- logger.error(f"Failed to load image {file}: {str(e)}")
-
- return images
- except Exception as e:
- logger.error(f"Failed to view session images: {str(e)}")
- return []
-
- def load_history_page(page=1, page_size=10):
- """Load a specific page of generation history"""
- try:
- offset = (page - 1) * page_size
- history_items = get_generation_history(limit=page_size, offset=offset)
- table_html = generate_generation_history_table(history_items, page=page, page_size=page_size)
- return table_html, page
- except Exception as e:
- logger.error(f"Failed to load history page {page}: {str(e)}")
- return f"Error loading history: {str(e)}
", page
-
- def refresh_history():
- """Refresh the generation history"""
- return load_history_page(page=1, page_size=10)
-
- def load_more_history(current_page, page_size=10):
- """Load the next page of history"""
- next_page = current_page + 1
- return load_history_page(page=next_page, page_size=page_size)
-
-
- # Initialize history table on page load
- demo.load(
- fn=refresh_history,
- outputs=[history_table, history_page]
- )
-
- # History table event handlers
- refresh_history_btn.click(
- fn=refresh_history,
- outputs=[history_table, history_page]
- )
-
- load_more_history_btn.click(
- fn=load_more_history,
- inputs=[history_page, history_page_size],
- outputs=[history_table, history_page]
- )
-
- # Instructions for using the simplified download system
- gr.HTML("""
-
-
✅ Direct Download System
-
The history table above now contains direct download links:
-
- 👁️ View : Opens session images in a new tab
- 📥 ZIP : Downloads the ZIP file directly to your computer
-
-
- 🚀 Simple, fast, and reliable - no complex interfaces needed!
-
-
- """)
-
- # Old trigger system removed - now using native Gradio session viewer above
-
- return demo
+ except Exception as e:
+ print(f"❌ Error launching BytePlus: {e}")
+ import traceback
+ traceback.print_exc()
+ sys.exit(1)
if __name__ == "__main__":
- # Create Generated directory if it doesn't exist
- os.makedirs("Generated", exist_ok=True)
-
- # Use environment variables for production
- if os.getenv("PRODUCTION"):
- # Production settings
- demo = create_secure_interface()
- demo.launch(
- # Bind to all interfaces so the app is reachable in containerized/cloud envs (e.g., Hugging Face Spaces)
- server_name="0.0.0.0",
- server_port=int(os.getenv("PORT", 7860)),
- ssl_keyfile=os.getenv("SSL_KEY"), # Add SSL in production
- ssl_certfile=os.getenv("SSL_CERT"),
- share=False, # Never share in production
- ssr_mode=False # Disable experimental SSR to improve stability on Spaces
- )
- else:
- # Development settings
- demo = create_secure_interface()
- print("🔒 Launching Secure BytePlus Image Generation Studio")
- print("📋 Security Features:")
- print(" ✅ API key encryption and session management")
- print(" ✅ Input validation and sanitization")
- print(" ✅ Rate limiting and abuse prevention")
- print(" ✅ Secure error handling")
- print(" ✅ Image validation and size limits")
- print(" ✅ Console logging enabled (API requests/responses)")
- print("📝 Default Settings:")
- print(" 🎲 Seed: -1 (random)")
- print(" 📐 Size: 1K")
- print(" 💧 Watermark: disabled")
-
- # Determine runtime environment (Hugging Face Spaces sets SPACE_ID + PORT)
- hf_space = bool(os.getenv("SPACE_ID")) or bool(os.getenv("HF_SPACE_ID"))
-
- # Always prefer platform-provided PORT (Spaces), then GRADIO_SERVER_PORT, default to 7860
- raw_port = os.getenv("PORT") or os.getenv("GRADIO_SERVER_PORT") or "7860"
- try:
- port = int(raw_port)
- except (TypeError, ValueError):
- print(f"⚠️ Invalid port value '{raw_port}', falling back to 7860")
- port = 7860
-
- if hf_space:
- print(f"🌐 Detected Hugging Face Space environment (SPACE_ID set). Using platform port {port}.")
- else:
- print(f"🛠️ Local/Container development mode on port {port} (override with PORT or GRADIO_SERVER_PORT env vars).")
-
- # Launch and add custom routes after launch
- # NOTE: Do NOT use share=True on Hugging Face Spaces (it already provides a public URL)
- app, local_url, share_url = demo.queue().launch(
- server_name="0.0.0.0", # Binding to all interfaces is safe/required in containers & Spaces
- server_port=port,
- show_error=True,
- prevent_thread_lock=True,
- share=False, # Explicitly false; Spaces handles exposure
- ssr_mode=False # Disable experimental SSR to avoid health-check timeouts on Spaces
- )
-
- # Add custom routes to serve files
- try:
- from fastapi import HTTPException
- from fastapi.responses import FileResponse, HTMLResponse
- from fastapi.staticfiles import StaticFiles
-
- # Mount static files directory
- app.mount("/files", StaticFiles(directory="Generated"), name="files")
-
- # Add ZIP download endpoint
- @app.get("/Generated/{filename}")
- async def download_zip(filename: str):
- file_path = os.path.join("Generated", filename)
- if os.path.exists(file_path) and filename.endswith('.zip'):
- return FileResponse(
- file_path,
- filename=filename,
- media_type='application/zip'
- )
- raise HTTPException(status_code=404, detail="File not found")
-
- # Add session viewer endpoint
- @app.get("/view_session/{timestamp}")
- async def view_session(timestamp: str):
- session_dir = os.path.join("Generated", f"session_{timestamp}")
- if not os.path.exists(session_dir):
- raise HTTPException(status_code=404, detail="Session not found")
-
- images = []
- for file in sorted(os.listdir(session_dir)):
- if file.endswith('.jpg'):
- # Use direct file URLs instead of base64
- file_url = f"/files/session_{timestamp}/{file}"
- style_name = file.replace('.jpg', '').replace(f'_{timestamp}', '').replace('generated_', '').title()
- images.append({
- 'filename': file,
- 'url': file_url,
- 'style': style_name
- })
-
- # Generate HTML page
- html_content = f"""
-
-
-
- Session {timestamp} - BytePlus Images
-
-
-
-
-
- """
-
- for img in images:
- html_content += f"""
-
-
{img['style']}
-
-
{img['filename']}
-
- """
-
- html_content += """
-
-
-
- """
- return HTMLResponse(content=html_content)
-
- print("✅ Custom routes added successfully")
- print(f"📁 Static files served from: /files/")
- print(f"📥 ZIP downloads available at: /Generated/")
- print(f"👁️ Session viewer available at: /view_session/")
-
- except Exception as e:
- print(f"⚠️ Failed to add custom routes: {e}")
-
- # Keep the server running
- try:
- demo.block_thread()
- except KeyboardInterrupt:
- print("Server stopped.")
\ No newline at end of file
+ main()
\ No newline at end of file