Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import sys | |
| from typing import List, Optional, Dict, Any | |
| from lpm_kernel.common.logging import logger | |
| # Test if logging works properly | |
| logger.debug("DEBUG: ScriptExecutor module loaded") | |
| logger.info("INFO: ScriptExecutor module loaded") | |
| logger.warning("WARNING: ScriptExecutor module loaded") | |
| logger.error("ERROR: ScriptExecutor module loaded") | |
| class ScriptExecutor: | |
| def __init__(self): | |
| # Check if running in Docker environment | |
| self.in_docker = os.getenv("IN_DOCKER_ENV") == "1" or os.path.exists("/.dockerenv") | |
| def execute( | |
| self, | |
| script_path: str, | |
| script_type: str, | |
| args: Optional[List[str]] = None, | |
| shell: bool = False, | |
| log_file: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute scripts directly | |
| Args: | |
| script_path: Script path or command | |
| script_type: Script type, used for logging | |
| args: Command line arguments | |
| shell: Whether to use shell for execution | |
| log_file: Log file path, if provided will redirect output to this file | |
| Returns: | |
| Execution result | |
| """ | |
| try: | |
| # Build the complete command | |
| if script_path.endswith(".py"): | |
| # Python script | |
| cmd = ["python", "-u", script_path] # Add -u parameter to disable output buffering | |
| elif script_path.endswith(".sh"): | |
| # Shell script | |
| cmd = ["bash", "-x", script_path] # Add -x parameter to display executed commands | |
| else: | |
| # Other commands | |
| cmd = [script_path] | |
| # Add additional parameters | |
| if args: | |
| cmd.extend(args) | |
| logger.info(f"Executing command: {' '.join(cmd)}") | |
| # If logging is needed, ensure the log directory exists | |
| if log_file: | |
| os.makedirs(os.path.dirname(log_file), exist_ok=True) | |
| # Execute command and capture output | |
| process = subprocess.Popen( | |
| cmd, | |
| shell=shell, | |
| env=os.environ.copy(), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, # Redirect error output to standard output | |
| bufsize=1, # Line buffering | |
| universal_newlines=True, # Use text mode | |
| ) | |
| # Read and process output in real-time | |
| while True: | |
| output = process.stdout.readline() | |
| if output == "" and process.poll() is not None: | |
| break | |
| if output: | |
| # Output to console | |
| print(output.strip()) | |
| # If needed, write to log file | |
| if log_file: | |
| with open(log_file, "a", encoding="utf-8") as f: | |
| f.write(output) | |
| # Wait for the process to end and get the return code | |
| return_code = process.wait() | |
| if return_code != 0: | |
| logger.error(f"Command execution failed, return code: {return_code}") | |
| else: | |
| logger.info(f"Command execution successful, return code: {return_code}") | |
| return { | |
| "returncode": return_code, | |
| "error": f"Execution failed, return code: {return_code}" | |
| if return_code != 0 | |
| else None, | |
| } | |
| except Exception as e: | |
| error_msg = f"Error occurred while executing {script_type} script: {str(e)}" | |
| logger.error(error_msg) | |
| return {"error": error_msg, "returncode": -1} | |