secondme-api / lpm_kernel /file_data /memory_service.py
Gemini
feat: add detailed logging
01d5a5d
from pathlib import Path
import os
import uuid
from datetime import datetime
from lpm_kernel.common.logging import logger
from lpm_kernel.models.memory import Memory
from lpm_kernel.common.repository.database_session import DatabaseSession
from lpm_kernel.file_data.process_factory import ProcessorFactory
from lpm_kernel.file_data.document_service import DocumentService
from lpm_kernel.file_data.document_dto import CreateDocumentRequest
from .process_status import ProcessStatus
from sqlalchemy import select
class StorageService:
def __init__(self, config):
self.config = config
# get raw content directory configuration
raw_content_dir = config.get("USER_RAW_CONTENT_DIR", "resources/raw_content")
base_dir = config.get("LOCAL_BASE_DIR", ".")
logger.info(f"Initializing storage service, base_dir: {base_dir}")
logger.info(f"Raw content directory configuration: {raw_content_dir}")
# if path is not absolute, build full path based on base_dir
if not os.path.isabs(raw_content_dir):
# replace environment variable
raw_content_dir = raw_content_dir.replace("${RESOURCE_DIR}", "resources")
raw_content_dir = raw_content_dir.replace(
"${RAW_CONTENT_DIR}", "resources/raw_content"
)
# build full path based on base_dir
raw_content_dir = os.path.join(base_dir, raw_content_dir)
logger.info(f"Building complete path: {raw_content_dir}")
# convert to Path object and ensure directory exists
self.base_path = Path(raw_content_dir).resolve()
self.base_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Storage path created: {self.base_path}")
self.document_service = DocumentService()
def check_file_exists(self, filename: str, filesize: int) -> Memory:
"""Check if file already exists
Args:
filename: file name
filesize: file size
Returns:
Memory: if file exists, return corresponding Memory object; otherwise return None
"""
db = DatabaseSession()
with db._session_factory() as session:
# find record with same file name and size
query = select(Memory).where(
Memory.name == filename, Memory.size == filesize
)
result = session.execute(query)
memory = result.scalar_one_or_none()
if memory:
logger.info(f"Found duplicate file: {filename}, size: {filesize}")
# check if file really exists
if os.path.exists(memory.path):
return memory
logger.warning(f"File in database does not exist on disk: {memory.path}")
return None
def save_file(self, file, metadata=None):
"""Save file and process document
Args:
file: uploaded file object
metadata: file metadata
Returns:
tuple: (Memory object, Document object)
Raises:
ValueError: if file already exists
"""
logger.info(f"Starting to save file: {file.filename}")
logger.debug(f"File metadata: {metadata}")
try:
# get file size
file.seek(0, os.SEEK_END)
filesize = file.tell()
file.seek(0) # reset file pointer to start position
# check if file already exists
existing_memory = self.check_file_exists(file.filename, filesize)
if existing_memory:
raise ValueError(f"File '{file.filename}' already exists")
# save file to disk
filepath, filename, filesize = self._save_file_to_disk(file)
logger.info(f"File saved to disk: {filepath}, size: {filesize} bytes")
# create Memory record
memory = None
document = None
db = DatabaseSession()
session = db._session_factory()
try:
# create and save Memory record
memory = Memory(
name=filename,
size=filesize,
path=str(filepath),
metadata=metadata or {},
)
session.add(memory)
session.commit()
logger.info(f"Memory record created successfully: {memory.id}")
# process document
document = self._process_document(filepath, metadata)
if document:
memory.document_id = document.id
session.add(memory)
session.commit()
logger.info(f"Memory record updated, associated document ID: {document.id}")
# refresh memory object to ensure all fields are up to date
session.refresh(memory)
except Exception as e:
session.rollback()
logger.error(f"Database operation failed: {str(e)}", exc_info=True)
raise
finally:
session.close()
return memory, document
except Exception as e:
logger.error(f"Error occurred during file saving: {str(e)}", exc_info=True)
raise
def _save_file_to_disk(self, file):
"""Save file to disk
Args:
file: uploaded file object
Returns:
tuple: (file path, file name, file size)
"""
try:
# ensure directory exists
self.base_path.mkdir(parents=True, exist_ok=True)
logger.debug(f"Ensuring storage directory exists: {self.base_path}")
# generate file name and path
filename = file.filename
filepath = self.base_path / filename
logger.info(f"Preparing to save file to: {filepath}")
# save file
file.save(str(filepath))
filesize = os.path.getsize(filepath)
logger.info(f"File saved successfully: {filepath}, size: {filesize} bytes")
return filepath, filename, filesize
except Exception as e:
logger.error(f"Failed to save file to disk: {str(e)}", exc_info=True)
raise
def _process_document(self, filepath, metadata=None):
"""Process document and create Document record
Args:
filepath: file path
metadata: file metadata
Returns:
Document: created Document object, return None if processing fails
"""
try:
logger.info(f"Starting to process document: {filepath}")
doc = ProcessorFactory.auto_detect_and_process(str(filepath))
logger.info(
f"Document processing completed, type: {doc.mime_type}, size: {doc.document_size}"
)
request = CreateDocumentRequest(
name=doc.name,
title=metadata.get("name", doc.name) if metadata else doc.name,
mime_type=doc.mime_type,
user_description=metadata.get("description", "Uploaded document")
if metadata
else "Uploaded document",
document_size=doc.document_size,
url=str(filepath),
raw_content=doc.raw_content,
extract_status=doc.extract_status,
embedding_status=ProcessStatus.INITIALIZED,
)
saved_doc = self.document_service.create_document(request)
logger.info(f"Document record created: {saved_doc.id}")
return saved_doc
except Exception as e:
logger.error(f"Document processing failed: {str(e)}", exc_info=True)
return None