Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import requests | |
| import sys # Added for command-line arguments | |
| from agent import BasicAgent | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| DOWNLOAD_DIR = "downloaded_task_files" # Directory to save downloaded files | |
| QUESTIONS_FILE = "data/questions.json" | |
| # QUESTION_INDEX = 3 # Removed global constant | |
| def run_single_question_test(question_index: int): # Added question_index parameter | |
| """ | |
| Fetches a single question by index, downloads its associated file (if any), | |
| runs the BasicAgent on it, and prints the answer. | |
| """ | |
| # Create download directory if it doesn't exist | |
| try: | |
| os.makedirs(DOWNLOAD_DIR, exist_ok=True) | |
| print(f"Ensured download directory exists: {DOWNLOAD_DIR}") | |
| except OSError as e: | |
| print(f"Error creating download directory {DOWNLOAD_DIR}: {e}") | |
| return | |
| # 1. Load Questions | |
| try: | |
| with open(QUESTIONS_FILE, 'r') as f: | |
| questions_data = json.load(f) | |
| if not questions_data or not isinstance(questions_data, list) or len(questions_data) <= question_index: | |
| print(f"Error: Could not load question at index {question_index} (0-indexed) from {QUESTIONS_FILE}. Total questions: {len(questions_data) if isinstance(questions_data, list) else 'N/A'}") | |
| return | |
| item = questions_data[question_index] | |
| print(f"Loaded question {question_index + 1} (from 1-indexed input): {item.get('question')}") | |
| except FileNotFoundError: | |
| print(f"Error: Questions file not found at {QUESTIONS_FILE}") | |
| return | |
| except json.JSONDecodeError: | |
| print(f"Error: Could not decode JSON from {QUESTIONS_FILE}") | |
| return | |
| except Exception as e: | |
| print(f"Error loading or parsing {QUESTIONS_FILE}: {e}") | |
| return | |
| # 2. Instantiate Agent | |
| try: | |
| agent = BasicAgent() | |
| print("Agent instantiated successfully.") | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| file_name = item.get("file_name") | |
| local_file_path = None | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| return | |
| if file_name: | |
| download_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| local_file_path = os.path.join(DOWNLOAD_DIR, file_name) | |
| print(f"Attempting to download file for task {task_id} (using task_id in URL): {file_name} from {download_url}") | |
| try: | |
| file_response = requests.get(download_url, stream=True, timeout=30) | |
| file_response.raise_for_status() | |
| with open(local_file_path, 'wb') as f: | |
| for chunk in file_response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"Successfully downloaded to {local_file_path}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Failed to download {file_name}: {e}") | |
| local_file_path = None # Ensure agent doesn't get a path to a non-existent/failed file | |
| except Exception as e: | |
| print(f"An unexpected error occurred downloading {file_name}: {e}") | |
| local_file_path = None | |
| else: | |
| print("No file associated with this question.") | |
| # 3. Run Agent on the single question | |
| try: | |
| print(f"Running agent on question: {question_text}") | |
| if local_file_path: | |
| print(f"Providing file: {local_file_path}") | |
| submitted_answer = agent(question_text, file_path=local_file_path) | |
| print(f"\n--- Agent's Answer for Task ID: {task_id} ---") | |
| print(submitted_answer) | |
| print("--- End of Answer ---") | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| def run_all_questions_test(): | |
| """ | |
| Runs the test for all questions in the QUESTIONS_FILE. | |
| """ | |
| print("Attempting to run tests for all questions...") | |
| try: | |
| with open(QUESTIONS_FILE, 'r') as f: | |
| questions_data = json.load(f) | |
| if not questions_data or not isinstance(questions_data, list): | |
| print(f"Error: Could not load questions from {QUESTIONS_FILE} or it's not a list.") | |
| return | |
| num_questions = len(questions_data) | |
| if num_questions == 0: | |
| print(f"No questions found in {QUESTIONS_FILE}.") | |
| return | |
| print(f"Found {num_questions} questions to test.") | |
| for i in range(num_questions): | |
| print(f"\n--- Running Test for Question {i+1}/{num_questions} ---") | |
| run_single_question_test(i) | |
| print(f"--- Finished Test for Question {i+1}/{num_questions} ---\n") | |
| print("All question tests completed.") | |
| except FileNotFoundError: | |
| print(f"Error: Questions file not found at {QUESTIONS_FILE}") | |
| except json.JSONDecodeError: | |
| print(f"Error: Could not decode JSON from {QUESTIONS_FILE}") | |
| except Exception as e: | |
| print(f"An unexpected error occurred while running all questions: {e}") | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: python test_single_q.py <question_number | 'all'>") | |
| print("Example (single question): python test_single_q.py 4") | |
| print("Example (all questions): python test_single_q.py all") | |
| run_all_questions_test() # Default to running all if no argument is provided | |
| sys.exit(0) | |
| argument = sys.argv[1] | |
| if argument.lower() == "all": | |
| run_all_questions_test() | |
| else: | |
| try: | |
| question_number_arg = int(argument) | |
| if question_number_arg <= 0: | |
| print("Error: Question number must be a positive integer.") | |
| sys.exit(1) | |
| # Convert 1-indexed input to 0-indexed for list access | |
| question_idx_0_based = question_number_arg - 1 | |
| run_single_question_test(question_idx_0_based) | |
| except ValueError: | |
| print("Error: Invalid argument. Please enter an integer for a specific question or 'all'.") | |
| sys.exit(1) | |