Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| from datetime import datetime | |
| import random | |
| import os | |
| from huggingface_hub import Repository | |
| random.seed(1234) | |
| import subprocess | |
| # Set Git user information | |
| subprocess.run(["git", "config", "--global", "user.email", "[email protected]"]) | |
| subprocess.run(["git", "config", "--global", "user.name", "czyang"]) | |
| hf_token = os.getenv("HF_TOKEN") | |
| print("HF Token is none?", hf_token is None) | |
| # Initialize the repository | |
| DATASET_REPO_URL = "https://huggingface.co/datasets/czyang/Foley-User-Study-Response-V2" | |
| repo = Repository( | |
| local_dir="user_responses", | |
| clone_from=DATASET_REPO_URL, | |
| use_auth_token=hf_token | |
| ) | |
| def prepare_test_cases(): | |
| json_path = "videos/videos.json" | |
| with open(json_path, "r") as f: | |
| video_dict = json.load(f) | |
| video_ids = list(video_dict.keys()) | |
| for video_id in video_ids: | |
| if random.random() > 0.5: | |
| video_list = [video_dict[video_id]['ours'], video_dict[video_id]['foleycrafter']] | |
| else: | |
| video_list = [video_dict[video_id]['foleycrafter'], video_dict[video_id]['ours']] | |
| random.shuffle(video_list) | |
| video_dict[video_id]['Video 1'] = video_list[0] | |
| video_dict[video_id]['Video 2'] = video_list[1] | |
| return video_dict | |
| video_dict = prepare_test_cases() | |
| video_ids = list(video_dict.keys()) | |
| random.shuffle(video_ids) | |
| questions = [ | |
| "Which video's audio best matches the sound of {}?", | |
| "In which video is the timing of the audio best synchronized with what you can see in the video?", | |
| "Which video has audio that sounds cleaner and more high definition? Please ignore the type of sound and whether it's timed to the video, focus only on the audio quality.", | |
| "Assuming the video is meant to sound like {}, which video has the best audio overall?" | |
| ] | |
| submissions_file = "user_responses/response.jsonl" | |
| def has_already_submitted(user_id): | |
| if os.path.exists(submissions_file): | |
| with open(submissions_file, "r") as f: | |
| for line in f: | |
| submission = json.loads(line) | |
| if submission.get("u_id") == user_id: | |
| return True | |
| return False | |
| # Save responses | |
| def save_responses(unique_submission, *responses): | |
| timestamp = datetime.now().isoformat() | |
| info = responses[-1] | |
| responses = responses[:-1] | |
| unique_id = info["session_id"] | |
| user_id = f"{unique_id}" | |
| # Check for unique submission | |
| if unique_submission and has_already_submitted(user_id): | |
| return "You have already submitted responses. Thank you for participating!" | |
| # Initialize the result dictionary | |
| result = { | |
| "u_id": user_id, | |
| "timestamp": timestamp, | |
| "responses": [] | |
| } | |
| for index in range(len(video_ids)): | |
| start_idx = index * len(questions) | |
| end_idx = start_idx + len(questions) | |
| response = responses[start_idx:end_idx] | |
| if any(r is None for r in response): | |
| return "Please answer all questions before submitting." | |
| video_id = video_ids[index] | |
| pair_response = { | |
| video_id: { | |
| 'semantic': video_dict[video_id][response[0]], | |
| 'sync': video_dict[video_id][response[1]], | |
| 'quality': video_dict[video_id][response[2]], | |
| 'overall': video_dict[video_id][response[3]], | |
| } | |
| } | |
| result["responses"].append(pair_response) | |
| result["responses"] = sorted(result["responses"], key=lambda x: x.keys()) | |
| # Save response locally and push to Hugging Face Hub | |
| with open(submissions_file, "a") as f: | |
| f.write(json.dumps(result) + "\n") | |
| # Push changes to the Hugging Face dataset repo | |
| repo.push_to_hub() | |
| return "All responses saved! Thank you for participating!" | |
| def create_interface(unique_submission=False): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Human Preference Study: Video Comparison") | |
| gr.Markdown(""" | |
| In this study, you will watch (and listen to) pairs of videos side by side. | |
| Please watch and **listen** to each pair of videos carefully and answer the three associated questions. | |
| **Headphones are recommended!** | |
| """) | |
| # Display video pairs and questions | |
| responses = [] | |
| for index, video_id in enumerate(video_ids): | |
| video1 = video_dict[video_id]['Video 1'] | |
| video2 = video_dict[video_id]['Video 2'] | |
| audio_prompt = video_dict[video_id]['audio prompt'] | |
| gr.Markdown(f"### Video Pair {index + 1}") | |
| with gr.Row(): | |
| gr.Video(video1, label="Video 1") | |
| gr.Video(video2, label="Video 2") | |
| with gr.Column(): | |
| responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[0].format(audio_prompt), value=None)) | |
| responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[1], value=None)) | |
| responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[2], value=None)) | |
| responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[3].format(audio_prompt), value=None)) | |
| gr.Markdown("---") | |
| info = gr.JSON(visible=False) | |
| demo.load(predict, None, info) | |
| submit_btn = gr.Button("Submit") | |
| result_message = gr.Textbox(label="Message (please only submit once)", interactive=False) | |
| submit_btn.click( | |
| fn=lambda *args: save_responses(unique_submission, *args), | |
| inputs=responses+[info], | |
| outputs=result_message | |
| ) | |
| return demo | |
| def predict(request: gr.Request): | |
| headers = request.headers | |
| host = request.client.host | |
| user_agent = request.headers["user-agent"] | |
| session_id = request.session_hash | |
| return { | |
| "ip": host, | |
| "user_agent": user_agent, | |
| "headers": headers, | |
| "session_id": session_id | |
| } | |
| if __name__ == "__main__": | |
| # Launch with unique_submission set based on `--unique` flag | |
| demo = create_interface(unique_submission=True) | |
| demo.launch(share=True) |