# file_data/service.py from pathlib import Path from typing import List, Dict, Optional import os from sqlalchemy import select from lpm_kernel.common.repository.database_session import DatabaseSession from lpm_kernel.common.repository.vector_store_factory import VectorStoreFactory from lpm_kernel.file_data.document_dto import DocumentDTO, CreateDocumentRequest from lpm_kernel.file_data.exceptions import FileProcessingError from lpm_kernel.kernel.l0_base import InsightKernel, SummaryKernel from lpm_kernel.models.memory import Memory from .document import Document from .document_repository import DocumentRepository from .dto.chunk_dto import ChunkDTO from .embedding_service import EmbeddingService from .process_factory import ProcessorFactory from .process_status import ProcessStatus from lpm_kernel.configs.logging import get_train_process_logger logger = get_train_process_logger() class DocumentService: def __init__(self): self._repository = DocumentRepository() self._insight_kernel = InsightKernel() self._summary_kernel = SummaryKernel() self.vector_store = VectorStoreFactory.get_instance() self.embedding_service = EmbeddingService() def create_document(self, data: CreateDocumentRequest) -> Document: """ create new document Args: data (CreateDocumentRequest): create doc request Returns: Document: create doc object """ doc = Document( name=data.name, title=data.title, mime_type=data.mime_type, user_description=data.user_description, url=str(data.url) if data.url else None, document_size=data.document_size, extract_status=data.extract_status, embedding_status=ProcessStatus.INITIALIZED, raw_content=data.raw_content, ) return self._repository.create(doc) def list_documents(self) -> List[Document]: """ get all doc list Returns: List[Document]: doc object list """ return self._repository.list() def scan_directory( self, directory_path: str, recursive: bool = False ) -> List[DocumentDTO]: """ scan and process files Args: directory_path (str): dir to scan recursive (bool, optional): if recursive scan. Defaults to False. Returns: List[Document]: processed doc object list Raises: FileProcessingError: when dir not exist or failed """ path = Path(directory_path) if not path.is_dir(): raise FileProcessingError(f"{directory_path} is not a directory") documents_dtos: List[DocumentDTO] = [] pattern = "**/*" if recursive else "*" # list all files files = list(path.glob(pattern)) logger.info(f"Found files: {files}") for file_path in files: if file_path.is_file(): try: logger.info(f"Processing file: {file_path}") doc = ProcessorFactory.auto_detect_and_process(str(file_path)) # create CreateDocumentRequest obj to database request = CreateDocumentRequest( name=doc.name, title=doc.name, mime_type=doc.mime_type, user_description="Auto scanned document", document_size=doc.document_size, url=str(file_path.absolute()), raw_content=doc.raw_content, extract_status=doc.extract_status, embedding_status=ProcessStatus.INITIALIZED, ) saved_doc = self.create_document(request) documents_dtos.append(saved_doc.to_dto()) logger.info(f"Successfully processed and saved: {file_path}") except Exception as e: # add detailed error log logger.exception( f"Error processing file {file_path}" ) continue logger.info(f"Total documents processed and saved: {len(documents_dtos)}") return documents_dtos def _analyze_document(self, doc: DocumentDTO) -> DocumentDTO: """ analyze one file Args: doc (Document): doc to analyze Returns: Document: updated doc Raises: Exception: error occurred """ try: # generate insight insight_result = self._insight_kernel.analyze(doc) # generate summary summary_result = self._summary_kernel.analyze( doc, insight_result["insight"] ) # update database updated_doc = self._repository.update_document_analysis( doc.id, insight_result, summary_result ) return updated_doc except Exception as e: logger.error(f"Document {doc.id} analysis failed: {str(e)}", exc_info=True) # update status as failed self._update_analyze_status_failed(doc.id) raise def analyze_document(self, document_id: int) -> DocumentDTO: """ Analyze a single document by ID Args: document_id (int): ID of document to analyze Returns: DocumentDTO: The analyzed document Raises: ValueError: If document not found Exception: If analysis fails """ try: # Get document document = self._repository.find_one(document_id) if not document: raise ValueError(f"Document not found with id: {document_id}") # Perform analysis return self._analyze_document(document) except ValueError as e: logger.error(f"Document {document_id} not found: {str(e)}") raise except Exception as e: logger.error(f"Error analyzing document {document_id}: {str(e)}", exc_info=True) self._update_analyze_status_failed(document_id) raise def _update_analyze_status_failed(self, doc_id: int) -> None: """update status as failed""" try: with self._repository._db.session() as session: document = session.get(self._repository.model, doc_id) if document: document.analyze_status = ProcessStatus.FAILED session.commit() logger.debug(f"Updated analyze status for document {doc_id} to FAILED") else: logger.warning(f"Document not found with id: {doc_id}") except Exception as e: logger.error(f"Error updating document analyze status: {str(e)}") def check_all_documents_embeding_status(self) -> bool: """ Check if there are any documents that need embedding Returns: bool: True if there are documents that need embedding, False otherwise """ try: unembedding_docs = self._repository.find_unembedding() return len(unembedding_docs) > 0 except Exception as e: logger.error(f"Error checking documents embedding status: {str(e)}", exc_info=True) raise def analyze_all_documents(self) -> List[DocumentDTO]: """ analyze all unanalyzed documents Returns: List[DocumentDTO]: finished doc list Raises: Exception: error occurred """ try: # get all unanalyzed documents unanalyzed_docs = self._repository.find_unanalyzed() analyzed_docs = [] success_count = 0 error_count = 0 for index, doc in enumerate(unanalyzed_docs, 1): try: analyzed_doc = self._analyze_document(doc) analyzed_docs.append(analyzed_doc) success_count += 1 except Exception as e: error_count += 1 logger.error(f"Document {doc.id} processing failed: {str(e)}") continue return analyzed_docs except Exception as e: logger.error(f"Error occurred during batch analysis: {str(e)}", exc_info=True) raise def get_document_l0(self, document_id: int) -> Dict: """ get chunks and embeds Args: document_id (int): doc ID Returns: Dict: format: { "document_id": int, "chunks": List[Dict], "total_chunks": int } Raises: FileProcessingError: doc not existed """ try: # get doc document = self._repository.find_one(document_id) if not document: raise FileProcessingError(f"Document not found: {document_id}") # get doc chunks chunks = self.get_document_chunks(document_id) if not chunks: return {"document_id": document_id, "chunks": [], "total_chunks": 0} # get doc embeddings all_chunk_embeddings = self.get_chunk_embeddings_by_document_id(document_id) # get L0 data l0_data = { "document_id": document_id, "chunks": [ { "id": chunk.id, "content": chunk.content, "has_embedding": chunk.has_embedding, "embedding": all_chunk_embeddings.get(chunk.id), "tags": chunk.tags, "topic": chunk.topic, } for chunk in chunks ], "total_chunks": len(chunks), } return l0_data except FileProcessingError as e: raise e except Exception as e: logger.error(f"Error getting L0 data for document {document_id}: {str(e)}") raise FileProcessingError(f"Failed to get L0 data: {str(e)}") def get_document_chunks(self, document_id: int) -> List[ChunkDTO]: """ get chunks result Args: document_id (int): doc ID Returns: List[ChunkDTO]: doc chunks list,each ChunkDTO include embedding info """ try: document = self._repository.find_one(document_id=document_id) if not document: logger.info(f"Document not found with id: {document_id}") return [] chunks = self._repository.find_chunks(document_id=document_id) logger.info(f"Found {len(chunks)} chunks for document {document_id}") for chunk in chunks: chunk.length = len(chunk.content) if chunk.content else 0 if chunk.has_embedding: chunk.embedding = ( self.embedding_service.get_chunk_embedding_by_chunk_id(chunk.id) ) return chunks except Exception as e: logger.error(f"Error getting chunks for document {document_id}: {str(e)}") return [] # def save_chunk(self, chunk: Chunk) -> None: # """ # Args: # chunk (Chunk): chunk obj # Raises: # Exception: error occurred # """ # try: # # create ChunkModel instance # chunk_model = ChunkModel( # document_id=chunk.document_id, # content=chunk.content, # tags=chunk.tags, # topic=chunk.topic, # ) # # save to db # self._repository.save_chunk(chunk_model) # logger.debug(f"Saved chunk for document {chunk.document_id}") # except Exception as e: # logger.error(f"Error saving chunk: {str(e)}") # raise def list_documents_with_l0(self) -> List[Dict]: """ get all docs' L0 data Returns: List[Dict]: list of dict of docs with L0 data """ # 1. get all basic data documents = self.list_documents() logger.info(f"list_documents len: {len(documents)}") # 2. each doc L0 documents_with_l0 = [] for doc in documents: doc_dict = doc.to_dict() try: l0_data = self.get_document_l0(doc.id) doc_dict["l0_data"] = l0_data logger.info(f"success getting L0 data for document {doc.id} success") except Exception as e: logger.error(f"Error getting L0 data for document {doc.id}: {str(e)}") doc_dict["l0_data"] = None documents_with_l0.append(doc_dict) return documents_with_l0 def get_document_by_id(self, document_id: int) -> Optional[Document]: """ get doc by ID Args: document_id (int): doc ID Returns: Optional[Document]: doc object, None if not found """ try: return self._repository.find_one(document_id) except Exception as e: logger.error(f"Error getting document by id {document_id}: {str(e)}") return None def generate_document_chunk_embeddings(self, document_id: int) -> List[ChunkDTO]: """ handle chunks and embeddings Args: document_id (int): ID Returns: List[ChunkDTO]: chunks list Raises: Exception: error occurred """ try: chunks_dtos = self._repository.find_chunks(document_id) if not chunks_dtos: logger.info(f"No chunks found for document {document_id}") return [] # handle embeddings processed_chunks = self.embedding_service.generate_chunk_embeddings( chunks_dtos ) # update state in db for chunk_dto in processed_chunks: if chunk_dto.has_embedding: self._repository.update_chunk_embedding_status(chunk_dto.id, True) return processed_chunks except Exception as e: logger.error(f"Error processing chunk embeddings: {str(e)}") raise def get_chunk_embeddings_by_document_id( self, document_id: int ) -> Dict[int, List[float]]: """ get chunks embeddings Args: document_id (int): doc ID Returns: Dict[int, List[float]]: chunk_id to embedding mapping Raises: Exception: error occurred """ try: # get all chunks ID chunks = self._repository.find_chunks(document_id) chunk_ids = [str(chunk.id) for chunk in chunks] # get embeddings from ChromaDB embeddings = {} if chunk_ids: results = self.embedding_service.chunk_collection.get( ids=chunk_ids, include=["embeddings", "documents"] ) # transfer chunk_id -> embedding for i, chunk_id in enumerate(results["ids"]): embeddings[int(chunk_id)] = results["embeddings"][i] return embeddings except Exception as e: logger.error( f"Error getting chunk embeddings for document {document_id}: {str(e)}" ) raise def process_document_embedding(self, document_id: int) -> List[float]: """ handle doc level embedding Args: document_id (int): doc ID Returns: List[float]: doc embedding Raises: ValueError: doc not exist Exception: error occurred """ try: document = self._repository.find_one(document_id) if not document: raise ValueError(f"Document not found with id: {document_id}") if not document.raw_content: logger.warning( f"Document {document_id} has no content to process embedding" ) self._repository.update_embedding_status( document_id, ProcessStatus.FAILED ) return None # gen doc embedding embedding = self.embedding_service.generate_document_embedding(document) if embedding is not None: self._repository.update_embedding_status( document_id, ProcessStatus.SUCCESS ) else: self._repository.update_embedding_status( document_id, ProcessStatus.FAILED ) return embedding except Exception as e: logger.error(f"Error processing document embedding: {str(e)}") self._repository.update_embedding_status(document_id, ProcessStatus.FAILED) raise def get_document_embedding(self, document_id: int) -> Optional[List[float]]: """ get doc embedding Args: document_id (int): doc ID Returns: Optional[List[float]]: doc embedding Raises: Exception: error occurred """ try: results = self.embedding_service.document_collection.get( ids=[str(document_id)], include=["embeddings"] ) if results and results["embeddings"]: return results["embeddings"][0] return None except Exception as e: logger.error(f"Error getting document embedding: {str(e)}") raise def delete_file_by_name(self, filename: str) -> bool: """ Args: filename (str): name to delete Returns: bool: if success Raises: Exception: error occurred """ logger.info(f"Starting to delete file: {filename}") try: # 1. search memories db = DatabaseSession() memory = None document_id = None with db._session_factory() as session: query = select(Memory).where(Memory.name == filename) result = session.execute(query) memory = result.scalar_one_or_none() if not memory: logger.warning(f"File record not found: {filename}") return False # get related document_id document_id = memory.document_id # get filepath file_path = memory.path # 2. delete memory session.delete(memory) session.commit() logger.info(f"Deleted record from memories table: {filename}") # if no related document, only delete physical file if not document_id: # delete physical file if os.path.exists(file_path): os.remove(file_path) logger.info(f"Deleted physical file: {file_path}") return True # 3. get doc obj document = self._repository.get_by_id(document_id) if not document: logger.warning(f"Corresponding document record not found, ID: {document_id}") # if no document record, delete physical file if os.path.exists(file_path): os.remove(file_path) logger.info(f"Deleted physical file: {file_path}") return True # 4. get all chunks chunks = self._repository.find_chunks(document_id) # 5. delete doc embedding from ChromaDB try: self.embedding_service.document_collection.delete( ids=[str(document_id)] ) logger.info(f"Deleted document embedding from ChromaDB, ID: {document_id}") except Exception as e: logger.error(f"Error deleting document embedding: {str(e)}") # 6. delete all chunk embedding from ChromaDB if chunks: try: chunk_ids = [str(chunk.id) for chunk in chunks] self.embedding_service.chunk_collection.delete( ids=chunk_ids ) logger.info(f"Deleted {len(chunk_ids)} chunk embeddings from ChromaDB") except Exception as e: logger.error(f"Error deleting chunk embeddings: {str(e)}") # 7. delete all chunks embedding from ChromaDB with db._session_factory() as session: from lpm_kernel.file_data.models import ChunkModel session.query(ChunkModel).filter( ChunkModel.document_id == document_id ).delete() session.commit() logger.info(f"Deleted all related chunks") # delete doc record doc_entity = session.get(Document, document_id) if doc_entity: session.delete(doc_entity) session.commit() logger.info(f"Deleted document record from database, ID: {document_id}") # 8. delete physical file if os.path.exists(file_path): os.remove(file_path) logger.info(f"Deleted physical file: {file_path}") return True except Exception as e: logger.error(f"Error deleting file: {str(e)}", exc_info=True) raise def fix_missing_document_analysis(self) -> int: """Fix documents with missing insights or summaries Returns: int: Number of documents fixed """ try: # Find all documents that have analysis issues docs = self._repository.list() fixed_count = 0 for doc in docs: needs_fixing = False # Check if document needs analysis if not doc.analyze_status or doc.analyze_status != ProcessStatus.SUCCESS: needs_fixing = True logger.info(f"Document {doc.id} needs analysis (status: {doc.analyze_status})") # Check if document has missing insights or summaries elif not doc.insight or not doc.summary: needs_fixing = True logger.info(f"Document {doc.id} has missing insight or summary") # Process documents that need fixing if needs_fixing: try: # Process document analysis self.analyze_document(doc.id) fixed_count += 1 logger.info(f"Fixed document {doc.id} analysis") except Exception as e: logger.error(f"Error fixing document {doc.id} analysis: {str(e)}") logger.info(f"Fixed {fixed_count} documents with missing analysis") return fixed_count except Exception as e: logger.error(f"Error in fix_missing_document_analysis: {str(e)}") raise FileProcessingError(f"Failed to fix document analysis: {str(e)}") def verify_document_embeddings(self, verbose=True) -> Dict: """ Verify all document embeddings and return statistics Args: verbose (bool): Whether to log detailed information Returns: Dict: Statistics about document embeddings """ try: docs = self._repository.list() results = { "total_documents": len(docs), "documents_with_embedding": 0, "documents_without_embedding": 0, "documents_with_content": 0, "documents_without_content": 0, "documents_with_summary": 0, "documents_without_summary": 0, "documents_with_insight": 0, "documents_without_insight": 0, "documents_needing_repair": 0, } documents_needing_repair = [] for doc in docs: # Check if document has content if doc.raw_content: results["documents_with_content"] += 1 else: results["documents_without_content"] += 1 # Check if document has summary if doc.summary: results["documents_with_summary"] += 1 else: results["documents_without_summary"] += 1 # Check if document has insight if doc.insight: results["documents_with_insight"] += 1 else: results["documents_without_insight"] += 1 # Check if embeddings exist in ChromaDB embedding = self.get_document_embedding(doc.id) if embedding is not None: results["documents_with_embedding"] += 1 if verbose: logger.info(f"Document {doc.id}: '{doc.name}' has embedding of dimension {len(embedding)}") else: results["documents_without_embedding"] += 1 if verbose: logger.warning(f"Document {doc.id}: '{doc.name}' missing embedding") # Check if document needs repair (has content but missing embedding or analysis) if doc.raw_content and (embedding is None or not doc.summary or not doc.insight): documents_needing_repair.append(doc.id) results["documents_needing_repair"] += 1 # Log statistics logger.info(f"Document embedding verification results: {results}") if documents_needing_repair and verbose: logger.info(f"Documents needing repair: {documents_needing_repair}") return results except Exception as e: logger.error(f"Error verifying document embeddings: {str(e)}", exc_info=True) raise # create service document_service = DocumentService() # use elsewhere by: # from lpm_kernel.file_data.service import document_service