|
|
""" |
|
|
Fire-Rescue - Simulation Core |
|
|
|
|
|
Handles fire spread, unit behavior, and win/lose conditions. |
|
|
""" |
|
|
|
|
|
import random |
|
|
from typing import Optional |
|
|
|
|
|
from config import range_text |
|
|
from models import ( |
|
|
WorldState, |
|
|
Cell, |
|
|
CellType, |
|
|
Unit, |
|
|
UnitType, |
|
|
SimulationStatus, |
|
|
Event, |
|
|
) |
|
|
|
|
|
FIRE_COUNT_RANGE_TEXT = range_text("fire_count") |
|
|
BUILDING_COUNT_RANGE_TEXT = range_text("building_count") |
|
|
MAX_UNITS_RANGE_TEXT = range_text("max_units") |
|
|
|
|
|
|
|
|
class SimulationConfig: |
|
|
"""Configuration parameters for the simulation.""" |
|
|
|
|
|
|
|
|
GRID_WIDTH = 10 |
|
|
GRID_HEIGHT = 10 |
|
|
|
|
|
|
|
|
FIRE_SPREAD_CHANCE = 0.08 |
|
|
FIRE_GROWTH_RATE = 0.02 |
|
|
FIRE_MAX_INTENSITY = 1.0 |
|
|
FIRE_DECAY_RATE = 0.01 |
|
|
|
|
|
|
|
|
DAMAGE_PER_TICK = 0.01 |
|
|
|
|
|
|
|
|
FIRE_TRUCK_RANGE = 1 |
|
|
FIRE_TRUCK_POWER = 0.4 |
|
|
HELICOPTER_RANGE = 2 |
|
|
HELICOPTER_POWER = 0.25 |
|
|
UNIT_COOLDOWN = 1 |
|
|
|
|
|
|
|
|
BUILDING_DAMAGE_THRESHOLD = 0.5 |
|
|
FOREST_DAMAGE_THRESHOLD = 0.8 |
|
|
FIRE_SAFE_THRESHOLD = 0.1 |
|
|
|
|
|
|
|
|
class SimulationEngine: |
|
|
""" |
|
|
Core simulation engine that manages world state updates. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[SimulationConfig] = None): |
|
|
self.config = config or SimulationConfig() |
|
|
self.world: Optional[WorldState] = None |
|
|
|
|
|
def reset( |
|
|
self, |
|
|
seed: Optional[int] = None, |
|
|
fire_count: int = 4, |
|
|
fire_intensity: float = 0.6, |
|
|
building_count: int = 16, |
|
|
max_units: int = 10 |
|
|
) -> WorldState: |
|
|
f""" |
|
|
Reset and initialize a new simulation. |
|
|
|
|
|
Args: |
|
|
seed: Random seed for reproducibility |
|
|
fire_count: Number of initial fire points ({FIRE_COUNT_RANGE_TEXT}) |
|
|
fire_intensity: Initial fire intensity (0.0-1.0) |
|
|
building_count: Number of buildings to place ({BUILDING_COUNT_RANGE_TEXT}) |
|
|
max_units: Maximum number of deployable units ({MAX_UNITS_RANGE_TEXT}) |
|
|
""" |
|
|
self.world = WorldState( |
|
|
width=self.config.GRID_WIDTH, |
|
|
height=self.config.GRID_HEIGHT, |
|
|
tick=0, |
|
|
status=SimulationStatus.RUNNING, |
|
|
max_ticks=200, |
|
|
max_units=max_units |
|
|
) |
|
|
|
|
|
self.world.initialize_grid( |
|
|
seed=seed, |
|
|
fire_count=fire_count, |
|
|
fire_intensity=fire_intensity, |
|
|
building_count=building_count |
|
|
) |
|
|
self.world.calculate_metrics() |
|
|
|
|
|
return self.world |
|
|
|
|
|
def step(self) -> WorldState: |
|
|
"""Advance simulation by one tick.""" |
|
|
if self.world is None: |
|
|
raise RuntimeError("Simulation not initialized. Call reset() first.") |
|
|
|
|
|
if self.world.status != SimulationStatus.RUNNING: |
|
|
return self.world |
|
|
|
|
|
|
|
|
self._update_units() |
|
|
|
|
|
|
|
|
self._update_fire() |
|
|
|
|
|
|
|
|
self._update_damage() |
|
|
|
|
|
|
|
|
self.world.calculate_metrics() |
|
|
|
|
|
|
|
|
self.world.tick += 1 |
|
|
|
|
|
|
|
|
self._check_end_conditions() |
|
|
|
|
|
return self.world |
|
|
|
|
|
def _update_fire(self): |
|
|
"""Update fire spread and growth.""" |
|
|
new_fires: list[tuple[int, int, float]] = [] |
|
|
|
|
|
for row in self.world.grid: |
|
|
for cell in row: |
|
|
if cell.fire_intensity > 0: |
|
|
|
|
|
cell.fire_intensity = min( |
|
|
cell.fire_intensity + self.config.FIRE_GROWTH_RATE, |
|
|
self.config.FIRE_MAX_INTENSITY |
|
|
) |
|
|
|
|
|
|
|
|
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]: |
|
|
nx, ny = cell.x + dx, cell.y + dy |
|
|
neighbor = self.world.get_cell(nx, ny) |
|
|
|
|
|
if neighbor and neighbor.fire_intensity == 0 and not neighbor.is_destroyed(): |
|
|
|
|
|
spread_chance = self.config.FIRE_SPREAD_CHANCE * cell.fire_intensity |
|
|
if random.random() < spread_chance: |
|
|
|
|
|
new_intensity = cell.fire_intensity * 0.5 |
|
|
new_fires.append((nx, ny, new_intensity)) |
|
|
|
|
|
|
|
|
for x, y, intensity in new_fires: |
|
|
cell = self.world.get_cell(x, y) |
|
|
if cell and cell.fire_intensity == 0: |
|
|
cell.fire_intensity = intensity |
|
|
|
|
|
def _update_damage(self): |
|
|
"""Update damage caused by fire.""" |
|
|
for row in self.world.grid: |
|
|
for cell in row: |
|
|
if cell.fire_intensity > 0 and cell.cell_type != CellType.EMPTY: |
|
|
|
|
|
damage = self.config.DAMAGE_PER_TICK * cell.fire_intensity |
|
|
cell.damage = min(cell.damage + damage, 1.0) |
|
|
|
|
|
def _update_units(self): |
|
|
"""Update unit actions (firefighting).""" |
|
|
for unit in self.world.units: |
|
|
|
|
|
if unit.cooldown > 0: |
|
|
unit.cooldown -= 1 |
|
|
continue |
|
|
|
|
|
|
|
|
extinguished = False |
|
|
|
|
|
if unit.unit_type == UnitType.FIRE_TRUCK: |
|
|
extinguished = self._fire_truck_action(unit) |
|
|
elif unit.unit_type == UnitType.HELICOPTER: |
|
|
extinguished = self._helicopter_action(unit) |
|
|
|
|
|
if extinguished: |
|
|
unit.cooldown = self.config.UNIT_COOLDOWN |
|
|
|
|
|
def _fire_truck_action(self, unit: Unit) -> bool: |
|
|
"""Fire truck extinguishes fires within a square radius (Chebyshev distance).""" |
|
|
targets = [] |
|
|
|
|
|
|
|
|
for dx in range(-self.config.FIRE_TRUCK_RANGE, self.config.FIRE_TRUCK_RANGE + 1): |
|
|
for dy in range(-self.config.FIRE_TRUCK_RANGE, self.config.FIRE_TRUCK_RANGE + 1): |
|
|
cell = self.world.get_cell(unit.x + dx, unit.y + dy) |
|
|
if cell and cell.fire_intensity > 0: |
|
|
targets.append(cell) |
|
|
|
|
|
if not targets: |
|
|
return False |
|
|
|
|
|
|
|
|
targets.sort(key=lambda c: c.fire_intensity, reverse=True) |
|
|
target = targets[0] |
|
|
|
|
|
|
|
|
target.fire_intensity = max(0, target.fire_intensity - self.config.FIRE_TRUCK_POWER) |
|
|
|
|
|
return True |
|
|
|
|
|
def _helicopter_action(self, unit: Unit) -> bool: |
|
|
"""Helicopter extinguishes fires within a wider square radius.""" |
|
|
affected = False |
|
|
|
|
|
for dx in range(-self.config.HELICOPTER_RANGE, self.config.HELICOPTER_RANGE + 1): |
|
|
for dy in range(-self.config.HELICOPTER_RANGE, self.config.HELICOPTER_RANGE + 1): |
|
|
cell = self.world.get_cell(unit.x + dx, unit.y + dy) |
|
|
if cell and cell.fire_intensity > 0: |
|
|
cell.fire_intensity = max(0, cell.fire_intensity - self.config.HELICOPTER_POWER) |
|
|
affected = True |
|
|
|
|
|
return affected |
|
|
|
|
|
def _check_end_conditions(self): |
|
|
"""Check win/lose conditions.""" |
|
|
|
|
|
if self.world.tick >= self.world.max_ticks: |
|
|
self.world.status = SimulationStatus.FAIL |
|
|
self.world.recent_events.append(Event( |
|
|
tick=self.world.tick, |
|
|
event_type="simulation_end", |
|
|
details={"reason": "time_limit_exceeded"} |
|
|
)) |
|
|
return |
|
|
|
|
|
|
|
|
if self.world.building_integrity < (1 - self.config.BUILDING_DAMAGE_THRESHOLD): |
|
|
self.world.status = SimulationStatus.FAIL |
|
|
self.world.recent_events.append(Event( |
|
|
tick=self.world.tick, |
|
|
event_type="simulation_end", |
|
|
details={"reason": "building_destroyed"} |
|
|
)) |
|
|
return |
|
|
|
|
|
|
|
|
if self.world.forest_burn_ratio > self.config.FOREST_DAMAGE_THRESHOLD: |
|
|
self.world.status = SimulationStatus.FAIL |
|
|
self.world.recent_events.append(Event( |
|
|
tick=self.world.tick, |
|
|
event_type="simulation_end", |
|
|
details={"reason": "forest_destroyed"} |
|
|
)) |
|
|
return |
|
|
|
|
|
|
|
|
fires = self.world.get_fires() |
|
|
if not fires or all(f.intensity < self.config.FIRE_SAFE_THRESHOLD for f in fires): |
|
|
self.world.status = SimulationStatus.SUCCESS |
|
|
self.world.recent_events.append(Event( |
|
|
tick=self.world.tick, |
|
|
event_type="simulation_end", |
|
|
details={"reason": "fire_contained"} |
|
|
)) |
|
|
|
|
|
def deploy_unit( |
|
|
self, |
|
|
unit_type: str, |
|
|
x: int, |
|
|
y: int, |
|
|
source: str = "player" |
|
|
) -> dict: |
|
|
"""Deploy a new unit at the specified position.""" |
|
|
if self.world is None: |
|
|
return {"status": "error", "message": "Simulation not initialized"} |
|
|
|
|
|
if self.world.status != SimulationStatus.RUNNING: |
|
|
return {"status": "error", "message": "Simulation is not running"} |
|
|
|
|
|
|
|
|
try: |
|
|
utype = UnitType(unit_type) |
|
|
except ValueError: |
|
|
return {"status": "error", "message": f"Invalid unit type: {unit_type}"} |
|
|
|
|
|
|
|
|
if not (0 <= x < self.world.width and 0 <= y < self.world.height): |
|
|
return {"status": "error", "message": f"Position ({x}, {y}) is out of bounds"} |
|
|
|
|
|
cell = self.world.get_cell(x, y) |
|
|
if cell and cell.fire_intensity > 0: |
|
|
return {"status": "error", "message": f"Cannot deploy on burning cell at ({x}, {y})"} |
|
|
|
|
|
if cell and cell.cell_type == CellType.BUILDING: |
|
|
return {"status": "error", "message": f"Cannot deploy on building at ({x}, {y})"} |
|
|
|
|
|
|
|
|
if len(self.world.units) >= self.world.max_units: |
|
|
return {"status": "error", "message": f"Unit limit reached ({self.world.max_units})"} |
|
|
|
|
|
|
|
|
unit = self.world.add_unit(utype, x, y, source) |
|
|
|
|
|
if unit is None: |
|
|
return {"status": "error", "message": "Failed to deploy unit"} |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"unit": unit.to_dict() |
|
|
} |
|
|
|
|
|
def remove_unit_at(self, x: int, y: int) -> dict: |
|
|
"""Remove a unit at the specified position.""" |
|
|
if self.world is None: |
|
|
return {"status": "error", "message": "Simulation not initialized"} |
|
|
|
|
|
|
|
|
unit_to_remove = None |
|
|
for unit in self.world.units: |
|
|
if unit.x == x and unit.y == y: |
|
|
unit_to_remove = unit |
|
|
break |
|
|
|
|
|
if unit_to_remove is None: |
|
|
return {"status": "error", "message": f"No unit at ({x}, {y})"} |
|
|
|
|
|
|
|
|
self.world.units.remove(unit_to_remove) |
|
|
|
|
|
return { |
|
|
"status": "ok", |
|
|
"message": f"Removed {unit_to_remove.unit_type.value} at ({x}, {y})", |
|
|
"unit": unit_to_remove.to_dict() |
|
|
} |
|
|
|
|
|
def get_state(self) -> dict: |
|
|
"""Get current world state as dictionary.""" |
|
|
if self.world is None: |
|
|
return {"status": "error", "message": "Simulation not initialized"} |
|
|
|
|
|
return self.world.to_dict() |
|
|
|
|
|
|