arkai2025's picture
refactor(config): centralize scenario configuration defaults
85cbecd
"""
Fire-Rescue - Simulation Core
Handles fire spread, unit behavior, and win/lose conditions.
"""
import random
from typing import Optional
from config import range_text
from models import (
WorldState,
Cell,
CellType,
Unit,
UnitType,
SimulationStatus,
Event,
)
FIRE_COUNT_RANGE_TEXT = range_text("fire_count")
BUILDING_COUNT_RANGE_TEXT = range_text("building_count")
MAX_UNITS_RANGE_TEXT = range_text("max_units")
class SimulationConfig:
"""Configuration parameters for the simulation."""
# Grid size
GRID_WIDTH = 10
GRID_HEIGHT = 10
# Fire spread parameters (balanced for playability)
FIRE_SPREAD_CHANCE = 0.08 # Reduced: chance to spread to adjacent cell per tick
FIRE_GROWTH_RATE = 0.02 # Reduced: how much fire intensity grows per tick
FIRE_MAX_INTENSITY = 1.0
FIRE_DECAY_RATE = 0.01 # Natural decay (very slow)
# Damage parameters
DAMAGE_PER_TICK = 0.01 # Reduced: damage dealt by fire per tick
# Unit parameters (stronger firefighting)
FIRE_TRUCK_RANGE = 1 # Square coverage radius (includes 8 neighboring cells)
FIRE_TRUCK_POWER = 0.4 # Increased: reduction in fire intensity per action
HELICOPTER_RANGE = 2 # Square coverage radius (extends two cells in all directions)
HELICOPTER_POWER = 0.25 # Increased: less powerful but wider coverage
UNIT_COOLDOWN = 1 # Reduced: ticks between unit actions (faster response)
# Win/lose thresholds
BUILDING_DAMAGE_THRESHOLD = 0.5 # Fail if building integrity < 50%
FOREST_DAMAGE_THRESHOLD = 0.8 # Fail if forest burn > 80%
FIRE_SAFE_THRESHOLD = 0.1 # Win if all fires below this intensity
class SimulationEngine:
"""
Core simulation engine that manages world state updates.
"""
def __init__(self, config: Optional[SimulationConfig] = None):
self.config = config or SimulationConfig()
self.world: Optional[WorldState] = None
def reset(
self,
seed: Optional[int] = None,
fire_count: int = 4,
fire_intensity: float = 0.6,
building_count: int = 16,
max_units: int = 10
) -> WorldState:
f"""
Reset and initialize a new simulation.
Args:
seed: Random seed for reproducibility
fire_count: Number of initial fire points ({FIRE_COUNT_RANGE_TEXT})
fire_intensity: Initial fire intensity (0.0-1.0)
building_count: Number of buildings to place ({BUILDING_COUNT_RANGE_TEXT})
max_units: Maximum number of deployable units ({MAX_UNITS_RANGE_TEXT})
"""
self.world = WorldState(
width=self.config.GRID_WIDTH,
height=self.config.GRID_HEIGHT,
tick=0,
status=SimulationStatus.RUNNING,
max_ticks=200,
max_units=max_units
)
self.world.initialize_grid(
seed=seed,
fire_count=fire_count,
fire_intensity=fire_intensity,
building_count=building_count
)
self.world.calculate_metrics()
return self.world
def step(self) -> WorldState:
"""Advance simulation by one tick."""
if self.world is None:
raise RuntimeError("Simulation not initialized. Call reset() first.")
if self.world.status != SimulationStatus.RUNNING:
return self.world
# 1. Units perform actions FIRST (so deployed units work immediately)
self._update_units()
# 2. Fire spreads and grows
self._update_fire()
# 3. Fire causes damage
self._update_damage()
# 4. Recalculate metrics
self.world.calculate_metrics()
# 5. Increment tick
self.world.tick += 1
# 6. Check win/lose conditions
self._check_end_conditions()
return self.world
def _update_fire(self):
"""Update fire spread and growth."""
new_fires: list[tuple[int, int, float]] = []
for row in self.world.grid:
for cell in row:
if cell.fire_intensity > 0:
# Fire grows
cell.fire_intensity = min(
cell.fire_intensity + self.config.FIRE_GROWTH_RATE,
self.config.FIRE_MAX_INTENSITY
)
# Fire spreads to neighbors
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nx, ny = cell.x + dx, cell.y + dy
neighbor = self.world.get_cell(nx, ny)
if neighbor and neighbor.fire_intensity == 0 and not neighbor.is_destroyed():
# Spread chance based on source fire intensity
spread_chance = self.config.FIRE_SPREAD_CHANCE * cell.fire_intensity
if random.random() < spread_chance:
# Initial intensity based on source
new_intensity = cell.fire_intensity * 0.5
new_fires.append((nx, ny, new_intensity))
# Apply new fires
for x, y, intensity in new_fires:
cell = self.world.get_cell(x, y)
if cell and cell.fire_intensity == 0:
cell.fire_intensity = intensity
def _update_damage(self):
"""Update damage caused by fire."""
for row in self.world.grid:
for cell in row:
if cell.fire_intensity > 0 and cell.cell_type != CellType.EMPTY:
# Damage proportional to fire intensity
damage = self.config.DAMAGE_PER_TICK * cell.fire_intensity
cell.damage = min(cell.damage + damage, 1.0)
def _update_units(self):
"""Update unit actions (firefighting)."""
for unit in self.world.units:
# Decrease cooldown
if unit.cooldown > 0:
unit.cooldown -= 1
continue
# Find cells to extinguish
extinguished = False
if unit.unit_type == UnitType.FIRE_TRUCK:
extinguished = self._fire_truck_action(unit)
elif unit.unit_type == UnitType.HELICOPTER:
extinguished = self._helicopter_action(unit)
if extinguished:
unit.cooldown = self.config.UNIT_COOLDOWN
def _fire_truck_action(self, unit: Unit) -> bool:
"""Fire truck extinguishes fires within a square radius (Chebyshev distance)."""
targets = []
# Check cells within square range (including diagonals)
for dx in range(-self.config.FIRE_TRUCK_RANGE, self.config.FIRE_TRUCK_RANGE + 1):
for dy in range(-self.config.FIRE_TRUCK_RANGE, self.config.FIRE_TRUCK_RANGE + 1):
cell = self.world.get_cell(unit.x + dx, unit.y + dy)
if cell and cell.fire_intensity > 0:
targets.append(cell)
if not targets:
return False
# Target highest intensity fire
targets.sort(key=lambda c: c.fire_intensity, reverse=True)
target = targets[0]
# Reduce fire
target.fire_intensity = max(0, target.fire_intensity - self.config.FIRE_TRUCK_POWER)
return True
def _helicopter_action(self, unit: Unit) -> bool:
"""Helicopter extinguishes fires within a wider square radius."""
affected = False
for dx in range(-self.config.HELICOPTER_RANGE, self.config.HELICOPTER_RANGE + 1):
for dy in range(-self.config.HELICOPTER_RANGE, self.config.HELICOPTER_RANGE + 1):
cell = self.world.get_cell(unit.x + dx, unit.y + dy)
if cell and cell.fire_intensity > 0:
cell.fire_intensity = max(0, cell.fire_intensity - self.config.HELICOPTER_POWER)
affected = True
return affected
def _check_end_conditions(self):
"""Check win/lose conditions."""
# Check time limit
if self.world.tick >= self.world.max_ticks:
self.world.status = SimulationStatus.FAIL
self.world.recent_events.append(Event(
tick=self.world.tick,
event_type="simulation_end",
details={"reason": "time_limit_exceeded"}
))
return
# Check building damage
if self.world.building_integrity < (1 - self.config.BUILDING_DAMAGE_THRESHOLD):
self.world.status = SimulationStatus.FAIL
self.world.recent_events.append(Event(
tick=self.world.tick,
event_type="simulation_end",
details={"reason": "building_destroyed"}
))
return
# Check forest damage
if self.world.forest_burn_ratio > self.config.FOREST_DAMAGE_THRESHOLD:
self.world.status = SimulationStatus.FAIL
self.world.recent_events.append(Event(
tick=self.world.tick,
event_type="simulation_end",
details={"reason": "forest_destroyed"}
))
return
# Check if all fires are extinguished
fires = self.world.get_fires()
if not fires or all(f.intensity < self.config.FIRE_SAFE_THRESHOLD for f in fires):
self.world.status = SimulationStatus.SUCCESS
self.world.recent_events.append(Event(
tick=self.world.tick,
event_type="simulation_end",
details={"reason": "fire_contained"}
))
def deploy_unit(
self,
unit_type: str,
x: int,
y: int,
source: str = "player"
) -> dict:
"""Deploy a new unit at the specified position."""
if self.world is None:
return {"status": "error", "message": "Simulation not initialized"}
if self.world.status != SimulationStatus.RUNNING:
return {"status": "error", "message": "Simulation is not running"}
# Parse unit type
try:
utype = UnitType(unit_type)
except ValueError:
return {"status": "error", "message": f"Invalid unit type: {unit_type}"}
# Check position validity first for better error messages
if not (0 <= x < self.world.width and 0 <= y < self.world.height):
return {"status": "error", "message": f"Position ({x}, {y}) is out of bounds"}
cell = self.world.get_cell(x, y)
if cell and cell.fire_intensity > 0:
return {"status": "error", "message": f"Cannot deploy on burning cell at ({x}, {y})"}
if cell and cell.cell_type == CellType.BUILDING:
return {"status": "error", "message": f"Cannot deploy on building at ({x}, {y})"}
# Check unit limit
if len(self.world.units) >= self.world.max_units:
return {"status": "error", "message": f"Unit limit reached ({self.world.max_units})"}
# Try to add unit
unit = self.world.add_unit(utype, x, y, source)
if unit is None:
return {"status": "error", "message": "Failed to deploy unit"}
return {
"status": "ok",
"unit": unit.to_dict()
}
def remove_unit_at(self, x: int, y: int) -> dict:
"""Remove a unit at the specified position."""
if self.world is None:
return {"status": "error", "message": "Simulation not initialized"}
# Find unit at position
unit_to_remove = None
for unit in self.world.units:
if unit.x == x and unit.y == y:
unit_to_remove = unit
break
if unit_to_remove is None:
return {"status": "error", "message": f"No unit at ({x}, {y})"}
# Remove the unit
self.world.units.remove(unit_to_remove)
return {
"status": "ok",
"message": f"Removed {unit_to_remove.unit_type.value} at ({x}, {y})",
"unit": unit_to_remove.to_dict()
}
def get_state(self) -> dict:
"""Get current world state as dictionary."""
if self.world is None:
return {"status": "error", "message": "Simulation not initialized"}
return self.world.to_dict()