finwise-ml / common.py
minstradamus's picture
Update common.py
7fa9875 verified
import re
import json
from typing import Optional, Tuple
import numpy as np
import pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing, Holt
try:
from prophet import Prophet
_HAS_PROPHET = True
except Exception:
_HAS_PROPHET = False
_KEEP = re.compile(r"[^А-Яа-яЁё0-9 ,.!?:;()«»\"'–—\-•\n]")
def clean_ru(text: str) -> str:
text = _KEEP.sub(" ", text or "")
return re.sub(r"\s+", " ", text).strip()
def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
work = df.copy()
for col in list(work.columns):
lc = col.lower()
if lc in ("date", "дата"):
work.rename(columns={col: "date"}, inplace=True)
elif lc in ("amount", "сумма"):
work.rename(columns={col: "amount"}, inplace=True)
elif lc in ("category", "категория"):
work.rename(columns={col: "category"}, inplace=True)
elif lc in ("type", "тип"):
work.rename(columns={col: "type"}, inplace=True)
required = {"date", "amount", "type"}
missing = required - set(map(str, work.columns))
if missing:
raise ValueError(f"Отсутствуют колонки: {', '.join(sorted(missing))}")
work["date"] = pd.to_datetime(work["date"], errors="coerce")
work = work.dropna(subset=["date"])
work["amount"] = pd.to_numeric(work["amount"], errors="coerce").fillna(0.0)
if "category" not in work.columns:
work["category"] = "Без категории"
return work
def is_expense(t: str) -> bool:
t = str(t).strip().lower()
return t in {"expense", "расход", "расходы", "-", "e", "exp"}
def is_income(t: str) -> bool:
t = str(t).strip().lower()
return t in {"income", "доход", "+", "i", "inc"}
def prepare_components_series(
df: pd.DataFrame, freq: str = "M"
) -> Tuple[pd.Series, pd.Series, pd.Series]:
if df is None or df.empty:
raise ValueError("Пустая таблица транзакций.")
work = normalize_columns(df)
work["is_expense"] = work["type"].apply(is_expense)
work["is_income"] = work["type"].apply(is_income)
inc = (
work.loc[work["is_income"]]
.set_index("date")["amount"]
.resample(freq)
.sum()
.sort_index()
)
exp = (
work.loc[work["is_expense"]]
.set_index("date")["amount"]
.abs()
.mul(-1)
.resample(freq)
.sum()
.sort_index()
)
if not inc.empty or not exp.empty:
start = min([x.index.min() for x in [inc, exp] if not x.empty])
end = max([x.index.max() for x in [inc, exp] if not x.empty])
full_idx = pd.date_range(start, end, freq=freq)
inc = inc.reindex(full_idx, fill_value=0.0)
exp = exp.reindex(full_idx, fill_value=0.0)
net = inc + exp
inc.index.name = exp.index.name = net.index.name = "period_end"
return inc, exp, net
def fit_and_forecast(
history: pd.Series, steps: int, freq: str, method: str = "auto"
) -> pd.Series:
if len(history) < 3:
last = float(history.iloc[-1]) if len(history) else 0.0
start = (
history.index[-1]
if len(history)
else pd.Timestamp.today().normalize()
) + pd.tseries.frequencies.to_offset(freq)
idx = pd.date_range(start, periods=steps, freq=freq)
return pd.Series([last] * steps, index=idx, name="forecast")
use_prophet = False
if method == "prophet":
use_prophet = True
elif method == "auto":
if freq.startswith("A"): # годовая
use_prophet = _HAS_PROPHET and (len(history) >= 5)
else: # месячная
use_prophet = _HAS_PROPHET and (len(history) >= 18)
if use_prophet:
try:
pfreq = "Y" if freq.startswith("A") else "M"
dfp = history.reset_index()
dfp.columns = ["ds", "y"]
m = Prophet(
yearly_seasonality=(pfreq == "M"),
weekly_seasonality=False,
daily_seasonality=False,
seasonality_mode="additive",
)
m.fit(dfp)
future = m.make_future_dataframe(periods=steps, freq=pfreq)
fcst = m.predict(future).tail(steps)
yhat = pd.Series(
fcst["yhat"].values,
index=pd.DatetimeIndex(fcst["ds"]),
name="forecast",
)
if pfreq == "M":
yhat.index = yhat.index.to_period("M").to_timestamp(how="end")
else:
yhat.index = yhat.index.to_period("Y").to_timestamp(how="end")
if yhat.index.freq is None:
yhat.index = pd.date_range(
yhat.index[0],
periods=len(yhat),
freq=("A-DEC" if pfreq == "Y" else "M"),
)
return yhat
except Exception:
pass
# Holt / Holt-Winters
try:
if freq.startswith("A"):
model = Holt(history, initialization_method="estimated")
else:
if len(history) >= 24:
model = ExponentialSmoothing(
history,
trend="add",
seasonal="add",
seasonal_periods=12,
initialization_method="estimated",
)
else:
model = Holt(history, initialization_method="estimated")
fit = model.fit(optimized=True)
fc = fit.forecast(steps)
if not isinstance(fc.index, pd.DatetimeIndex) or len(fc.index) != steps:
start = history.index[-1] + pd.tseries.frequencies.to_offset(freq)
idx = pd.date_range(start, periods=steps, freq=freq)
fc = pd.Series(np.asarray(fc), index=idx, name="forecast")
return fc
except Exception:
tail = min(6, len(history))
baseline = float(history.tail(tail).mean()) if tail else 0.0
start = history.index[-1] + pd.tseries.frequencies.to_offset(freq)
idx = pd.date_range(start, periods=steps, freq=freq)
return pd.Series([baseline] * steps, index=idx, name="forecast")
def current_month_snapshot(df: pd.DataFrame) -> dict:
if df is None or df.empty:
return {}
w = normalize_columns(df)
w["is_income"] = w["type"].apply(is_income)
w["is_expense"] = w["type"].apply(is_expense)
lastp = w["date"].dt.to_period("M").max()
cur = w[w["date"].dt.to_period("M") == lastp].copy()
if cur.empty:
return {}
income_total = float(cur.loc[cur["is_income"], "amount"].sum())
expense_total = -float(cur.loc[cur["is_expense"], "amount"].abs().sum())
net = income_total + expense_total
exp_df = cur.loc[cur["is_expense"], ["category", "amount"]].copy()
exp_df["amount"] = -exp_df["amount"].abs()
top = (
exp_df.groupby("category")["amount"]
.sum()
.sort_values()
.head(5)
)
return {
"month": str(lastp),
"income_total": income_total,
"expense_total": expense_total,
"net": net,
"top_expense_categories": [
(str(k), float(v)) for k, v in top.items()
],
}
def read_json_stdin() -> dict:
import sys
raw = sys.stdin.read()
return json.loads(raw or "{}")
def write_json_stdout(obj) -> None:
import sys
sys.stdout.write(json.dumps(obj, ensure_ascii=False))
sys.stdout.flush()