# src/autoclean/utils/bids.py
# pylint: disable=line-too-long
"""
This module contains functions for converting EEG data to BIDS format.
"""
import hashlib
import json
import traceback
from contextlib import contextmanager # Imported for dummy lock
from pathlib import Path
from typing import Optional
import pandas as pd
from mne.io.constants import FIFF
from mne_bids import BIDSPath, update_sidecar_json, write_raw_bids
from autoclean.utils.logging import message
[docs]
def step_convert_to_bids(
raw,
output_dir,
task="rest",
participant_id=None,
line_freq=60.0,
overwrite=False,
events=None,
event_id=None,
study_name="EEG Study",
autoclean_dict: Optional[dict] = None,
):
"""
Converts a single EEG data file into BIDS format with default/dummy metadata.
Handles concurrent access to participants.tsv using a threading.Lock passed
via autoclean_dict. Ensures specific column order and dtype=object for the TSV.
Parameters
----------
raw : mne.io.Raw
The raw data to convert to BIDS.
output_dir : str
The directory where the BIDS dataset will be created.
task : str
The task name for BIDS.
participant_id : str
The participant ID (if None, generated from filename).
line_freq : float
The power line frequency.
overwrite : bool
Whether to overwrite existing BIDS files.
events : mne.events_data
The events array.
event_id : dict
The event_id dictionary.
study_name : str
The name of the study for dataset_description.json.
autoclean_dict : dict
The run configuration, MUST include 'participants_tsv_lock' (a threading.Lock) for concurrent safety.
Returns
-------
bids_path : BIDSPath
The BIDS path of the converted file.
"""
file_path = raw.filenames[0]
file_name = Path(file_path).name
# Retrieve Lock from autoclean_dict if available for thread-safe TSV access.
lock = None
lock_valid = False
if autoclean_dict and "participants_tsv_lock" in autoclean_dict:
retrieved_lock = autoclean_dict["participants_tsv_lock"]
# Validate the lock object based on expected methods and type name ('lock').
if (
hasattr(retrieved_lock, "acquire")
and hasattr(retrieved_lock, "release")
and retrieved_lock.__class__.__name__ == "lock"
):
lock = retrieved_lock
lock_valid = True
message(
"debug",
"Successfully validated threading.Lock object from autoclean_dict.",
)
else:
message(
"warning",
f"participants_tsv_lock found in autoclean_dict but is not a valid threading.Lock object "
f"(type: {type(retrieved_lock).__name__}, value: {retrieved_lock!r}). "
"Proceeding without lock.",
)
if not lock_valid:
message(
"warning",
"participants_tsv_lock not found or invalid. Concurrent writes to participants.tsv may be unsafe.", # pylint: disable=line-too-long
)
# Use a dummy context manager if no valid lock is found to allow execution.
@contextmanager
def dummy_lock():
yield
lock_context = dummy_lock()
else:
lock_context = lock # Use the actual lock context.
bids_root = Path(output_dir)
bids_root.mkdir(parents=True, exist_ok=True)
# Define participants file path and the desired column order.
participants_file = bids_root / "participants.tsv"
desired_column_order = [
"participant_id",
"file_name",
"bids_path",
"age",
"sex",
"group",
"hand",
"weight",
"height",
"eegid",
"file_hash",
]
# Determine participant ID (generate if not provided).
if participant_id is None:
participant_id = step_sanitize_id(file_path)
subject_id = str(participant_id)
# Set default metadata values.
session = None
run = None
age = "n/a"
sex = "n/a"
group = "n/a"
# Sanitize task name for BIDS compliance (no underscores, hyphens, or slashes)
bids_task = task.replace("_", "").replace("-", "").replace("/", "")
# Create BIDSPath object.
bids_path = BIDSPath(
subject=subject_id,
session=session,
task=bids_task,
run=run,
datatype="eeg",
root=bids_root,
suffix="eeg",
)
fif_file = Path(file_path)
# Calculate file hash.
try:
file_hash = hashlib.sha256(fif_file.read_bytes()).hexdigest()
except Exception as e:
message("error", f"Failed to read {fif_file} for hashing: {e}")
raise
# Prepare MNE Raw object metadata for BIDS conversion.
raw.info["subject_info"] = {"id": int(subject_id)}
raw.info["line_freq"] = line_freq
for ch in raw.info["chs"]:
ch["unit"] = FIFF.FIFF_UNIT_V
# Prepare arguments for mne_bids.write_raw_bids.
bids_kwargs = {
"raw": raw,
"bids_path": bids_path,
"overwrite": overwrite,
"verbose": False,
"format": "BrainVision",
"events": events,
"event_id": event_id,
"allow_preload": True,
}
# Create derivatives directory structure (outside the lock).
derivatives_dir = bids_root / "derivatives" / f"sub-{subject_id}" / "eeg"
derivatives_dir.mkdir(parents=True, exist_ok=True)
message("info", f"Created derivatives directory structure at {derivatives_dir}")
# --- Critical Section: Accessing shared BIDS files ---
# Use the lock (real or dummy) to protect file access.
message("debug", f"Acquiring participants.tsv lock for {file_name}...")
with lock_context:
message("debug", f"Acquired participants.tsv lock for {file_name}.")
# Ensure participants.tsv exists with correct headers and dtype=object
# *before* calling mne_bids, which might interact with it.
try:
if not participants_file.exists():
message(
"info",
f"Creating participants.tsv with headers at {participants_file}",
)
header_df = pd.DataFrame(columns=desired_column_order, dtype=object)
header_df.to_csv(participants_file, sep=" ", index=False, na_rep="n/a")
except Exception as header_err:
message("error", f"Failed to create participants.tsv header: {header_err}")
raise
# Call mne_bids to write the core BIDS data.
try:
write_raw_bids(**bids_kwargs)
message("success", f"Converted {fif_file.name} to BIDS format.")
# Update sidecar JSON with additional info.
entries = {"Manufacturer": "Unknown", "PowerLineFrequency": line_freq}
sidecar_path = bids_path.copy().update(extension=".json")
update_sidecar_json(bids_path=sidecar_path, entries=entries)
except Exception as e:
message("error", f"Failed to write BIDS for {fif_file.name}: {e}")
print(f"Detailed error: {str(e)}")
traceback.print_exc()
raise
# --- Update participants.tsv with custom/calculated metadata ---
try:
# Read the potentially modified participants.tsv, enforcing object dtype.
try:
dtype_mapping = {col: object for col in desired_column_order}
# Read assuming all desired columns should exist; add missing ones later.
# na_filter=False prevents 'NA' strings from becoming NaN if object dtype is used.
participants_df = pd.read_csv(
participants_file, sep=" ", dtype=dtype_mapping, na_filter=False
)
# Validate and fix columns after reading.
missing_cols = [
col
for col in desired_column_order
if col not in participants_df.columns
]
if missing_cols:
message(
"warning",
f"participants.tsv is missing columns: {missing_cols}. Adding them with 'n/a'.",
)
for col in missing_cols:
participants_df[col] = "n/a"
participants_df = participants_df.astype(
{col: object for col in missing_cols}
)
# Handle cases where the file might be corrupted or unexpectedly empty.
if participants_df.empty and participants_file.stat().st_size > 0:
message(
"warning",
"participants.tsv exists but pandas read an empty DataFrame. Recreating.",
)
participants_df = pd.DataFrame(
columns=desired_column_order, dtype=object
)
elif (
not participants_df.empty
and "participant_id" not in participants_df.columns
):
message(
"warning",
"participants.tsv is missing 'participant_id'. Recreating.",
)
participants_df = pd.DataFrame(
columns=desired_column_order, dtype=object
)
except pd.errors.EmptyDataError:
# Handle case where mne_bids might have left the file empty.
message(
"warning",
"participants.tsv is empty after MNE-BIDS write. Starting with headers.",
)
participants_df = pd.DataFrame(
columns=desired_column_order, dtype=object
)
except Exception as pd_read_err: # pylint: disable=broad-except
message(
"error",
f"Error reading participants.tsv after MNE-BIDS write: {pd_read_err}. Attempting overwrite.", # pylint: disable=line-too-long
)
participants_df = pd.DataFrame(
columns=desired_column_order, dtype=object
)
# Prepare the entry for the current participant.
new_entry = {
"participant_id": f"sub-{subject_id}",
"file_name": file_name,
"bids_path": str(bids_path.match()[0]),
"age": age,
"sex": sex,
"group": group,
# Add standard optional BIDS columns with 'n/a' if not provided elsewhere.
"hand": "n/a",
"weight": "n/a",
"height": "n/a",
"eegid": fif_file.stem,
"file_hash": file_hash,
}
# Update existing row or append new row.
participant_col_id = f"sub-{subject_id}"
if participant_col_id not in participants_df["participant_id"].values:
# Append new row using pd.concat for better type handling.
new_row_df = pd.DataFrame([new_entry]).astype(dtype=object)
participants_df = pd.concat(
[participants_df, new_row_df], ignore_index=True
)
message(
"debug",
f"Appended new entry for {participant_col_id} to participants.tsv.",
)
else:
# Update existing row.
message(
"debug",
f"Participant {participant_col_id} already exists. Updating row.",
)
idx = participants_df.index[
participants_df["participant_id"] == participant_col_id
].tolist()
if idx:
row_index = idx[0]
for key, value in new_entry.items():
if key in participants_df.columns:
# Ensure value assignment respects object dtype.
participants_df.loc[row_index, key] = (
str(value) if value is not None else "n/a"
)
else:
message(
"warning",
f"Column '{key}' not found in participants.tsv during update for {participant_col_id}.", # pylint: disable=line-too-long
)
else:
# Fallback if index search fails.
message(
"warning",
f"Could not find index for existing participant {participant_col_id}. Appending instead.", # pylint: disable=line-too-long
)
new_row_df = pd.DataFrame([new_entry]).astype(dtype=object)
participants_df = pd.concat(
[participants_df, new_row_df], ignore_index=True
)
# Ensure no duplicate participant IDs remain.
participants_df.drop_duplicates(
subset="participant_id", keep="last", inplace=True
)
# Ensure final DataFrame columns match desired order, preserving extras.
# Note: This assumes desired_column_order contains all keys from new_entry that should be primary columns. # pylint: disable=line-too-long
final_columns = desired_column_order + [
col
for col in participants_df.columns
if col not in desired_column_order
]
participants_df = participants_df[final_columns]
# Write the updated DataFrame back to TSV.
participants_df.to_csv(
participants_file, sep=" ", index=False, na_rep="n/a"
)
message("debug", f"Updated participants.tsv for {file_name}")
# Create metadata JSON files if they don't exist.
dataset_description_file = bids_root / "dataset_description.json"
if not dataset_description_file.exists():
step_create_dataset_desc(bids_root, study_name=study_name)
participants_json_file = bids_root / "participants.json"
if not participants_json_file.exists():
step_create_participants_json(bids_root)
except Exception as update_err:
message(
"error",
f"Failed during participants.tsv update or associated file creation: {update_err}",
)
traceback.print_exc()
raise
# Lock is automatically released when exiting the 'with' block.
message("debug", f"Released participants.tsv lock for {file_name}.")
return bids_path, derivatives_dir
[docs]
def step_sanitize_id(filename):
"""
Generates a reproducible numeric participant ID from a filename using MD5 hashing.
Parameters
----------
filename : str
The filename to generate a participant ID from.
"""
def filename_to_number(filename, max_value=1000000):
# Generate MD5 hash of the filename.
hash_object = hashlib.md5(filename.encode())
# Convert first 8 bytes of hash to an integer.
hash_int = int.from_bytes(hash_object.digest()[:8], "big")
# Scale to the desired range using modulo.
return hash_int % max_value
basename = Path(filename).stem
participant_id = filename_to_number(basename)
message("info", f"Generated participant ID for {basename}: {participant_id}")
return participant_id
[docs]
def step_create_dataset_desc(output_path, study_name):
"""
Creates BIDS dataset_description.json file.
Parameters
----------
output_path : str
The path to the output directory.
study_name : str
The name of the study.
"""
dataset_description = {
"Name": study_name,
"BIDSVersion": "1.6.0", # Specify BIDS version used.
"DatasetType": "raw",
}
filepath = output_path / "dataset_description.json"
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(dataset_description, f, indent=4)
message("success", f"Created {filepath.name}")
except Exception as e: # pylint: disable=broad-except
message("error", f"Failed to create {filepath.name}: {e}")
[docs]
def step_create_participants_json(output_path):
"""
Creates BIDS participants.json sidecar file describing participants.tsv columns.
Parameters
----------
output_path : str
The path to the output directory.
"""
# Describes columns in participants.tsv, including standard and custom ones.
participants_json = {
"participant_id": {"Description": "Unique participant identifier"},
"file_name": {"Description": "Original source filename"},
"bids_path": {"Description": "Relative path to the primary BIDS data file"},
"age": {"Description": "Age of the participant", "Units": "years"},
"sex": {
"Description": "Biological sex of the participant",
"Levels": {
"M": "Male",
"F": "Female",
"O": "Other",
"n/a": "Not available",
},
},
"group": {"Description": "Participant group membership", "Levels": {}},
"hand": {
"Description": "Dominant hand of the participant",
"Levels": {
"L": "Left",
"R": "Right",
"A": "Ambidextrous",
"n/a": "Not available",
},
},
"weight": {"Description": "Weight of the participant", "Units": "kg"},
"height": {"Description": "Height of the participant", "Units": "m"},
"eegid": {"Description": "Original participant identifier/source file stem"},
"file_hash": {"Description": "SHA256 hash of the original source file"},
}
filepath = output_path / "participants.json"
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(participants_json, f, indent=4)
message("success", f"Created {filepath.name}")
except Exception as e: # pylint: disable=broad-except
message("error", f"Failed to create {filepath.name}: {e}")