import re import json from typing import Optional, Tuple import numpy as np import pandas as pd from statsmodels.tsa.holtwinters import ExponentialSmoothing, Holt try: from prophet import Prophet _HAS_PROPHET = True except Exception: _HAS_PROPHET = False _KEEP = re.compile(r"[^А-Яа-яЁё0-9 ,.!?:;()«»\"'–—\-•\n]") def clean_ru(text: str) -> str: text = _KEEP.sub(" ", text or "") return re.sub(r"\s+", " ", text).strip() def normalize_columns(df: pd.DataFrame) -> pd.DataFrame: work = df.copy() for col in list(work.columns): lc = col.lower() if lc in ("date", "дата"): work.rename(columns={col: "date"}, inplace=True) elif lc in ("amount", "сумма"): work.rename(columns={col: "amount"}, inplace=True) elif lc in ("category", "категория"): work.rename(columns={col: "category"}, inplace=True) elif lc in ("type", "тип"): work.rename(columns={col: "type"}, inplace=True) required = {"date", "amount", "type"} missing = required - set(map(str, work.columns)) if missing: raise ValueError(f"Отсутствуют колонки: {', '.join(sorted(missing))}") work["date"] = pd.to_datetime(work["date"], errors="coerce") work = work.dropna(subset=["date"]) work["amount"] = pd.to_numeric(work["amount"], errors="coerce").fillna(0.0) if "category" not in work.columns: work["category"] = "Без категории" return work def is_expense(t: str) -> bool: t = str(t).strip().lower() return t in {"expense", "расход", "расходы", "-", "e", "exp"} def is_income(t: str) -> bool: t = str(t).strip().lower() return t in {"income", "доход", "+", "i", "inc"} def prepare_components_series( df: pd.DataFrame, freq: str = "M" ) -> Tuple[pd.Series, pd.Series, pd.Series]: if df is None or df.empty: raise ValueError("Пустая таблица транзакций.") work = normalize_columns(df) work["is_expense"] = work["type"].apply(is_expense) work["is_income"] = work["type"].apply(is_income) inc = ( work.loc[work["is_income"]] .set_index("date")["amount"] .resample(freq) .sum() .sort_index() ) exp = ( work.loc[work["is_expense"]] .set_index("date")["amount"] .abs() .mul(-1) .resample(freq) .sum() .sort_index() ) if not inc.empty or not exp.empty: start = min([x.index.min() for x in [inc, exp] if not x.empty]) end = max([x.index.max() for x in [inc, exp] if not x.empty]) full_idx = pd.date_range(start, end, freq=freq) inc = inc.reindex(full_idx, fill_value=0.0) exp = exp.reindex(full_idx, fill_value=0.0) net = inc + exp inc.index.name = exp.index.name = net.index.name = "period_end" return inc, exp, net def fit_and_forecast( history: pd.Series, steps: int, freq: str, method: str = "auto" ) -> pd.Series: if len(history) < 3: last = float(history.iloc[-1]) if len(history) else 0.0 start = ( history.index[-1] if len(history) else pd.Timestamp.today().normalize() ) + pd.tseries.frequencies.to_offset(freq) idx = pd.date_range(start, periods=steps, freq=freq) return pd.Series([last] * steps, index=idx, name="forecast") use_prophet = False if method == "prophet": use_prophet = True elif method == "auto": if freq.startswith("A"): # годовая use_prophet = _HAS_PROPHET and (len(history) >= 5) else: # месячная use_prophet = _HAS_PROPHET and (len(history) >= 18) if use_prophet: try: pfreq = "Y" if freq.startswith("A") else "M" dfp = history.reset_index() dfp.columns = ["ds", "y"] m = Prophet( yearly_seasonality=(pfreq == "M"), weekly_seasonality=False, daily_seasonality=False, seasonality_mode="additive", ) m.fit(dfp) future = m.make_future_dataframe(periods=steps, freq=pfreq) fcst = m.predict(future).tail(steps) yhat = pd.Series( fcst["yhat"].values, index=pd.DatetimeIndex(fcst["ds"]), name="forecast", ) if pfreq == "M": yhat.index = yhat.index.to_period("M").to_timestamp(how="end") else: yhat.index = yhat.index.to_period("Y").to_timestamp(how="end") if yhat.index.freq is None: yhat.index = pd.date_range( yhat.index[0], periods=len(yhat), freq=("A-DEC" if pfreq == "Y" else "M"), ) return yhat except Exception: pass # Holt / Holt-Winters try: if freq.startswith("A"): model = Holt(history, initialization_method="estimated") else: if len(history) >= 24: model = ExponentialSmoothing( history, trend="add", seasonal="add", seasonal_periods=12, initialization_method="estimated", ) else: model = Holt(history, initialization_method="estimated") fit = model.fit(optimized=True) fc = fit.forecast(steps) if not isinstance(fc.index, pd.DatetimeIndex) or len(fc.index) != steps: start = history.index[-1] + pd.tseries.frequencies.to_offset(freq) idx = pd.date_range(start, periods=steps, freq=freq) fc = pd.Series(np.asarray(fc), index=idx, name="forecast") return fc except Exception: tail = min(6, len(history)) baseline = float(history.tail(tail).mean()) if tail else 0.0 start = history.index[-1] + pd.tseries.frequencies.to_offset(freq) idx = pd.date_range(start, periods=steps, freq=freq) return pd.Series([baseline] * steps, index=idx, name="forecast") def current_month_snapshot(df: pd.DataFrame) -> dict: if df is None or df.empty: return {} w = normalize_columns(df) w["is_income"] = w["type"].apply(is_income) w["is_expense"] = w["type"].apply(is_expense) lastp = w["date"].dt.to_period("M").max() cur = w[w["date"].dt.to_period("M") == lastp].copy() if cur.empty: return {} income_total = float(cur.loc[cur["is_income"], "amount"].sum()) expense_total = -float(cur.loc[cur["is_expense"], "amount"].abs().sum()) net = income_total + expense_total exp_df = cur.loc[cur["is_expense"], ["category", "amount"]].copy() exp_df["amount"] = -exp_df["amount"].abs() top = ( exp_df.groupby("category")["amount"] .sum() .sort_values() .head(5) ) return { "month": str(lastp), "income_total": income_total, "expense_total": expense_total, "net": net, "top_expense_categories": [ (str(k), float(v)) for k, v in top.items() ], } def read_json_stdin() -> dict: import sys raw = sys.stdin.read() return json.loads(raw or "{}") def write_json_stdout(obj) -> None: import sys sys.stdout.write(json.dumps(obj, ensure_ascii=False)) sys.stdout.flush()