Source code for autoclean.utils.database

# src/autoclean/utils/database.py
"""Database utilities for the autoclean package using SQLite."""

import json
import sqlite3
import threading
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional

from autoclean.utils.logging import message

# Global lock for thread safety
_db_lock = threading.Lock()

# Global database path
DB_PATH = None


[docs] def set_database_path(path: Path) -> None: """Set the global database path. Parameters ---------- path : Path The path to the autoclean directory. """ global DB_PATH # pylint: disable=global-statement DB_PATH = path
class DatabaseError(Exception): """Custom exception for database operations.""" def __init__(self, error_message: str): self.message = error_message super().__init__(self.message) class RecordNotFoundError(Exception): """Custom exception for when a database record is not found.""" def __init__(self, error_message: str): self.message = error_message super().__init__(self.message) def _serialize_for_json(obj: Any) -> Any: """Convert objects to JSON-serializable format. Parameters ---------- obj : Any Object to serialize. Returns ------- Any JSON-serializable object. """ if isinstance(obj, Path): return str(obj) if isinstance(obj, dict): # Preserve all dictionary keys and values, converting non-serializable values to strings return {k: _serialize_for_json(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [_serialize_for_json(item) for item in obj] try: json.dumps(obj) return obj except (TypeError, OverflowError): return str(obj)
[docs] def get_run_record(run_id: str) -> dict: """Get a run record from the database by run ID. Parameters ---------- run_id : str The string ID of the run to retrieve. Returns ------- run_record : dict The run record if found, None if not found """ run_record = manage_database(operation="get_record", run_record={"run_id": run_id}) return run_record
def _validate_metadata(metadata: dict) -> bool: """Validates metadata structure and types. Parameters ---------- metadata : dict The metadata to validate. Returns ------- bool True if the metadata is valid, False otherwise. """ if not isinstance(metadata, dict): return False return all(isinstance(k, str) for k in metadata.keys()) def _get_db_connection(db_path: Path) -> sqlite3.Connection: """Get a database connection with proper configuration. Parameters ---------- db_path : Path Path to the SQLite database file. Returns ------- sqlite3.Connection Configured database connection. """ conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row # Enable row factory for dict-like access return conn
[docs] def manage_database( operation: str, run_record: Optional[Dict[str, Any]] = None, update_record: Optional[Dict[str, Any]] = None, ) -> Any: """Manage database operations with thread safety. Parameters ---------- operation : str Operations can be: - **create_collection**: Create a new collection. - **store**: Store a new record. - **update**: Update an existing record. - **update_status**: Update the status of an existing record. - **drop_collection**: Drop the collection. - **get_collection**: Get the collection. - **get_record**: Get a record from the collection. run_record : dict The record to store. update_record : dict The record updates. Returns ------- Any Operation-specific return value. """ db_path = DB_PATH / "pipeline.db" db_path.parent.mkdir(parents=True, exist_ok=True) with _db_lock: # Ensure only one thread can access the database at a time try: conn = _get_db_connection(db_path) cursor = conn.cursor() if operation == "create_collection": # Create table only if it doesn't exist cursor.execute( """ CREATE TABLE IF NOT EXISTS pipeline_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT UNIQUE NOT NULL, created_at TEXT NOT NULL, task TEXT, unprocessed_file TEXT, status TEXT, success BOOLEAN, json_file TEXT, report_file TEXT, metadata TEXT, error TEXT ) """ ) conn.commit() message("info", f"✓ Ensured 'pipeline_runs' table exists in {db_path}") elif operation == "store": if not run_record: raise ValueError("Missing run_record for store operation") # Convert metadata to JSON string, handling Path objects metadata_json = json.dumps( _serialize_for_json(run_record.get("metadata", {})) ) cursor.execute( """ INSERT INTO pipeline_runs ( run_id, created_at, task, unprocessed_file, status, success, json_file, report_file, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( run_record["run_id"], run_record.get("timestamp", datetime.now().isoformat()), run_record.get("task"), ( str(run_record.get("unprocessed_file")) if run_record.get("unprocessed_file") else None ), run_record.get("status"), run_record.get("success", False), ( str(run_record.get("json_file")) if run_record.get("json_file") else None ), ( str(run_record.get("report_file")) if run_record.get("report_file") else None ), metadata_json, ), ) conn.commit() record_id = cursor.lastrowid message("info", f"✓ Stored new record with ID: {record_id}") return record_id elif operation in ["update", "update_status"]: if not update_record or "run_id" not in update_record: raise ValueError("Missing run_id in update_record") run_id = update_record["run_id"] # Check if record exists cursor.execute( "SELECT * FROM pipeline_runs WHERE run_id = ?", (run_id,) ) existing_record = cursor.fetchone() if not existing_record: raise RecordNotFoundError(f"No record found for run_id: {run_id}") if operation == "update_status": cursor.execute( """ UPDATE pipeline_runs SET status = ? WHERE run_id = ? """, ( f"{update_record['status']} at {datetime.now().isoformat()}", run_id, ), ) else: update_components = [] current_update_values = [] # Using a distinct name for clarity # Handle metadata update if 'metadata' key exists in update_record if "metadata" in update_record: metadata_to_update = update_record["metadata"] if not _validate_metadata(metadata_to_update): raise ValueError("Invalid metadata structure for update") # Fetch existing metadata cursor.execute( "SELECT metadata FROM pipeline_runs WHERE run_id = ?", (run_id,), ) row_with_metadata = cursor.fetchone() existing_metadata_str = ( row_with_metadata["metadata"] if row_with_metadata else "{}" ) current_metadata = json.loads(existing_metadata_str or "{}") # Serialize the new metadata fragment and merge it serialized_new_metadata_fragment = _serialize_for_json( metadata_to_update ) current_metadata.update(serialized_new_metadata_fragment) final_metadata_json = json.dumps(current_metadata) update_components.append("metadata = ?") current_update_values.append(final_metadata_json) # Handle other fields present in update_record for key, value in update_record.items(): if key == "run_id" or key == "metadata": continue update_components.append(f"{key} = ?") if isinstance(value, Path): current_update_values.append(str(value)) else: current_update_values.append(value) # Only execute the UPDATE SQL statement if there are actual fields to set if update_components: # Add the run_id for the WHERE clause; it's the last parameter for the query current_update_values.append(run_id) set_clause_sql = ", ".join(update_components) query = f"UPDATE pipeline_runs SET {set_clause_sql} WHERE run_id = ?" cursor.execute(query, tuple(current_update_values)) else: message( "debug", f"For 'update' operation on run_id '{run_id}', no non-metadata fields were identified for SET clause. update_record: {update_record}. Metadata might have been updated if processed.", ) conn.commit() message("debug", f"Record {operation} successful for run_id: {run_id}") elif operation == "drop_collection": cursor.execute("DROP TABLE IF EXISTS pipeline_runs") conn.commit() message("warning", f"'pipeline_runs' table dropped from {db_path}") elif operation == "get_collection": cursor.execute("SELECT * FROM pipeline_runs") records = [dict(row) for row in cursor.fetchall()] return records elif operation == "get_record": if not run_record or "run_id" not in run_record: raise ValueError("Missing run_id in run_record") cursor.execute( "SELECT * FROM pipeline_runs WHERE run_id = ?", (run_record["run_id"],), ) record = cursor.fetchone() if not record: raise RecordNotFoundError( f"No record found for run_id: {run_record['run_id']}" ) # Convert record to dict and parse metadata JSON record_dict = dict(record) if record_dict.get("metadata"): record_dict["metadata"] = json.loads(record_dict["metadata"]) return record_dict conn.close() except Exception as e: error_context = { "operation": operation, "timestamp": datetime.now().isoformat(), "error": str(e), } message("error", f"Database operation failed: {error_context}") raise DatabaseError(f"Operation '{operation}' failed: {e}") from e