import pandas as pd import numpy as np from sklearn.ensemble import IsolationForest from sklearn.neighbors import LocalOutlierFactor from sklearn.preprocessing import StandardScaler from typing import Tuple try: import tensorflow as tf from tensorflow.keras import layers, models TF_AVAILABLE = True except ImportError: TF_AVAILABLE = False def parse_datetime_cols(df: pd.DataFrame) -> pd.DataFrame: for c in ['OutageDateTime','FirstRestoDateTime','LastRestoDateTime']: if c in df.columns: df[c+'_dt'] = pd.to_datetime(df[c], format='%d-%m-%Y %H:%M:%S', errors='coerce') return df def feature_engineer(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() df = parse_datetime_cols(df) # Duration in minutes between outage and last restore if 'OutageDateTime_dt' in df.columns and 'LastRestoDateTime_dt' in df.columns: df['duration_min'] = (df['LastRestoDateTime_dt'] - df['OutageDateTime_dt']).dt.total_seconds() / 60.0 else: df['duration_min'] = np.nan # Load numeric for col in ['Load(MW)','Capacity(kVA)','FirstStepDuration','LastStepDuration','AffectedCustomer']: if col in df.columns: df[col+'_num'] = pd.to_numeric(df[col], errors='coerce') else: df[col+'_num'] = np.nan # time of day if 'OutageDateTime_dt' in df.columns: df['hour'] = df['OutageDateTime_dt'].dt.hour else: df['hour'] = np.nan # device type one-hot small encoding: frequency if 'OpDeviceType' in df.columns: freq = df['OpDeviceType'].fillna('NA').value_counts() df['device_freq'] = df['OpDeviceType'].map(lambda x: freq.get(x,0)) else: df['device_freq'] = 0 # coordinates if 'OpDeviceXYcoord' in df.columns: def parse_xy(s): try: s = str(s).strip().strip('"') x,y = s.split(',') return float(x), float(y) except Exception: return (np.nan, np.nan) xs, ys = zip(*df['OpDeviceXYcoord'].map(parse_xy)) df['x'] = xs df['y'] = ys else: df['x'] = np.nan df['y'] = np.nan return df def build_feature_matrix(df: pd.DataFrame) -> Tuple[np.ndarray, list]: df_fe = feature_engineer(df) features = ['duration_min','Load(MW)_num','AffectedCustomer_num','hour','device_freq','x','y'] X = df_fe[features].copy() # Fill na with median X = X.fillna(X.median()) scaler = StandardScaler() Xs = scaler.fit_transform(X) return Xs, features, df_fe, scaler def run_isolation_forest(X: np.ndarray, contamination: float = 0.05, random_state: int = 42): iso = IsolationForest(contamination=contamination, random_state=random_state) preds = iso.fit_predict(X) # IsolationForest returns -1 for outliers scores = iso.decision_function(X) return preds, scores def run_lof(X: np.ndarray, contamination: float = 0.05, n_neighbors: int = 20): lof = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination) preds = lof.fit_predict(X) # negative_outlier_factor_ (the lower, more abnormal) scores = lof.negative_outlier_factor_ return preds, scores def run_autoencoder(X: np.ndarray, contamination: float = 0.05, latent_dim: int = 4, epochs: int = 50, batch_size: int = 32): if not TF_AVAILABLE: raise ImportError("TensorFlow not available. Install tensorflow to use autoencoder.") input_dim = X.shape[1] # Build autoencoder encoder = models.Sequential([ layers.Input(shape=(input_dim,)), layers.Dense(16, activation='relu'), layers.Dense(latent_dim, activation='relu') ]) decoder = models.Sequential([ layers.Input(shape=(latent_dim,)), layers.Dense(16, activation='relu'), layers.Dense(input_dim, activation='linear') ]) autoencoder = models.Sequential([encoder, decoder]) autoencoder.compile(optimizer='adam', loss='mse') # Train autoencoder.fit(X, X, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1) # Reconstruction error reconstructed = autoencoder.predict(X, verbose=0) mse = np.mean((X - reconstructed)**2, axis=1) # Threshold based on contamination threshold = np.percentile(mse, (1 - contamination) * 100) preds = (mse > threshold).astype(int) * -1 # -1 for outliers preds[preds == 0] = 1 # 1 for inliers return preds, mse def explain_anomalies(df_fe: pd.DataFrame, explain_features=None): # explain_features: which numeric columns to compute z-score on if explain_features is None: explain_features = ['duration_min','Load(MW)_num','AffectedCustomer_num','hour','device_freq'] df_num = df_fe[explain_features].astype(float).fillna(df_fe[explain_features].median()) means = df_num.mean() stds = df_num.std().replace(0, 1.0) z = (df_num - means) / stds # create explanation string for each row: top 3 absolute z-scores explanations = [] for i, row in z.iterrows(): abs_row = row.abs() top = abs_row.sort_values(ascending=False).head(3) parts = [] for feat in top.index: val = row[feat] sign = 'สูง' if val > 0 else 'ต่ำ' if val < 0 else 'ปกติ' parts.append(f"{feat} {sign} (z={val:.2f})") explanations.append('; '.join(parts)) return z, explanations def detect_anomalies(df: pd.DataFrame, contamination: float = 0.05, algorithm: str = 'iso+lof') -> pd.DataFrame: Xs, features, df_fe, scaler = build_feature_matrix(df) if algorithm == 'autoencoder': preds, scores = run_autoencoder(Xs, contamination=contamination) res = df.copy().reset_index(drop=True) res['auto_pred'] = preds res['auto_score'] = scores res['final_flag'] = res['auto_pred'] == -1 else: preds_iso, scores_iso = run_isolation_forest(Xs, contamination=contamination) preds_lof, scores_lof = run_lof(Xs, contamination=contamination) res = df.copy().reset_index(drop=True) res['iso_pred'] = preds_iso res['iso_score'] = scores_iso res['lof_pred'] = preds_lof res['lof_score'] = scores_lof # ensemble: flag if both mark as outlier (-1) res['ensemble_flag'] = ((res['iso_pred'] == -1) & (res['lof_pred'] == -1)) # algorithm filter: if algorithm == 'iso' or 'lof' or 'iso+lof', compute final_flag if algorithm == 'iso': res['final_flag'] = res['iso_pred'] == -1 elif algorithm == 'lof': res['final_flag'] = res['lof_pred'] == -1 elif algorithm == 'iso+lof': res['final_flag'] = res['ensemble_flag'] else: raise ValueError(f"Unknown algorithm: {algorithm}") # explainability (same for all) z_df, explanations = explain_anomalies(df_fe) # attach z-scores for explain features for col in z_df.columns: res[f'z_{col}'] = z_df[col].values res['explanation'] = explanations return res