"""Compute reliability indices (SAIFI, SAIDI, CAIDI, MAIFI) from outage event CSV. Usage (programmatic): from scripts.compute_reliability import compute_reliability summary = compute_reliability('data/data_1.csv', total_customers=500000) Usage (CLI): python scripts/compute_reliability.py --input data/data_1.csv --total-customers 500000 Assumptions & mapping (from inspected CSV): - Outage start: `OutageDateTime` - Outage end: prefer `CloseEventDateTime`, else `LastRestoDateTime`, else `FirstRestoDateTime` - Customers affected: prefer `AffectedCustomer` column; else sum `AffectedCustomer1..5`; else `AllStepCusXTime` or `AllStepCusXTime1..5` fallback. - Planned outages: rows with `EventType` containing 'แผน' (e.g., 'แผนดับไฟ') are considered planned and can be excluded by default. - Date format is day-first like '10-01-2025 10:28:00'. Outputs saved to `outputs/reliability_summary.csv` and breakdown CSVs. """ from __future__ import annotations import argparse from typing import Optional, Dict import pandas as pd import numpy as np from pathlib import Path DATE_COLS = ['OutageDateTime', 'FirstRestoDateTime', 'LastRestoDateTime', 'CreateEventDateTime', 'CloseEventDateTime'] def parse_dates(df: pd.DataFrame) -> pd.DataFrame: for c in DATE_COLS: if c in df.columns: # many dates are in format dd-mm-YYYY HH:MM:SS df[c] = pd.to_datetime(df[c], dayfirst=True, errors='coerce') return df def coalesce_end_time(row: pd.Series) -> pd.Timestamp | None: for c in ('CloseEventDateTime', 'LastRestoDateTime', 'FirstRestoDateTime', 'CreateEventDateTime'): if c in row and pd.notna(row[c]): return row[c] return pd.NaT def estimate_customers(row: pd.Series) -> float: # Prefer AffectedCustomer if present and numeric def to_num(x): try: if pd.isna(x) or x == '': return np.nan return float(x) except Exception: return np.nan cols = row.index # Try AffectedCustomer if 'AffectedCustomer' in cols: v = to_num(row['AffectedCustomer']) if not np.isnan(v): return v # Sum AffectedCustomer1..5 acs = [] for i in range(1, 6): k = f'AffectedCustomer{i}' if k in cols: acs.append(to_num(row[k])) acs = [x for x in acs if not np.isnan(x)] if acs: return float(sum(acs)) # Try AllStepCusXTime or AllStepCusXTime1..5 if 'AllStepCusXTime' in cols: v = to_num(row['AllStepCusXTime']) if not np.isnan(v): return v asts = [] for i in range(1, 6): k = f'AllStepCusXTime{i}' if k in cols: asts.append(to_num(row[k])) asts = [x for x in asts if not np.isnan(x)] if asts: return float(sum(asts)) # As last resort, try numeric columns near end: Capacity(kVA) or Load(MW) are not customer counts return np.nan def flag_planned(event_type: Optional[str]) -> bool: if pd.isna(event_type): return False s = str(event_type) # In this dataset planned outages use Thai word 'แผน' if 'แผน' in s: return True # else treat as unplanned return False def compute_reliability( input_csv: str | Path, total_customers: Optional[float] = None, customers_map: Optional[Dict[str, float]] = None, exclude_planned: bool = True, momentary_threshold_min: float = 1.0, groupby_cols: list[str] | None = None, out_dir: str | Path | None = 'outputs', ) -> Dict[str, pd.DataFrame]: """Reads CSV and computes reliability indices. Returns a dict of DataFrames: overall, by_group. """ input_csv = Path(input_csv) out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) df = pd.read_csv(input_csv, dtype=str) # parse dates df = parse_dates(df) # coalesce end time df['OutageStart'] = df.get('OutageDateTime') df['OutageEnd'] = df.apply(coalesce_end_time, axis=1) # compute duration in minutes df['DurationMin'] = (pd.to_datetime(df['OutageEnd']) - pd.to_datetime(df['OutageStart'])).dt.total_seconds() / 60.0 # customers affected df['CustomersAffected'] = df.apply(estimate_customers, axis=1) # flag planned df['IsPlanned'] = df['EventType'].apply(flag_planned) if 'EventType' in df.columns else False if exclude_planned: df_work = df[~df['IsPlanned']].copy() else: df_work = df.copy() # Fill missing durations (negative or NaN) with 0 df_work['DurationMin'] = df_work['DurationMin'].fillna(0) df_work.loc[df_work['DurationMin'] < 0, 'DurationMin'] = 0 # ensure numeric customers df_work['CustomersAffected'] = pd.to_numeric(df_work['CustomersAffected'], errors='coerce').fillna(0) # Choose grouping if groupby_cols is None: groupby_cols = [] # helper to compute indices given total customers def compute_from_df(dfall: pd.DataFrame, cust_total: float) -> Dict[str, float]: total_interruptions = dfall['CustomersAffected'].sum() total_customer_minutes = (dfall['CustomersAffected'] * dfall['DurationMin']).sum() # momentary interruptions: durations less than threshold momentary_interruptions = dfall.loc[dfall['DurationMin'] < momentary_threshold_min, 'CustomersAffected'].sum() saifi = total_interruptions / cust_total if cust_total and cust_total > 0 else np.nan saidi = total_customer_minutes / cust_total if cust_total and cust_total > 0 else np.nan caidi = (saidi / saifi) if (saifi and saifi > 0) else np.nan maifi = momentary_interruptions / cust_total if cust_total and cust_total > 0 else np.nan return { 'TotalInterruptions': total_interruptions, 'TotalCustomerMinutes': total_customer_minutes, 'MomentaryInterruptions': momentary_interruptions, 'SAIFI': saifi, 'SAIDI': saidi, 'CAIDI': caidi, 'MAIFI': maifi, } results = {} if customers_map is not None: # customers_map expects keys matching grouping (e.g., Feeder or AffectedAreaID). We'll compute per key # Overall must supply a 'TOTAL' or we sum map values total_customers_map_sum = sum(customers_map.values()) overall = compute_from_df(df_work, total_customers_map_sum if total_customers is None else total_customers) results['overall'] = pd.DataFrame([overall]) # per-group if groupby_cols: group = df_work.groupby(groupby_cols).agg({'CustomersAffected': 'sum', 'DurationMin': 'mean'}) else: # if no group col provided, try Feeder then AffectedAreaID if 'Feeder' in df_work.columns: groupby_cols = ['Feeder'] elif 'AffectedAreaID' in df_work.columns: groupby_cols = ['AffectedAreaID'] else: groupby_cols = [] if groupby_cols: rows = [] for key, sub in df_work.groupby(groupby_cols): # key can be tuple keyname = key if isinstance(key, str) else '_'.join(map(str, key)) cust = customers_map.get(keyname, np.nan) metrics = compute_from_df(sub, cust if not np.isnan(cust) else np.nan) metrics.update({'Group': keyname}) rows.append(metrics) results['by_group'] = pd.DataFrame(rows) else: results['by_group'] = pd.DataFrame() else: # customers_map not provided: require total_customers if total_customers is None: raise ValueError('Either total_customers or customers_map must be provided to compute per-customer indices') overall = compute_from_df(df_work, float(total_customers)) results['overall'] = pd.DataFrame([overall]) # per-group breakdowns (SAIFI-like per 1000 customers will use proportion of total customers by share) if groupby_cols: rows = [] # If we don't have customers per group, we will compute interruption counts and durations but can't compute per-customer normalized indices without providing customers_map. for key, sub in df_work.groupby(groupby_cols): keyname = key if isinstance(key, str) else '_'.join(map(str, key)) rows.append({ 'Group': keyname, 'TotalInterruptions': sub['CustomersAffected'].sum(), 'TotalCustomerMinutes': (sub['CustomersAffected'] * sub['DurationMin']).sum(), 'Events': len(sub), }) results['by_group'] = pd.DataFrame(rows) else: results['by_group'] = pd.DataFrame() # Save CSVs results['raw'] = df_work results['raw'].to_csv(out_dir / 'events_cleaned.csv', index=False) results['overall'].to_csv(out_dir / 'reliability_overall.csv', index=False) if 'by_group' in results: results['by_group'].to_csv(out_dir / 'reliability_by_group.csv', index=False) return results if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--input', '-i', required=True, help='Input CSV file') parser.add_argument('--total-customers', type=float, help='Total customers served in the system (required if no customers map)') parser.add_argument('--exclude-planned', action='store_true', help='Exclude planned outages (default True)') parser.add_argument('--momentary-threshold-min', type=float, default=1.0, help='Threshold in minutes for momentary interruption') parser.add_argument('--groupby', nargs='*', default=['Feeder'], help='Columns to group by for breakdown (default: Feeder)') args = parser.parse_args() res = compute_reliability(args.input, total_customers=args.total_customers, exclude_planned=args.exclude_planned, momentary_threshold_min=args.momentary_threshold_min, groupby_cols=args.groupby) print('Wrote outputs to outputs/ (events_cleaned.csv, reliability_overall.csv, reliability_by_group.csv)')