arkai2025's picture
refactor(config): centralize scenario configuration defaults
85cbecd
"""
Fire-Rescue MCP Server
Provides MCP tools for fire rescue simulation.
MCP tools only return DATA - all decisions are made by AI.
Core Operations:
- reset_scenario: Initialize a new simulation
- get_world_state: Get current world state snapshot
- deploy_unit: Deploy a firefighting unit
- move_unit: Move an existing unit to a new position
- step_simulation: Advance simulation by ticks
Data Query Tools:
- find_idle_units: Get units not covering any fires
- find_uncovered_fires: Get fires with no unit coverage
- find_building_threats: Get fires near buildings
- analyze_coverage: Get comprehensive coverage data
"""
import sys
import threading
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import range_text
from models import CellType, UnitType
from simulation import SimulationEngine, SimulationConfig
# Create FastMCP server instance
mcp = FastMCP("Fire-Rescue Simulation")
# Shared simulation engines keyed by session_id
_engines: dict[str, SimulationEngine] = {}
_engine_lock = threading.RLock()
def attach_engine(engine: SimulationEngine, session_id: str) -> None:
"""Allow external callers to reuse their engine inside the MCP server."""
if not session_id:
raise ValueError("session_id is required to attach engine")
with _engine_lock:
_engines[session_id] = engine
# Unit effective ranges (matching SimulationConfig, Chebyshev/square distance)
FIRE_TRUCK_RANGE = 1 # Square coverage radius (includes 8 neighbors)
HELICOPTER_RANGE = 2 # Square coverage radius (extends two cells in all directions)
FIRE_COUNT_RANGE_TEXT = range_text("fire_count")
FIRE_INTENSITY_RANGE_TEXT = range_text("fire_intensity")
BUILDING_COUNT_RANGE_TEXT = range_text("building_count")
MAX_UNITS_RANGE_TEXT = range_text("max_units")
def detach_engine(session_id: str) -> None:
"""Remove engine mapping for a session (best-effort)."""
if not session_id:
return
with _engine_lock:
_engines.pop(session_id, None)
def get_engine(session_id: Optional[str]) -> SimulationEngine:
"""Get the simulation engine for a specific session."""
if not session_id:
raise ValueError("session_id is required")
with _engine_lock:
engine = _engines.get(session_id)
if engine is None:
raise ValueError(f"No simulation engine attached for session '{session_id}'")
return engine
def _get_unit_effective_range(unit_type: str) -> int:
"""Get the effective range for a unit type (for coverage analysis)."""
if unit_type == "fire_truck":
return FIRE_TRUCK_RANGE
elif unit_type == "helicopter":
return HELICOPTER_RANGE
return 2 # Default
def _calculate_distance(x1: int, y1: int, x2: int, y2: int) -> int:
"""Calculate Chebyshev distance (square radius) between two points."""
return max(abs(x1 - x2), abs(y1 - y2))
def _is_in_range(ux: int, uy: int, fx: int, fy: int, unit_type: str) -> bool:
"""Check if a fire position is within unit's effective range."""
effective_range = _get_unit_effective_range(unit_type)
return _calculate_distance(ux, uy, fx, fy) <= effective_range
def generate_emoji_map(engine: SimulationEngine) -> str:
"""
Generate an emoji-based visualization of the current world state.
Legend (matching Gradio UI):
- 🌲 Forest (no fire)
- 🏢 Building (no fire)
- 🔥 Fire (intensity >= 10%)
- 💨 Smoke (smoldering, intensity < 10%)
- 🚒 Fire Truck
- 🚁 Helicopter
"""
if engine.world is None:
return "No map available"
world = engine.world
# Create unit position lookup
unit_positions = {}
for unit in world.units:
key = (unit.x, unit.y)
if key not in unit_positions:
unit_positions[key] = []
unit_positions[key].append(unit.unit_type.value)
# Build the map with coordinates
lines = []
# Header with X coordinates
header = " " + "".join(f"{x:2}" for x in range(world.width))
lines.append(header)
for y in range(world.height):
row_chars = []
for x in range(world.width):
cell = world.grid[y][x]
pos = (x, y)
# Priority: Units > Fire > Terrain
if pos in unit_positions:
if "fire_truck" in unit_positions[pos]:
row_chars.append("🚒")
else:
row_chars.append("🚁")
elif cell.fire_intensity > 0:
# Show fire intensity (matching Gradio: >=10% = fire, <10% = smoke)
if cell.fire_intensity >= 0.1:
row_chars.append("🔥")
else:
row_chars.append("💨")
else:
# Show terrain
if cell.cell_type == CellType.BUILDING:
row_chars.append("🏢")
elif cell.cell_type == CellType.FOREST:
row_chars.append("🌲")
else:
row_chars.append("⬜")
lines.append(f"{y:2} " + "".join(row_chars))
# Add legend (matching Gradio UI)
lines.append("")
lines.append("Legend: 🌲Forest 🏢Building 🔥Fire 💨Smoke 🚒Truck 🚁Heli")
return "\n".join(lines)
def _resolve_engine(session_id: Optional[str]):
"""Return (engine, error_dict) tuple for tool handlers."""
try:
return get_engine(session_id), None
except ValueError as exc:
return None, {"status": "error", "message": str(exc)}
@mcp.tool()
def reset_scenario(
seed: Optional[int] = None,
fire_count: int = 10,
fire_intensity: float = 0.5,
building_count: int = 20,
max_units: int = 10,
session_id: Optional[str] = None,
) -> dict:
f"""
Reset and initialize a new fire rescue simulation scenario.
Args:
seed: Random seed for reproducibility (optional)
fire_count: Number of initial fire points ({FIRE_COUNT_RANGE_TEXT}, default: 10)
fire_intensity: Initial fire intensity ({FIRE_INTENSITY_RANGE_TEXT}, default: 0.5)
building_count: Number of buildings to place ({BUILDING_COUNT_RANGE_TEXT}, default: 20)
max_units: Maximum deployable units ({MAX_UNITS_RANGE_TEXT}, default: 10)
Returns:
Status, summary, and emoji map of the initial state
"""
engine, error = _resolve_engine(session_id)
if error:
return error
world = engine.reset(
seed=seed,
fire_count=fire_count,
fire_intensity=fire_intensity,
building_count=building_count,
max_units=max_units
)
fires = world.get_fires()
return {
"status": "ok",
"tick": world.tick,
"grid_size": f"{world.width}x{world.height}",
"initial_fires": len(fires),
"buildings": len(world.building_positions),
"max_units": world.max_units,
"max_ticks": world.max_ticks,
"emoji_map": generate_emoji_map(engine)
}
@mcp.tool()
def get_world_state(session_id: Optional[str] = None) -> dict:
"""
Get the current world state snapshot with emoji map visualization.
Returns complete state of the simulation including:
- Current tick number
- Emoji map showing the battlefield visually
- Fire locations and intensities
- Deployed units and their positions
- Building integrity and forest burn ratio
- Recent events
The emoji_map provides a visual overview:
🌲 Forest | 🏢 Building | 🔥 Fire (>=10%) | 💨 Smoke (<10%)
🚒 Fire Truck | 🚁 Helicopter
"""
engine, error = _resolve_engine(session_id)
if error:
return error
if engine.world is None:
return {
"status": "error",
"message": "Simulation not initialized. Call reset_scenario first."
}
state = engine.get_state()
state["emoji_map"] = generate_emoji_map(engine)
return state
@mcp.tool()
def deploy_unit(
unit_type: str,
x: int,
y: int,
source: str = "player",
session_id: Optional[str] = None,
) -> dict:
"""
Deploy a firefighting unit at the specified position.
Args:
unit_type: Type of unit - "fire_truck" or "helicopter"
x: X coordinate (0 to grid_width-1)
y: Y coordinate (0 to grid_height-1)
source: Who initiated the deployment - "player", "player_accept", "auto_accept_ai"
Returns:
Status and details of the deployed unit
"""
engine, error = _resolve_engine(session_id)
if error:
return error
return engine.deploy_unit(unit_type, x, y, source)
@mcp.tool()
def step_simulation(ticks: int = 1, session_id: Optional[str] = None) -> dict:
"""
Advance the simulation by the specified number of ticks.
Args:
ticks: Number of ticks to advance (default: 1)
Returns:
Current world state with emoji map after advancing
"""
engine, error = _resolve_engine(session_id)
if error:
return error
if engine.world is None:
return {
"status": "error",
"message": "Simulation not initialized. Call reset_scenario first."
}
for _ in range(ticks):
engine.step()
state = engine.get_state()
state["emoji_map"] = generate_emoji_map(engine)
return state
@mcp.tool()
def move_unit(
source_x: int,
source_y: int,
target_x: int,
target_y: int,
session_id: Optional[str] = None,
) -> dict:
"""
Move an existing unit from source position to target position.
Useful for repositioning idle units to cover uncovered fires.
Args:
source_x: Current X coordinate of the unit
source_y: Current Y coordinate of the unit
target_x: New X coordinate to move to
target_y: New Y coordinate to move to
Returns:
Status and details of the move operation
"""
engine, error = _resolve_engine(session_id)
if error:
return error
if engine.world is None:
return {
"status": "error",
"message": "Simulation not initialized. Call reset_scenario first."
}
# Find unit at source position
unit_to_move = None
for unit in engine.world.units:
if unit.x == source_x and unit.y == source_y:
unit_to_move = unit
break
if unit_to_move is None:
return {
"status": "error",
"message": f"No unit found at source position ({source_x}, {source_y})"
}
unit_type = unit_to_move.unit_type.value
# Remove unit from source
remove_result = engine.remove_unit_at(source_x, source_y)
if remove_result.get("status") != "ok":
return {
"status": "error",
"message": f"Failed to remove unit: {remove_result.get('message')}"
}
# Deploy unit at target
deploy_result = engine.deploy_unit(unit_type, target_x, target_y, "ai_move")
if deploy_result.get("status") != "ok":
# Restore unit at original position
engine.deploy_unit(unit_type, source_x, source_y, "ai_restore")
return {
"status": "error",
"message": f"Failed to deploy at target: {deploy_result.get('message')}. Unit restored to original position."
}
return {
"status": "ok",
"unit_type": unit_type,
"source": {"x": source_x, "y": source_y},
"target": {"x": target_x, "y": target_y},
"message": f"Moved {unit_type} from ({source_x}, {source_y}) to ({target_x}, {target_y})"
}
@mcp.tool()
def remove_unit(
x: int,
y: int,
session_id: Optional[str] = None,
) -> dict:
"""
Remove an existing unit at the specified position.
Use this to free up a deployment slot, then deploy_unit to place a new unit elsewhere.
Args:
x: X coordinate of the unit to remove
y: Y coordinate of the unit to remove
Returns:
Status and details of the removed unit
Example use cases:
- Remove ineffective truck, then deploy helicopter at better position
- Free up deployment slot when unit is no longer needed
- Reposition unit: remove_unit + deploy_unit at new location
"""
engine, error = _resolve_engine(session_id)
if error:
return error
if engine.world is None:
return {
"status": "error",
"message": "Simulation not initialized. Call reset_scenario first."
}
# Find unit at position
unit_to_remove = None
for unit in engine.world.units:
if unit.x == x and unit.y == y:
unit_to_remove = unit
break
if unit_to_remove is None:
return {
"status": "error",
"message": f"No unit found at position ({x}, {y})"
}
removed_unit_type = unit_to_remove.unit_type.value
# Remove the unit
remove_result = engine.remove_unit_at(x, y)
if remove_result.get("status") != "ok":
return {
"status": "error",
"message": f"Failed to remove unit: {remove_result.get('message')}"
}
return {
"status": "ok",
"removed_unit_type": removed_unit_type,
"position": {"x": x, "y": y},
"message": f"Removed {removed_unit_type} at ({x}, {y}). You can now deploy a new unit."
}
@mcp.tool()
def find_idle_units(session_id: Optional[str] = None) -> dict:
"""
Find units that are not covering any fires (idle/ineffective units).
An idle unit is one where NO fires exist within its effective range:
- Fire Truck effective range: 2 cells
- Helicopter effective range: 3 cells
Returns:
List of idle units and effective units with their positions
"""
engine, error = _resolve_engine(session_id)
if error:
return error
if engine.world is None:
return {
"status": "error",
"message": "Simulation not initialized. Call reset_scenario first."
}
world = engine.world
fires = world.get_fires()
fire_positions = [(f.x, f.y, f.intensity) for f in fires]
idle_units = []
effective_units = []
for unit in world.units:
unit_type = unit.unit_type.value
has_fire_in_range = False
# Check if any fire is within this unit's effective range
for fx, fy, intensity in fire_positions:
if _is_in_range(unit.x, unit.y, fx, fy, unit_type):
has_fire_in_range = True
break
unit_info = {
"x": unit.x,
"y": unit.y,
"type": unit_type,
"effective_range": _get_unit_effective_range(unit_type)
}
if has_fire_in_range:
effective_units.append(unit_info)
else:
idle_units.append(unit_info)
return {
"status": "ok",
"idle_units": idle_units,
"idle_count": len(idle_units),
"effective_units": effective_units,
"effective_count": len(effective_units),
"total_units": len(world.units)
}
@mcp.tool()
def find_uncovered_fires(session_id: Optional[str] = None) -> dict:
"""
Find fires that have NO unit coverage.
An uncovered fire is one where NO unit is within effective range:
- Fire Truck range: 2 cells
- Helicopter range: 3 cells
Returns:
List of uncovered fires with their positions, intensity, and building threat status
"""
engine, error = _resolve_engine(session_id)
if error:
return error
if engine.world is None:
return {
"status": "error",
"message": "Simulation not initialized. Call reset_scenario first."
}
world = engine.world
fires = world.get_fires()
units = world.units
building_positions = set(world.building_positions)
uncovered_fires = []
covered_fires = []
for fire in fires:
is_covered = False
# Check if any unit covers this fire
for unit in units:
unit_type = unit.unit_type.value
if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit_type):
is_covered = True
break
# Check if fire threatens any building (within 2 cells)
threatens_building = False
for bx, by in building_positions:
if _calculate_distance(fire.x, fire.y, bx, by) <= 2:
threatens_building = True
break
fire_info = {
"x": fire.x,
"y": fire.y,
"intensity": round(fire.intensity, 2),
"threatens_building": threatens_building
}
if is_covered:
covered_fires.append(fire_info)
else:
uncovered_fires.append(fire_info)
return {
"status": "ok",
"uncovered_fires": uncovered_fires,
"uncovered_count": len(uncovered_fires),
"covered_fires": covered_fires,
"covered_count": len(covered_fires),
"total_fires": len(fires),
"coverage_ratio": round(len(covered_fires) / len(fires), 2) if fires else 1.0
}
@mcp.tool()
def find_building_threats(session_id: Optional[str] = None) -> dict:
"""
Find fires that are threatening buildings (within 2 cells of any building).
Returns:
List of building-threatening fires with their positions, threatened buildings, and coverage status
"""
engine, error = _resolve_engine(session_id)
if error:
return error
if engine.world is None:
return {
"status": "error",
"message": "Simulation not initialized. Call reset_scenario first."
}
world = engine.world
fires = world.get_fires()
units = world.units
building_positions = set(world.building_positions)
building_threats = []
for fire in fires:
# Check if fire threatens any building
threatened_buildings = []
for bx, by in building_positions:
dist = _calculate_distance(fire.x, fire.y, bx, by)
if dist <= 2:
threatened_buildings.append({"x": bx, "y": by, "distance": dist})
if not threatened_buildings:
continue
# Check if this fire is covered
is_covered = False
covering_unit = None
for unit in units:
unit_type = unit.unit_type.value
if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit_type):
is_covered = True
covering_unit = {"x": unit.x, "y": unit.y, "type": unit_type}
break
building_threats.append({
"fire": {"x": fire.x, "y": fire.y, "intensity": round(fire.intensity, 2)},
"threatened_buildings": threatened_buildings,
"is_covered": is_covered,
"covering_unit": covering_unit
})
uncovered_threats = [t for t in building_threats if not t["is_covered"]]
return {
"status": "ok",
"building_threats": building_threats,
"total_threats": len(building_threats),
"uncovered_threats": len(uncovered_threats),
"building_integrity": round(world.building_integrity, 2)
}
@mcp.tool()
def analyze_coverage(session_id: Optional[str] = None) -> dict:
"""
Get comprehensive coverage analysis data.
This tool combines information from multiple analyses:
- Idle units (not covering any fire)
- Uncovered fires (no unit in range)
- Building threats (fires near buildings)
- High intensity fires
Returns:
Comprehensive data about fires, units, and coverage status
"""
engine, error = _resolve_engine(session_id)
if error:
return error
if engine.world is None:
return {
"status": "error",
"message": "Simulation not initialized. Call reset_scenario first."
}
world = engine.world
fires = world.get_fires()
units = world.units
building_positions = set(world.building_positions)
# Analyze fires
fire_analysis = {
"total": len(fires),
"high_intensity": [], # >70%
"building_threats": [], # within 2 cells of building
"uncovered": [] # no unit in range
}
# Analyze units
unit_analysis = {
"total": len(units),
"max_units": world.max_units,
"available_slots": world.max_units - len(units),
"idle": [], # no fire in range
"effective": [] # has fire in range
}
# Process fires
for fire in fires:
fire_info = {"x": fire.x, "y": fire.y, "intensity": round(fire.intensity, 2)}
# High intensity check
if fire.intensity > 0.7:
fire_analysis["high_intensity"].append(fire_info)
# Building threat check
for bx, by in building_positions:
if _calculate_distance(fire.x, fire.y, bx, by) <= 2:
fire_analysis["building_threats"].append(fire_info)
break
# Coverage check
is_covered = False
for unit in units:
if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit.unit_type.value):
is_covered = True
break
if not is_covered:
fire_analysis["uncovered"].append(fire_info)
# Process units
fire_positions = [(f.x, f.y) for f in fires]
for unit in units:
unit_info = {"x": unit.x, "y": unit.y, "type": unit.unit_type.value}
has_fire = False
for fx, fy in fire_positions:
if _is_in_range(unit.x, unit.y, fx, fy, unit.unit_type.value):
has_fire = True
break
if has_fire:
unit_analysis["effective"].append(unit_info)
else:
unit_analysis["idle"].append(unit_info)
# Calculate coverage ratio
coverage_ratio = 1.0
if fires:
covered_count = len(fires) - len(fire_analysis["uncovered"])
coverage_ratio = covered_count / len(fires)
return {
"status": "ok",
"building_integrity": round(world.building_integrity, 2),
"coverage_ratio": round(coverage_ratio, 2),
"fire_analysis": {
"total_fires": fire_analysis["total"],
"high_intensity_count": len(fire_analysis["high_intensity"]),
"high_intensity_fires": fire_analysis["high_intensity"],
"building_threat_count": len(fire_analysis["building_threats"]),
"building_threat_fires": fire_analysis["building_threats"],
"uncovered_count": len(fire_analysis["uncovered"]),
"uncovered_fires": fire_analysis["uncovered"]
},
"unit_analysis": {
"deployed": unit_analysis["total"],
"max_units": unit_analysis["max_units"],
"available_slots": unit_analysis["available_slots"],
"idle_count": len(unit_analysis["idle"]),
"idle_units": unit_analysis["idle"],
"effective_count": len(unit_analysis["effective"]),
"effective_units": unit_analysis["effective"]
},
"emoji_map": generate_emoji_map(engine)
}
def run_server():
"""Run the MCP server with stdio transport."""
mcp.run()
if __name__ == "__main__":
run_server()