import pandas as pd
import gradio as gr
import os
import requests
from dotenv import load_dotenv
from matplotlib.colors import LinearSegmentedColormap
import plotly.graph_objects as go
import numpy as np
from huggingface_hub import HfApi
from huggingface_hub.hf_api import HTTPError
from huggingface_hub.utils import GatedRepoError
from gradio_rangeslider import RangeSlider
import datetime
from title import css, TITLE_HTML, SUBTITLE_HTML, LINKS_HTML
from data_manager import DataManager, LongContextDataManager
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter
from collections import defaultdict
from longctx_utils import *
load_dotenv()
webhook_url = os.environ.get("WEBHOOK_URL")
metric_list = [
"Compression Ratio (%)",
"Bits Per Character (BPC)",
"Bits Per Byte (BPB)",
]
model_size_list = [
">20B",
"~14B",
# "~9B",
"~7B",
"~3B",
"~1.5B",
"Other",
]
metric_to_sheet = {
"Compression Ratio (%)": "cr",
"Bits Per Character (BPC)": "bpc",
"Bits Per Byte (BPB)": "bpb",
}
model_size_to_file_name = {
">20B": "20b+",
"~14B": "14b",
# "~9B": "9b",
"~7B": "7b",
"~3B": "3b",
"~1.5B": "1b5",
"Other": "other",
}
def read_about_md():
with open("about.md", "r", encoding="utf-8") as f:
return f.read()
def read_longctx_about_md():
with open("longctx_about.md", "r", encoding="utf-8") as f:
return f.read()
def update_table(
data_manager: DataManager,
period: str,
models_size: list,
metric: str,
visible_columns: list,
color_columns: list,
size_range: list,
midpoint: float = 0.5,
ascending: bool = True,
request: gr.Request = None,
):
is_dark_mode = request.is_dark if request else False
print(
f"Updating - time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}, period: {period}, models: {models_size}, metric: {metric}, visible_columns: {visible_columns}, color_columns: {color_columns}, size_range: {size_range}, ascending: {ascending}, is_dark: {is_dark_mode}\n"
)
target_file_name = [model_size_to_file_name[model] for model in models_size]
metric_code = metric_to_sheet[metric]
# 过滤掉不在当前 period 可用列中的列名,避免错误
if visible_columns:
available_columns = data_manager.get_available_columns(period)
visible_columns = [col for col in visible_columns if col in available_columns]
filtered_data = data_manager.query(
period=period,
metric_code=metric_code,
param_range=(size_range[0], size_range[1]),
model_groups=target_file_name,
visible_columns=visible_columns,
)
if len(filtered_data) == 0:
return "No data available for the selected models and period."
colors = ["#2ca02c", "#2b2b2b", "#d62728"] if is_dark_mode else ["#63be7b", "#ffffff", "#f8696b"]
vmin, vmax, vmid = {}, {}, {}
for column in filtered_data.columns:
if column in ["Name", "Params (B)"]:
continue
col_values = filtered_data[column].dropna()
if len(col_values) > 1:
sorted_values = np.sort(col_values)
vmin[column] = sorted_values.min()
vmax[column] = sorted_values.max()
idx = int(len(sorted_values) * midpoint)
vmid[column] = sorted_values[idx]
def custom_background_gradient(series, cmap, vmin_val, vmax_val, vmid_val):
if len(series) == 0:
return series
def normalize(x):
if pd.isna(x):
return 0.5 # Neutral for NaN
if vmid_val == vmin_val and x <= vmid_val:
return 0.0
if vmid_val == vmax_val and x >= vmid_val:
return 1.0
if vmid_val == vmin_val or vmid_val == vmax_val:
return 0.5
if x <= vmid_val:
return 0.5 * (x - vmin_val) / (vmid_val - vmin_val)
else:
return 0.5 + 0.5 * (x - vmid_val) / (vmax_val - vmid_val)
normed = series.apply(normalize)
cmap_colors = [cmap(x) for x in normed]
return ["background-color: rgba({}, {}, {}, {}); color: black;".format(*[int(255 * c) for c in color[:3]], color[3]) for color in cmap_colors]
target_color_columns = []
if "Average" in color_columns:
target_color_columns.append("Average (lower=better)")
if "Individual Tests" in color_columns:
target_color_columns.extend([col for col in filtered_data.columns if col not in ["Name", "Params (B)", "Average (lower=better)"]])
def color_params_column_dynamic(value):
if not pd.notna(value):
return "default"
if is_dark_mode:
return "background-color: #4b4936; color: #f0f0f0;"
else:
return "background-color: #fffdd0; color: black;"
formatter = {col: "{:.3f}" for col in filtered_data.columns if filtered_data[col].dtype in ["float64", "float32"]}
styler = filtered_data.style.format(formatter)
styler = styler.map(color_params_column_dynamic, subset=["Params (B)"])
for column in target_color_columns:
if column in vmin:
custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", colors)
styler = styler.apply(
custom_background_gradient, cmap=custom_cmap, vmin_val=vmin[column], vmax_val=vmax[column], vmid_val=vmid[column], subset=[column]
)
styler = styler.hide(axis="index")
widths = [250, 80, 80, 70, 70, 70, 70, 70, 70, 70, 70, 70, 70, 70]
table_styles = []
table_styles.append(
{
"selector": "th",
"props": [
("background-color", "var(--background-fill-secondary)"),
("color", "var(--body-text-color)"),
("padding", "8px"),
("font-weight", "bold"),
],
}
)
table_styles.append({"selector": "table", "props": [("border-collapse", "collapse"), ("border", f"1px solid var(--border-color-primary)")]})
for i, w in enumerate(widths):
table_styles.append(
{
"selector": f"th.col{i}, td.col{i}",
"props": [
("min-width", f"{w}px"),
("max-width", f"{w}px"),
("text-align", "center"),
("border", f"1px solid var(--border-color-primary)"),
],
}
)
styler = styler.set_table_styles(table_styles)
return styler.to_html()
def check_model_exists(model_id):
api = HfApi()
try:
model_info = api.model_info(model_id)
return "Exists and is accessible"
except GatedRepoError:
return "Exists but is restricted"
except HTTPError as e:
if e.response.status_code == 404:
return "Does not exist"
else:
return "Error: " + str(e)
def submit_model(name):
if "Exists" not in check_model_exists(name):
return f"# ERROR: Model {name} does not exist on Hugging Face!"
try:
response = requests.post(webhook_url, json={"content": name})
if response.status_code == 200:
response_data = response.json()
if response_data.get("status") == "success":
return "# SUCCESS: We will check the model as soon as possible. Thank you for your submission!"
else:
return f"# ERROR: {response_data.get('message', 'Unknown error')}"
else:
return f"# ERROR: Failed to submit model {name}. Server returned status code {response.status_code}."
except requests.exceptions.HTTPError:
return "# ERROR: Network error while contacting queue. Please try again in a few minutes."
except Exception as e:
print(e)
return "ERROR: Unexpected error. Please try again later."
def create_scaling_plot(data_manager: DataManager, period: str):
new_df = data_manager.query(
period=period,
metric_code="cr",
param_range=(0, 40),
model_groups=None,
visible_columns=None,
)
if len(new_df) == 0:
fig = go.Figure()
fig.update_layout(title={"text": "Compression Ratio Scaling Law", "x": 0.5}, width=800, height=600)
return fig
x_values = new_df["Params (B)"].astype(float).tolist()
y_values = new_df["Average (lower=better)"].astype(float).tolist()
names = new_df["Name"].tolist()
# 过滤掉无效值(NaN, 0, 负数)
valid_data = [(x, y, n) for x, y, n in zip(x_values, y_values, names) if x > 0 and y > 0 and not np.isnan(x) and not np.isnan(y)]
if len(valid_data) == 0:
fig = go.Figure()
fig.update_layout(title={"text": "Compression Ratio Scaling Law", "x": 0.5}, width=800, height=600)
return fig
x_values, y_values, names = zip(*valid_data)
x_values, y_values, names = list(x_values), list(y_values), list(names)
x_min, x_max = np.log10(min(x_values)), np.log10(max(x_values))
y_min, y_max = np.log10(min(y_values)), np.log10(max(y_values))
x_dtick = (x_max - x_min) / 4
y_dtick = (y_max - y_min) / 4
# 在对数空间中进行线性回归拟合
# log(y) = a * log(x) + b => y = 10^b * x^a
log_x = np.log10(np.array(x_values))
log_y = np.log10(np.array(y_values))
# 线性拟合: log_y = slope * log_x + intercept
slope, intercept = np.polyfit(log_x, log_y, 1)
# 计算 R² 值
log_y_pred = slope * log_x + intercept
ss_res = np.sum((log_y - log_y_pred) ** 2)
ss_tot = np.sum((log_y - np.mean(log_y)) ** 2)
r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
# 生成拟合线的点(在对数空间中是直线)
fit_x_log = np.linspace(x_min - 0.1, x_max + 0.1, 100)
fit_y_log = slope * fit_x_log + intercept
fit_x = 10**fit_x_log
fit_y = 10**fit_y_log
fig = go.Figure()
# 添加数据点
fig.add_trace(
go.Scatter(
x=x_values,
y=y_values,
mode="markers",
name="Model",
marker=dict(size=12, color="#39C5BB", opacity=0.8),
text=names,
customdata=list(zip(x_values, y_values)),
hovertemplate=(
"%{text}
" + "Params: %{customdata[0]:.2f}B
" + "Compression Ratio: %{customdata[1]:.2f}%
" + ""
),
)
)
# 添加拟合直线
fit_label = f"Fit: y = {10**intercept:.2f} × x^{slope:.3f} (R² = {r_squared:.3f})"
fig.add_trace(
go.Scatter(
x=fit_x.tolist(),
y=fit_y.tolist(),
mode="lines",
name=fit_label,
line=dict(color="#FF6B6B", width=2, dash="dash"),
hoverinfo="skip",
)
)
fig.update_layout(
title={"text": "Compression Ratio Scaling Law", "x": 0.5, "xanchor": "center", "yanchor": "top"},
width=800,
height=600,
showlegend=True,
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01,
bgcolor="rgba(255,255,255,0.8)",
),
xaxis=dict(
title="Parameters (B)",
showgrid=True,
zeroline=False,
type="log",
dtick=x_dtick,
tickformat=".2f",
range=[x_min - 0.1, x_max + 0.1],
),
yaxis=dict(
title="Compression Ratio (%)",
showgrid=True,
zeroline=False,
type="log",
dtick=y_dtick,
tickformat=".2f",
range=[y_min - 0.1, y_max + 0.1],
autorange="reversed",
),
)
return fig
def create_category_scaling_plot(data_manager: DataManager, period: str, selected_datasets: list):
"""
为每个选中的数据集绘制单独的 scaling law 拟合线
"""
new_df = data_manager.query(
period=period,
metric_code="cr",
param_range=(0, 40),
model_groups=None,
visible_columns=None,
)
if len(new_df) == 0 or not selected_datasets:
fig = go.Figure()
fig.update_layout(title={"text": "Scaling Law by Dataset", "x": 0.5}, width=1000, height=700)
return fig
# 颜色配色方案 - 使用高对比度、饱和度高的颜色
color_palette = [
"#1f77b4", # 蓝色
"#ff7f0e", # 橙色
"#2ca02c", # 绿色
"#d62728", # 红色
"#9467bd", # 紫色
"#8c564b", # 棕色
"#e377c2", # 粉色
"#17becf", # 青色
"#bcbd22", # 黄绿色
"#7f7f7f", # 灰色
]
fig = go.Figure()
# 用于计算全局坐标范围
all_x_values = []
all_y_values = []
# 为每个数据集创建散点图和拟合线
for idx, dataset in enumerate(selected_datasets):
if dataset not in new_df.columns:
continue
# 提取该数据集的数据
x_values = new_df["Params (B)"].astype(float).tolist()
y_values = new_df[dataset].astype(float).tolist()
names = new_df["Name"].tolist()
# 过滤掉无效值
valid_data = [(x, y, n) for x, y, n in zip(x_values, y_values, names) if x > 0 and y > 0 and not np.isnan(x) and not np.isnan(y)]
if len(valid_data) < 2: # 至少需要2个点才能拟合
continue
x_vals, y_vals, name_vals = zip(*valid_data)
x_vals, y_vals, name_vals = list(x_vals), list(y_vals), list(name_vals)
all_x_values.extend(x_vals)
all_y_values.extend(y_vals)
color = color_palette[idx % len(color_palette)]
# 在对数空间中进行线性回归拟合
log_x = np.log10(np.array(x_vals))
log_y = np.log10(np.array(y_vals))
slope, intercept = np.polyfit(log_x, log_y, 1)
# 计算 R² 值
log_y_pred = slope * log_x + intercept
ss_res = np.sum((log_y - log_y_pred) ** 2)
ss_tot = np.sum((log_y - np.mean(log_y)) ** 2)
r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
# 生成拟合线的点
x_min_local, x_max_local = np.log10(min(x_vals)), np.log10(max(x_vals))
fit_x_log = np.linspace(x_min_local - 0.05, x_max_local + 0.05, 100)
fit_y_log = slope * fit_x_log + intercept
fit_x = 10**fit_x_log
fit_y = 10**fit_y_log
# 添加数据点
fig.add_trace(
go.Scatter(
x=x_vals,
y=y_vals,
mode="markers",
name=f"{dataset}",
marker=dict(size=10, color=color, opacity=0.7),
text=name_vals,
customdata=list(zip(x_vals, y_vals)),
hovertemplate=(
f"%{{text}}
{dataset}
" + "Params: %{customdata[0]:.2f}B
" + "CR: %{customdata[1]:.2f}%
" + ""
),
legendgroup=dataset,
)
)
# 添加拟合直线
fit_label = f"{dataset} fit (slope={slope:.3f}, R²={r_squared:.3f})"
fig.add_trace(
go.Scatter(
x=fit_x.tolist(),
y=fit_y.tolist(),
mode="lines",
name=fit_label,
line=dict(color=color, width=2, dash="dash"),
hoverinfo="skip",
legendgroup=dataset,
showlegend=True,
)
)
if not all_x_values or not all_y_values:
fig = go.Figure()
fig.update_layout(title={"text": "Scaling Law by Dataset - No Valid Data", "x": 0.5}, width=1000, height=700)
return fig
# 计算全局坐标范围
x_min, x_max = np.log10(min(all_x_values)), np.log10(max(all_x_values))
y_min, y_max = np.log10(min(all_y_values)), np.log10(max(all_y_values))
x_dtick = (x_max - x_min) / 4
y_dtick = (y_max - y_min) / 4
fig.update_layout(
title={"text": "Scaling Law by Dataset", "x": 0.5, "xanchor": "center", "yanchor": "top"},
width=1000,
height=700,
showlegend=True,
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=1.02,
bgcolor="rgba(255,255,255,0.9)",
font=dict(size=10),
),
xaxis=dict(
title="Parameters (B)",
showgrid=True,
zeroline=False,
type="log",
dtick=x_dtick,
tickformat=".2f",
range=[x_min - 0.1, x_max + 0.1],
),
yaxis=dict(
title="Compression Ratio (%)",
showgrid=True,
zeroline=False,
type="log",
dtick=y_dtick,
tickformat=".2f",
range=[y_min - 0.1, y_max + 0.1],
autorange="reversed",
),
margin=dict(r=250), # 为图例预留空间
)
return fig
if __name__ == "__main__":
data_manager = DataManager("data")
time_list = data_manager.get_available_periods()
last_period = time_list[-1]
# Long Context Data
lc_dm = LongContextDataManager("longctx_data")
lc_periods = lc_dm.get_available_periods()
default_lc_period = lc_periods[-1]
MODE_ABS_AVG = "Absolute (Averaged by Model)"
MODE_ABS_SINGLE = "Absolute (By Dataset)"
MODE_REL_AVG = "Relative (Averaged by Model)"
MODE_REL_SINGLE = "Relative (By Dataset)"
lc_modes = [MODE_ABS_AVG, MODE_ABS_SINGLE, MODE_REL_AVG, MODE_REL_SINGLE]
default_lc_mode = MODE_ABS_AVG
# init_lc_choices = lc_dm.get_model_choices(default_lc_period)
init_lc_choices = lc_dm.get_model_choices(default_lc_period)
print(init_lc_choices)
def get_default_model(choices):
"""获取默认模型,优先选择 Qwen3-8B-Base,否则返回第一个模型"""
if not choices:
return None
for display_name, model_name in choices:
if model_name == "Qwen3-8B-Base":
return model_name
return choices[0][1]
def create_initial_lc_plot():
if not init_lc_choices:
return None
default_model = get_default_model(init_lc_choices)
data_map = {}
paths = lc_dm.get_paths_for_model(default_lc_period, default_model)
data_map[default_model] = paths
return draw_long_context_plot(default_lc_mode, data_map, None, 0.2, 32, 32, [None, None])
initial_lc_plot = create_initial_lc_plot()
initial_fig = create_scaling_plot(data_manager, last_period) if last_period else go.Figure()
initial_metric = metric_list[0]
initial_columns = data_manager.get_available_columns(last_period)
initial_colors = ["Average", "Individual Tests"]
initial_size_range = [0, 40]
initial_data = update_table(data_manager, last_period, model_size_list, initial_metric, initial_columns, initial_colors, initial_size_range)
theme = gr.themes.Default()
with gr.Blocks(theme=theme, css=css) as demo:
gr.HTML(TITLE_HTML)
gr.HTML(SUBTITLE_HTML)
gr.HTML(LINKS_HTML)
with gr.Tabs() as tabs:
with gr.Tab("🏆 Leaderboard"):
with gr.Row():
with gr.Column():
period_selector = gr.Dropdown(label="Period", choices=time_list, value=last_period)
metric_selector = gr.Dropdown(label="Metric", choices=metric_list, value=initial_metric)
model_selector = gr.CheckboxGroup(label="Model Size", choices=model_size_list, value=model_size_list)
size_range_slider = RangeSlider(minimum=0, maximum=40, value=[0, 40], step=0.1, label="Model Size Range")
midpoint_slider = gr.Slider(minimum=0.1, maximum=0.9, value=0.5, step=0.01, label="Color Gradient Midpoint")
color_selector = gr.CheckboxGroup(label="Colored Columns", choices=["Average", "Individual Tests"], value=initial_colors)
with gr.Column():
# Data Source 分组定义
code_cols = ["github cpp", "github python", "github javascript"]
research_cols = ["arxiv physics", "arxiv cs", "arxiv math"]
writing_cols = ["ao3 english", "github markdown"]
knowledge_cols = ["bbc news", "wikipedia english"]
initial_code = [c for c in code_cols if c in initial_columns]
initial_research = [c for c in research_cols if c in initial_columns]
initial_writing = [c for c in writing_cols if c in initial_columns]
initial_knowledge = [c for c in knowledge_cols if c in initial_columns]
with gr.Column(elem_classes=["data-source-box"]):
gr.Markdown("Data Sources")
# 代码 (Code)
with gr.Row():
toggle_code = gr.Checkbox(label="💻 Code", value=True, scale=0, min_width=150)
colfilter_code = gr.CheckboxGroup(
choices=initial_code, value=initial_code, show_label=False, scale=3, elem_classes=["aligned-checkboxes"]
)
# 科研 (Research)
with gr.Row():
toggle_research = gr.Checkbox(label="🔬 Science", value=True, scale=0, min_width=150)
colfilter_research = gr.CheckboxGroup(
choices=initial_research, value=initial_research, show_label=False, scale=3, elem_classes=["aligned-checkboxes"]
)
# 世界知识 (World Knowledge)
with gr.Row():
toggle_knowledge = gr.Checkbox(label="📖 Knowledge", value=True, scale=0, min_width=150)
colfilter_knowledge = gr.CheckboxGroup(
choices=initial_knowledge, value=initial_knowledge, show_label=False, scale=3, elem_classes=["aligned-checkboxes"]
)
# 写作 (Writing)
with gr.Row():
toggle_writing = gr.Checkbox(label="✍️ Writing", value=True, scale=0, min_width=150)
colfilter_writing = gr.CheckboxGroup(
choices=initial_writing, value=initial_writing, show_label=False, scale=3, elem_classes=["aligned-checkboxes"]
)
# 多语言 (Multilingual) - Coming Soon
with gr.Row():
gr.Checkbox(label="🌍 Multilingual (Coming Soon)", value=False, interactive=False, scale=0, min_width=250)
table = gr.HTML(initial_data)
def update_table_wrapper(
period, models_size, metric, code_sel, research_sel, writing_sel, knowledge_sel, color_columns, size_range, midpoint
):
visible_columns = code_sel + research_sel + writing_sel + knowledge_sel
return update_table(data_manager, period, models_size, metric, visible_columns, color_columns, size_range, midpoint)
def update_column_choices(period, cur_code, cur_research, cur_writing, cur_knowledge):
if not period:
empty = gr.update(choices=[], value=[])
return empty, empty, empty, empty
columns = data_manager.get_available_columns(period)
new_code = [c for c in code_cols if c in columns]
new_research = [c for c in research_cols if c in columns]
new_writing = [c for c in writing_cols if c in columns]
new_knowledge = [c for c in knowledge_cols if c in columns]
sel_code = [c for c in cur_code if c in new_code] if cur_code else new_code
sel_research = [c for c in cur_research if c in new_research] if cur_research else new_research
sel_writing = [c for c in cur_writing if c in new_writing] if cur_writing else new_writing
sel_knowledge = [c for c in cur_knowledge if c in new_knowledge] if cur_knowledge else new_knowledge
if not sel_code:
sel_code = new_code
if not sel_research:
sel_research = new_research
if not sel_writing:
sel_writing = new_writing
if not sel_knowledge:
sel_knowledge = new_knowledge
return (
gr.update(choices=new_code, value=sel_code),
gr.update(choices=new_research, value=sel_research),
gr.update(choices=new_writing, value=sel_writing),
gr.update(choices=new_knowledge, value=sel_knowledge),
)
# 总开关功能
def toggle_group(enabled, group_cols, available_cols):
valid_cols = [c for c in group_cols if c in available_cols]
return valid_cols if enabled else []
toggle_code.change(lambda enabled: toggle_group(enabled, code_cols, initial_columns), inputs=[toggle_code], outputs=[colfilter_code])
toggle_research.change(
lambda enabled: toggle_group(enabled, research_cols, initial_columns), inputs=[toggle_research], outputs=[colfilter_research]
)
toggle_writing.change(
lambda enabled: toggle_group(enabled, writing_cols, initial_columns), inputs=[toggle_writing], outputs=[colfilter_writing]
)
toggle_knowledge.change(
lambda enabled: toggle_group(enabled, knowledge_cols, initial_columns), inputs=[toggle_knowledge], outputs=[colfilter_knowledge]
)
shared_inputs = [
period_selector,
model_selector,
metric_selector,
colfilter_code,
colfilter_research,
colfilter_writing,
colfilter_knowledge,
color_selector,
size_range_slider,
midpoint_slider,
]
period_selector.change(
update_column_choices,
inputs=[period_selector, colfilter_code, colfilter_research, colfilter_writing, colfilter_knowledge],
outputs=[colfilter_code, colfilter_research, colfilter_writing, colfilter_knowledge],
)
period_selector.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
model_selector.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
metric_selector.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
colfilter_code.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
colfilter_research.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
colfilter_writing.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
colfilter_knowledge.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
color_selector.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
size_range_slider.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
midpoint_slider.change(update_table_wrapper, inputs=shared_inputs, outputs=table)
with gr.Tab("📚 Long Context"):
gr.Markdown(read_longctx_about_md())
with gr.Row():
with gr.Column(scale=1):
lc_period_dropdown = gr.Dropdown(label="Period", choices=lc_periods, value=default_lc_period)
lc_mode_radio = gr.Radio(label="Visualization Mode", choices=lc_modes, value=default_lc_mode)
gr.Markdown("### Model / Dataset Selection")
default_model = get_default_model(init_lc_choices)
default_selected_models = [default_model] if default_model else []
lc_select_abs = gr.Dropdown(
label="Select Models", choices=init_lc_choices, value=default_selected_models, multiselect=True, visible=True
)
lc_select_base = gr.Dropdown(
label="Baseline Model",
choices=init_lc_choices,
value=None,
multiselect=False,
visible=False,
)
lc_select_comp = gr.Dropdown(label="Comparison Models", choices=init_lc_choices, value=[], multiselect=True, visible=False)
# By Dataset mode selectors
init_dataset_choices = lc_dm.get_dataset_choices(default_lc_period) if default_lc_period else []
default_selected_datasets = [init_dataset_choices[0][1]] if init_dataset_choices else []
lc_select_datasets = gr.Dropdown(
label="Select Datasets", choices=init_dataset_choices, value=default_selected_datasets, multiselect=True, visible=False
)
lc_select_models_single = gr.Dropdown(
label="Select Models", choices=init_lc_choices, value=default_selected_models, multiselect=True, visible=False
)
lc_select_base_model_single = gr.Dropdown(
label="Baseline Model",
choices=init_lc_choices,
value=None,
multiselect=False,
visible=False,
)
lc_select_comp_models_single = gr.Dropdown(
label="Comparison Models", choices=init_lc_choices, value=[], multiselect=True, visible=False
)
with gr.Accordion("Advanced Settings", open=True):
lc_smooth = gr.Slider(1, 125, 32, step=1, label="Smooth Window")
lc_cutoff = gr.Slider(0.1, 1.0, 0.2, step=0.05, label="Cutoff Ratio")
lc_offset = gr.Number(32, label="Start Offset (Bytes)")
with gr.Row():
lc_ymin = gr.Textbox(label="Y Min", placeholder="Auto", value="")
lc_ymax = gr.Textbox(label="Y Max", placeholder="Auto", value="")
lc_btn_plot = gr.Button("Visualize", variant="primary")
with gr.Column(scale=3):
lc_plot_output = gr.Plot(label="Visualization Result", value=initial_lc_plot)
def update_lc_inputs(period, mode):
if not period:
return tuple([gr.update()] * 7)
is_model_agg = "Averaged by Model" in mode
is_single_dataset = "By Dataset" in mode
is_relative = "Relative" in mode
def get_default_model(choices):
"""获取默认模型,优先选择 Qwen3-8B-Base,否则返回第一个模型"""
if not choices:
return None
for display_name, model_name in choices:
if model_name == "Qwen3-8B-Base":
return model_name
return choices[0][1] if choices else None
if is_model_agg:
# Averaged by Model mode - use existing logic
choices = lc_dm.get_model_choices(period)
label_suffix = "Models"
if not is_relative:
# Absolute (Averaged by Model) - 默认选择 Qwen3-8B-Base
default_model = get_default_model(choices)
default_selected = [default_model] if default_model else []
return (
gr.update(visible=True, choices=choices, label=f"Select {label_suffix}", value=default_selected),
gr.update(visible=False, choices=choices, value=None),
gr.update(visible=False, choices=choices, value=[]),
gr.update(visible=False, value=[]),
gr.update(visible=False, value=[]),
gr.update(visible=False, value=None),
gr.update(visible=False, value=[]),
)
else:
default_baseline = get_default_model(choices)
return (
gr.update(visible=False, choices=choices, value=[]),
gr.update(visible=True, choices=choices, label=f"Baseline", value=default_baseline),
gr.update(visible=True, choices=choices, label=f"Comparison", value=[]),
gr.update(visible=False, value=[]),
gr.update(visible=False, value=[]),
gr.update(visible=False, value=None),
gr.update(visible=False, value=[]),
)
else:
# By Dataset mode
dataset_choices = lc_dm.get_dataset_choices(period)
model_choices = lc_dm.get_model_choices(period)
if not is_relative:
# Absolute By Dataset - 默认选择 Qwen3-8B-Base
default_model = get_default_model(model_choices)
default_selected = [default_model] if default_model else []
return (
gr.update(visible=False, value=[]),
gr.update(visible=False, value=None),
gr.update(visible=False, value=[]),
gr.update(visible=True, choices=dataset_choices, value=[]),
gr.update(visible=True, choices=model_choices, value=default_selected),
gr.update(visible=False, value=None),
gr.update(visible=False, value=[]),
)
else:
# Relative By Dataset - use same datasets for all models
default_baseline = get_default_model(model_choices)
return (
gr.update(visible=False, value=[]),
gr.update(visible=False, value=None),
gr.update(visible=False, value=[]),
gr.update(visible=True, choices=dataset_choices, value=[]),
gr.update(visible=False, value=[]),
gr.update(visible=True, choices=model_choices, value=default_baseline),
gr.update(visible=True, choices=model_choices, value=[]),
)
lc_period_dropdown.change(
fn=update_lc_inputs,
inputs=[lc_period_dropdown, lc_mode_radio],
outputs=[
lc_select_abs,
lc_select_base,
lc_select_comp,
lc_select_datasets,
lc_select_models_single,
lc_select_base_model_single,
lc_select_comp_models_single,
],
)
lc_mode_radio.change(
fn=update_lc_inputs,
inputs=[lc_period_dropdown, lc_mode_radio],
outputs=[
lc_select_abs,
lc_select_base,
lc_select_comp,
lc_select_datasets,
lc_select_models_single,
lc_select_base_model_single,
lc_select_comp_models_single,
],
)
def run_lc_plot(
mode,
period,
sel_abs,
sel_base,
sel_comp,
sel_datasets,
sel_models_single,
sel_base_model_single,
sel_comp_models_single,
smooth,
cutoff,
offset,
ymin,
ymax,
):
data_map = {}
baseline_key = None
is_model_agg = "Averaged by Model" in mode
is_relative = "Relative" in mode
if is_model_agg:
# Averaged by Model mode - existing logic
if not is_relative:
selection = sel_abs
else:
if not sel_base:
return None
selection = [sel_base] + sel_comp
baseline_key = sel_base
if not selection:
return None
for item in selection:
paths = lc_dm.get_paths_for_model(period, item)
if paths:
data_map[item] = paths
else:
# By Dataset mode
if not is_relative:
# Absolute By Dataset
if not sel_datasets or not sel_models_single:
return None
for model_name in sel_models_single:
paths = lc_dm.get_paths_for_model_and_datasets(period, model_name, sel_datasets)
if paths:
data_map[model_name] = paths
else:
# Relative By Dataset - use same datasets for all models
if not sel_datasets or not sel_base_model_single:
return None
# Baseline model with selected datasets (averaged)
baseline_paths = lc_dm.get_paths_for_model_and_datasets(period, sel_base_model_single, sel_datasets)
if baseline_paths:
baseline_key = sel_base_model_single
data_map[baseline_key] = baseline_paths
# Comparison models with same datasets (averaged)
if sel_comp_models_single:
for model_name in sel_comp_models_single:
paths = lc_dm.get_paths_for_model_and_datasets(period, model_name, sel_datasets)
if paths:
data_map[model_name] = paths
if not data_map:
return None
def _to_float_or_none(val):
if val is None:
return None
s = str(val).strip()
if not s:
return None
try:
return float(s)
except ValueError:
return None
ymin = _to_float_or_none(ymin)
ymax = _to_float_or_none(ymax)
y_range = [ymin, ymax]
return draw_long_context_plot(mode, data_map, baseline_key, cutoff, smooth, int(offset), y_range)
lc_btn_plot.click(
fn=run_lc_plot,
inputs=[
lc_mode_radio,
lc_period_dropdown,
lc_select_abs,
lc_select_base,
lc_select_comp,
lc_select_datasets,
lc_select_models_single,
lc_select_base_model_single,
lc_select_comp_models_single,
lc_smooth,
lc_cutoff,
lc_offset,
lc_ymin,
lc_ymax,
],
outputs=lc_plot_output,
)
with gr.Tab("📈 Scaling Law"):
gr.Markdown("### Compression Ratio Scaling Law")
gr.Markdown("Explore how compression ratio scales with model parameters across different datasets.")
# 显示模式选择
MODE_OVERALL = "📊 Overall (Average)"
MODE_BY_DATASET = "📈 By Dataset"
scaling_modes = [MODE_OVERALL, MODE_BY_DATASET]
# 数据集列表
all_datasets = [
"github cpp",
"github python",
"github javascript",
"arxiv physics",
"arxiv cs",
"arxiv math",
"ao3 english",
"github markdown",
"bbc news",
"wikipedia english",
]
initial_datasets = all_datasets[:4]
with gr.Row():
with gr.Column(scale=1):
scaling_period_selector = gr.Dropdown(label="Period", choices=time_list, value=last_period)
scaling_mode_radio = gr.Radio(label="Display Mode", choices=scaling_modes, value=MODE_OVERALL)
# 数据集选择器(初始隐藏)
scaling_dataset_selector = gr.CheckboxGroup(
label="Select Datasets", choices=all_datasets, value=initial_datasets, visible=False
)
with gr.Column(scale=3):
initial_scaling_fig = create_scaling_plot(data_manager, last_period) if last_period else go.Figure()
scaling_plot = gr.Plot(initial_scaling_fig)
def update_scaling_mode_visibility(mode):
"""根据模式切换数据集选择器的可见性"""
is_by_dataset = mode == MODE_BY_DATASET
return gr.update(visible=is_by_dataset)
def update_scaling_plot_unified(period, mode, datasets):
"""统一的绘图更新函数"""
if mode == MODE_OVERALL:
return create_scaling_plot(data_manager, period)
else: # MODE_BY_DATASET
return create_category_scaling_plot(data_manager, period, datasets)
# 模式切换时更新可见性和图表
scaling_mode_radio.change(fn=update_scaling_mode_visibility, inputs=[scaling_mode_radio], outputs=[scaling_dataset_selector])
scaling_mode_radio.change(
fn=update_scaling_plot_unified,
inputs=[scaling_period_selector, scaling_mode_radio, scaling_dataset_selector],
outputs=scaling_plot,
)
# Period 改变时更新图表
scaling_period_selector.change(
fn=update_scaling_plot_unified,
inputs=[scaling_period_selector, scaling_mode_radio, scaling_dataset_selector],
outputs=scaling_plot,
)
# 数据集选择改变时更新图表
scaling_dataset_selector.change(
fn=update_scaling_plot_unified,
inputs=[scaling_period_selector, scaling_mode_radio, scaling_dataset_selector],
outputs=scaling_plot,
)
with gr.Tab("ℹ️ About"):
gr.Markdown(read_about_md())
with gr.Tab("🚀 Submit"):
with gr.Group():
with gr.Row():
model_name = gr.Textbox(max_lines=1, placeholder="Enter model name...", show_label=False, scale=4)
submit = gr.Button("Submit", variant="primary", scale=0)
output = gr.Markdown("# Enter a public HF repo id, then hit Submit to add it to the evaluation queue.")
submit.click(fn=submit_model, inputs=model_name, outputs=output)
demo.launch(share=False)