from pathlib import Path import os import uuid from datetime import datetime from lpm_kernel.common.logging import logger from lpm_kernel.models.memory import Memory from lpm_kernel.common.repository.database_session import DatabaseSession from lpm_kernel.file_data.process_factory import ProcessorFactory from lpm_kernel.file_data.document_service import DocumentService from lpm_kernel.file_data.document_dto import CreateDocumentRequest from .process_status import ProcessStatus from sqlalchemy import select class StorageService: def __init__(self, config): self.config = config # get raw content directory configuration raw_content_dir = config.get("USER_RAW_CONTENT_DIR", "resources/raw_content") base_dir = config.get("LOCAL_BASE_DIR", ".") logger.info(f"Initializing storage service, base_dir: {base_dir}") logger.info(f"Raw content directory configuration: {raw_content_dir}") # if path is not absolute, build full path based on base_dir if not os.path.isabs(raw_content_dir): # replace environment variable raw_content_dir = raw_content_dir.replace("${RESOURCE_DIR}", "resources") raw_content_dir = raw_content_dir.replace( "${RAW_CONTENT_DIR}", "resources/raw_content" ) # build full path based on base_dir raw_content_dir = os.path.join(base_dir, raw_content_dir) logger.info(f"Building complete path: {raw_content_dir}") # convert to Path object and ensure directory exists self.base_path = Path(raw_content_dir).resolve() self.base_path.mkdir(parents=True, exist_ok=True) logger.info(f"Storage path created: {self.base_path}") self.document_service = DocumentService() def check_file_exists(self, filename: str, filesize: int) -> Memory: """Check if file already exists Args: filename: file name filesize: file size Returns: Memory: if file exists, return corresponding Memory object; otherwise return None """ db = DatabaseSession() with db._session_factory() as session: # find record with same file name and size query = select(Memory).where( Memory.name == filename, Memory.size == filesize ) result = session.execute(query) memory = result.scalar_one_or_none() if memory: logger.info(f"Found duplicate file: {filename}, size: {filesize}") # check if file really exists if os.path.exists(memory.path): return memory logger.warning(f"File in database does not exist on disk: {memory.path}") return None def save_file(self, file, metadata=None): """Save file and process document Args: file: uploaded file object metadata: file metadata Returns: tuple: (Memory object, Document object) Raises: ValueError: if file already exists """ logger.info(f"Starting to save file: {file.filename}") logger.debug(f"File metadata: {metadata}") try: # get file size file.seek(0, os.SEEK_END) filesize = file.tell() file.seek(0) # reset file pointer to start position # check if file already exists existing_memory = self.check_file_exists(file.filename, filesize) if existing_memory: raise ValueError(f"File '{file.filename}' already exists") # save file to disk filepath, filename, filesize = self._save_file_to_disk(file) logger.info(f"File saved to disk: {filepath}, size: {filesize} bytes") # create Memory record memory = None document = None db = DatabaseSession() session = db._session_factory() try: # create and save Memory record memory = Memory( name=filename, size=filesize, path=str(filepath), metadata=metadata or {}, ) session.add(memory) session.commit() logger.info(f"Memory record created successfully: {memory.id}") # process document document = self._process_document(filepath, metadata) if document: memory.document_id = document.id session.add(memory) session.commit() logger.info(f"Memory record updated, associated document ID: {document.id}") # refresh memory object to ensure all fields are up to date session.refresh(memory) except Exception as e: session.rollback() logger.error(f"Database operation failed: {str(e)}", exc_info=True) raise finally: session.close() return memory, document except Exception as e: logger.error(f"Error occurred during file saving: {str(e)}", exc_info=True) raise def _save_file_to_disk(self, file): """Save file to disk Args: file: uploaded file object Returns: tuple: (file path, file name, file size) """ try: # ensure directory exists self.base_path.mkdir(parents=True, exist_ok=True) logger.debug(f"Ensuring storage directory exists: {self.base_path}") # generate file name and path filename = file.filename filepath = self.base_path / filename logger.info(f"Preparing to save file to: {filepath}") # save file file.save(str(filepath)) filesize = os.path.getsize(filepath) logger.info(f"File saved successfully: {filepath}, size: {filesize} bytes") return filepath, filename, filesize except Exception as e: logger.error(f"Failed to save file to disk: {str(e)}", exc_info=True) raise def _process_document(self, filepath, metadata=None): """Process document and create Document record Args: filepath: file path metadata: file metadata Returns: Document: created Document object, return None if processing fails """ try: logger.info(f"Starting to process document: {filepath}") doc = ProcessorFactory.auto_detect_and_process(str(filepath)) logger.info( f"Document processing completed, type: {doc.mime_type}, size: {doc.document_size}" ) request = CreateDocumentRequest( name=doc.name, title=metadata.get("name", doc.name) if metadata else doc.name, mime_type=doc.mime_type, user_description=metadata.get("description", "Uploaded document") if metadata else "Uploaded document", document_size=doc.document_size, url=str(filepath), raw_content=doc.raw_content, extract_status=doc.extract_status, embedding_status=ProcessStatus.INITIALIZED, ) saved_doc = self.document_service.create_document(request) logger.info(f"Document record created: {saved_doc.id}") return saved_doc except Exception as e: logger.error(f"Document processing failed: {str(e)}", exc_info=True) return None