secondme-api / lpm_kernel /api /common /script_executor.py
Gemini
feat: add detailed logging
01d5a5d
import os
import subprocess
import sys
from typing import List, Optional, Dict, Any
from lpm_kernel.common.logging import logger
# Test if logging works properly
logger.debug("DEBUG: ScriptExecutor module loaded")
logger.info("INFO: ScriptExecutor module loaded")
logger.warning("WARNING: ScriptExecutor module loaded")
logger.error("ERROR: ScriptExecutor module loaded")
class ScriptExecutor:
def __init__(self):
# Check if running in Docker environment
self.in_docker = os.getenv("IN_DOCKER_ENV") == "1" or os.path.exists("/.dockerenv")
def execute(
self,
script_path: str,
script_type: str,
args: Optional[List[str]] = None,
shell: bool = False,
log_file: Optional[str] = None,
) -> Dict[str, Any]:
"""
Execute scripts directly
Args:
script_path: Script path or command
script_type: Script type, used for logging
args: Command line arguments
shell: Whether to use shell for execution
log_file: Log file path, if provided will redirect output to this file
Returns:
Execution result
"""
try:
# Build the complete command
if script_path.endswith(".py"):
# Python script
cmd = ["python", "-u", script_path] # Add -u parameter to disable output buffering
elif script_path.endswith(".sh"):
# Shell script
cmd = ["bash", "-x", script_path] # Add -x parameter to display executed commands
else:
# Other commands
cmd = [script_path]
# Add additional parameters
if args:
cmd.extend(args)
logger.info(f"Executing command: {' '.join(cmd)}")
# If logging is needed, ensure the log directory exists
if log_file:
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# Execute command and capture output
process = subprocess.Popen(
cmd,
shell=shell,
env=os.environ.copy(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Redirect error output to standard output
bufsize=1, # Line buffering
universal_newlines=True, # Use text mode
)
# Read and process output in real-time
while True:
output = process.stdout.readline()
if output == "" and process.poll() is not None:
break
if output:
# Output to console
print(output.strip())
# If needed, write to log file
if log_file:
with open(log_file, "a", encoding="utf-8") as f:
f.write(output)
# Wait for the process to end and get the return code
return_code = process.wait()
if return_code != 0:
logger.error(f"Command execution failed, return code: {return_code}")
else:
logger.info(f"Command execution successful, return code: {return_code}")
return {
"returncode": return_code,
"error": f"Execution failed, return code: {return_code}"
if return_code != 0
else None,
}
except Exception as e:
error_msg = f"Error occurred while executing {script_type} script: {str(e)}"
logger.error(error_msg)
return {"error": error_msg, "returncode": -1}