Source code for autoclean.core.task
"""Base class for all EEG processing tasks."""
# Standard library imports
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, Optional
# Third-party imports
import mne # Core EEG processing library for data containers and processing
from autoclean.io.export import save_epochs_to_set, save_raw_to_set
from autoclean.io.import_ import import_eeg
# Local imports
try:
from autoclean.mixins import DISCOVERED_MIXINS
if not DISCOVERED_MIXINS:
print("🚨 CRITICAL ERROR: DISCOVERED_MIXINS is empty!")
print("Task class will be missing all mixin functionality!")
print("Check autoclean.mixins package for import errors.")
# Create a minimal fallback
class _EmptyMixinFallback:
def __getattr__(self, name):
raise AttributeError(
f"Method '{name}' not available - mixin discovery failed. "
f"Check autoclean.mixins package for import errors."
)
DISCOVERED_MIXINS = (_EmptyMixinFallback,)
except ImportError as e:
print("🚨 CRITICAL ERROR: Could not import DISCOVERED_MIXINS!")
print(f"Import error: {e}")
print("Task class will be missing all mixin functionality!")
# Create a minimal fallback
class _ImportErrorMixinFallback:
def __getattr__(self, name):
raise AttributeError(f"Method '{name}' not available - mixin import failed")
DISCOVERED_MIXINS = (_ImportErrorMixinFallback,)
[docs]
class Task(ABC, *DISCOVERED_MIXINS):
"""Base class for all EEG processing tasks.
This class defines the interface that all specific EEG tasks must implement.
It provides the basic structure for:
1. Loading and validating configuration
2. Importing raw EEG data
3. Running preprocessing steps
4. Applying task-specific processing
5. Saving results
It should be inherited from to create new tasks in the autoclean.tasks module.
Notes
-----
Abstract base class that enforces a consistent interface across all EEG processing
tasks through abstract methods and strict type checking. Manages state through
MNE objects (Raw and Epochs) while maintaining processing history in a dictionary.
"""
[docs]
def __init__(self, config: Dict[str, Any]):
"""Initialize a new task instance.
Parameters
----------
config : Dict[str, Any]
A dictionary containing all configuration settings for the task.
Must include:
- run_id (str): Unique identifier for this processing run
- unprocessed_file (Path): Path to the raw EEG data file
- task (str): Name of the task (e.g., "rest_eyesopen")
The base class automatically detects a module-level 'config' variable
and uses it for self.settings in Python-based tasks.
Examples
--------
>>> # Python task file approach - no __init__ needed!
>>> config = {'resample': {'enabled': True, 'value': 250}}
>>> class MyTask(Task):
... def run(self):
... self.import_raw()
... # Processing steps here
"""
# Auto-detect module-level config for Python tasks
if not hasattr(self, "settings"):
# Get the module where this class was defined
import inspect
module = inspect.getmodule(self.__class__)
if module and hasattr(module, "config"):
self.settings = module.config
else:
self.settings = None
# Extract EEG system from task settings before validation
config["eeg_system"] = self._extract_eeg_system()
# Configuration must be validated first as other initializations depend on it
self.config = self.validate_config(config)
# Initialize MNE data containers to None
# These will be populated during the processing pipeline
self.raw: Optional[mne.io.Raw] = None # Holds continuous EEG data
self.original_raw: Optional[mne.io.Raw] = None
self.epochs: Optional[mne.Epochs] = None # Holds epoched data segments
self.flagged = False
self.flagged_reasons = []
self.fast_ica: Optional[mne.ICA] = None
self.final_ica: Optional[mne.ICA] = None
self.ica_flags = None
def _extract_eeg_system(self) -> str:
"""Extract EEG system/montage from task settings.
Returns
-------
str
The montage name from task config, or "auto" as fallback
"""
if (
self.settings
and "montage" in self.settings
and self.settings["montage"].get("enabled", False)
):
return self.settings["montage"]["value"]
return "auto"
[docs]
def import_raw(self) -> None:
"""Import the raw EEG data from file.
Notes
-----
Imports data using the configured import function and flags files with
duration less than 60 seconds. Saves the imported data as a post-import
stage file.
"""
self.raw = import_eeg(self.config)
if self.raw.duration < 60:
self.flagged = True
self.flagged_reasons = [
f"WARNING: Initial duration ({float(self.raw.duration):.1f}s) less than 1 minute"
]
save_raw_to_set(
raw=self.raw,
autoclean_dict=self.config,
stage="post_import",
flagged=self.flagged,
)
[docs]
def import_epochs(self) -> None:
"""Import the epochs from file.
Notes
-----
Imports data using the configured import function and saves the imported
data as a post-import stage file.
"""
self.epochs = import_eeg(self.config)
save_epochs_to_set(
epochs=self.epochs,
autoclean_dict=self.config,
stage="post_import",
flagged=self.flagged,
)
[docs]
@abstractmethod
def run(self) -> None:
"""Run the standard EEG preprocessing pipeline.
Notes
-----
Defines interface for MNE-based preprocessing operations including filtering,
resampling, and artifact detection. Maintains processing state through
self.raw modifications.
The specific parameters for each preprocessing step should be
defined in the task configuration and validated before use.
"""
[docs]
def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Validate the complete task configuration.
Parameters
----------
config : Dict[str, Any]
The configuration dictionary to validate.
See __init__ docstring for required fields.
Returns
-------
Dict[str, Any]
The validated configuration dictionary.
May contain additional fields added during validation.
Notes
-----
Implements two-stage validation pattern with base validation followed by
task-specific checks. Uses type annotations and runtime checks to ensure
configuration integrity before processing begins.
Examples
--------
>>> config = {...} # Your configuration dictionary
>>> validated_config = task.validate_config(config)
>>> print(f"Validation successful: {validated_config['task']}")
"""
# Schema definition for base configuration requirements
# All tasks must provide these fields with exact types
required_fields = {
"run_id": str, # Unique identifier for tracking
"unprocessed_file": Path, # Input file path
"task": str, # Task identifier
}
# Two-stage validation: first check existence, then type
for field, field_type in required_fields.items():
# Stage 1: Check field existence
if field not in config:
raise ValueError(f"Missing required field: {field}")
# Stage 2: Validate field type using isinstance for safety
if not isinstance(config[field], field_type):
raise TypeError(
f"Field '{field}' must be of type {field_type.__name__}, "
f"got {type(config[field]).__name__} instead"
)
# No longer validate required_stages - stages are created dynamically when export=True is used
return config
[docs]
def get_flagged_status(self) -> tuple[bool, list[str]]:
"""Get the flagged status of the task.
Returns
-------
tuple of (bool, list of str)
A tuple containing a boolean flag and a list of reasons for flagging.
"""
return self.flagged, self.flagged_reasons
[docs]
def get_raw(self) -> Optional[mne.io.Raw]:
"""Get the raw data of the task.
Returns
-------
mne.io.Raw
The raw data of the task.
"""
if self.raw is None:
raise ValueError("Raw data is not available.")
return self.raw
[docs]
def get_epochs(self) -> Optional[mne.Epochs]:
"""Get the epochs of the task.
Returns
-------
mne.Epochs
The epochs of the task.
"""
if self.epochs is None:
raise ValueError("Epochs are not available.")
return self.epochs