arkai2025's picture
refactor(app): extract helper functions for manual action updates
47d7945
"""
Fire-Rescue MCP - Gradio Application
Main entry point for the Hugging Face Space deployment.
Provides Gradio UI for simulation control and visualization.
"""
import html
import time
import uuid
import gradio as gr
from typing import Optional
from config import SCENARIO_DEFAULTS
from service import (
ADVISOR_MODEL_CHOICES,
DEFAULT_ADVISOR_MODEL_CHOICE,
SimulationService,
)
AUTO_EXECUTE_DEFAULT = True # Keep UI + backend aligned on initial load
ADVISOR_MODEL_LABELS = list(ADVISOR_MODEL_CHOICES.keys())
def get_or_create_service(service: Optional[SimulationService]) -> SimulationService:
"""Return existing SimulationService or create a new one for the session."""
if service is None:
return SimulationService()
return service
def _create_display_cache() -> dict:
"""Create a per-session cache for advisor/history/status renders."""
return {
"last_status_html": None,
"last_advisor_signature": (),
"history_messages_cache": [],
"current_cycle_messages_cache": [],
"result_freeze_state": "",
}
def _reset_advisor_display_cache(cache: Optional[dict] = None) -> dict:
"""Clear cached advisor/chatbot renders (prevents stale history after reset)."""
if cache is None:
cache = _create_display_cache()
cache["last_status_html"] = None
cache["last_advisor_signature"] = ()
cache["history_messages_cache"] = []
cache["current_cycle_messages_cache"] = []
cache["result_freeze_state"] = ""
return cache
def _reset_auto_execute_to_default(
service: Optional[SimulationService] = None,
) -> tuple[bool, SimulationService]:
"""Restore auto-execute to the configured default and sync the checkbox."""
service = get_or_create_service(service)
service.set_auto_execute(AUTO_EXECUTE_DEFAULT)
return AUTO_EXECUTE_DEFAULT, service
def _generate_session_token() -> str:
"""
Generate a unique identifier combining timestamp and UUID.
Similar to C# Guid.NewGuid() but with timestamp prefix for absolute uniqueness.
Format: {timestamp_ms}_{uuid_hex}
"""
timestamp_ms = int(time.time() * 1000)
unique_id = uuid.uuid4().hex
return f"{timestamp_ms}_{unique_id}"
def _initialize_session_defaults(
service: Optional[SimulationService] = None,
display_cache: Optional[dict] = None,
):
"""
Reset auto-execute back to default and mark the new Gradio session
as fresh so the next Start press always creates a brand-new run.
Also issues a unique session token to bind backend ownership.
Returns initial values for all UI components to ensure clean state.
"""
auto_value, service = _reset_auto_execute_to_default(service)
service = get_or_create_service(service)
if display_cache is None:
display_cache = _create_display_cache()
display_cache = _reset_advisor_display_cache(display_cache)
default_model_choice = service.reset_advisor_model_choice()
advisor_messages = [
{
"role": "assistant",
"content": "No AI analysis yet. Press **Start** to begin the advisor cycle.",
}
]
event_log = "No events yet..."
status_html = render_status_html({"status": "idle"})
grid_updates = [gr.update(value="🌲") for _ in range(100)]
return (
auto_value,
False,
_generate_session_token(),
default_model_choice,
service,
display_cache,
advisor_messages,
event_log,
status_html,
*grid_updates,
)
# =============================================================================
# Grid Visualization (Clickable)
# =============================================================================
def get_cell_info(state: dict, x: int, y: int) -> tuple[str, str]:
"""Get cell display info (emoji, css class)."""
fires = {(f["x"], f["y"]): f["intensity"] for f in state.get("fires", [])}
units = {(u["x"], u["y"]): u["type"] for u in state.get("units", [])}
buildings = {(b["x"], b["y"]) for b in state.get("buildings", [])}
# Default: forest
content = "🌲"
bg_color = "#1b4332"
# Building (from dynamic positions)
if (x, y) in buildings:
content = "🏢"
bg_color = "#495057"
# Fire - show intensity level with different visuals
if (x, y) in fires:
intensity = fires[(x, y)]
# Display fire intensity as percentage
pct = int(intensity * 100)
if intensity >= 0.9:
# Extreme fire - inferno
content = f"🔥{pct}"
bg_color = "#7f1d1d" # Very dark red
elif intensity >= 0.7:
# High fire
content = f"🔥{pct}"
bg_color = "#b91c1c" # Dark red
elif intensity >= 0.5:
# Medium-high fire
content = f"🔥{pct}"
bg_color = "#dc2626" # Red
elif intensity >= 0.3:
# Medium fire
content = f"🔥{pct}"
bg_color = "#ea580c" # Orange-red
elif intensity >= 0.1:
# Low fire
content = f"🔥{pct}"
bg_color = "#f97316" # Orange
else:
# Smoldering / almost out
content = f"💨{pct}"
bg_color = "#fbbf24" # Yellow-orange
# Unit (overwrites fire display)
if (x, y) in units:
unit_type = units[(x, y)]
content = "🚒" if unit_type == "fire_truck" else "🚁"
bg_color = "#0077b6"
return content, bg_color
def render_status_html(state: dict, is_thinking: bool = False, thinking_stage: int = 0) -> str:
"""Render simulation status as compact HTML bar."""
if state.get("status") == "idle":
return "<div class='status-compact'>🎮 Click Start to begin</div>"
tick = state.get("tick", 0)
status = state.get("status", "unknown")
building = state.get("building_integrity", 1.0)
fires = len(state.get("fires", []))
units = len(state.get("units", []))
status_colors = {
"running": "#4ade80",
"success": "#22d3ee",
"fail": "#f87171"
}
status_color = status_colors.get(status, "#888")
health_color = "#4ade80" if building > 0.6 else "#fbbf24" if building > 0.5 else "#f87171"
fire_color = "#22d3ee" if fires == 0 else "#f87171"
thinking_html = ""
if is_thinking:
stage_info = {
1: ("📊", "Assessing"),
2: ("🎯", "Planning"),
3: ("⚡", "Executing"),
4: ("🧭", "Summarizing"),
}
icon, label = stage_info.get(thinking_stage, ("🤔", "Thinking"))
thinking_html = f"""
<div class="status-item ai-thinking-inline">
<span class="ai-pulse">{icon}</span>
<span class="ai-label">AI</span>
<strong>{label}...</strong>
</div>
"""
html = f"""
<div class="status-compact">
<div class="status-item">
<span>⏱️</span>
<strong>{tick}</strong>
</div>
<div class="status-item">
<span class="status-badge" style="background: {status_color};">{status.upper()}</span>
</div>
<div class="status-item status-health">
<span>🏢</span>
<div class="progress-mini">
<div style="background: {health_color}; width: {building*100}%; height: 100%;"></div>
</div>
<span style="color: {health_color}; font-weight: bold;">{building:.0%}</span>
</div>
<div class="status-item">
<span>🔥</span>
<strong style="color: {fire_color};">{fires}</strong>
</div>
<div class="status-item">
<span>🚒</span>
<strong>{units}</strong>
</div>
{thinking_html}
</div>
"""
return html
def _chat_signature(messages: list[dict]) -> tuple:
"""Create a hashable signature for chatbot messages (role, content, metadata)."""
signature = []
for msg in messages or []:
metadata = msg.get("metadata") or {}
metadata_tuple = tuple(sorted(metadata.items()))
signature.append(
(
msg.get("role", "assistant"),
msg.get("content", ""),
metadata_tuple,
)
)
return tuple(signature)
def render_compact_status(state: dict) -> str:
"""Render a compact horizontal status bar."""
if state.get("status") == "idle":
return """
<div class="status-bar">
<div class="status-item">
<span style="color: #888;">🎮 Click Start to begin simulation</span>
</div>
</div>
"""
tick = state.get("tick", 0)
status = state.get("status", "unknown")
building = state.get("building_integrity", 1.0)
fires = len(state.get("fires", []))
units = len(state.get("units", []))
# Status colors
status_colors = {
"running": "#4ade80",
"success": "#22d3ee",
"fail": "#f87171"
}
status_color = status_colors.get(status, "#888")
health_color = "#4ade80" if building > 0.6 else "#fbbf24" if building > 0.5 else "#f87171"
fire_color = "#22d3ee" if fires == 0 else "#f87171"
html = f"""
<div class="status-bar">
<div class="status-item">
<span>⏱️ Tick:</span>
<strong>{tick}</strong>
</div>
<div class="status-item">
<span class="status-badge" style="background: {status_color}; color: #000;">{status.upper()}</span>
</div>
<div class="status-item">
<span>🏢</span>
<div class="progress-mini">
<div style="background: {health_color}; width: {building*100}%; height: 100%;"></div>
</div>
<span style="color: {health_color}; font-weight: bold;">{building:.0%}</span>
</div>
<div class="status-item">
<span>🔥</span>
<span style="color: {fire_color}; font-weight: bold;">{fires}</span>
</div>
<div class="status-item">
<span>🚒 {units}</span>
</div>
</div>
"""
return html
def render_ai_thinking_toast(is_thinking: bool, thinking_stage: int) -> str:
"""Render AI thinking toast notification (iOS-style)."""
if not is_thinking:
return ""
stage_info = {
1: ("📊", "Stage 1: Assessment", "Querying MCP tools & analyzing situation..."),
2: ("🎯", "Stage 2: Planning", "Formulating tactical strategy..."),
3: ("⚡", "Stage 3: Execution", "Generating MCP deployment commands..."),
4: ("🧭", "Stage 4: Summary", "Consolidating per-cycle findings..."),
}
icon, title, subtitle = stage_info.get(thinking_stage, ("🤔", "AI Thinking", "Processing..."))
return f"""
<div class="ai-toast-container">
<div class="ai-toast">
<div class="ai-toast-icon">{icon}</div>
<div class="ai-toast-content">
<div class="ai-toast-title">{title}</div>
<div class="ai-toast-subtitle">{subtitle}</div>
</div>
<div class="ai-toast-spinner"></div>
</div>
</div>
"""
def render_game_result(status: str, report_payload: Optional[dict] = None) -> str:
"""Render game result popup with after-action report."""
if status not in {"success", "fail"}:
return ""
report_payload = report_payload or {}
outcome_config = {
"success": {
"icon": "🎉",
"title": "VICTORY!",
"color": "#22d3ee",
"subtitle": "All fires extinguished!",
},
"fail": {
"icon": "💀",
"title": "GAME OVER",
"color": "#f87171",
"subtitle": "Building integrity dropped below 50%.",
},
}[status]
def _render_report_cards(title: str, items, css_class: str) -> str:
normalized = []
for item in items or []:
text = str(item).strip()
if text:
normalized.append(html.escape(text))
if not normalized:
return ""
bullets = "".join(f"<li>{value}</li>" for value in normalized)
icon = ""
label = html.escape(title)
parts = title.split(" ", 1)
if len(parts) == 2 and not parts[0].isalnum():
icon = html.escape(parts[0])
label = html.escape(parts[1])
icon_html = f"<span class='report-card-icon'>{icon}</span>" if icon else ""
return f"""
<div class="report-card {css_class}">
<div class="report-card-header">
{icon_html}
<div class="report-card-title">{label}</div>
</div>
<div class="report-card-body">
<ul>{bullets}</ul>
</div>
</div>
"""
def _render_chart_card(
points: list[dict],
value_key: str,
title: str,
color: str,
value_formatter=None,
axis_formatter=None,
) -> str:
if not points:
return ""
series = []
for pt in points:
tick = pt.get("tick")
value = pt.get(value_key)
if value is None or tick is None:
continue
try:
value = float(value)
except (TypeError, ValueError):
continue
series.append((tick, value, pt))
if not series:
return ""
values = [item[1] for item in series]
min_v = min(values)
max_v = max(values)
span = max(max_v - min_v, 1e-6)
coords = []
for idx, (_, value, _) in enumerate(series):
x = 0 if len(series) == 1 else (idx / (len(series) - 1)) * 100
if max_v == min_v:
y = 30
else:
y = 60 - ((value - min_v) / span) * 60
coords.append(f"{x:.2f},{y:.2f}")
last_tick, last_value, last_point = series[-1]
display_value = (
value_formatter(last_value, last_point)
if value_formatter
else f"{last_value:.0f}"
)
axis_format = axis_formatter or (lambda v: f"{v:.0f}")
axis_top_label = axis_format(max_v)
axis_bottom_label = axis_format(min_v)
first_tick = series[0][0]
last_coord = coords[-1].split(",")
return f"""
<div class="report-chart-card">
<div class="chart-title">{title}</div>
<div class="mini-chart">
<svg viewBox="0 0 100 60" preserveAspectRatio="none">
<polyline points="{' '.join(coords)}" fill="none" stroke="{color}" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" />
<circle cx="{last_coord[0]}" cy="{last_coord[1]}" r="2.5" fill="{color}" />
</svg>
</div>
<div class="chart-axis-values">
<span>{axis_top_label}</span>
<span>{axis_bottom_label}</span>
</div>
<div class="chart-x-axis">
<span>Tick {int(first_tick)}</span>
<span>Tick {int(last_tick)}</span>
</div>
<div class="chart-meta">
<span>Tick {int(last_tick)}</span>
<span class="chart-value" style="color: {color};">{display_value}</span>
</div>
</div>
"""
def _render_chart_section(charts_payload) -> str:
if not charts_payload:
return ""
if isinstance(charts_payload, list):
metric_points = charts_payload
threat_points = []
else:
metric_points = charts_payload.get("metrics") or []
threat_points = charts_payload.get("threat_levels") or []
fire_chart = _render_chart_card(
metric_points,
"fires",
"🔥 Fire Load",
"#f97316",
lambda v, _: f"{int(round(v))} fires",
lambda v: f"{int(round(v))} fires",
)
unit_chart = _render_chart_card(
metric_points,
"units",
"🚒 Units Deployed",
"#38bdf8",
lambda v, pt: f"{int(round(v))}/{int(pt.get('max_units', 0) or 0)} units",
lambda v: f"{int(round(v))} units",
)
integrity_chart = _render_chart_card(
metric_points,
"building_integrity",
"🏢 Building Integrity",
"#4ade80",
lambda v, _: f"{max(0, min(1, v)) * 100:.0f}%",
lambda v: f"{max(0, min(1, v)) * 100:.0f}%",
)
threat_chart = _render_threat_chart(threat_points)
charts = "".join(filter(None, [fire_chart, unit_chart, integrity_chart, threat_chart]))
if not charts:
return ""
return f"""
<div class="report-chart-grid">
{charts}
</div>
"""
def _render_player_actions_block(payload: Optional[dict]) -> str:
if not payload:
return ""
summary = html.escape(payload.get("summary", "") or "Player has not performed any manual actions this run.")
counts = payload.get("counts") or {}
chips = []
chip_meta = [
("deploy_unit", "🚒 Deploy Units"),
("remove_unit", "♻️ Remove Units"),
("add_fire", "🔥 Ignite Fires"),
]
for key, label in chip_meta:
value = counts.get(key, 0) or 0
try:
value = int(value)
except (TypeError, ValueError):
value = 0
chip_class = "" if value > 0 else "muted"
chips.append(f"<span class='player-action-chip {chip_class}'>{label}: {value}</span>")
chips_html = "".join(chips)
return f"""
<div class="player-actions-block">
<div class="player-actions-header">🙋 Player Manual Actions</div>
<p class="player-actions-summary">{summary}</p>
<div class="player-actions-chips">{chips_html}</div>
</div>
"""
def _render_threat_chart(points: list[dict]) -> str:
if not points:
return ""
threat_map = {"CRITICAL": 4, "HIGH": 3, "MODERATE": 2, "LOW": 1}
normalized = []
for entry in points:
tick = entry.get("tick")
if tick is None:
continue
level = (entry.get("threat_level") or "").upper()
value = entry.get("value")
if value is None:
value = threat_map.get(level, 0)
normalized.append({
"tick": tick,
"threat_value": value,
"threat_label": entry.get("threat_level", level.title()),
})
if not normalized:
return ""
reverse_map = {v: k.title() for k, v in threat_map.items()}
def format_label(val):
return reverse_map.get(int(round(val)), f"{val:.0f}")
return _render_chart_card(
normalized,
"threat_value",
"🧭 Threat Level",
"#c084fc",
lambda v, pt: pt.get("threat_label", format_label(v)),
format_label,
)
after_action_status = report_payload.get("status", "idle")
report = report_payload.get("report", {}) or {}
if after_action_status == "pending":
report_section = """
<div class="result-report pending">
<div class="report-spinner"></div>
<div class="report-text">
<strong>AI battle report is being generated...</strong>
<p>The AI is consolidating Stage 4 cycle summaries and mission metrics into the final report.</p>
</div>
</div>
"""
elif after_action_status == "ready" and report:
summary = html.escape(report.get("summary", ""))
charts_html = _render_chart_section(report.get("charts") or [])
player_actions_html = _render_player_actions_block(report.get("player_actions"))
strengths = _render_report_cards("✅ What Went Well", report.get("strengths", []), "positive")
improvements = _render_report_cards("⚠️ Needs Improvement", report.get("improvements", []), "risk")
actions = _render_report_cards("🛠 Actionable Suggestions", report.get("next_actions", []), "action")
cards = strengths + improvements + actions
if not cards:
cards = "<p class='report-empty'>AI has not provided any concrete items yet.</p>"
report_section = f"""
<div class="result-report ready">
<div class="result-report-header">
<span>AI Battle Report</span>
<span class="report-badge">Complete</span>
</div>
<p class="report-summary">{summary}</p>
{charts_html}
{player_actions_html}
<div class="report-card-grid">{cards}</div>
</div>
"""
elif after_action_status == "error":
error_msg = html.escape(report_payload.get("error") or "AI report generation failed.")
report_section = f"""
<div class="result-report error">
<div class="result-report-header">
<span>AI Battle Report</span>
<span class="report-badge danger">Error</span>
</div>
<p class="report-error">⚠️ {error_msg}</p>
</div>
"""
else:
report_section = """
<div class="result-report idle">
<p>Waiting for the AI to finish the battle analysis...</p>
</div>
"""
return f"""
<div class="game-result-overlay" id="result-overlay">
<div class="game-result-box">
<div class="result-icon" style="color: {outcome_config['color']};">{outcome_config['icon']}</div>
<div class="result-title" style="color: {outcome_config['color']};">{outcome_config['title']}</div>
<div class="result-subtitle">{outcome_config['subtitle']}</div>
{report_section}
<button class="result-ok-btn" onclick="const overlay=document.getElementById('result-overlay');if(overlay){{overlay.style.display='none';}}">OK</button>
</div>
</div>
"""
# =============================================================================
# Gradio Event Handlers
# =============================================================================
def _get_combined_advisor_messages(service) -> list[dict]:
"""Return history + current advisor messages as one list."""
history = service.get_advisor_history_chat_messages() or []
current = service.get_advisor_chat_messages() or []
combined = history + current
if not combined:
return [{
"role": "assistant",
"content": "No AI analysis yet. Press **Start** to begin the advisor cycle."
}]
return combined
def start_or_resume_simulation(
fire_count: int,
fire_intensity: float,
building_count: int,
max_units: int,
seed: Optional[int],
should_resume: bool,
session_token: Optional[str],
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""Handle Start/Resume button click."""
service = get_or_create_service(service)
if display_cache is None:
display_cache = _create_display_cache()
token = (session_token or "").strip()
resume_requested = bool(should_resume) and service.can_resume_session(token)
if resume_requested:
# Resume only when both the backend is paused AND the current
# Gradio session explicitly asked for it (same browser session).
state = service.resume()
# Keep existing token for resume
new_token = token
else:
# Start new simulation - ALWAYS generate fresh token to ensure uniqueness
# This prevents any possibility of matching old paused sessions
display_cache = _reset_advisor_display_cache(display_cache)
new_token = _generate_session_token()
actual_seed = int(seed) if seed and seed > 0 else None
state = service.start(
seed=actual_seed,
fire_count=int(fire_count),
fire_intensity=fire_intensity,
building_count=int(building_count),
max_units=int(max_units),
session_id=new_token,
)
# Generate button updates
updates = get_all_button_updates(state)
# Get thinking state
is_thinking = service.is_thinking()
thinking_stage = service.get_thinking_stage()
return [
gr.Timer(active=True), # Start the timer
gr.Timer(active=False), # Ensure report poller is off initially
render_game_result(state.get("status", ""), state.get("after_action_report")),
_get_combined_advisor_messages(service),
service.get_event_log_text(),
gr.update(interactive=False), # start btn
gr.update(interactive=True), # pause btn
render_status_html(state, is_thinking, thinking_stage),
] + updates + [False, new_token, service, display_cache]
def pause_simulation(
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""Handle Pause button click."""
service = get_or_create_service(service)
if display_cache is None:
display_cache = _create_display_cache()
state = service.pause()
updates = get_all_button_updates(state)
return [
gr.Timer(active=False), # Stop the timer
gr.Timer(active=False),
render_game_result(state.get("status", ""), state.get("after_action_report")),
_get_combined_advisor_messages(service),
service.get_event_log_text(),
gr.update(interactive=True), # start btn (can resume)
gr.update(interactive=False), # pause btn
render_status_html(state),
] + updates + [True, service, display_cache]
def reset_simulation(
fire_count: int,
fire_intensity: float,
building_count: int,
max_units: int,
seed: Optional[int],
session_token: Optional[str],
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""Handle Reset button click."""
service = get_or_create_service(service)
if display_cache is None:
display_cache = _create_display_cache()
display_cache = _reset_advisor_display_cache(display_cache)
# Always generate fresh token on reset to ensure clean state
new_token = _generate_session_token()
actual_seed = int(seed) if seed and seed > 0 else None
state = service.reset(
seed=actual_seed,
fire_count=int(fire_count),
fire_intensity=fire_intensity,
building_count=int(building_count),
max_units=int(max_units),
session_id=new_token,
)
updates = get_all_button_updates(state)
return [
gr.Timer(active=False), # Stop the timer
gr.Timer(active=False),
"", # Clear result popup
_get_combined_advisor_messages(service),
service.get_event_log_text(),
gr.update(interactive=True),
gr.update(interactive=False),
render_status_html(state),
] + updates + [False, new_token, service, display_cache]
def prepare_reset(
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""Clean up the current service instance before reloading."""
if service:
service.shutdown()
return None, None
def _ensure_display_cache_initialized(display_cache: Optional[dict]) -> dict:
"""Guarantee we always have a mutable display cache."""
return display_cache if display_cache is not None else _create_display_cache()
def _render_manual_action_update(service: SimulationService, display_cache: dict):
"""Return the standard UI updates after a manual grid action."""
state = service.get_state()
updates = get_all_button_updates(state)
is_thinking = service.is_thinking()
thinking_stage = service.get_thinking_stage()
return [
render_game_result(state.get("status", ""), state.get("after_action_report")),
_get_combined_advisor_messages(service),
service.get_event_log_text(),
render_status_html(state, is_thinking, thinking_stage),
] + updates + [service, display_cache]
def _normalize_manual_selection(selection: str) -> str:
"""Map flexible text inputs to the radio labels used by the UI."""
if not selection:
return "🚒 Truck"
normalized = selection.strip().lower()
mappings = {
"truck": "🚒 Truck",
"fire_truck": "🚒 Truck",
"firetruck": "🚒 Truck",
"🚒": "🚒 Truck",
"heli": "🚁 Heli",
"helicopter": "🚁 Heli",
"🚁": "🚁 Heli",
"fire": "🔥 Fire",
"🔥": "🔥 Fire",
}
return mappings.get(normalized, selection if selection in ["🚒 Truck", "🚁 Heli", "🔥 Fire"] else "🚒 Truck")
def deploy_at_cell(
x: int,
y: int,
selection: str,
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""Deploy unit or fire at specific cell, or remove if unit already exists there."""
service = get_or_create_service(service)
display_cache = _ensure_display_cache_initialized(display_cache)
# Only allow deployment when simulation is actively running (not paused)
if not service.is_running():
gr.Warning("⚠️ Please start the simulation first!")
return _render_manual_action_update(service, display_cache)
# Handle fire placement
if selection == "🔥 Fire":
result = service.add_fire(x, y, intensity=0.5)
if result.get("status") != "ok":
error_msg = result.get("message", "Unknown error")
gr.Warning(f"⚠️ {error_msg}")
# Check if there's already a unit at this position
elif service.has_unit_at(x, y):
# Remove the existing unit
result = service.remove_unit(x, y)
else:
# Deploy new unit
unit_type_key = "fire_truck" if selection == "🚒 Truck" else "helicopter"
result = service.deploy_unit(unit_type_key, x, y, "player")
# Show warning notification only on failure
if result.get("status") != "ok":
error_msg = result.get("message", "Unknown error")
gr.Warning(f"⚠️ {error_msg}")
return _render_manual_action_update(service, display_cache)
def handle_map_deploy(
selection: str,
x: int,
y: int,
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""Public API handler to deploy units/fires at a coordinate."""
normalized = _normalize_manual_selection(selection)
return deploy_at_cell(
int(x),
int(y),
normalized,
service,
display_cache,
)
def handle_map_remove(
x: int,
y: int,
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""Public API handler to remove a unit at coordinates."""
service = get_or_create_service(service)
display_cache = _ensure_display_cache_initialized(display_cache)
if not service.is_running():
gr.Warning("⚠️ Please start the simulation first!")
return _render_manual_action_update(service, display_cache)
x = int(x)
y = int(y)
if not service.has_unit_at(x, y):
gr.Warning("⚠️ No unit found at the requested coordinates.")
return _render_manual_action_update(service, display_cache)
result = service.remove_unit(x, y)
if result.get("status") != "ok":
error_msg = result.get("message", "Unknown error")
gr.Warning(f"⚠️ {error_msg}")
return _render_manual_action_update(service, display_cache)
def handle_map_move(
source_x: int,
source_y: int,
target_x: int,
target_y: int,
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""Public API handler to move a unit between coordinates."""
service = get_or_create_service(service)
display_cache = _ensure_display_cache_initialized(display_cache)
if not service.is_running():
gr.Warning("⚠️ Please start the simulation first!")
return _render_manual_action_update(service, display_cache)
sx, sy = int(source_x), int(source_y)
tx, ty = int(target_x), int(target_y)
if not service.has_unit_at(sx, sy):
gr.Warning("⚠️ No unit found at the source coordinates.")
return _render_manual_action_update(service, display_cache)
removal = service.remove_unit(sx, sy)
if removal.get("status") != "ok":
error_msg = removal.get("message", "Failed to remove unit before moving.")
gr.Warning(f"⚠️ {error_msg}")
return _render_manual_action_update(service, display_cache)
removed_unit = removal.get("unit") or {}
unit_type = removed_unit.get("type") or removal.get("removed_unit_type") or "fire_truck"
deploy_result = service.deploy_unit(unit_type, tx, ty, "player_move")
if deploy_result.get("status") != "ok":
# Attempt to restore the original placement
service.deploy_unit(unit_type, sx, sy, "player_move_rollback")
error_msg = deploy_result.get("message", "Failed to deploy unit at target location.")
gr.Warning(f"⚠️ {error_msg}")
return _render_manual_action_update(service, display_cache)
def poll_after_action_report(service: Optional[SimulationService]):
"""Poll after-action report status independently of the main simulation timer."""
service = get_or_create_service(service)
state = service.get_state()
status = state.get("status", "idle")
report_payload = state.get("after_action_report")
after_action_status = (report_payload or {}).get("status", "idle")
report_timer_update = gr.update()
if status not in ["success", "fail"] or after_action_status != "pending":
report_timer_update = gr.Timer(active=False)
return [
report_timer_update,
render_game_result(status, report_payload),
service,
]
def get_all_button_updates(state: dict) -> list:
"""Generate updates for all grid buttons."""
updates = []
for y in range(10):
for x in range(10):
content, bg_color = get_cell_info(state, x, y)
updates.append(gr.update(value=content))
return updates
def refresh_display(
service: Optional[SimulationService],
display_cache: Optional[dict],
):
"""
Single timer refresh (every 1 second).
Uses unified change tracking to update only changed components.
"""
service = get_or_create_service(service)
if display_cache is None:
display_cache = _create_display_cache()
# Get all changes in one call
changes = service.get_changed_components()
state = changes["state"]
status = state.get("status", "idle")
report_payload_state = state.get("after_action_report") or {}
after_action_status = report_payload_state.get("status", "idle")
# Get AI thinking state
is_thinking = service.is_thinking()
thinking_stage = service.get_thinking_stage()
# Freeze background UI once a win/lose overlay has already been shown
# but still allow the final tick (when the overlay first appears) to push
# its grid updates so the user sees the true end state.
overlay_state = changes.get("result_state", "")
cached_overlay_state = display_cache["result_freeze_state"]
overlay_first_tick = bool(overlay_state) and not cached_overlay_state
if overlay_state:
display_cache["result_freeze_state"] = overlay_state
elif cached_overlay_state:
display_cache["result_freeze_state"] = ""
freeze_background = bool(display_cache["result_freeze_state"]) and not overlay_first_tick
# Timer control - stop when game ends
timer_update = gr.update()
if status in ["success", "fail"]:
timer_update = gr.Timer(active=False)
report_timer_update = gr.Timer(active=False)
if status in ["success", "fail"] and after_action_status == "pending":
report_timer_update = gr.Timer(active=True)
# Result popup - only when state changes
if changes["result_changed"]:
result_state = changes["result_state"]
payload = changes.get("result_payload") or {}
report_payload = payload.get("after_action")
result_popup = render_game_result(result_state, report_payload)
else:
result_popup = gr.update()
if freeze_background:
advisor_display = gr.skip()
else:
# Advisor display (chatbot) showing combined history + current cycle
if changes["history_changed"]:
history_messages = changes.get("advisor_history")
if history_messages is not None:
display_cache["history_messages_cache"] = history_messages
current_cycle_messages = changes.get("advisor_messages")
if current_cycle_messages is not None:
display_cache["current_cycle_messages_cache"] = current_cycle_messages
combined_messages = (display_cache["history_messages_cache"] or []) + (
display_cache["current_cycle_messages_cache"] or []
)
if combined_messages:
signature = _chat_signature(combined_messages)
if signature != display_cache["last_advisor_signature"]:
display_cache["last_advisor_signature"] = signature
advisor_display = gr.update(value=combined_messages)
else:
advisor_display = gr.skip()
else:
advisor_display = gr.skip()
# Event log - only when content changes
if freeze_background:
event_log = gr.update()
else:
event_log = changes["event_log"] if changes["event_log_changed"] else gr.update()
# Buttons - only when state changes
if freeze_background:
start_btn_update = gr.update(interactive=True)
pause_btn_update = gr.update(interactive=False)
else:
if changes["buttons_changed"]:
start_enabled, pause_enabled = changes["button_states"]
start_btn_update = gr.update(interactive=start_enabled)
pause_btn_update = gr.update(interactive=pause_enabled)
else:
start_btn_update = gr.update()
pause_btn_update = gr.update()
# Status bar - use cache to prevent flicker (only update when content changes)
if freeze_background:
status_html_update = gr.skip()
else:
new_status_html = render_status_html(state, is_thinking, thinking_stage)
if new_status_html == display_cache["last_status_html"]:
status_html_update = gr.skip()
else:
display_cache["last_status_html"] = new_status_html
status_html_update = new_status_html
# Grid buttons - only update if changed
if freeze_background:
updates = [gr.update() for _ in range(100)]
else:
if changes["grid_changed"]:
updates = get_all_button_updates(state)
else:
updates = [gr.update() for _ in range(100)]
return [
timer_update,
report_timer_update,
result_popup,
advisor_display,
event_log,
start_btn_update,
pause_btn_update,
status_html_update,
] + updates + [service, display_cache]
# =============================================================================
# Gradio UI Definition
# =============================================================================
CUSTOM_CSS = """
.advisor-chatbot {
border: 1px solid var(--progress-bg);
border-radius: 12px;
background: var(--status-bg);
padding: 4px;
}
.advisor-chatbot .wrap {
background: transparent !important;
}
.advisor-chatbot .top-panel,
.advisor-chatbot .icon-button-wrapper {
display: none !important;
}
.advisor-chatbot .message {
font-family: "JetBrains Mono", "SFMono-Regular", ui-monospace, monospace !important;
font-size: 13px !important;
line-height: 1.5 !important;
color: var(--status-text);
}
.advisor-chatbot .message .metadata-title {
font-weight: 600;
}
/* Theme-aware CSS variables */
:root {
--status-bg: #f8fafc;
--status-text: #1e293b;
--status-muted: #64748b;
--progress-bg: #e2e8f0;
--overlay-panel-bg: rgba(255, 255, 255, 0.92);
--overlay-panel-border: rgba(15, 23, 42, 0.12);
--overlay-panel-strong-bg: #f1f5f9;
--overlay-subpanel-bg: #ffffff;
--overlay-subpanel-border: rgba(15, 23, 42, 0.1);
--overlay-card-bg: #ffffff;
--overlay-card-border: rgba(15, 23, 42, 0.1);
--overlay-chart-bg: #ffffff;
--overlay-chart-border: rgba(15, 23, 42, 0.12);
--overlay-chip-bg: rgba(248, 113, 113, 0.12);
--overlay-chip-border: rgba(248, 113, 113, 0.25);
--overlay-chip-text: #9a3412;
--overlay-list-bg: #ffffff;
--overlay-list-text: var(--status-text);
}
/* Dark mode overrides */
.dark {
--status-bg: #1a1a2e;
--status-text: #ffffff;
--status-muted: #94a3b8;
--progress-bg: #333333;
--overlay-panel-bg: rgba(23, 27, 45, 0.8);
--overlay-panel-border: rgba(255, 255, 255, 0.08);
--overlay-panel-strong-bg: rgba(23, 27, 45, 0.9);
--overlay-subpanel-bg: rgba(15, 23, 42, 0.75);
--overlay-subpanel-border: rgba(248, 250, 252, 0.2);
--overlay-card-bg: rgba(15, 23, 42, 0.8);
--overlay-card-border: rgba(255, 255, 255, 0.08);
--overlay-chart-bg: rgba(13, 20, 35, 0.85);
--overlay-chart-border: rgba(255, 255, 255, 0.08);
--overlay-chip-bg: rgba(248, 113, 113, 0.08);
--overlay-chip-border: rgba(248, 113, 113, 0.35);
--overlay-chip-text: #fed7aa;
--overlay-list-bg: rgba(15, 23, 42, 0.9);
--overlay-list-text: #e2e8f0;
}
/* Fixed size grid buttons - force square shape */
.grid-btn {
width: 52px !important;
height: 52px !important;
min-width: 52px !important;
min-height: 52px !important;
max-width: 52px !important;
max-height: 52px !important;
padding: 0 !important;
font-size: 16px !important;
border-radius: 5px !important;
margin: 1px !important;
flex-shrink: 0 !important;
flex-grow: 0 !important;
line-height: 52px !important;
box-sizing: border-box !important;
font-weight: bold !important;
text-shadow: 1px 1px 2px rgba(0,0,0,0.5) !important;
}
/* Coordinate label - same size as grid-btn but transparent */
.coord-label {
width: 52px !important;
height: 52px !important;
min-width: 52px !important;
min-height: 52px !important;
max-width: 52px !important;
max-height: 52px !important;
padding: 0 !important;
font-size: 14px !important;
font-weight: bold !important;
border-radius: 5px !important;
margin: 1px !important;
flex-shrink: 0 !important;
flex-grow: 0 !important;
line-height: 52px !important;
box-sizing: border-box !important;
background: transparent !important;
border: none !important;
color: #888 !important;
cursor: default !important;
pointer-events: none !important;
}
/* Fixed row layout - prevent wrapping, tight spacing */
.grid-row {
display: flex !important;
flex-wrap: nowrap !important;
gap: 0 !important;
align-items: center !important;
margin: 0 !important;
padding: 0 !important;
min-height: 0 !important;
line-height: 1 !important;
}
/* Override Gradio's default row gap */
.grid-row.row {
gap: 0 !important;
}
.grid-row > div {
flex-shrink: 0 !important;
flex-grow: 0 !important;
margin: 0 !important;
padding: 0 !important;
}
/* Ensure button containers have no extra spacing */
.grid-row > div > button {
margin: 1px !important;
}
/* Override Gradio column gap for grid container */
.grid-container {
gap: 0 !important;
row-gap: 0 !important;
column-gap: 0 !important;
}
.grid-container.column {
gap: 0 !important;
row-gap: 0 !important;
column-gap: 0 !important;
}
/* Target any parent column that contains grid-row */
div.column:has(.grid-row) {
gap: 0 !important;
row-gap: 0 !important;
column-gap: 0 !important;
}
.log-box textarea {
font-family: "JetBrains Mono", "SFMono-Regular", ui-monospace, monospace !important;
font-size: 13px !important;
min-height: 160px !important;
max-height: 320px !important;
overflow-y: auto !important;
resize: vertical !important;
}
/* Theme-aware legend box */
.legend-box {
background: var(--status-bg);
padding: 10px;
border-radius: 8px;
color: var(--status-text);
border: 1px solid var(--progress-bg);
}
.status-panel {
background: var(--status-bg);
border: 1px solid var(--progress-bg);
border-radius: 12px;
padding: 12px;
color: var(--status-text);
}
.status-panel .status-compact {
margin: 0;
}
/* Subtitle styling */
.subtitle {
color: var(--status-muted) !important;
font-size: 14px !important;
margin-top: -8px !important;
margin-bottom: 12px !important;
}
/* AI Advisor highlight styling */
.ai-advisor-panel {
background: linear-gradient(135deg, rgba(59, 130, 246, 0.1), rgba(147, 51, 234, 0.1));
border: 2px solid rgba(99, 102, 241, 0.3);
border-radius: 12px;
padding: 16px;
}
.dark .ai-advisor-panel {
background: linear-gradient(135deg, rgba(59, 130, 246, 0.15), rgba(147, 51, 234, 0.15));
border-color: rgba(99, 102, 241, 0.4);
}
/* Accordion styling - more compact */
.gradio-accordion {
margin-bottom: 8px !important;
}
/* Control buttons row spacing */
.control-buttons-row {
margin-top: 16px !important;
}
/* How to play inline styling */
.how-to-play {
background: var(--status-bg);
border: 1px solid var(--progress-bg);
border-radius: 8px;
padding: 12px 16px !important;
margin: 8px 0 !important;
font-size: 14px;
}
.how-to-play p {
margin: 4px 0 !important;
}
/* Main heading styling */
h1 {
margin-bottom: 4px !important;
}
h2 {
margin-top: 12px !important;
margin-bottom: 8px !important;
}
/* Section spacing */
.section-gap {
margin-top: 16px !important;
}
/* Compact status bar */
.status-bar {
display: flex;
flex-wrap: wrap;
gap: 12px;
padding: 10px 16px;
background: var(--status-bg);
border-radius: 8px;
font-size: 14px;
align-items: center;
border: 1px solid var(--progress-bg);
}
.status-bar .status-item {
display: flex;
align-items: center;
gap: 6px;
}
.status-bar .status-badge {
padding: 2px 8px;
border-radius: 4px;
font-weight: bold;
font-size: 12px;
}
.status-bar .progress-mini {
width: 80px;
height: 6px;
background: var(--progress-bg);
border-radius: 3px;
overflow: hidden;
}
/* Compact status bar */
.status-compact {
display: flex;
flex-wrap: wrap;
gap: 12px;
padding: 8px 14px;
background: var(--status-bg);
border-radius: 8px;
font-size: 14px;
align-items: center;
border: 1px solid var(--progress-bg);
}
.status-compact .status-item {
display: flex;
align-items: center;
gap: 5px;
}
.status-compact .status-badge {
padding: 2px 8px;
border-radius: 4px;
font-weight: bold;
font-size: 11px;
color: #000;
}
.status-compact .status-health {
flex: 1;
min-width: 120px;
}
.status-compact .progress-mini {
flex: 1;
min-width: 60px;
max-width: 100px;
height: 8px;
background: var(--progress-bg);
border-radius: 4px;
overflow: hidden;
}
/* AI Thinking indicator in status bar */
.status-compact .ai-thinking-inline {
background: linear-gradient(135deg, rgba(99, 102, 241, 0.2), rgba(147, 51, 234, 0.2));
padding: 4px 10px;
border-radius: 6px;
border: 1px solid rgba(99, 102, 241, 0.4);
animation: ai-glow 1.5s ease-in-out infinite;
}
.status-compact .ai-label {
font-weight: bold;
margin-right: 4px;
color: #a78bfa;
}
.status-compact .ai-pulse {
animation: pulse-icon 1s ease-in-out infinite;
}
@keyframes ai-glow {
0%, 100% { box-shadow: 0 0 5px rgba(99, 102, 241, 0.3); }
50% { box-shadow: 0 0 15px rgba(99, 102, 241, 0.6); }
}
@keyframes pulse-icon {
0%, 100% { transform: scale(1); }
50% { transform: scale(1.2); }
}
/* Win/Lose overlay */
.game-result-overlay {
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
display: flex;
justify-content: center;
align-items: center;
background: rgba(0,0,0,0.7);
z-index: 9999;
}
.game-result-box {
background: var(--status-bg);
padding: 32px 40px;
border-radius: 20px;
text-align: center;
animation: popup 0.3s ease-out;
width: min(90%, 90%);
max-height: 90vh;
overflow-y: auto;
box-shadow: 0 30px 80px rgba(0, 0, 0, 0.45);
}
.result-icon {
font-size: 60px;
margin-bottom: 12px;
}
.result-title {
font-size: 36px;
font-weight: 800;
letter-spacing: 1px;
}
.result-subtitle {
font-size: 18px;
color: var(--status-muted);
margin-bottom: 24px;
}
.result-report {
background: var(--overlay-panel-bg);
border: 1px solid var(--overlay-panel-border);
border-radius: 16px;
padding: 20px 24px;
margin-bottom: 24px;
text-align: left;
}
.result-report-header {
display: flex;
justify-content: space-between;
align-items: center;
font-weight: 600;
margin-bottom: 12px;
}
.report-badge {
font-size: 12px;
text-transform: uppercase;
letter-spacing: 0.5px;
padding: 4px 10px;
border-radius: 999px;
background: rgba(56, 189, 248, 0.15);
color: #22d3ee;
border: 1px solid rgba(34, 211, 238, 0.3);
}
.report-badge.danger {
background: rgba(248, 113, 113, 0.15);
color: #f87171;
border-color: rgba(248, 113, 113, 0.4);
}
.result-report.ready {
background: var(--overlay-panel-strong-bg);
}
.result-report.pending,
.result-report.error {
display: flex;
align-items: center;
gap: 16px;
}
.report-spinner {
width: 32px;
height: 32px;
border: 3px solid rgba(255, 255, 255, 0.2);
border-top-color: #22d3ee;
border-radius: 50%;
animation: report-spin 0.8s linear infinite;
}
@keyframes report-spin {
to { transform: rotate(360deg); }
}
.report-summary {
font-size: 16px;
margin-bottom: 16px;
color: var(--status-text);
}
.player-actions-block {
background: var(--overlay-subpanel-bg);
border: 1px dashed var(--overlay-subpanel-border);
border-radius: 14px;
padding: 16px 18px;
margin-bottom: 18px;
}
.player-actions-header {
font-weight: 700;
letter-spacing: 0.08em;
text-transform: uppercase;
font-size: 13px;
color: #facc15;
margin-bottom: 6px;
}
.player-actions-summary {
margin: 0 0 12px 0;
color: var(--status-text);
font-size: 14px;
}
.player-actions-chips {
display: flex;
flex-wrap: wrap;
gap: 8px;
margin-bottom: 12px;
}
.player-action-chip {
padding: 4px 10px;
border-radius: 999px;
border: 1px solid var(--overlay-chip-border);
background: var(--overlay-chip-bg);
font-size: 12px;
color: var(--overlay-chip-text);
letter-spacing: 0.03em;
}
.player-action-chip.muted {
opacity: 0.4;
}
.player-actions-list {
list-style: none;
margin: 0;
padding: 0;
display: flex;
flex-direction: column;
gap: 8px;
}
.player-actions-list li {
display: flex;
justify-content: space-between;
gap: 12px;
padding: 8px 12px;
border-radius: 10px;
border: 1px solid var(--overlay-panel-border);
background: var(--overlay-list-bg);
font-size: 13px;
color: var(--overlay-list-text);
}
.player-actions-list .action-tick {
font-weight: 600;
color: #38bdf8;
}
.player-actions-list .action-desc {
flex: 1;
text-align: right;
color: var(--overlay-list-text);
}
.player-actions-empty {
margin: 0;
font-size: 13px;
color: var(--status-muted);
text-align: center;
}
.report-card-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(260px, 1fr));
gap: 18px;
}
@media (min-width: 1100px) {
.report-card-grid {
grid-template-columns: repeat(3, minmax(0, 1fr));
}
}
.report-card {
background: var(--overlay-card-bg);
border-radius: 16px;
padding: 18px;
border: 1px solid var(--overlay-card-border);
display: flex;
flex-direction: column;
gap: 12px;
min-height: 220px;
color: var(--status-text);
}
.report-card-header {
display: flex;
align-items: center;
gap: 12px;
text-transform: uppercase;
letter-spacing: 0.08em;
font-size: 13px;
font-weight: 700;
color: var(--status-text);
}
.report-card-icon {
font-size: 22px;
line-height: 1;
}
.report-card-title {
flex: 1;
}
.report-card-body {
flex: 1;
}
.report-card ul {
list-style: disc;
margin: 0 0 0 18px;
padding: 0;
color: var(--status-text);
line-height: 1.45;
}
.report-card ul li {
margin-bottom: 6px;
}
.report-chart-grid {
margin-top: 20px;
display: grid;
grid-template-columns: repeat(auto-fit, minmax(220px, 1fr));
gap: 16px;
}
.report-chart-card {
background: var(--overlay-chart-bg);
border-radius: 14px;
padding: 14px 16px 18px;
border: 1px solid var(--overlay-chart-border);
display: flex;
flex-direction: column;
gap: 8px;
color: var(--status-text);
}
.mini-chart {
width: 100%;
height: 80px;
}
.mini-chart svg {
width: 100%;
height: 80px;
}
.chart-title {
font-size: 13px;
font-weight: 600;
color: var(--status-muted);
letter-spacing: 0.04em;
text-transform: uppercase;
}
.chart-meta {
display: flex;
justify-content: space-between;
font-size: 12px;
color: var(--status-muted);
}
.chart-value {
font-weight: 700;
}
.chart-axis-values,
.chart-x-axis {
display: flex;
justify-content: space-between;
font-size: 11px;
color: var(--status-muted);
}
.chart-axis-values {
margin-top: 4px;
}
.chart-x-axis {
margin-top: 6px;
}
.report-card.positive {
border-color: rgba(34, 197, 94, 0.3);
}
.report-card.risk {
border-color: rgba(248, 113, 113, 0.35);
}
.report-card.action {
border-color: rgba(59, 130, 246, 0.35);
}
.report-error {
color: #f87171;
margin: 0;
}
.report-empty {
color: var(--status-muted);
margin: 0;
}
.result-ok-btn {
width: 100%;
padding: 14px 0;
border: none;
border-radius: 999px;
background: linear-gradient(90deg, #22d3ee, #3b82f6);
color: #0f172a;
font-weight: 700;
font-size: 16px;
cursor: pointer;
transition: transform 0.2s ease, box-shadow 0.2s ease;
}
.result-ok-btn:hover {
transform: translateY(-1px);
box-shadow: 0 10px 25px rgba(34, 211, 238, 0.3);
}
.report-text p {
margin: 4px 0 0 0;
color: var(--status-muted);
}
@keyframes popup {
from { transform: scale(0.8); opacity: 0; }
to { transform: scale(1); opacity: 1; }
}
/* AI Thinking indicator animation */
.ai-thinking {
background: linear-gradient(90deg, rgba(99, 102, 241, 0.2), rgba(147, 51, 234, 0.2));
border-radius: 6px;
padding: 4px 10px !important;
animation: thinking-pulse 1.5s ease-in-out infinite;
}
.thinking-indicator {
color: #a855f7;
font-weight: bold;
font-size: 13px;
}
.dark .thinking-indicator {
color: #c084fc;
}
@keyframes thinking-pulse {
0%, 100% { opacity: 1; background: linear-gradient(90deg, rgba(99, 102, 241, 0.2), rgba(147, 51, 234, 0.2)); }
50% { opacity: 0.7; background: linear-gradient(90deg, rgba(99, 102, 241, 0.4), rgba(147, 51, 234, 0.4)); }
}
/* iOS-style AI Thinking Toast */
.ai-toast-container {
position: absolute;
top: 10px;
left: 50%;
transform: translateX(-50%);
z-index: 100;
animation: toast-slide-in 0.3s ease-out;
}
.ai-toast {
display: flex;
align-items: center;
gap: 12px;
background: rgba(30, 30, 40, 0.95);
backdrop-filter: blur(20px);
-webkit-backdrop-filter: blur(20px);
border-radius: 16px;
padding: 12px 18px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3),
0 0 0 1px rgba(255, 255, 255, 0.1);
min-width: 280px;
}
.ai-toast-icon {
font-size: 28px;
line-height: 1;
}
.ai-toast-content {
flex: 1;
}
.ai-toast-title {
color: #ffffff;
font-weight: 600;
font-size: 15px;
margin-bottom: 2px;
}
.ai-toast-subtitle {
color: rgba(255, 255, 255, 0.6);
font-size: 13px;
}
.ai-toast-spinner {
width: 20px;
height: 20px;
border: 2px solid rgba(255, 255, 255, 0.2);
border-top-color: #a855f7;
border-radius: 50%;
animation: toast-spin 0.8s linear infinite;
}
@keyframes toast-slide-in {
from {
opacity: 0;
transform: translateX(-50%) translateY(-20px);
}
to {
opacity: 1;
transform: translateX(-50%) translateY(0);
}
}
@keyframes toast-spin {
to { transform: rotate(360deg); }
}
/* Grid wrapper for toast positioning */
.grid-wrapper {
position: relative;
}
"""
def create_app() -> gr.Blocks:
"""Create the Gradio application."""
with gr.Blocks() as app:
# Inject CSS via HTML style tag (most compatible method)
gr.HTML(f"<style>{CUSTOM_CSS}</style>")
session_resume_state = gr.State(False)
session_token_state = gr.State("")
service_state = gr.State(None)
display_cache_state = gr.State(None)
# Header with title and instructions
gr.Markdown("# 🔥 Fire Rescue Simulator Game")
gr.Markdown("*An interactive game where you watch AI Agent autonomously fight fires using MCP tools!*")
gr.Markdown("""
**🎮 How to Play:**
- Click **Start**
- Turn **Auto-Execute** OFF to manually control actions and deploy units.
- Watch AI's process: **Reasoning → Planning → Execution → Summary**
- **Settings & Controls:** Use the panel below to quickly tune scenario difficulty (fires, buildings, units, randomness) before sending the team in
**🏆 Win:** Extinguish all fires | **💀 Lose:** Building ≤ 50% or Tick ≥ 200 (time out)
""", elem_classes=["how-to-play"])
# Collapsible Controls Section
with gr.Accordion("⚙️ Settings & Controls", open=False):
with gr.Row():
with gr.Column(scale=1):
fire_count_defaults = SCENARIO_DEFAULTS["fire_count"]
fire_count = gr.Slider(
minimum=fire_count_defaults.minimum,
maximum=fire_count_defaults.maximum,
value=fire_count_defaults.value,
step=fire_count_defaults.step,
label="🔥 Initial Fire Count",
info="Number of fire starting points"
)
with gr.Column(scale=1):
fire_intensity_defaults = SCENARIO_DEFAULTS["fire_intensity"]
fire_intensity = gr.Slider(
minimum=fire_intensity_defaults.minimum,
maximum=fire_intensity_defaults.maximum,
value=fire_intensity_defaults.value,
step=fire_intensity_defaults.step,
label="🌡️ Fire Intensity",
info="Initial fire strength"
)
with gr.Row():
with gr.Column(scale=1):
building_count_defaults = SCENARIO_DEFAULTS["building_count"]
building_count = gr.Slider(
minimum=building_count_defaults.minimum,
maximum=building_count_defaults.maximum,
value=building_count_defaults.value,
step=building_count_defaults.step,
label="🏢 Building Count",
info="Number of buildings (connected cluster)"
)
with gr.Column(scale=1):
max_units_defaults = SCENARIO_DEFAULTS["max_units"]
max_units = gr.Slider(
minimum=max_units_defaults.minimum,
maximum=max_units_defaults.maximum,
value=max_units_defaults.value,
step=max_units_defaults.step,
label="🚒 Max Units",
info="Maximum deployable units"
)
with gr.Row():
with gr.Column(scale=1):
seed_input = gr.Number(
value=0, label="Random Seed (0 = random)", precision=0
)
# Control buttons - always visible
with gr.Row(elem_classes=["control-buttons-row"]):
start_btn = gr.Button("▶️ Start", variant="primary", scale=1)
pause_btn = gr.Button("⏸️ Pause", variant="stop", interactive=False, scale=1)
reset_btn = gr.Button("🔄 Reset", scale=1)
# Game result popup (hidden by default)
result_popup = gr.HTML(value="", visible=True)
# Store grid buttons for updates
grid_buttons = []
# Main content: AI Advisor (left) + Simulation Grid (right)
with gr.Row(elem_classes=["section-gap"]):
# Left column: AI Advisor - THE STAR OF THE SHOW
with gr.Column(scale=2, min_width=300):
advisor_interval_ticks = 10
gr.Markdown(
f"## 🤖 AI Tactical Advisor · (refreshes every {advisor_interval_ticks} ticks)"
)
model_selector = gr.Dropdown(
label="🧠 Advisor Model Source",
choices=ADVISOR_MODEL_LABELS,
value=DEFAULT_ADVISOR_MODEL_CHOICE,
interactive=True,
)
auto_execute_toggle = gr.Checkbox(
label="🎮 Auto-Execute",
value=AUTO_EXECUTE_DEFAULT,
info="Automatically execute AI recommendations",
)
advisor_initial_messages = [
{
"role": "assistant",
"content": "No AI analysis yet. Press **Start** to begin the advisor cycle.",
}
]
with gr.Accordion("📜 AI Analysis History", open=True):
gr.Markdown(
"⚠️ **Heads-up**: This timeline refreshes whenever a new AI cycle starts. "
"Hit `Pause` first if you want to read the full reasoning without it updating mid-run. "
"All completed cycles remain in this view—just scroll to review earlier ticks."
)
advisor_display = gr.Chatbot(
value=advisor_initial_messages,
height=500,
render_markdown=True,
show_label=False,
layout="panel",
elem_classes=["advisor-chatbot"],
avatar_images=(None, None),
)
# Collapsible Event Log
with gr.Accordion("📋 Event Log & Deploy Status", open=False):
event_log_display = gr.Textbox(
value="No events yet...",
label="Events",
lines=5,
max_lines=10,
interactive=False,
elem_classes=["log-box"]
)
# Right column: Simulation Grid
with gr.Column(scale=2):
gr.Markdown("## 🗺️ Simulation Grid")
# Click to place selector
with gr.Row():
place_selector = gr.Radio(
choices=["🚒 Truck", "🚁 Heli", "🔥 Fire"],
value="🚒 Truck",
label="Click Map to Place",
scale=2
)
# Legend
gr.HTML("""
<div class="legend-box">
<span style="margin-right: 15px;">🌲 Forest</span>
<span style="margin-right: 15px;">🏢 Building</span>
<span style="margin-right: 15px;">🔥 Fire</span>
<span style="margin-right: 15px;">💨 Smoke</span>
<span style="margin-right: 15px;">🚒 Truck</span>
<span>🚁 Heli</span>
</div>
""")
# Status display with progress bars - below legend (includes AI thinking indicator)
status_display = gr.HTML(
value=render_status_html({"status": "idle"}),
elem_classes=["status-panel"]
)
# Grid container with no gap
with gr.Column(elem_classes=["grid-container"]):
# X-axis labels (top row)
with gr.Row(elem_classes=["grid-row"]):
# Empty corner
gr.Button(value="", elem_classes=["coord-label"], interactive=False, min_width=52)
# X coordinates 0-9
for x in range(10):
gr.Button(value=str(x), elem_classes=["coord-label"], interactive=False, min_width=52)
# Grid buttons with Y-axis labels
for y in range(10):
with gr.Row(elem_classes=["grid-row"]):
# Y-axis label
gr.Button(value=str(y), elem_classes=["coord-label"], interactive=False, min_width=52)
# Grid cells
for x in range(10):
btn = gr.Button(
value="🌲",
elem_classes=["grid-btn"],
min_width=52,
)
grid_buttons.append((x, y, btn))
# Hidden components that only exist to expose MCP-friendly APIs (kept out of UI)
with gr.Column(visible=False):
manual_deploy_selection = gr.Textbox(label="deploy_selection", value="🚒 Truck")
manual_deploy_x = gr.Number(label="deploy_x", value=0, precision=0)
manual_deploy_y = gr.Number(label="deploy_y", value=0, precision=0)
manual_move_source_x = gr.Number(label="move_source_x", value=0, precision=0)
manual_move_source_y = gr.Number(label="move_source_y", value=0, precision=0)
manual_move_target_x = gr.Number(label="move_target_x", value=0, precision=0)
manual_move_target_y = gr.Number(label="move_target_y", value=0, precision=0)
manual_remove_x = gr.Number(label="remove_x", value=0, precision=0)
manual_remove_y = gr.Number(label="remove_y", value=0, precision=0)
manual_deploy_trigger = gr.Button("manual_deploy_trigger", visible=False)
manual_move_trigger = gr.Button("manual_move_trigger", visible=False)
manual_remove_trigger = gr.Button("manual_remove_trigger", visible=False)
# Timers: main simulation refresh + after-action poller
timer = gr.Timer(value=1.0, active=False)
report_timer = gr.Timer(value=1.0, active=False)
# Collect all button outputs for updates
all_buttons = [btn for (_, _, btn) in grid_buttons]
manual_action_outputs = [result_popup, advisor_display, event_log_display, status_display] + all_buttons + [service_state, display_cache_state]
# Event handlers for simulation controls
start_btn.click(
fn=start_or_resume_simulation,
inputs=[
fire_count,
fire_intensity,
building_count,
max_units,
seed_input,
session_resume_state,
session_token_state,
service_state,
display_cache_state,
],
outputs=[timer, report_timer, result_popup, advisor_display, event_log_display, start_btn, pause_btn, status_display]
+ all_buttons
+ [session_resume_state, session_token_state, service_state, display_cache_state],
)
pause_btn.click(
fn=pause_simulation,
inputs=[service_state, display_cache_state],
outputs=[timer, report_timer, result_popup, advisor_display, event_log_display, start_btn, pause_btn, status_display]
+ all_buttons
+ [session_resume_state, service_state, display_cache_state],
)
reset_btn.click(
fn=prepare_reset,
inputs=[service_state, display_cache_state],
outputs=[service_state, display_cache_state],
queue=False,
js="() => { window.location.reload(); return []; }",
)
# Event handlers for grid buttons (click to place)
for x, y, btn in grid_buttons:
btn.click(
fn=lambda sel, svc, cache, _x=x, _y=y: deploy_at_cell(_x, _y, sel, svc, cache),
inputs=[place_selector, service_state, display_cache_state],
outputs=[result_popup, advisor_display, event_log_display, status_display] + all_buttons + [service_state, display_cache_state],
api_visibility="private",
)
# Timer tick handler - updates all components with change tracking
timer.tick(
fn=refresh_display,
inputs=[service_state, display_cache_state],
outputs=[timer, report_timer, result_popup, advisor_display, event_log_display, start_btn, pause_btn, status_display]
+ all_buttons
+ [service_state, display_cache_state],
)
report_timer.tick(
fn=poll_after_action_report,
inputs=[service_state],
outputs=[report_timer, result_popup, service_state],
)
# Auto-execute toggle handler
def on_auto_execute_toggle(enabled: bool, service: Optional[SimulationService]):
service = get_or_create_service(service)
service.set_auto_execute(enabled)
return service
def on_model_choice_change(selection: str, service: Optional[SimulationService]):
service = get_or_create_service(service)
result = service.set_advisor_model_choice(selection)
if result.get("status") != "ok":
raise gr.Error(result.get("message", "Failed to switch advisor model"))
return service
auto_execute_toggle.change(
fn=on_auto_execute_toggle,
inputs=[auto_execute_toggle, service_state],
outputs=[service_state],
)
model_selector.change(
fn=on_model_choice_change,
inputs=[model_selector, service_state],
outputs=[service_state],
)
# Public MCP endpoints for manual deploy/move/remove without exposing all grid buttons
manual_deploy_trigger.click(
fn=handle_map_deploy,
inputs=[manual_deploy_selection, manual_deploy_x, manual_deploy_y, service_state, display_cache_state],
outputs=manual_action_outputs,
api_name="deploy_unit_manual",
api_visibility="public",
)
manual_move_trigger.click(
fn=handle_map_move,
inputs=[
manual_move_source_x,
manual_move_source_y,
manual_move_target_x,
manual_move_target_y,
service_state,
display_cache_state,
],
outputs=manual_action_outputs,
api_name="move_unit_manual",
api_visibility="public",
)
manual_remove_trigger.click(
fn=handle_map_remove,
inputs=[manual_remove_x, manual_remove_y, service_state, display_cache_state],
outputs=manual_action_outputs,
api_name="remove_unit_manual",
api_visibility="public",
)
app.load(
fn=_initialize_session_defaults,
inputs=[service_state, display_cache_state],
outputs=[
auto_execute_toggle,
session_resume_state,
session_token_state,
model_selector,
service_state,
display_cache_state,
advisor_display,
event_log_display,
status_display,
]
+ all_buttons,
)
return app
demo = create_app()
def launch_simple():
"""Launch simple Gradio app (for HF Spaces / local development)."""
demo.launch(
server_name="0.0.0.0",
server_port=7860,
ssr_mode=False, # Disable SSR for better compatibility,
mcp_server=True
)
if __name__ == "__main__":
launch_simple()