Kevin Xie
Upload main processing scripts for this repo
577fb61
import pandas as pd
import json
from .CONSTANTS import *
class ExcelProcessor:
def __init__(self, excel_path, invalid_models=None):
"""Initialize the ExcelProcessor with an Excel file.
Args:
excel_path (str): Path to the Excel file containing model and task data.
"""
# excel_path = path to excel file
self.sheet_path = excel_path
self.excel_data = self.load_excel()
self.model_sheet = self.load_sheet("Models (Simplified)")
self.invalid_models = invalid_models
print("You have excluded the following models: ", self.invalid_models)
# Get all of the valid models (exclude invalid models)
self.valid_models = self.get_valid_models(self.invalid_models)
# print("VALID MODELS: ", self.valid_models)
def load_excel(self):
"""Load the Excel file into a pandas ExcelFile object.
Returns:
pd.ExcelFile: The loaded Excel file object.
"""
return pd.ExcelFile(self.sheet_path)
def load_sheet(self, sheet_name):
"""Load a specific sheet from the Excel file.
Args:
sheet_name (str): Name of the sheet to load.
Returns:
pd.DataFrame: The loaded sheet as a pandas DataFrame.
"""
return self.excel_data.parse(sheet_name)
def get_valid_models(self, invalid_models=None):
"""Get all valid models from the Models sheet, excluding invalid ones.
Returns:
list: List of valid model names that should be included in evaluation.
"""
valid_models = []
for idx, model_name in enumerate(self.model_sheet["Name"]):
if model_name not in invalid_models:
valid_models.append(model_name)
return valid_models
def get_valid_columns(self, sheet_name):
"""Get all non-empty columns from a specified sheet.
Args:
sheet_name (str): Name of the sheet to analyze.
Returns:
list: List of valid column names (excluding unnamed columns).
"""
valid_columns = []
for column in self.load_sheet(sheet_name).columns:
if column.split(' ')[0] != "Unnamed:":
valid_columns.append(column.strip())
return valid_columns
def get_model_information(self,
sheet_name = "Models (Simplified)",
name_column = "Name",
domain_column = "Domain",
license_column = "License",
size_column = "Size (B)",
):
"""Extract model information from the Models sheet.
Args:
sheet_name (str, optional): Name of the sheet containing model info.
Defaults to "Models (Simplified)".
name_column (str, optional): Column name containing model names.
Defaults to "Name".
domain_column (str, optional): Column name containing model domains.
Defaults to "Domain".
license_column (str, optional): Column name containing license info.
Defaults to "License".
size_column (str, optional): Column name containing model sizes.
Defaults to "Size (B)".
Returns:
tuple: A tuple containing 7 dictionaries:
- model_name_info: Model names indexed by position
- domain_info: Model domains mapped using DOMAIN_MAPPING
- license_info: License information (abbreviated if needed)
- accessibility_info: Accessibility mapped using LICENSE_MAPPING
- displayed_size_info: Raw size values for display
- hidden_size_info: Size ranges for filtering
- T_info: Position markers for the leaderboard
"""
# Load the model sheet
model_sheet = self.load_sheet(sheet_name)
# Everything to be returned.
T_info = {}
model_name_info = {}
domain_info = {}
license_info = {}
accessibility_info = {}
displayed_size_info = {} # shown on leaderboard
hidden_size_info = {} # hidden column
def map_size(param_size):
"""Map parameter size to predefined ranges.
Args:
param_size: The parameter size value.
Returns:
str: Size range category.
"""
if param_size == "/":
return "None"
if param_size == "Unknown":
return "Unknown"
size = int(param_size)
if size < 5:
return "0-5"
elif size < 10:
return "5-10"
elif size < 40:
return "10-40"
elif size < 80:
return "40-80"
else:
return ">80"
i = 0
for name, domain, license, size in zip(model_sheet[name_column],
model_sheet[domain_column],
model_sheet[license_column],
model_sheet[size_column]):
# If it is a valid model (used in evaluation)
if name in self.valid_models:
T_info[f"{i}"] = "\ud83d\udd36"
model_name_info[f"{i}"] = name
domain_info[f"{i}"] = DOMAIN_MAPPING[domain]
if license == "PhysioNet Credentialed Health Data License 1.5.0":
license_info[f"{i}"] = "PhysioNet 1.5.0" # Abbreviate license name to fit on leaderboard
else:
license_info[f"{i}"] = license
accessibility_info[f"{i}"] = LICENSE_MAPPING[license]
displayed_size_info[f"{i}"] = size
hidden_size_info[f"{i}"] = map_size(size)
i += 1
else:
print("Invalid model: ", name)
return model_name_info, domain_info, license_info, accessibility_info, displayed_size_info, hidden_size_info, T_info
def get_sheet_information(self, sheets_list, task_names_list, task_types_list):
"""Extract task performance information from specified sheets.
Args:
sheets_list (list): List of sheet names to process.
task_names_list (list): List of task names corresponding to each sheet.
task_types_list (list): List of task types ('ext', 'gen', etc.) for each sheet.
Returns:
dict: Dictionary mapping task names to model performance data.
Format: {task_name: {model_index: performance_score}}
"""
task_info = {}
# Iterate through each row
for idx, sheet in enumerate(sheets_list):
# Get the task type (tt)
tt = task_types_list[idx]
# Load the sheet
model_sheet = self.load_sheet(sheet)
# Name of the task (i.e. 1.1-ADE Identification)
task_name = task_names_list[idx]
# Get all columns in the sheet
for i, t in enumerate(model_sheet['Task Type']):
if i == 0:
continue
# Break out of loop when it reaches the end of the sheet
if t == "-":
break
row = i
task_counter = 0
for model in self.valid_models:
column_name = model.strip()
if column_name == "gpt-35-turbo-0125":
column_name = "gpt-35-turbo"
elif column_name == "gpt-4o-0806":
column_name = "gpt-4o"
elif column_name == "gemini-2.0-flash-001":
column_name = "gemini-2.0-flash"
elif column_name == "gemini-1.5-pro-002":
column_name = "gemini-1.5-pro"
if column_name == "gpt-oss-20b":
column_name = "gpt-oss-20b-high"
elif column_name == "gpt-oss-120b":
column_name = "gpt-oss-120b-high"
if tt == 'ext':
column_name = column_name + '.1'
elif tt == 'gen':
column_name = column_name + '.1'
# Name of the task (i.e 1.1-ADE Identification)
task = model_sheet[task_name][row]
# Update task name to more simple version
task = TASK_MAPPING[task]
if task == "Average score":
break
# Update the information for each task
if task not in task_info:
task_info[task] = {}
task_info[task][f"{task_counter}"] = round(float(model_sheet[column_name.strip()][row].split(" ")[0]), 2)
task_counter += 1
return task_info
def add_average_performance(self, task_info):
"""Calculate average performance across all tasks for each model.
Args:
task_info (dict): Dictionary containing task performance data.
Format: {task_name: {model_index: performance_score}}
Returns:
dict: Dictionary mapping model indices to average performance scores.
Format: {model_index: average_score}
"""
for task in task_info:
n = len(task_info[task])
break
average_performance_info = {}
for i in range(n):
perf = 0
num_tasks = 0
for task in task_info:
perf += float(task_info[task][str(i)])
num_tasks += 1
average_performance_info[f"{i}"] = str(round(perf / num_tasks, 2))
return average_performance_info
def create_leaderboards(
self,
sheet_names_list=None,
task_names_list=["Task-Classification", "Task-Extraction", "Task-Generation"],
task_types_list=["cls", "ext", "gen"],
output_path=None):
"""Create a leaderboard JSON file from Excel data.
Args:
sheet_names_list (list, optional): List of sheet names to process.
task_names_list (list, optional): List of task names corresponding to sheets.
task_types_list (list, optional): List of task types for each sheet.
leaderboard_name (str, optional): Name of the leaderboard being created.
output_path (str, optional): Path where the JSON file should be saved.
Note:
Creates one leaderboard per call (CoT, Direct, or Few-Shot).
The output JSON contains model information, task performance, and metadata.
"""
data = {}
model_info, domain_info, license_info, accessibility_info, displayed_size_info, hidden_size_info, T_info = self.get_model_information()
task_info = self.get_sheet_information(sheet_names_list, task_names_list, task_types_list)
average_performance_info = self.add_average_performance(task_info)
data["T"] = T_info
data["Model"] = model_info
data["Model: Domain"] = domain_info
data["Model: License"] = license_info
data["Model: Accessibility"] = accessibility_info
data["Size (B)"] = displayed_size_info
data["Model: Size Range"] = hidden_size_info
data["Average Performance"] = average_performance_info
for task in task_info:
data[task] = task_info[task]
with open(output_path, 'w') as file:
json.dump(data, file, indent=4)
def create_task_information(self, output_path: str):
"""Create a JSON file containing detailed task information.
Args:
output_path (str): Path where the task information JSON should be saved.
Note:
Extracts task metadata from the "Task-all" sheet including language,
task type, clinical context, data access requirements, applications,
and clinical stage information.
"""
task_sheet = self.load_sheet("Task-all")
# Initialize a map to store the json information
info = {}
# Iterate through the "Task-Original" column, which contains all of the task names
for idx, task in enumerate(task_sheet["Task name"]):
# Add the task to the final json
if task not in info:
info[task] = {}
# Add all of the attributes to the task
language = task_sheet["Language"][idx]
task_type = task_sheet["Task Type - fine grained"][idx]
clinical_context = task_sheet["Clinical context"][idx]
data_access = task_sheet["Data Access\nOpen Access (OA) / \nRegulated (R) / \nPhysionet (P) / \nn2c2 (N)"][idx]
application = task_sheet['Clinical Application'][idx]
clinical_stage = task_sheet['Clinical Stage'][idx]
info[task]["Language"] = language.strip()
info[task]["Task Type"] = task_type.strip()
info[task]["Clinical Context"] = clinical_context.strip()
info[task]["Data Access"] = DATA_ACCESS_MAP[data_access.strip()]
info[task]['Applications'] = application.strip()
info[task]['Clinical Stage'] = clinical_stage.strip()
with open(output_path, 'w') as file:
json.dump(info, file, indent=4)