"""Basic steps mixin for autoclean tasks."""
from typing import List, Optional, Union
import mne
from autoclean.utils.logging import message
[docs]
class BasicStepsMixin:
"""Mixin class providing basic signal processing steps for autoclean tasks."""
[docs]
def run_basic_steps(
self,
data: Union[mne.io.Raw, mne.Epochs, None] = None,
use_epochs: bool = False,
stage_name: str = "post_basic_steps",
export: bool = False,
) -> Union[mne.io.Raw, mne.Epochs]:
"""Runs all basic preprocessing steps sequentially based on configuration.
The steps included are:
1. Resample Data
2. Filter Data
3. Drop Outer Layer Channels
4. Assign EOG Channels
5. Trim Edges
6. Crop Duration
Each step's execution depends on its 'enabled' status in the configuration.
Parameters
----------
data : Optional
The data object to process. If None, uses self.raw or self.epochs.
use_epochs : bool, Optional
If True and data is None, uses self.epochs instead of self.raw.
stage_name : str, Optional
Name of the processing stage for export. Default is "post_basic_steps".
export : bool, Optional
If True, exports the processed data to the stage directory. Default is False.
Returns
-------
inst : instance of mne.io.Raw or mne.io.Epochs
The data object after applying all enabled basic processing steps.
"""
message("header", "Running basic preprocessing steps...")
# Start with the correct data object
processed_data = self._get_data_object(data, use_epochs)
# 1. Resample
processed_data = self.resample_data(data=processed_data, use_epochs=use_epochs)
# 2. Filter
processed_data = self.filter_data(data=processed_data, use_epochs=use_epochs)
# 3. Drop Outer Layer
processed_data = self.drop_outer_layer(
data=processed_data, use_epochs=use_epochs
)
# 4. Assign EOG Channels
processed_data = self.assign_eog_channels(
data=processed_data, use_epochs=use_epochs
)
# 6. Trim Edges
processed_data = self.trim_edges(data=processed_data, use_epochs=use_epochs)
# 7. Crop Duration
processed_data = self.crop_duration(data=processed_data, use_epochs=use_epochs)
message("info", "Basic preprocessing steps completed successfully.")
# Update instance data
self._update_instance_data(data, processed_data, use_epochs)
# Store a copy of the pre-cleaned raw data for comparison
self.original_raw = self.raw.copy()
# Export if requested
self._auto_export_if_enabled(processed_data, stage_name, export)
return processed_data
[docs]
def filter_data(
self,
data: Union[mne.io.Raw, mne.Epochs, None] = None,
use_epochs: bool = False,
l_freq: Optional[float] = None,
h_freq: Optional[float] = None,
notch_freqs: Optional[List[float]] = None,
notch_widths: Optional[Union[float, List[float]]] = None,
method: Optional[str] = None,
phase: Optional[str] = None,
fir_window: Optional[str] = None,
verbose: Optional[bool] = None,
) -> Union[mne.io.Raw, mne.Epochs]:
"""Apply filtering to EEG data within the AutoClean pipeline.
This method wraps the standalone :func:`autoclean.filter_data` function
with pipeline integration including configuration management, metadata
tracking, and automatic export functionality.
Parameters override configuration values when provided. If not provided,
values are read from the task configuration using the existing
``_check_step_enabled`` system.
Parameters
----------
data : mne.io.Raw, mne.Epochs, or None, default None
Input data. If None, uses ``self.raw`` or ``self.epochs`` based on
``use_epochs`` parameter.
use_epochs : bool, default False
If True and data is None, uses ``self.epochs`` instead of ``self.raw``.
l_freq : float or None, optional
Low cutoff frequency for highpass filtering in Hz. Overrides config if provided.
h_freq : float or None, optional
High cutoff frequency for lowpass filtering in Hz. Overrides config if provided.
notch_freqs : list of float or None, optional
Frequencies to notch filter in Hz. Overrides config if provided.
notch_widths : float, list of float, or None, optional
Width of notch filters in Hz. Overrides config if provided.
method : str or None, optional
Filtering method ('fir' or 'iir'). Overrides config if provided.
phase : str or None, optional
Filter phase ('zero', 'zero-double', 'minimum'). Overrides config if provided.
fir_window : str or None, optional
FIR window function. Overrides config if provided.
verbose : bool or None, optional
Control verbosity. Overrides config if provided.
Returns
-------
filtered_data : mne.io.Raw or mne.Epochs
Filtered data object. Also updates ``self.raw`` or ``self.epochs``
and triggers metadata tracking and export if configured.
See Also
--------
autoclean.filter_data : The underlying standalone filtering function
"""
data = self._get_data_object(data, use_epochs)
# Use existing config system
is_enabled, config_value = self._check_step_enabled("filtering")
if not is_enabled:
message("info", "Filtering step is disabled in configuration")
return data
# Get config defaults
filter_args = config_value.get("value", {})
# Apply parameter overrides (only if explicitly provided)
final_l_freq = l_freq if l_freq is not None else filter_args.get("l_freq")
final_h_freq = h_freq if h_freq is not None else filter_args.get("h_freq")
final_notch_freqs = (
notch_freqs if notch_freqs is not None else filter_args.get("notch_freqs")
)
final_notch_widths = (
notch_widths
if notch_widths is not None
else filter_args.get("notch_widths", 0.5)
)
final_method = (
method if method is not None else filter_args.get("method", "fir")
)
final_phase = phase if phase is not None else filter_args.get("phase", "zero")
final_fir_window = (
fir_window
if fir_window is not None
else filter_args.get("fir_window", "hamming")
)
final_verbose = verbose if verbose is not None else filter_args.get("verbose")
# Check if any filtering is requested
if final_l_freq is None and final_h_freq is None and final_notch_freqs is None:
message("warning", "No filter parameters provided, skipping filtering")
return data
message("header", "Filtering data...")
# Call standalone function
from autoclean.functions.preprocessing.filtering import (
filter_data as standalone_filter_data,
)
filtered_data = standalone_filter_data(
data=data,
l_freq=final_l_freq,
h_freq=final_h_freq,
notch_freqs=final_notch_freqs,
notch_widths=final_notch_widths,
method=final_method,
phase=final_phase,
fir_window=final_fir_window,
verbose=final_verbose,
)
# Pipeline integration with result-based metadata
self._update_instance_data(data, filtered_data, use_epochs)
self._save_raw_result(filtered_data, "post_filter")
# Use actual results in metadata
metadata = {
"original_sfreq": data.info["sfreq"],
"filtered_sfreq": filtered_data.info["sfreq"],
"original_n_channels": len(data.ch_names),
"filtered_n_channels": len(filtered_data.ch_names),
"applied_l_freq": final_l_freq,
"applied_h_freq": final_h_freq,
"applied_notch_freqs": final_notch_freqs,
"applied_notch_widths": final_notch_widths,
"method": final_method,
"phase": final_phase,
"fir_window": final_fir_window,
"original_data_type": type(data).__name__,
"result_data_type": type(filtered_data).__name__,
}
self._update_metadata("step_filter_data", metadata)
return filtered_data
[docs]
def resample_data(
self,
data: Union[mne.io.Raw, mne.Epochs, None] = None,
target_sfreq: Optional[float] = None,
stage_name: str = "post_resample",
use_epochs: bool = False,
npad: Optional[str] = None,
window: Optional[str] = None,
n_jobs: Optional[int] = None,
pad: Optional[str] = None,
verbose: Optional[bool] = None,
) -> Union[mne.io.Raw, mne.Epochs]:
"""Apply resampling to EEG data within the AutoClean pipeline.
This method wraps the standalone :func:`autoclean.resample_data` function
with pipeline integration including configuration management, metadata
tracking, and automatic export functionality.
Parameters override configuration values when provided. If not provided,
values are read from the task configuration using the existing
``_check_step_enabled`` system.
Parameters
----------
data : mne.io.Raw, mne.Epochs, or None, default None
Input data. If None, uses ``self.raw`` or ``self.epochs`` based on
``use_epochs`` parameter.
target_sfreq : float or None, optional
Target sampling frequency in Hz. Overrides config if provided.
stage_name : str, default "post_resample"
Name for saving the resampled data.
use_epochs : bool, default False
If True and data is None, uses ``self.epochs`` instead of ``self.raw``.
npad : str or None, optional
Padding parameter. Overrides config if provided.
window : str or None, optional
Window function. Overrides config if provided.
n_jobs : int or None, optional
Number of parallel jobs. Overrides config if provided.
pad : str or None, optional
Padding mode. Overrides config if provided.
verbose : bool or None, optional
Control verbosity. Overrides config if provided.
Returns
-------
resampled_data : mne.io.Raw or mne.Epochs
Resampled data object. Also updates ``self.raw`` or ``self.epochs``
and triggers metadata tracking and export if configured.
See Also
--------
autoclean.resample_data : The underlying standalone resampling function
"""
data = self._get_data_object(data, use_epochs)
# Use existing config system
if target_sfreq is None:
is_enabled, config_value = self._check_step_enabled("resample_step")
if not is_enabled:
message("info", "Resampling step is disabled in configuration")
return data
target_sfreq = config_value.get("value", None)
if target_sfreq is None:
message(
"warning",
"Target sampling frequency not specified, skipping resampling",
)
return data
# Get config defaults and apply overrides
config_args = {}
if hasattr(self, "config") and "resample_step" in self.config.get(
"tasks", {}
).get(self.config.get("task", ""), {}).get("settings", {}):
config_args = self.config["tasks"][self.config["task"]]["settings"][
"resample_step"
].get("value", {})
final_npad = npad if npad is not None else config_args.get("npad", "auto")
final_window = (
window if window is not None else config_args.get("window", "auto")
)
final_n_jobs = n_jobs if n_jobs is not None else config_args.get("n_jobs", 1)
final_pad = pad if pad is not None else config_args.get("pad", "auto")
final_verbose = verbose if verbose is not None else config_args.get("verbose")
# Check if resampling is needed
current_sfreq = data.info["sfreq"]
if abs(current_sfreq - target_sfreq) < 0.01:
message(
"info",
f"Data already at target frequency ({target_sfreq} Hz), skipping resampling",
)
return data
message(
"header", f"Resampling data from {current_sfreq} Hz to {target_sfreq} Hz..."
)
# Call standalone function
from autoclean.functions.preprocessing.resampling import (
resample_data as standalone_resample_data,
)
resampled_data = standalone_resample_data(
data=data,
sfreq=target_sfreq,
npad=final_npad,
window=final_window,
n_jobs=final_n_jobs,
pad=final_pad,
verbose=final_verbose,
)
message("info", f"Data successfully resampled to {target_sfreq} Hz")
# Pipeline integration with result-based metadata
self._update_instance_data(data, resampled_data, use_epochs)
self._save_raw_result(resampled_data, stage_name)
# Use actual results in metadata
metadata = {
"original_sfreq": current_sfreq,
"target_sfreq": target_sfreq,
"actual_sfreq": resampled_data.info["sfreq"],
"original_n_samples": (
data.get_data().shape[1]
if hasattr(data, "get_data")
else len(data.times)
),
"resampled_n_samples": (
resampled_data.get_data().shape[1]
if hasattr(resampled_data, "get_data")
else len(resampled_data.times)
),
"npad": final_npad,
"window": final_window,
"n_jobs": final_n_jobs,
"pad": final_pad,
"original_data_type": type(data).__name__,
"result_data_type": type(resampled_data).__name__,
}
self._update_metadata("step_resample_data", metadata)
return resampled_data
[docs]
def rereference_data(
self,
data: Union[mne.io.Raw, mne.Epochs, None] = None,
ref_type: str = None,
use_epochs: bool = False,
stage_name: str = "post_rereference",
) -> Union[mne.io.Raw, mne.Epochs]:
"""Rereference raw or epoched data based on configuration settings.
This method can work with self.raw, self.epochs, or a provided data object.
It checks the rereference_step toggle in the configuration if no ref_type is provided.
Parameters
----------
data : Optional
The raw data to rereference. If None, uses self.raw or self.epochs.
use_epochs : bool, Optional
If True and data is None, uses self.epochs instead of self.raw.
ref_type : str, Optional
The type of reference to use. If None, reads from config.
stage_name : str, Optional
Name for saving the rereferenced data (default: "post_rereference").
Returns
-------
inst : instance of mne.io.Raw or mne.io.Epochs
The rereferenced data object (same type as input)
Examples
--------
>>> #Inside a task class that uses the autoclean framework
>>> self.rereference_data()
See Also
--------
:py:meth:`mne.io.Raw.set_eeg_reference` : For MNE's raw data rereferencing functionality
:py:meth:`mne.Epochs.set_eeg_reference` : For MNE's epochs rereferencing functionality
"""
message("header", "Rereferencing data...")
data = self._get_data_object(data, use_epochs)
if not isinstance(
data, (mne.io.base.BaseRaw, mne.Epochs)
): # pylint: disable=isinstance-second-argument-not-valid-type
raise TypeError("Data must be an MNE Raw or Epochs object")
if ref_type is None:
is_enabled, config_value = self._check_step_enabled("reference_step")
if not is_enabled:
message("info", "Rereferencing step is disabled in configuration")
return data
ref_type = config_value.get("value", None)
if ref_type is None:
message(
"warning",
"Rereferencing value not specified, skipping rereferencing",
)
return data
# Call standalone function
from autoclean.functions.preprocessing.referencing import (
rereference_data as standalone_rereference_data,
)
rereferenced_data = standalone_rereference_data(
data=data,
ref_channels=ref_type,
projection=False if ref_type == "average" else True,
verbose=False,
)
# Pipeline integration
self._update_instance_data(data, rereferenced_data, use_epochs)
self._save_raw_result(rereferenced_data, stage_name)
metadata = {
"new_ref_type": ref_type,
}
self._update_metadata("step_rereference_data", metadata)
return rereferenced_data
[docs]
def drop_outer_layer(
self,
data: Union[mne.io.Raw, mne.Epochs, None] = None,
stage_name: str = "post_outerlayer",
use_epochs: bool = False,
) -> Union[mne.io.Raw, mne.Epochs]:
"""Drop outer layer channels based on configuration settings.
Parameters
----------
data : Optional
The data object to process. If None, uses self.raw or self.epochs.
stage_name : str, Optional
Name for saving the processed data (default: "post_outerlayer").
use_epochs : bool, Optional
If True and data is None, uses self.epochs instead of self.raw.
Returns:
inst : instance of mne.io.Raw or mne.io.Epochs
The data object with outer layer channels removed.
"""
data = self._get_data_object(data, use_epochs)
if not isinstance(
data, (mne.io.base.BaseRaw, mne.Epochs)
): # pylint: disable=isinstance-second-argument-not-valid-type
raise TypeError("Data must be an MNE Raw or Epochs object")
is_enabled, config_value = self._check_step_enabled("drop_outerlayer")
if not is_enabled:
message("info", "Drop Outer Layer step is disabled in configuration")
return data
outer_layer_channels = config_value.get("value", [])
if not outer_layer_channels:
message("warning", "Outer layer channels not specified, skipping step")
return data
# Ensure channels exist in the data before attempting to drop
channels_to_drop = [ch for ch in outer_layer_channels if ch in data.ch_names]
if not channels_to_drop:
message(
"info",
"Specified outer layer channels not found in data, skipping drop.",
)
return data
message(
"header", f"Dropping outer layer channels: {', '.join(channels_to_drop)}"
)
processed_data = data.copy().drop_channels(channels_to_drop)
message("info", f"Channels dropped: {', '.join(channels_to_drop)}")
if isinstance(processed_data, (mne.io.Raw, mne.io.base.BaseRaw)):
self._save_raw_result(processed_data, stage_name)
metadata = {
"dropped_outer_layer_channels": channels_to_drop,
"original_channel_count": len(data.ch_names),
"new_channel_count": len(processed_data.ch_names),
}
self._update_metadata("step_drop_outerlayer", metadata)
self._update_instance_data(data, processed_data, use_epochs)
return processed_data
[docs]
def assign_eog_channels(
self,
data: Union[mne.io.Raw, mne.Epochs, None] = None,
use_epochs: bool = False,
) -> Union[mne.io.Raw, mne.Epochs]:
"""Assign EOG channel types based on configuration settings.
Parameters
----------
data : Optional
The data object to process. If None, uses self.raw or self.epochs.
use_epochs : bool, Optional
If True and data is None, uses self.epochs instead of self.raw.
Returns:
inst : instance of mne.io.Raw or mne.io.Epochs
The data object with EOG channels assigned.
"""
data = self._get_data_object(data, use_epochs)
if not isinstance(
data, (mne.io.base.BaseRaw, mne.Epochs)
): # pylint: disable=isinstance-second-argument-not-valid-type
raise TypeError("Data must be an MNE Raw or Epochs object")
is_enabled, config_value = self._check_step_enabled("eog_step")
if not is_enabled:
message("info", "EOG Assignment step is disabled in configuration")
return data
eog_channel_indices = config_value.get("value", [])
if not eog_channel_indices:
message("warning", "EOG channel indices not specified, skipping step")
return data
# Assuming value is a list of indices or names, convert indices to names if needed
# The example uses formatting `f"E{ch}"`, suggesting indices are expected.
# Adapt this logic based on how channel names vs indices are stored in config.
# For simplicity, assuming names or indices directly map to existing channel names for now.
# A more robust implementation might handle various naming conventions.
eog_channels_to_set = [
ch
for idx, ch in enumerate(data.ch_names)
if idx + 1 in eog_channel_indices or ch in eog_channel_indices
] # Handling both indices (1-based) and names
eog_channels_map = {
ch: "eog" for ch in eog_channels_to_set if ch in data.ch_names
}
if not eog_channels_map:
message(
"warning", "Specified EOG channels not found in data, skipping step."
)
return data
message(
"header",
f"Assigning EOG channel types for: {', '.join(eog_channels_map.keys())}",
)
# Process a copy to avoid modifying the original data object directly
processed_data = data.copy()
processed_data.set_channel_types(eog_channels_map)
message(
"info",
f"EOG channel types assigned for: {', '.join(eog_channels_map.keys())}",
)
# Note: set_channel_types modifies in place, but we operate on a copy.
# No need to save intermediate step here unless explicitly required,
# as channel type changes don't alter the data matrix itself.
metadata = {"assigned_eog_channels": list(eog_channels_map.keys())}
self._update_metadata("step_assign_eog_channels", metadata)
# Even though set_channel_types modifies inplace on the copy,
# we still call update_instance_data to potentially update self.raw/self.epochs
self._update_instance_data(data, processed_data, use_epochs)
return processed_data
[docs]
def trim_edges(
self,
data: Union[mne.io.Raw, mne.Epochs, None] = None,
stage_name: str = "post_trim",
use_epochs: bool = False,
) -> Union[mne.io.Raw, mne.Epochs]:
"""Trim data edges based on configuration settings.
Parameters
----------
data : Optional
The data object to process. If None, uses self.raw or self.epochs.
stage_name : str, Optional
Name for saving the processed data (default: "post_trim").
use_epochs : bool, Optional
If True and data is None, uses self.epochs instead of self.raw.
Returns:
inst : instance of mne.io.Raw or mne.io.Epochs
The data object with edges trimmed.
"""
data = self._get_data_object(data, use_epochs)
if not isinstance(
data, (mne.io.base.BaseRaw, mne.Epochs)
): # pylint: disable=isinstance-second-argument-not-valid-type
raise TypeError("Data must be an MNE Raw or Epochs object")
is_enabled, config_value = self._check_step_enabled("trim_step")
if not is_enabled:
message("info", "Edge Trimming step is disabled in configuration")
return data
trim_duration_sec = config_value.get("value", None)
if trim_duration_sec is None or trim_duration_sec <= 0:
message(
"warning",
"Invalid or zero trim duration specified, skipping edge trimming",
)
return data
original_start_time = data.times[0]
original_end_time = data.times[-1]
original_duration = original_end_time - original_start_time
if 2 * trim_duration_sec >= original_duration:
message(
"error",
f"Total trim duration ({2 * trim_duration_sec}s) is greater than or equal to data "
f"duration ({original_duration}s). Cannot trim.",
)
# Consider raising an error or just returning data
return data # Return original data to avoid erroring out pipeline
tmin = original_start_time + trim_duration_sec
tmax = original_end_time - trim_duration_sec
message(
"header",
f"Trimming {trim_duration_sec}s from each end (new range: {tmin:.3f}s to {tmax:.3f}s)",
)
processed_data = data.copy().crop(tmin=tmin, tmax=tmax)
new_duration = processed_data.times[-1] - processed_data.times[0]
message("info", f"Data trimmed. New duration: {new_duration:.3f}s")
if isinstance(processed_data, (mne.io.Raw, mne.io.base.BaseRaw)):
self._save_raw_result(processed_data, stage_name)
metadata = {
"trim_duration": trim_duration_sec,
"original_start_time": original_start_time,
"original_end_time": original_end_time,
"new_start_time": tmin,
"new_end_time": tmax,
"original_duration": original_duration,
"new_duration": new_duration,
}
self._update_metadata("step_trim_edges", metadata)
self._update_instance_data(data, processed_data, use_epochs)
return processed_data
[docs]
def crop_duration(
self,
data: Union[mne.io.Raw, mne.Epochs, None] = None,
stage_name: str = "post_crop",
use_epochs: bool = False,
) -> Union[mne.io.Raw, mne.Epochs]:
"""Crop data duration based on configuration settings.
Parameters
----------
data : Optional
The data object to process. If None, uses self.raw or self.epochs.
stage_name : str, Optional
Name for saving the processed data (default: "post_crop").
use_epochs : bool, Optional
If True and data is None, uses self.epochs instead of self.raw.
Returns:
inst : instance of mne.io.Raw or mne.io.Epochs
The data object cropped to the specified duration.
"""
data = self._get_data_object(data, use_epochs)
if not isinstance(
data, (mne.io.base.BaseRaw, mne.Epochs)
): # pylint: disable=isinstance-second-argument-not-valid-type
raise TypeError("Data must be an MNE Raw or Epochs object")
is_enabled, config_value = self._check_step_enabled("crop_step")
if not is_enabled:
message("info", "Duration Cropping step is disabled in configuration")
return data
crop_times = config_value.get("value", {})
start_time_sec = crop_times.get("start", None)
end_time_sec = crop_times.get("end", None)
if start_time_sec is None and end_time_sec is None:
message(
"warning", "Crop start and end times not specified, skipping cropping"
)
return data
# Use data's bounds if start or end is None
tmin = start_time_sec if start_time_sec is not None else data.times[0]
tmax = end_time_sec if end_time_sec is not None else data.times[-1]
# Validate crop times against data bounds
original_start = data.times[0]
original_end = data.times[-1]
# Adjust tmin/tmax if they fall outside the data range
tmin = max(tmin, original_start)
tmax = min(tmax, original_end)
if tmin >= tmax:
message(
"error",
f"Invalid crop range: start time ({tmin:.3f}s) is not before end time ({tmax:.3f}s)"
f"after adjusting to data bounds. Skipping crop.",
)
return data
message(
"header", f"Cropping data duration to range: {tmin:.3f}s to {tmax:.3f}s"
)
processed_data = data.copy().crop(tmin=tmin, tmax=tmax)
new_duration = processed_data.times[-1] - processed_data.times[0]
message("info", f"Data cropped. New duration: {new_duration:.3f}s")
if isinstance(processed_data, (mne.io.Raw, mne.io.base.BaseRaw)):
self._save_raw_result(processed_data, stage_name)
metadata = {
"crop_duration": start_time_sec,
"crop_start": tmin,
"crop_end": tmax,
"original_duration": original_end - original_start,
"new_duration": new_duration,
}
self._update_metadata("step_crop_duration", metadata)
self._update_instance_data(data, processed_data, use_epochs)
return processed_data