|
|
""" |
|
|
Fire-Rescue MCP Server |
|
|
|
|
|
Provides MCP tools for fire rescue simulation. |
|
|
MCP tools only return DATA - all decisions are made by AI. |
|
|
|
|
|
Core Operations: |
|
|
- reset_scenario: Initialize a new simulation |
|
|
- get_world_state: Get current world state snapshot |
|
|
- deploy_unit: Deploy a firefighting unit |
|
|
- move_unit: Move an existing unit to a new position |
|
|
- step_simulation: Advance simulation by ticks |
|
|
|
|
|
Data Query Tools: |
|
|
- find_idle_units: Get units not covering any fires |
|
|
- find_uncovered_fires: Get fires with no unit coverage |
|
|
- find_building_threats: Get fires near buildings |
|
|
- analyze_coverage: Get comprehensive coverage data |
|
|
""" |
|
|
|
|
|
import sys |
|
|
import threading |
|
|
from pathlib import Path |
|
|
from typing import Optional |
|
|
|
|
|
from mcp.server.fastmcp import FastMCP |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
from config import range_text |
|
|
from models import CellType, UnitType |
|
|
from simulation import SimulationEngine, SimulationConfig |
|
|
|
|
|
|
|
|
mcp = FastMCP("Fire-Rescue Simulation") |
|
|
|
|
|
|
|
|
_engines: dict[str, SimulationEngine] = {} |
|
|
_engine_lock = threading.RLock() |
|
|
|
|
|
|
|
|
def attach_engine(engine: SimulationEngine, session_id: str) -> None: |
|
|
"""Allow external callers to reuse their engine inside the MCP server.""" |
|
|
if not session_id: |
|
|
raise ValueError("session_id is required to attach engine") |
|
|
with _engine_lock: |
|
|
_engines[session_id] = engine |
|
|
|
|
|
|
|
|
FIRE_TRUCK_RANGE = 1 |
|
|
HELICOPTER_RANGE = 2 |
|
|
FIRE_COUNT_RANGE_TEXT = range_text("fire_count") |
|
|
FIRE_INTENSITY_RANGE_TEXT = range_text("fire_intensity") |
|
|
BUILDING_COUNT_RANGE_TEXT = range_text("building_count") |
|
|
MAX_UNITS_RANGE_TEXT = range_text("max_units") |
|
|
|
|
|
|
|
|
def detach_engine(session_id: str) -> None: |
|
|
"""Remove engine mapping for a session (best-effort).""" |
|
|
if not session_id: |
|
|
return |
|
|
with _engine_lock: |
|
|
_engines.pop(session_id, None) |
|
|
|
|
|
|
|
|
def get_engine(session_id: Optional[str]) -> SimulationEngine: |
|
|
"""Get the simulation engine for a specific session.""" |
|
|
if not session_id: |
|
|
raise ValueError("session_id is required") |
|
|
with _engine_lock: |
|
|
engine = _engines.get(session_id) |
|
|
if engine is None: |
|
|
raise ValueError(f"No simulation engine attached for session '{session_id}'") |
|
|
return engine |
|
|
|
|
|
|
|
|
def _get_unit_effective_range(unit_type: str) -> int: |
|
|
"""Get the effective range for a unit type (for coverage analysis).""" |
|
|
if unit_type == "fire_truck": |
|
|
return FIRE_TRUCK_RANGE |
|
|
elif unit_type == "helicopter": |
|
|
return HELICOPTER_RANGE |
|
|
return 2 |
|
|
|
|
|
|
|
|
def _calculate_distance(x1: int, y1: int, x2: int, y2: int) -> int: |
|
|
"""Calculate Chebyshev distance (square radius) between two points.""" |
|
|
return max(abs(x1 - x2), abs(y1 - y2)) |
|
|
|
|
|
|
|
|
def _is_in_range(ux: int, uy: int, fx: int, fy: int, unit_type: str) -> bool: |
|
|
"""Check if a fire position is within unit's effective range.""" |
|
|
effective_range = _get_unit_effective_range(unit_type) |
|
|
return _calculate_distance(ux, uy, fx, fy) <= effective_range |
|
|
|
|
|
|
|
|
def generate_emoji_map(engine: SimulationEngine) -> str: |
|
|
""" |
|
|
Generate an emoji-based visualization of the current world state. |
|
|
|
|
|
Legend (matching Gradio UI): |
|
|
- 🌲 Forest (no fire) |
|
|
- 🏢 Building (no fire) |
|
|
- 🔥 Fire (intensity >= 10%) |
|
|
- 💨 Smoke (smoldering, intensity < 10%) |
|
|
- 🚒 Fire Truck |
|
|
- 🚁 Helicopter |
|
|
""" |
|
|
if engine.world is None: |
|
|
return "No map available" |
|
|
|
|
|
world = engine.world |
|
|
|
|
|
|
|
|
unit_positions = {} |
|
|
for unit in world.units: |
|
|
key = (unit.x, unit.y) |
|
|
if key not in unit_positions: |
|
|
unit_positions[key] = [] |
|
|
unit_positions[key].append(unit.unit_type.value) |
|
|
|
|
|
|
|
|
lines = [] |
|
|
|
|
|
|
|
|
header = " " + "".join(f"{x:2}" for x in range(world.width)) |
|
|
lines.append(header) |
|
|
|
|
|
for y in range(world.height): |
|
|
row_chars = [] |
|
|
for x in range(world.width): |
|
|
cell = world.grid[y][x] |
|
|
pos = (x, y) |
|
|
|
|
|
|
|
|
if pos in unit_positions: |
|
|
if "fire_truck" in unit_positions[pos]: |
|
|
row_chars.append("🚒") |
|
|
else: |
|
|
row_chars.append("🚁") |
|
|
elif cell.fire_intensity > 0: |
|
|
|
|
|
if cell.fire_intensity >= 0.1: |
|
|
row_chars.append("🔥") |
|
|
else: |
|
|
row_chars.append("💨") |
|
|
else: |
|
|
|
|
|
if cell.cell_type == CellType.BUILDING: |
|
|
row_chars.append("🏢") |
|
|
elif cell.cell_type == CellType.FOREST: |
|
|
row_chars.append("🌲") |
|
|
else: |
|
|
row_chars.append("⬜") |
|
|
|
|
|
lines.append(f"{y:2} " + "".join(row_chars)) |
|
|
|
|
|
|
|
|
lines.append("") |
|
|
lines.append("Legend: 🌲Forest 🏢Building 🔥Fire 💨Smoke 🚒Truck 🚁Heli") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def _resolve_engine(session_id: Optional[str]): |
|
|
"""Return (engine, error_dict) tuple for tool handlers.""" |
|
|
try: |
|
|
return get_engine(session_id), None |
|
|
except ValueError as exc: |
|
|
return None, {"status": "error", "message": str(exc)} |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def reset_scenario( |
|
|
seed: Optional[int] = None, |
|
|
fire_count: int = 10, |
|
|
fire_intensity: float = 0.5, |
|
|
building_count: int = 20, |
|
|
max_units: int = 10, |
|
|
session_id: Optional[str] = None, |
|
|
) -> dict: |
|
|
f""" |
|
|
Reset and initialize a new fire rescue simulation scenario. |
|
|
|
|
|
Args: |
|
|
seed: Random seed for reproducibility (optional) |
|
|
fire_count: Number of initial fire points ({FIRE_COUNT_RANGE_TEXT}, default: 10) |
|
|
fire_intensity: Initial fire intensity ({FIRE_INTENSITY_RANGE_TEXT}, default: 0.5) |
|
|
building_count: Number of buildings to place ({BUILDING_COUNT_RANGE_TEXT}, default: 20) |
|
|
max_units: Maximum deployable units ({MAX_UNITS_RANGE_TEXT}, default: 10) |
|
|
|
|
|
Returns: |
|
|
Status, summary, and emoji map of the initial state |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
world = engine.reset( |
|
|
seed=seed, |
|
|
fire_count=fire_count, |
|
|
fire_intensity=fire_intensity, |
|
|
building_count=building_count, |
|
|
max_units=max_units |
|
|
) |
|
|
fires = world.get_fires() |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"tick": world.tick, |
|
|
"grid_size": f"{world.width}x{world.height}", |
|
|
"initial_fires": len(fires), |
|
|
"buildings": len(world.building_positions), |
|
|
"max_units": world.max_units, |
|
|
"max_ticks": world.max_ticks, |
|
|
"emoji_map": generate_emoji_map(engine) |
|
|
} |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def get_world_state(session_id: Optional[str] = None) -> dict: |
|
|
""" |
|
|
Get the current world state snapshot with emoji map visualization. |
|
|
|
|
|
Returns complete state of the simulation including: |
|
|
- Current tick number |
|
|
- Emoji map showing the battlefield visually |
|
|
- Fire locations and intensities |
|
|
- Deployed units and their positions |
|
|
- Building integrity and forest burn ratio |
|
|
- Recent events |
|
|
|
|
|
The emoji_map provides a visual overview: |
|
|
🌲 Forest | 🏢 Building | 🔥 Fire (>=10%) | 💨 Smoke (<10%) |
|
|
🚒 Fire Truck | 🚁 Helicopter |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
if engine.world is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Simulation not initialized. Call reset_scenario first." |
|
|
} |
|
|
|
|
|
state = engine.get_state() |
|
|
state["emoji_map"] = generate_emoji_map(engine) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def deploy_unit( |
|
|
unit_type: str, |
|
|
x: int, |
|
|
y: int, |
|
|
source: str = "player", |
|
|
session_id: Optional[str] = None, |
|
|
) -> dict: |
|
|
""" |
|
|
Deploy a firefighting unit at the specified position. |
|
|
|
|
|
Args: |
|
|
unit_type: Type of unit - "fire_truck" or "helicopter" |
|
|
x: X coordinate (0 to grid_width-1) |
|
|
y: Y coordinate (0 to grid_height-1) |
|
|
source: Who initiated the deployment - "player", "player_accept", "auto_accept_ai" |
|
|
|
|
|
Returns: |
|
|
Status and details of the deployed unit |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
return engine.deploy_unit(unit_type, x, y, source) |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def step_simulation(ticks: int = 1, session_id: Optional[str] = None) -> dict: |
|
|
""" |
|
|
Advance the simulation by the specified number of ticks. |
|
|
|
|
|
Args: |
|
|
ticks: Number of ticks to advance (default: 1) |
|
|
|
|
|
Returns: |
|
|
Current world state with emoji map after advancing |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
if engine.world is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Simulation not initialized. Call reset_scenario first." |
|
|
} |
|
|
|
|
|
for _ in range(ticks): |
|
|
engine.step() |
|
|
|
|
|
state = engine.get_state() |
|
|
state["emoji_map"] = generate_emoji_map(engine) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def move_unit( |
|
|
source_x: int, |
|
|
source_y: int, |
|
|
target_x: int, |
|
|
target_y: int, |
|
|
session_id: Optional[str] = None, |
|
|
) -> dict: |
|
|
""" |
|
|
Move an existing unit from source position to target position. |
|
|
Useful for repositioning idle units to cover uncovered fires. |
|
|
|
|
|
Args: |
|
|
source_x: Current X coordinate of the unit |
|
|
source_y: Current Y coordinate of the unit |
|
|
target_x: New X coordinate to move to |
|
|
target_y: New Y coordinate to move to |
|
|
|
|
|
Returns: |
|
|
Status and details of the move operation |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
if engine.world is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Simulation not initialized. Call reset_scenario first." |
|
|
} |
|
|
|
|
|
|
|
|
unit_to_move = None |
|
|
for unit in engine.world.units: |
|
|
if unit.x == source_x and unit.y == source_y: |
|
|
unit_to_move = unit |
|
|
break |
|
|
|
|
|
if unit_to_move is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": f"No unit found at source position ({source_x}, {source_y})" |
|
|
} |
|
|
|
|
|
unit_type = unit_to_move.unit_type.value |
|
|
|
|
|
|
|
|
remove_result = engine.remove_unit_at(source_x, source_y) |
|
|
if remove_result.get("status") != "ok": |
|
|
return { |
|
|
"status": "error", |
|
|
"message": f"Failed to remove unit: {remove_result.get('message')}" |
|
|
} |
|
|
|
|
|
|
|
|
deploy_result = engine.deploy_unit(unit_type, target_x, target_y, "ai_move") |
|
|
if deploy_result.get("status") != "ok": |
|
|
|
|
|
engine.deploy_unit(unit_type, source_x, source_y, "ai_restore") |
|
|
return { |
|
|
"status": "error", |
|
|
"message": f"Failed to deploy at target: {deploy_result.get('message')}. Unit restored to original position." |
|
|
} |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"unit_type": unit_type, |
|
|
"source": {"x": source_x, "y": source_y}, |
|
|
"target": {"x": target_x, "y": target_y}, |
|
|
"message": f"Moved {unit_type} from ({source_x}, {source_y}) to ({target_x}, {target_y})" |
|
|
} |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def remove_unit( |
|
|
x: int, |
|
|
y: int, |
|
|
session_id: Optional[str] = None, |
|
|
) -> dict: |
|
|
""" |
|
|
Remove an existing unit at the specified position. |
|
|
Use this to free up a deployment slot, then deploy_unit to place a new unit elsewhere. |
|
|
|
|
|
Args: |
|
|
x: X coordinate of the unit to remove |
|
|
y: Y coordinate of the unit to remove |
|
|
|
|
|
Returns: |
|
|
Status and details of the removed unit |
|
|
|
|
|
Example use cases: |
|
|
- Remove ineffective truck, then deploy helicopter at better position |
|
|
- Free up deployment slot when unit is no longer needed |
|
|
- Reposition unit: remove_unit + deploy_unit at new location |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
if engine.world is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Simulation not initialized. Call reset_scenario first." |
|
|
} |
|
|
|
|
|
|
|
|
unit_to_remove = None |
|
|
for unit in engine.world.units: |
|
|
if unit.x == x and unit.y == y: |
|
|
unit_to_remove = unit |
|
|
break |
|
|
|
|
|
if unit_to_remove is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": f"No unit found at position ({x}, {y})" |
|
|
} |
|
|
|
|
|
removed_unit_type = unit_to_remove.unit_type.value |
|
|
|
|
|
|
|
|
remove_result = engine.remove_unit_at(x, y) |
|
|
if remove_result.get("status") != "ok": |
|
|
return { |
|
|
"status": "error", |
|
|
"message": f"Failed to remove unit: {remove_result.get('message')}" |
|
|
} |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"removed_unit_type": removed_unit_type, |
|
|
"position": {"x": x, "y": y}, |
|
|
"message": f"Removed {removed_unit_type} at ({x}, {y}). You can now deploy a new unit." |
|
|
} |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def find_idle_units(session_id: Optional[str] = None) -> dict: |
|
|
""" |
|
|
Find units that are not covering any fires (idle/ineffective units). |
|
|
|
|
|
An idle unit is one where NO fires exist within its effective range: |
|
|
- Fire Truck effective range: 2 cells |
|
|
- Helicopter effective range: 3 cells |
|
|
|
|
|
Returns: |
|
|
List of idle units and effective units with their positions |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
if engine.world is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Simulation not initialized. Call reset_scenario first." |
|
|
} |
|
|
|
|
|
world = engine.world |
|
|
fires = world.get_fires() |
|
|
fire_positions = [(f.x, f.y, f.intensity) for f in fires] |
|
|
|
|
|
idle_units = [] |
|
|
effective_units = [] |
|
|
|
|
|
for unit in world.units: |
|
|
unit_type = unit.unit_type.value |
|
|
has_fire_in_range = False |
|
|
|
|
|
|
|
|
for fx, fy, intensity in fire_positions: |
|
|
if _is_in_range(unit.x, unit.y, fx, fy, unit_type): |
|
|
has_fire_in_range = True |
|
|
break |
|
|
|
|
|
unit_info = { |
|
|
"x": unit.x, |
|
|
"y": unit.y, |
|
|
"type": unit_type, |
|
|
"effective_range": _get_unit_effective_range(unit_type) |
|
|
} |
|
|
|
|
|
if has_fire_in_range: |
|
|
effective_units.append(unit_info) |
|
|
else: |
|
|
idle_units.append(unit_info) |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"idle_units": idle_units, |
|
|
"idle_count": len(idle_units), |
|
|
"effective_units": effective_units, |
|
|
"effective_count": len(effective_units), |
|
|
"total_units": len(world.units) |
|
|
} |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def find_uncovered_fires(session_id: Optional[str] = None) -> dict: |
|
|
""" |
|
|
Find fires that have NO unit coverage. |
|
|
|
|
|
An uncovered fire is one where NO unit is within effective range: |
|
|
- Fire Truck range: 2 cells |
|
|
- Helicopter range: 3 cells |
|
|
|
|
|
Returns: |
|
|
List of uncovered fires with their positions, intensity, and building threat status |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
if engine.world is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Simulation not initialized. Call reset_scenario first." |
|
|
} |
|
|
|
|
|
world = engine.world |
|
|
fires = world.get_fires() |
|
|
units = world.units |
|
|
building_positions = set(world.building_positions) |
|
|
|
|
|
uncovered_fires = [] |
|
|
covered_fires = [] |
|
|
|
|
|
for fire in fires: |
|
|
is_covered = False |
|
|
|
|
|
|
|
|
for unit in units: |
|
|
unit_type = unit.unit_type.value |
|
|
if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit_type): |
|
|
is_covered = True |
|
|
break |
|
|
|
|
|
|
|
|
threatens_building = False |
|
|
for bx, by in building_positions: |
|
|
if _calculate_distance(fire.x, fire.y, bx, by) <= 2: |
|
|
threatens_building = True |
|
|
break |
|
|
|
|
|
fire_info = { |
|
|
"x": fire.x, |
|
|
"y": fire.y, |
|
|
"intensity": round(fire.intensity, 2), |
|
|
"threatens_building": threatens_building |
|
|
} |
|
|
|
|
|
if is_covered: |
|
|
covered_fires.append(fire_info) |
|
|
else: |
|
|
uncovered_fires.append(fire_info) |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"uncovered_fires": uncovered_fires, |
|
|
"uncovered_count": len(uncovered_fires), |
|
|
"covered_fires": covered_fires, |
|
|
"covered_count": len(covered_fires), |
|
|
"total_fires": len(fires), |
|
|
"coverage_ratio": round(len(covered_fires) / len(fires), 2) if fires else 1.0 |
|
|
} |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def find_building_threats(session_id: Optional[str] = None) -> dict: |
|
|
""" |
|
|
Find fires that are threatening buildings (within 2 cells of any building). |
|
|
|
|
|
Returns: |
|
|
List of building-threatening fires with their positions, threatened buildings, and coverage status |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
if engine.world is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Simulation not initialized. Call reset_scenario first." |
|
|
} |
|
|
|
|
|
world = engine.world |
|
|
fires = world.get_fires() |
|
|
units = world.units |
|
|
building_positions = set(world.building_positions) |
|
|
|
|
|
building_threats = [] |
|
|
|
|
|
for fire in fires: |
|
|
|
|
|
threatened_buildings = [] |
|
|
for bx, by in building_positions: |
|
|
dist = _calculate_distance(fire.x, fire.y, bx, by) |
|
|
if dist <= 2: |
|
|
threatened_buildings.append({"x": bx, "y": by, "distance": dist}) |
|
|
|
|
|
if not threatened_buildings: |
|
|
continue |
|
|
|
|
|
|
|
|
is_covered = False |
|
|
covering_unit = None |
|
|
for unit in units: |
|
|
unit_type = unit.unit_type.value |
|
|
if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit_type): |
|
|
is_covered = True |
|
|
covering_unit = {"x": unit.x, "y": unit.y, "type": unit_type} |
|
|
break |
|
|
|
|
|
building_threats.append({ |
|
|
"fire": {"x": fire.x, "y": fire.y, "intensity": round(fire.intensity, 2)}, |
|
|
"threatened_buildings": threatened_buildings, |
|
|
"is_covered": is_covered, |
|
|
"covering_unit": covering_unit |
|
|
}) |
|
|
|
|
|
uncovered_threats = [t for t in building_threats if not t["is_covered"]] |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"building_threats": building_threats, |
|
|
"total_threats": len(building_threats), |
|
|
"uncovered_threats": len(uncovered_threats), |
|
|
"building_integrity": round(world.building_integrity, 2) |
|
|
} |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def analyze_coverage(session_id: Optional[str] = None) -> dict: |
|
|
""" |
|
|
Get comprehensive coverage analysis data. |
|
|
|
|
|
This tool combines information from multiple analyses: |
|
|
- Idle units (not covering any fire) |
|
|
- Uncovered fires (no unit in range) |
|
|
- Building threats (fires near buildings) |
|
|
- High intensity fires |
|
|
|
|
|
Returns: |
|
|
Comprehensive data about fires, units, and coverage status |
|
|
""" |
|
|
engine, error = _resolve_engine(session_id) |
|
|
if error: |
|
|
return error |
|
|
|
|
|
if engine.world is None: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Simulation not initialized. Call reset_scenario first." |
|
|
} |
|
|
|
|
|
world = engine.world |
|
|
fires = world.get_fires() |
|
|
units = world.units |
|
|
building_positions = set(world.building_positions) |
|
|
|
|
|
|
|
|
fire_analysis = { |
|
|
"total": len(fires), |
|
|
"high_intensity": [], |
|
|
"building_threats": [], |
|
|
"uncovered": [] |
|
|
} |
|
|
|
|
|
|
|
|
unit_analysis = { |
|
|
"total": len(units), |
|
|
"max_units": world.max_units, |
|
|
"available_slots": world.max_units - len(units), |
|
|
"idle": [], |
|
|
"effective": [] |
|
|
} |
|
|
|
|
|
|
|
|
for fire in fires: |
|
|
fire_info = {"x": fire.x, "y": fire.y, "intensity": round(fire.intensity, 2)} |
|
|
|
|
|
|
|
|
if fire.intensity > 0.7: |
|
|
fire_analysis["high_intensity"].append(fire_info) |
|
|
|
|
|
|
|
|
for bx, by in building_positions: |
|
|
if _calculate_distance(fire.x, fire.y, bx, by) <= 2: |
|
|
fire_analysis["building_threats"].append(fire_info) |
|
|
break |
|
|
|
|
|
|
|
|
is_covered = False |
|
|
for unit in units: |
|
|
if _is_in_range(unit.x, unit.y, fire.x, fire.y, unit.unit_type.value): |
|
|
is_covered = True |
|
|
break |
|
|
if not is_covered: |
|
|
fire_analysis["uncovered"].append(fire_info) |
|
|
|
|
|
|
|
|
fire_positions = [(f.x, f.y) for f in fires] |
|
|
for unit in units: |
|
|
unit_info = {"x": unit.x, "y": unit.y, "type": unit.unit_type.value} |
|
|
|
|
|
has_fire = False |
|
|
for fx, fy in fire_positions: |
|
|
if _is_in_range(unit.x, unit.y, fx, fy, unit.unit_type.value): |
|
|
has_fire = True |
|
|
break |
|
|
|
|
|
if has_fire: |
|
|
unit_analysis["effective"].append(unit_info) |
|
|
else: |
|
|
unit_analysis["idle"].append(unit_info) |
|
|
|
|
|
|
|
|
coverage_ratio = 1.0 |
|
|
if fires: |
|
|
covered_count = len(fires) - len(fire_analysis["uncovered"]) |
|
|
coverage_ratio = covered_count / len(fires) |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"building_integrity": round(world.building_integrity, 2), |
|
|
"coverage_ratio": round(coverage_ratio, 2), |
|
|
"fire_analysis": { |
|
|
"total_fires": fire_analysis["total"], |
|
|
"high_intensity_count": len(fire_analysis["high_intensity"]), |
|
|
"high_intensity_fires": fire_analysis["high_intensity"], |
|
|
"building_threat_count": len(fire_analysis["building_threats"]), |
|
|
"building_threat_fires": fire_analysis["building_threats"], |
|
|
"uncovered_count": len(fire_analysis["uncovered"]), |
|
|
"uncovered_fires": fire_analysis["uncovered"] |
|
|
}, |
|
|
"unit_analysis": { |
|
|
"deployed": unit_analysis["total"], |
|
|
"max_units": unit_analysis["max_units"], |
|
|
"available_slots": unit_analysis["available_slots"], |
|
|
"idle_count": len(unit_analysis["idle"]), |
|
|
"idle_units": unit_analysis["idle"], |
|
|
"effective_count": len(unit_analysis["effective"]), |
|
|
"effective_units": unit_analysis["effective"] |
|
|
}, |
|
|
"emoji_map": generate_emoji_map(engine) |
|
|
} |
|
|
|
|
|
|
|
|
def run_server(): |
|
|
"""Run the MCP server with stdio transport.""" |
|
|
mcp.run() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
run_server() |
|
|
|