import pandas as pd import gradio as gr import os import requests from dotenv import load_dotenv from matplotlib.colors import LinearSegmentedColormap import plotly.graph_objects as go import numpy as np from huggingface_hub import HfApi from huggingface_hub.hf_api import HTTPError from huggingface_hub.utils import GatedRepoError from gradio_rangeslider import RangeSlider import datetime from title import css, TITLE_HTML, SUBTITLE_HTML, LINKS_HTML from data_manager import DataManager, LongContextDataManager import matplotlib.pyplot as plt from matplotlib.ticker import ScalarFormatter from collections import defaultdict from longctx_utils import * load_dotenv() webhook_url = os.environ.get("WEBHOOK_URL") metric_list = [ "Compression Ratio (%)", "Bits Per Character (BPC)", "Bits Per Byte (BPB)", ] model_size_list = [ ">20B", "~14B", # "~9B", "~7B", "~3B", "~1.5B", "Other", ] metric_to_sheet = { "Compression Ratio (%)": "cr", "Bits Per Character (BPC)": "bpc", "Bits Per Byte (BPB)": "bpb", } model_size_to_file_name = { ">20B": "20b+", "~14B": "14b", # "~9B": "9b", "~7B": "7b", "~3B": "3b", "~1.5B": "1b5", "Other": "other", } def read_about_md(): with open("about.md", "r", encoding="utf-8") as f: return f.read() def read_longctx_about_md(): with open("longctx_about.md", "r", encoding="utf-8") as f: return f.read() def update_table( data_manager: DataManager, period: str, models_size: list, metric: str, visible_columns: list, color_columns: list, size_range: list, midpoint: float = 0.5, ascending: bool = True, request: gr.Request = None, ): is_dark_mode = request.is_dark if request else False print( f"Updating - time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}, period: {period}, models: {models_size}, metric: {metric}, visible_columns: {visible_columns}, color_columns: {color_columns}, size_range: {size_range}, ascending: {ascending}, is_dark: {is_dark_mode}\n" ) target_file_name = [model_size_to_file_name[model] for model in models_size] metric_code = metric_to_sheet[metric] # 过滤掉不在当前 period 可用列中的列名,避免错误 if visible_columns: available_columns = data_manager.get_available_columns(period) visible_columns = [col for col in visible_columns if col in available_columns] filtered_data = data_manager.query( period=period, metric_code=metric_code, param_range=(size_range[0], size_range[1]), model_groups=target_file_name, visible_columns=visible_columns, ) if len(filtered_data) == 0: return "No data available for the selected models and period." colors = ["#2ca02c", "#2b2b2b", "#d62728"] if is_dark_mode else ["#63be7b", "#ffffff", "#f8696b"] vmin, vmax, vmid = {}, {}, {} for column in filtered_data.columns: if column in ["Name", "Params (B)"]: continue col_values = filtered_data[column].dropna() if len(col_values) > 1: sorted_values = np.sort(col_values) vmin[column] = sorted_values.min() vmax[column] = sorted_values.max() idx = int(len(sorted_values) * midpoint) vmid[column] = sorted_values[idx] def custom_background_gradient(series, cmap, vmin_val, vmax_val, vmid_val): if len(series) == 0: return series def normalize(x): if pd.isna(x): return 0.5 # Neutral for NaN if vmid_val == vmin_val and x <= vmid_val: return 0.0 if vmid_val == vmax_val and x >= vmid_val: return 1.0 if vmid_val == vmin_val or vmid_val == vmax_val: return 0.5 if x <= vmid_val: return 0.5 * (x - vmin_val) / (vmid_val - vmin_val) else: return 0.5 + 0.5 * (x - vmid_val) / (vmax_val - vmid_val) normed = series.apply(normalize) cmap_colors = [cmap(x) for x in normed] return ["background-color: rgba({}, {}, {}, {}); color: black;".format(*[int(255 * c) for c in color[:3]], color[3]) for color in cmap_colors] target_color_columns = [] if "Average" in color_columns: target_color_columns.append("Average (lower=better)") if "Individual Tests" in color_columns: target_color_columns.extend([col for col in filtered_data.columns if col not in ["Name", "Params (B)", "Average (lower=better)"]]) def color_params_column_dynamic(value): if not pd.notna(value): return "default" if is_dark_mode: return "background-color: #4b4936; color: #f0f0f0;" else: return "background-color: #fffdd0; color: black;" formatter = {col: "{:.3f}" for col in filtered_data.columns if filtered_data[col].dtype in ["float64", "float32"]} styler = filtered_data.style.format(formatter) styler = styler.map(color_params_column_dynamic, subset=["Params (B)"]) for column in target_color_columns: if column in vmin: custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", colors) styler = styler.apply( custom_background_gradient, cmap=custom_cmap, vmin_val=vmin[column], vmax_val=vmax[column], vmid_val=vmid[column], subset=[column] ) styler = styler.hide(axis="index") widths = [250, 80, 80, 70, 70, 70, 70, 70, 70, 70, 70, 70, 70, 70] table_styles = [] table_styles.append( { "selector": "th", "props": [ ("background-color", "var(--background-fill-secondary)"), ("color", "var(--body-text-color)"), ("padding", "8px"), ("font-weight", "bold"), ], } ) table_styles.append({"selector": "table", "props": [("border-collapse", "collapse"), ("border", f"1px solid var(--border-color-primary)")]}) for i, w in enumerate(widths): table_styles.append( { "selector": f"th.col{i}, td.col{i}", "props": [ ("min-width", f"{w}px"), ("max-width", f"{w}px"), ("text-align", "center"), ("border", f"1px solid var(--border-color-primary)"), ], } ) styler = styler.set_table_styles(table_styles) return styler.to_html() def check_model_exists(model_id): api = HfApi() try: model_info = api.model_info(model_id) return "Exists and is accessible" except GatedRepoError: return "Exists but is restricted" except HTTPError as e: if e.response.status_code == 404: return "Does not exist" else: return "Error: " + str(e) def submit_model(name): if "Exists" not in check_model_exists(name): return f"# ERROR: Model {name} does not exist on Hugging Face!" try: response = requests.post(webhook_url, json={"content": name}) if response.status_code == 200: response_data = response.json() if response_data.get("status") == "success": return "# SUCCESS: We will check the model as soon as possible. Thank you for your submission!" else: return f"# ERROR: {response_data.get('message', 'Unknown error')}" else: return f"# ERROR: Failed to submit model {name}. Server returned status code {response.status_code}." except requests.exceptions.HTTPError: return "# ERROR: Network error while contacting queue. Please try again in a few minutes." except Exception as e: print(e) return "ERROR: Unexpected error. Please try again later." def create_scaling_plot(data_manager: DataManager, period: str): new_df = data_manager.query( period=period, metric_code="cr", param_range=(0, 40), model_groups=None, visible_columns=None, ) if len(new_df) == 0: fig = go.Figure() fig.update_layout(title={"text": "Compression Ratio Scaling Law", "x": 0.5}, width=800, height=600) return fig x_values = new_df["Params (B)"].astype(float).tolist() y_values = new_df["Average (lower=better)"].astype(float).tolist() names = new_df["Name"].tolist() # 过滤掉无效值(NaN, 0, 负数) valid_data = [(x, y, n) for x, y, n in zip(x_values, y_values, names) if x > 0 and y > 0 and not np.isnan(x) and not np.isnan(y)] if len(valid_data) == 0: fig = go.Figure() fig.update_layout(title={"text": "Compression Ratio Scaling Law", "x": 0.5}, width=800, height=600) return fig x_values, y_values, names = zip(*valid_data) x_values, y_values, names = list(x_values), list(y_values), list(names) x_min, x_max = np.log10(min(x_values)), np.log10(max(x_values)) y_min, y_max = np.log10(min(y_values)), np.log10(max(y_values)) x_dtick = (x_max - x_min) / 4 y_dtick = (y_max - y_min) / 4 # 在对数空间中进行线性回归拟合 # log(y) = a * log(x) + b => y = 10^b * x^a log_x = np.log10(np.array(x_values)) log_y = np.log10(np.array(y_values)) # 线性拟合: log_y = slope * log_x + intercept slope, intercept = np.polyfit(log_x, log_y, 1) # 计算 R² 值 log_y_pred = slope * log_x + intercept ss_res = np.sum((log_y - log_y_pred) ** 2) ss_tot = np.sum((log_y - np.mean(log_y)) ** 2) r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 # 生成拟合线的点(在对数空间中是直线) fit_x_log = np.linspace(x_min - 0.1, x_max + 0.1, 100) fit_y_log = slope * fit_x_log + intercept fit_x = 10**fit_x_log fit_y = 10**fit_y_log fig = go.Figure() # 添加数据点 fig.add_trace( go.Scatter( x=x_values, y=y_values, mode="markers", name="Model", marker=dict(size=12, color="#39C5BB", opacity=0.8), text=names, customdata=list(zip(x_values, y_values)), hovertemplate=( "%{text}
" + "Params: %{customdata[0]:.2f}B
" + "Compression Ratio: %{customdata[1]:.2f}%
" + "" ), ) ) # 添加拟合直线 fit_label = f"Fit: y = {10**intercept:.2f} × x^{slope:.3f} (R² = {r_squared:.3f})" fig.add_trace( go.Scatter( x=fit_x.tolist(), y=fit_y.tolist(), mode="lines", name=fit_label, line=dict(color="#FF6B6B", width=2, dash="dash"), hoverinfo="skip", ) ) fig.update_layout( title={"text": "Compression Ratio Scaling Law", "x": 0.5, "xanchor": "center", "yanchor": "top"}, width=800, height=600, showlegend=True, legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor="rgba(255,255,255,0.8)", ), xaxis=dict( title="Parameters (B)", showgrid=True, zeroline=False, type="log", dtick=x_dtick, tickformat=".2f", range=[x_min - 0.1, x_max + 0.1], ), yaxis=dict( title="Compression Ratio (%)", showgrid=True, zeroline=False, type="log", dtick=y_dtick, tickformat=".2f", range=[y_min - 0.1, y_max + 0.1], autorange="reversed", ), ) return fig def create_category_scaling_plot(data_manager: DataManager, period: str, selected_datasets: list): """ 为每个选中的数据集绘制单独的 scaling law 拟合线 """ new_df = data_manager.query( period=period, metric_code="cr", param_range=(0, 40), model_groups=None, visible_columns=None, ) if len(new_df) == 0 or not selected_datasets: fig = go.Figure() fig.update_layout(title={"text": "Scaling Law by Dataset", "x": 0.5}, width=1000, height=700) return fig # 颜色配色方案 - 使用高对比度、饱和度高的颜色 color_palette = [ "#1f77b4", # 蓝色 "#ff7f0e", # 橙色 "#2ca02c", # 绿色 "#d62728", # 红色 "#9467bd", # 紫色 "#8c564b", # 棕色 "#e377c2", # 粉色 "#17becf", # 青色 "#bcbd22", # 黄绿色 "#7f7f7f", # 灰色 ] fig = go.Figure() # 用于计算全局坐标范围 all_x_values = [] all_y_values = [] # 为每个数据集创建散点图和拟合线 for idx, dataset in enumerate(selected_datasets): if dataset not in new_df.columns: continue # 提取该数据集的数据 x_values = new_df["Params (B)"].astype(float).tolist() y_values = new_df[dataset].astype(float).tolist() names = new_df["Name"].tolist() # 过滤掉无效值 valid_data = [(x, y, n) for x, y, n in zip(x_values, y_values, names) if x > 0 and y > 0 and not np.isnan(x) and not np.isnan(y)] if len(valid_data) < 2: # 至少需要2个点才能拟合 continue x_vals, y_vals, name_vals = zip(*valid_data) x_vals, y_vals, name_vals = list(x_vals), list(y_vals), list(name_vals) all_x_values.extend(x_vals) all_y_values.extend(y_vals) color = color_palette[idx % len(color_palette)] # 在对数空间中进行线性回归拟合 log_x = np.log10(np.array(x_vals)) log_y = np.log10(np.array(y_vals)) slope, intercept = np.polyfit(log_x, log_y, 1) # 计算 R² 值 log_y_pred = slope * log_x + intercept ss_res = np.sum((log_y - log_y_pred) ** 2) ss_tot = np.sum((log_y - np.mean(log_y)) ** 2) r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 # 生成拟合线的点 x_min_local, x_max_local = np.log10(min(x_vals)), np.log10(max(x_vals)) fit_x_log = np.linspace(x_min_local - 0.05, x_max_local + 0.05, 100) fit_y_log = slope * fit_x_log + intercept fit_x = 10**fit_x_log fit_y = 10**fit_y_log # 添加数据点 fig.add_trace( go.Scatter( x=x_vals, y=y_vals, mode="markers", name=f"{dataset}", marker=dict(size=10, color=color, opacity=0.7), text=name_vals, customdata=list(zip(x_vals, y_vals)), hovertemplate=( f"%{{text}}
{dataset}
" + "Params: %{customdata[0]:.2f}B
" + "CR: %{customdata[1]:.2f}%
" + "" ), legendgroup=dataset, ) ) # 添加拟合直线 fit_label = f"{dataset} fit (slope={slope:.3f}, R²={r_squared:.3f})" fig.add_trace( go.Scatter( x=fit_x.tolist(), y=fit_y.tolist(), mode="lines", name=fit_label, line=dict(color=color, width=2, dash="dash"), hoverinfo="skip", legendgroup=dataset, showlegend=True, ) ) if not all_x_values or not all_y_values: fig = go.Figure() fig.update_layout(title={"text": "Scaling Law by Dataset - No Valid Data", "x": 0.5}, width=1000, height=700) return fig # 计算全局坐标范围 x_min, x_max = np.log10(min(all_x_values)), np.log10(max(all_x_values)) y_min, y_max = np.log10(min(all_y_values)), np.log10(max(all_y_values)) x_dtick = (x_max - x_min) / 4 y_dtick = (y_max - y_min) / 4 fig.update_layout( title={"text": "Scaling Law by Dataset", "x": 0.5, "xanchor": "center", "yanchor": "top"}, width=1000, height=700, showlegend=True, legend=dict( yanchor="top", y=0.99, xanchor="left", x=1.02, bgcolor="rgba(255,255,255,0.9)", font=dict(size=10), ), xaxis=dict( title="Parameters (B)", showgrid=True, zeroline=False, type="log", dtick=x_dtick, tickformat=".2f", range=[x_min - 0.1, x_max + 0.1], ), yaxis=dict( title="Compression Ratio (%)", showgrid=True, zeroline=False, type="log", dtick=y_dtick, tickformat=".2f", range=[y_min - 0.1, y_max + 0.1], autorange="reversed", ), margin=dict(r=250), # 为图例预留空间 ) return fig if __name__ == "__main__": data_manager = DataManager("data") time_list = data_manager.get_available_periods() last_period = time_list[-1] # Long Context Data lc_dm = LongContextDataManager("longctx_data") lc_periods = lc_dm.get_available_periods() default_lc_period = lc_periods[-1] MODE_ABS_AVG = "Absolute (Averaged by Model)" MODE_ABS_SINGLE = "Absolute (By Dataset)" MODE_REL_AVG = "Relative (Averaged by Model)" MODE_REL_SINGLE = "Relative (By Dataset)" lc_modes = [MODE_ABS_AVG, MODE_ABS_SINGLE, MODE_REL_AVG, MODE_REL_SINGLE] default_lc_mode = MODE_ABS_AVG # init_lc_choices = lc_dm.get_model_choices(default_lc_period) init_lc_choices = lc_dm.get_model_choices(default_lc_period) print(init_lc_choices) def get_default_model(choices): """获取默认模型,优先选择 Qwen3-8B-Base,否则返回第一个模型""" if not choices: return None for display_name, model_name in choices: if model_name == "Qwen3-8B-Base": return model_name return choices[0][1] def create_initial_lc_plot(): if not init_lc_choices: return None default_model = get_default_model(init_lc_choices) data_map = {} paths = lc_dm.get_paths_for_model(default_lc_period, default_model) data_map[default_model] = paths return draw_long_context_plot(default_lc_mode, data_map, None, 0.2, 32, 32, [None, None]) initial_lc_plot = create_initial_lc_plot() initial_fig = create_scaling_plot(data_manager, last_period) if last_period else go.Figure() initial_metric = metric_list[0] initial_columns = data_manager.get_available_columns(last_period) initial_colors = ["Average", "Individual Tests"] initial_size_range = [0, 40] initial_data = update_table(data_manager, last_period, model_size_list, initial_metric, initial_columns, initial_colors, initial_size_range) theme = gr.themes.Default() with gr.Blocks(theme=theme, css=css) as demo: gr.HTML(TITLE_HTML) gr.HTML(SUBTITLE_HTML) gr.HTML(LINKS_HTML) with gr.Tabs() as tabs: with gr.Tab("🏆 Leaderboard"): with gr.Row(): with gr.Column(): period_selector = gr.Dropdown(label="Period", choices=time_list, value=last_period) metric_selector = gr.Dropdown(label="Metric", choices=metric_list, value=initial_metric) model_selector = gr.CheckboxGroup(label="Model Size", choices=model_size_list, value=model_size_list) size_range_slider = RangeSlider(minimum=0, maximum=40, value=[0, 40], step=0.1, label="Model Size Range") midpoint_slider = gr.Slider(minimum=0.1, maximum=0.9, value=0.5, step=0.01, label="Color Gradient Midpoint") color_selector = gr.CheckboxGroup(label="Colored Columns", choices=["Average", "Individual Tests"], value=initial_colors) with gr.Column(): # Data Source 分组定义 code_cols = ["github cpp", "github python", "github javascript"] research_cols = ["arxiv physics", "arxiv cs", "arxiv math"] writing_cols = ["ao3 english", "github markdown"] knowledge_cols = ["bbc news", "wikipedia english"] initial_code = [c for c in code_cols if c in initial_columns] initial_research = [c for c in research_cols if c in initial_columns] initial_writing = [c for c in writing_cols if c in initial_columns] initial_knowledge = [c for c in knowledge_cols if c in initial_columns] with gr.Column(elem_classes=["data-source-box"]): gr.Markdown("Data Sources") # 代码 (Code) with gr.Row(): toggle_code = gr.Checkbox(label="💻 Code", value=True, scale=0, min_width=150) colfilter_code = gr.CheckboxGroup( choices=initial_code, value=initial_code, show_label=False, scale=3, elem_classes=["aligned-checkboxes"] ) # 科研 (Research) with gr.Row(): toggle_research = gr.Checkbox(label="🔬 Science", value=True, scale=0, min_width=150) colfilter_research = gr.CheckboxGroup( choices=initial_research, value=initial_research, show_label=False, scale=3, elem_classes=["aligned-checkboxes"] ) # 世界知识 (World Knowledge) with gr.Row(): toggle_knowledge = gr.Checkbox(label="📖 Knowledge", value=True, scale=0, min_width=150) colfilter_knowledge = gr.CheckboxGroup( choices=initial_knowledge, value=initial_knowledge, show_label=False, scale=3, elem_classes=["aligned-checkboxes"] ) # 写作 (Writing) with gr.Row(): toggle_writing = gr.Checkbox(label="✍️ Writing", value=True, scale=0, min_width=150) colfilter_writing = gr.CheckboxGroup( choices=initial_writing, value=initial_writing, show_label=False, scale=3, elem_classes=["aligned-checkboxes"] ) # 多语言 (Multilingual) - Coming Soon with gr.Row(): gr.Checkbox(label="🌍 Multilingual (Coming Soon)", value=False, interactive=False, scale=0, min_width=250) table = gr.HTML(initial_data) def update_table_wrapper( period, models_size, metric, code_sel, research_sel, writing_sel, knowledge_sel, color_columns, size_range, midpoint ): visible_columns = code_sel + research_sel + writing_sel + knowledge_sel return update_table(data_manager, period, models_size, metric, visible_columns, color_columns, size_range, midpoint) def update_column_choices(period, cur_code, cur_research, cur_writing, cur_knowledge): if not period: empty = gr.update(choices=[], value=[]) return empty, empty, empty, empty columns = data_manager.get_available_columns(period) new_code = [c for c in code_cols if c in columns] new_research = [c for c in research_cols if c in columns] new_writing = [c for c in writing_cols if c in columns] new_knowledge = [c for c in knowledge_cols if c in columns] sel_code = [c for c in cur_code if c in new_code] if cur_code else new_code sel_research = [c for c in cur_research if c in new_research] if cur_research else new_research sel_writing = [c for c in cur_writing if c in new_writing] if cur_writing else new_writing sel_knowledge = [c for c in cur_knowledge if c in new_knowledge] if cur_knowledge else new_knowledge if not sel_code: sel_code = new_code if not sel_research: sel_research = new_research if not sel_writing: sel_writing = new_writing if not sel_knowledge: sel_knowledge = new_knowledge return ( gr.update(choices=new_code, value=sel_code), gr.update(choices=new_research, value=sel_research), gr.update(choices=new_writing, value=sel_writing), gr.update(choices=new_knowledge, value=sel_knowledge), ) # 总开关功能 def toggle_group(enabled, group_cols, available_cols): valid_cols = [c for c in group_cols if c in available_cols] return valid_cols if enabled else [] toggle_code.change(lambda enabled: toggle_group(enabled, code_cols, initial_columns), inputs=[toggle_code], outputs=[colfilter_code]) toggle_research.change( lambda enabled: toggle_group(enabled, research_cols, initial_columns), inputs=[toggle_research], outputs=[colfilter_research] ) toggle_writing.change( lambda enabled: toggle_group(enabled, writing_cols, initial_columns), inputs=[toggle_writing], outputs=[colfilter_writing] ) toggle_knowledge.change( lambda enabled: toggle_group(enabled, knowledge_cols, initial_columns), inputs=[toggle_knowledge], outputs=[colfilter_knowledge] ) shared_inputs = [ period_selector, model_selector, metric_selector, colfilter_code, colfilter_research, colfilter_writing, colfilter_knowledge, color_selector, size_range_slider, midpoint_slider, ] period_selector.change( update_column_choices, inputs=[period_selector, colfilter_code, colfilter_research, colfilter_writing, colfilter_knowledge], outputs=[colfilter_code, colfilter_research, colfilter_writing, colfilter_knowledge], ) period_selector.change(update_table_wrapper, inputs=shared_inputs, outputs=table) model_selector.change(update_table_wrapper, inputs=shared_inputs, outputs=table) metric_selector.change(update_table_wrapper, inputs=shared_inputs, outputs=table) colfilter_code.change(update_table_wrapper, inputs=shared_inputs, outputs=table) colfilter_research.change(update_table_wrapper, inputs=shared_inputs, outputs=table) colfilter_writing.change(update_table_wrapper, inputs=shared_inputs, outputs=table) colfilter_knowledge.change(update_table_wrapper, inputs=shared_inputs, outputs=table) color_selector.change(update_table_wrapper, inputs=shared_inputs, outputs=table) size_range_slider.change(update_table_wrapper, inputs=shared_inputs, outputs=table) midpoint_slider.change(update_table_wrapper, inputs=shared_inputs, outputs=table) with gr.Tab("📚 Long Context"): gr.Markdown(read_longctx_about_md()) with gr.Row(): with gr.Column(scale=1): lc_period_dropdown = gr.Dropdown(label="Period", choices=lc_periods, value=default_lc_period) lc_mode_radio = gr.Radio(label="Visualization Mode", choices=lc_modes, value=default_lc_mode) gr.Markdown("### Model / Dataset Selection") default_model = get_default_model(init_lc_choices) default_selected_models = [default_model] if default_model else [] lc_select_abs = gr.Dropdown( label="Select Models", choices=init_lc_choices, value=default_selected_models, multiselect=True, visible=True ) lc_select_base = gr.Dropdown( label="Baseline Model", choices=init_lc_choices, value=None, multiselect=False, visible=False, ) lc_select_comp = gr.Dropdown(label="Comparison Models", choices=init_lc_choices, value=[], multiselect=True, visible=False) # By Dataset mode selectors init_dataset_choices = lc_dm.get_dataset_choices(default_lc_period) if default_lc_period else [] default_selected_datasets = [init_dataset_choices[0][1]] if init_dataset_choices else [] lc_select_datasets = gr.Dropdown( label="Select Datasets", choices=init_dataset_choices, value=default_selected_datasets, multiselect=True, visible=False ) lc_select_models_single = gr.Dropdown( label="Select Models", choices=init_lc_choices, value=default_selected_models, multiselect=True, visible=False ) lc_select_base_model_single = gr.Dropdown( label="Baseline Model", choices=init_lc_choices, value=None, multiselect=False, visible=False, ) lc_select_comp_models_single = gr.Dropdown( label="Comparison Models", choices=init_lc_choices, value=[], multiselect=True, visible=False ) with gr.Accordion("Advanced Settings", open=True): lc_smooth = gr.Slider(1, 125, 32, step=1, label="Smooth Window") lc_cutoff = gr.Slider(0.1, 1.0, 0.2, step=0.05, label="Cutoff Ratio") lc_offset = gr.Number(32, label="Start Offset (Bytes)") with gr.Row(): lc_ymin = gr.Textbox(label="Y Min", placeholder="Auto", value="") lc_ymax = gr.Textbox(label="Y Max", placeholder="Auto", value="") lc_btn_plot = gr.Button("Visualize", variant="primary") with gr.Column(scale=3): lc_plot_output = gr.Plot(label="Visualization Result", value=initial_lc_plot) def update_lc_inputs(period, mode): if not period: return tuple([gr.update()] * 7) is_model_agg = "Averaged by Model" in mode is_single_dataset = "By Dataset" in mode is_relative = "Relative" in mode def get_default_model(choices): """获取默认模型,优先选择 Qwen3-8B-Base,否则返回第一个模型""" if not choices: return None for display_name, model_name in choices: if model_name == "Qwen3-8B-Base": return model_name return choices[0][1] if choices else None if is_model_agg: # Averaged by Model mode - use existing logic choices = lc_dm.get_model_choices(period) label_suffix = "Models" if not is_relative: # Absolute (Averaged by Model) - 默认选择 Qwen3-8B-Base default_model = get_default_model(choices) default_selected = [default_model] if default_model else [] return ( gr.update(visible=True, choices=choices, label=f"Select {label_suffix}", value=default_selected), gr.update(visible=False, choices=choices, value=None), gr.update(visible=False, choices=choices, value=[]), gr.update(visible=False, value=[]), gr.update(visible=False, value=[]), gr.update(visible=False, value=None), gr.update(visible=False, value=[]), ) else: default_baseline = get_default_model(choices) return ( gr.update(visible=False, choices=choices, value=[]), gr.update(visible=True, choices=choices, label=f"Baseline", value=default_baseline), gr.update(visible=True, choices=choices, label=f"Comparison", value=[]), gr.update(visible=False, value=[]), gr.update(visible=False, value=[]), gr.update(visible=False, value=None), gr.update(visible=False, value=[]), ) else: # By Dataset mode dataset_choices = lc_dm.get_dataset_choices(period) model_choices = lc_dm.get_model_choices(period) if not is_relative: # Absolute By Dataset - 默认选择 Qwen3-8B-Base default_model = get_default_model(model_choices) default_selected = [default_model] if default_model else [] return ( gr.update(visible=False, value=[]), gr.update(visible=False, value=None), gr.update(visible=False, value=[]), gr.update(visible=True, choices=dataset_choices, value=[]), gr.update(visible=True, choices=model_choices, value=default_selected), gr.update(visible=False, value=None), gr.update(visible=False, value=[]), ) else: # Relative By Dataset - use same datasets for all models default_baseline = get_default_model(model_choices) return ( gr.update(visible=False, value=[]), gr.update(visible=False, value=None), gr.update(visible=False, value=[]), gr.update(visible=True, choices=dataset_choices, value=[]), gr.update(visible=False, value=[]), gr.update(visible=True, choices=model_choices, value=default_baseline), gr.update(visible=True, choices=model_choices, value=[]), ) lc_period_dropdown.change( fn=update_lc_inputs, inputs=[lc_period_dropdown, lc_mode_radio], outputs=[ lc_select_abs, lc_select_base, lc_select_comp, lc_select_datasets, lc_select_models_single, lc_select_base_model_single, lc_select_comp_models_single, ], ) lc_mode_radio.change( fn=update_lc_inputs, inputs=[lc_period_dropdown, lc_mode_radio], outputs=[ lc_select_abs, lc_select_base, lc_select_comp, lc_select_datasets, lc_select_models_single, lc_select_base_model_single, lc_select_comp_models_single, ], ) def run_lc_plot( mode, period, sel_abs, sel_base, sel_comp, sel_datasets, sel_models_single, sel_base_model_single, sel_comp_models_single, smooth, cutoff, offset, ymin, ymax, ): data_map = {} baseline_key = None is_model_agg = "Averaged by Model" in mode is_relative = "Relative" in mode if is_model_agg: # Averaged by Model mode - existing logic if not is_relative: selection = sel_abs else: if not sel_base: return None selection = [sel_base] + sel_comp baseline_key = sel_base if not selection: return None for item in selection: paths = lc_dm.get_paths_for_model(period, item) if paths: data_map[item] = paths else: # By Dataset mode if not is_relative: # Absolute By Dataset if not sel_datasets or not sel_models_single: return None for model_name in sel_models_single: paths = lc_dm.get_paths_for_model_and_datasets(period, model_name, sel_datasets) if paths: data_map[model_name] = paths else: # Relative By Dataset - use same datasets for all models if not sel_datasets or not sel_base_model_single: return None # Baseline model with selected datasets (averaged) baseline_paths = lc_dm.get_paths_for_model_and_datasets(period, sel_base_model_single, sel_datasets) if baseline_paths: baseline_key = sel_base_model_single data_map[baseline_key] = baseline_paths # Comparison models with same datasets (averaged) if sel_comp_models_single: for model_name in sel_comp_models_single: paths = lc_dm.get_paths_for_model_and_datasets(period, model_name, sel_datasets) if paths: data_map[model_name] = paths if not data_map: return None def _to_float_or_none(val): if val is None: return None s = str(val).strip() if not s: return None try: return float(s) except ValueError: return None ymin = _to_float_or_none(ymin) ymax = _to_float_or_none(ymax) y_range = [ymin, ymax] return draw_long_context_plot(mode, data_map, baseline_key, cutoff, smooth, int(offset), y_range) lc_btn_plot.click( fn=run_lc_plot, inputs=[ lc_mode_radio, lc_period_dropdown, lc_select_abs, lc_select_base, lc_select_comp, lc_select_datasets, lc_select_models_single, lc_select_base_model_single, lc_select_comp_models_single, lc_smooth, lc_cutoff, lc_offset, lc_ymin, lc_ymax, ], outputs=lc_plot_output, ) with gr.Tab("📈 Scaling Law"): gr.Markdown("### Compression Ratio Scaling Law") gr.Markdown("Explore how compression ratio scales with model parameters across different datasets.") # 显示模式选择 MODE_OVERALL = "📊 Overall (Average)" MODE_BY_DATASET = "📈 By Dataset" scaling_modes = [MODE_OVERALL, MODE_BY_DATASET] # 数据集列表 all_datasets = [ "github cpp", "github python", "github javascript", "arxiv physics", "arxiv cs", "arxiv math", "ao3 english", "github markdown", "bbc news", "wikipedia english", ] initial_datasets = all_datasets[:4] with gr.Row(): with gr.Column(scale=1): scaling_period_selector = gr.Dropdown(label="Period", choices=time_list, value=last_period) scaling_mode_radio = gr.Radio(label="Display Mode", choices=scaling_modes, value=MODE_OVERALL) # 数据集选择器(初始隐藏) scaling_dataset_selector = gr.CheckboxGroup( label="Select Datasets", choices=all_datasets, value=initial_datasets, visible=False ) with gr.Column(scale=3): initial_scaling_fig = create_scaling_plot(data_manager, last_period) if last_period else go.Figure() scaling_plot = gr.Plot(initial_scaling_fig) def update_scaling_mode_visibility(mode): """根据模式切换数据集选择器的可见性""" is_by_dataset = mode == MODE_BY_DATASET return gr.update(visible=is_by_dataset) def update_scaling_plot_unified(period, mode, datasets): """统一的绘图更新函数""" if mode == MODE_OVERALL: return create_scaling_plot(data_manager, period) else: # MODE_BY_DATASET return create_category_scaling_plot(data_manager, period, datasets) # 模式切换时更新可见性和图表 scaling_mode_radio.change(fn=update_scaling_mode_visibility, inputs=[scaling_mode_radio], outputs=[scaling_dataset_selector]) scaling_mode_radio.change( fn=update_scaling_plot_unified, inputs=[scaling_period_selector, scaling_mode_radio, scaling_dataset_selector], outputs=scaling_plot, ) # Period 改变时更新图表 scaling_period_selector.change( fn=update_scaling_plot_unified, inputs=[scaling_period_selector, scaling_mode_radio, scaling_dataset_selector], outputs=scaling_plot, ) # 数据集选择改变时更新图表 scaling_dataset_selector.change( fn=update_scaling_plot_unified, inputs=[scaling_period_selector, scaling_mode_radio, scaling_dataset_selector], outputs=scaling_plot, ) with gr.Tab("ℹ️ About"): gr.Markdown(read_about_md()) with gr.Tab("🚀 Submit"): with gr.Group(): with gr.Row(): model_name = gr.Textbox(max_lines=1, placeholder="Enter model name...", show_label=False, scale=4) submit = gr.Button("Submit", variant="primary", scale=0) output = gr.Markdown("# Enter a public HF repo id, then hit Submit to add it to the evaluation queue.") submit.click(fn=submit_model, inputs=model_name, outputs=output) demo.launch(share=False)