# src/autoclean/step_functions/reports.py # pylint: disable=too-many-lines
"""Visualization and reporting functions.
The reporting mixins provide the same functionality with improved integration
with the Task class and better configuration handling.
This module provides functions for generating visualizations and reports
from EEG processing results. It includes:
- Run summary reports
- Data quality visualizations
- Artifact detection plots
- Processing stage comparisons
The functions generate clear, publication-ready figures and detailed
HTML reports documenting the processing pipeline results.
"""
import os
import shutil
import traceback
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
import matplotlib
import pandas as pd
# ReportLab imports for PDF generation
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import inch
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer
from reportlab.platypus import Table as ReportLabTable
from reportlab.platypus import TableStyle
from autoclean.utils.database import get_run_record, manage_database
from autoclean.utils.logging import message
__all__ = [
"create_run_report",
"update_task_processing_log",
"create_json_summary",
"generate_bad_channels_tsv",
]
# Force matplotlib to use non-interactive backend for async operations
matplotlib.use("Agg")
[docs]
def create_run_report(run_id: str, autoclean_dict: dict = None) -> None:
"""
Creates a pdf report summarizing the run.
Parameters
----------
run_id : str
The run ID to generate a report for
autoclean_dict : dict
The autoclean dictionary
"""
if not run_id:
message("error", "No run ID provided")
return
run_record = get_run_record(run_id)
if not run_record or "metadata" not in run_record:
message("error", "No metadata found for run ID")
return
# Early validation of required metadata sections
required_sections = ["step_prepare_directories"]
missing_sections = [
section
for section in required_sections
if section not in run_record["metadata"]
]
if missing_sections:
message(
"error",
f"Missing required metadata sections: {', '.join(missing_sections)}",
)
return
# Check if JSON summary exists and use it if available
json_summary = None
if "json_summary" in run_record["metadata"]:
json_summary = run_record["metadata"]["json_summary"]
message("info", "Using JSON summary for report generation")
# If no JSON summary, create it
if not json_summary:
message(
"warning", "No json summary found, run report may be missing or incomplete"
)
json_summary = {}
# Set up BIDS path
derivatives_path = None
try:
if autoclean_dict:
derivatives_path = autoclean_dict["derivatives_dir"]
except Exception as e: # pylint: disable=broad-except
message(
"warning",
f"Failed to get derivatives path: {str(e)} : Saving only to metadata directory",
)
derivatives_path = None
# Get metadata directory from step_prepare_directories
metadata_dir = Path(run_record["metadata"]["step_prepare_directories"]["metadata"])
if not metadata_dir.exists():
metadata_dir.mkdir(parents=True, exist_ok=True)
# Create PDF filename
pdf_path = metadata_dir / f"{run_record['report_file']}"
# Initialize the PDF document
doc = SimpleDocTemplate(
str(pdf_path),
pagesize=letter,
rightMargin=24,
leftMargin=24,
topMargin=24,
bottomMargin=24,
)
# Get styles
styles = getSampleStyleSheet()
# Custom styles for better visual hierarchy
title_style = ParagraphStyle(
"CustomTitle",
parent=styles["Title"],
fontSize=14,
spaceAfter=6,
textColor=colors.HexColor("#2C3E50"),
alignment=1,
)
heading_style = ParagraphStyle(
"CustomHeading",
parent=styles["Heading1"],
fontSize=10,
spaceAfter=4,
textColor=colors.HexColor("#34495E"),
alignment=1,
)
normal_style = ParagraphStyle(
"CustomNormal",
parent=styles["Normal"],
fontSize=7,
spaceAfter=2,
textColor=colors.HexColor("#2C3E50"),
)
# steps_style = ParagraphStyle(
# "Steps",
# parent=normal_style,
# fontSize=7,
# leading=10,
# spaceBefore=1,
# spaceAfter=1,
# )
# Define frame style for main content
frame_style = TableStyle(
[
("GRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#BDC3C7")),
("BACKGROUND", (0, 0), (-1, -1), colors.HexColor("#ECF0F1")),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("TOPPADDING", (0, 0), (-1, -1), 8),
("BOTTOMPADDING", (0, 0), (-1, -1), 8),
]
)
# Common table style
table_style = TableStyle(
[
("GRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#BDC3C7")),
("FONTSIZE", (0, 0), (-1, -1), 7),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#F5F6FA")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#2C3E50")),
("TOPPADDING", (0, 0), (-1, -1), 2),
("BOTTOMPADDING", (0, 0), (-1, -1), 2),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
]
)
# Create story (content) for the PDF
story = []
# Title and Basic Info
title = "EEG Processing Report"
story.append(Paragraph(title, title_style))
# Add status-colored subtitle
status_color = (
colors.HexColor("#2ECC71")
if run_record.get("success", False)
else colors.HexColor("#E74C3C")
)
subtitle_style = ParagraphStyle(
"CustomSubtitle",
parent=heading_style,
textColor=status_color,
spaceAfter=2,
)
status_text = "SUCCESS" if run_record.get("success", False) else "FAILED"
subtitle = f"Run ID: {run_id} - {status_text}"
story.append(Paragraph(subtitle, subtitle_style))
# Add timestamp
timestamp_style = ParagraphStyle(
"Timestamp",
parent=normal_style,
textColor=colors.HexColor("#7F8C8D"),
alignment=1,
spaceAfter=8,
)
timestamp = f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
story.append(Paragraph(timestamp, timestamp_style))
# Tables layout with better styling - NOW 2 COLUMNS
data = [
[
Paragraph("Import Information", heading_style),
Paragraph(
"Processing Details", heading_style
), # Changed from "Preprocessing Parameters"
]
]
# Left column: Import info with colored background
try:
import_info = []
if json_summary and "import_details" in json_summary:
# Use data from JSON summary
import_details = json_summary["import_details"]
# Get values and format them safely
duration = import_details.get("duration")
duration_str = (
f"{duration:.1f} sec" if isinstance(duration, (int, float)) else "N/A"
)
sample_rate = import_details.get("sample_rate")
sample_rate_str = (
f"{sample_rate} Hz" if isinstance(sample_rate, (int, float)) else "N/A"
)
import_info.extend(
[
["File", import_details.get("basename", "N/A")],
["Duration", duration_str],
["Sample Rate", sample_rate_str],
["Channels", str(import_details.get("net_nbchan_orig", "N/A"))],
]
)
else:
# Fall back to direct metadata access
raw_info = run_record["metadata"].get("import_eeg", {})
if not raw_info:
raw_info = {"message": "Step import metadata not available"}
# Get values and format them safely
duration = raw_info.get("durationSec")
duration_str = (
f"{duration:.1f} sec" if isinstance(duration, (int, float)) else "N/A"
)
sample_rate = raw_info.get("sampleRate")
sample_rate_str = (
f"{sample_rate} Hz" if isinstance(sample_rate, (int, float)) else "N/A"
)
import_info.extend(
[
["File", raw_info.get("unprocessedFile", "N/A")],
["Duration", duration_str],
["Sample Rate", sample_rate_str],
["Channels", str(raw_info.get("channelCount", "N/A"))],
]
)
if not import_info:
import_info = [["No import data available", "N/A"]]
except Exception as e: # pylint: disable=broad-except
message("warning", f"Error processing import information: {str(e)}")
import_info = [["Error processing import data", "N/A"]]
import_table = ReportLabTable(import_info, colWidths=[0.7 * inch, 1.3 * inch])
import_table.setStyle(
TableStyle(
[
*table_style._cmds,
(
"BACKGROUND",
(0, 0),
(-1, -1),
colors.HexColor("#F8F9F9"),
),
]
)
)
# Middle column: Processing Details
processing_details_info = []
try:
# Check if JSON summary and processing_details exist
if json_summary and "processing_details" in json_summary:
current_processing_details = json_summary["processing_details"]
# Ensure current_processing_details is a dictionary
if isinstance(current_processing_details, dict):
# Filter
l_freq = current_processing_details.get("l_freq", "N/A")
h_freq = current_processing_details.get("h_freq", "N/A")
filter_display = "N/A"
if l_freq != "N/A" or h_freq != "N/A":
filter_display = f"{l_freq if l_freq is not None else 'N/A'}-{h_freq if h_freq is not None else 'N/A'} Hz"
processing_details_info.append(["Filter", filter_display])
# Notch
notch_freqs_list = current_processing_details.get("notch_freqs", [])
notch_freq_display = "N/A"
actual_notch_freqs = []
if isinstance(notch_freqs_list, list):
actual_notch_freqs = [
str(f) for f in notch_freqs_list if f is not None
]
elif isinstance(
notch_freqs_list, (int, float, str)
) and notch_freqs_list not in [
None,
"",
]: # Handle scalar if somehow it's not a list
actual_notch_freqs = [str(notch_freqs_list)]
if actual_notch_freqs:
notch_freq_display = f"{', '.join(actual_notch_freqs)} Hz"
processing_details_info.append(["Notch", notch_freq_display])
# Resample Rate
resample_rate = current_processing_details.get("target_sfreq")
resample_display = (
f"{resample_rate} Hz" if resample_rate is not None else "N/A"
)
processing_details_info.append(["Resample Rate", resample_display])
# Trim Duration
trim_duration = current_processing_details.get("trim_duration")
if trim_duration is not None:
processing_details_info.append(
["Trim Duration", f"{trim_duration} sec"]
)
# Crop Info
crop_s = current_processing_details.get("crop_start")
crop_e = current_processing_details.get("crop_end")
crop_d = current_processing_details.get(
"crop_duration"
) # from create_json_summary
crop_display_val = None
if crop_s is not None and crop_e is not None:
crop_display_val = f"{crop_s:.2f}s to {crop_e:.2f}s"
processing_details_info.append(["Crop Window", crop_display_val])
elif (
crop_d is not None
): # if specific start/end aren't there, maybe overall duration is
crop_display_val = f"{crop_d:.2f} sec"
processing_details_info.append(["Crop Duration", crop_display_val])
# EOG Channels
eog_channels = current_processing_details.get("eog_channels", [])
if (
isinstance(eog_channels, list) and eog_channels
): # Only show if present and not empty
processing_details_info.append(
["EOG Channels", ", ".join(eog_channels)]
)
# Dropped Outer Layer Channels
dropped_ch_list = current_processing_details.get("dropped_channels", [])
# Ensure dropped_ch_list is a list before calling len()
num_dropped = (
len(dropped_ch_list) if isinstance(dropped_ch_list, list) else 0
)
if num_dropped > 0: # Only show if channels were actually dropped
processing_details_info.append(
["Outer Chans Dropped", str(num_dropped)]
)
# Reference Type
ref_type = current_processing_details.get("ref_type")
if ref_type: # Only show if present
processing_details_info.append(["Reference", str(ref_type)])
else:
message(
"debug",
f"json_summary['processing_details'] is not a dictionary: {type(current_processing_details)}",
)
else:
# This case means json_summary or json_summary["processing_details"] is missing.
# The 'if not processing_details_info:' check below will handle it.
message(
"debug",
"processing_details not found in json_summary. 'Processing Details' section will be sparse or N/A.",
)
except Exception as e: # pylint: disable=broad-except
message("warning", f"Error populating 'Processing Details' section: {str(e)}")
processing_details_info = [
["Error processing details", str(e)]
] # Show error in report
if not processing_details_info: # If after all attempts, it's still empty
processing_details_info = [["Processing data N/A", ""]]
processing_details_table = ReportLabTable(
processing_details_info, colWidths=[1.2 * inch, 2.1 * inch]
)
processing_details_table.setStyle(
TableStyle(
[
*table_style._cmds,
(
"BACKGROUND",
(0, 0),
(-1, -1),
colors.HexColor(
"#EFF8F9"
), # Light blue background for this section
),
]
)
)
# Add tables to main layout with spacing
data.append([import_table, processing_details_table])
main_table = ReportLabTable(
data, colWidths=[2.5 * inch, 3.5 * inch]
) # Adjusted for 2 columns
main_table.setStyle(
TableStyle(
[
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 10),
("RIGHTPADDING", (0, 0), (-1, -1), 10),
("TOPPADDING", (0, 0), (-1, 0), 0),
("BOTTOMPADDING", (0, 0), (-1, 0), 12),
]
)
)
# Add main content in a frame
frame_data = [[main_table]]
frame = ReportLabTable(frame_data, colWidths=[6.5 * inch])
frame.setStyle(frame_style)
story.append(frame)
story.append(Spacer(1, 0.2 * inch))
# Processing Steps Section
story.append(Paragraph("Processing Steps", heading_style))
# Get processing steps from metadata
steps_data = []
try:
# Fall back to metadata for steps
for step_name, step_data in run_record["metadata"].items():
if step_name.startswith("step_") and step_name not in [
"step_prepare_directories",
]:
# Format step name for display
display_name = step_name.replace("step_", "").replace("_", " ").title()
steps_data.append([display_name])
except Exception as e: # pylint: disable=broad-except
message("warning", f"Error processing steps data: {str(e)}")
steps_data = [["Error processing steps"]]
if not steps_data:
steps_data = [["No processing steps data available"]]
# Create steps table with background styling
steps_table = ReportLabTable(
[[Paragraph("Processing Step", heading_style)]] + steps_data,
colWidths=[6 * inch],
)
steps_table.setStyle(
TableStyle(
[
("GRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#BDC3C7")),
("FONTSIZE", (0, 0), (-1, -1), 7),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#F5F6FA")),
("BACKGROUND", (0, 1), (-1, -1), colors.HexColor("#F8F9F9")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#2C3E50")),
("TOPPADDING", (0, 0), (-1, -1), 2),
("BOTTOMPADDING", (0, 0), (-1, -1), 2),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
]
)
)
story.append(steps_table)
story.append(Spacer(1, 0.2 * inch))
# Bad Channels Section
story.append(Paragraph("Bad Channels", heading_style))
# Get bad channels from metadata
bad_channels_data = []
try:
# First try to get bad channels from JSON summary
if json_summary and "channel_dict" in json_summary:
channel_dict = json_summary["channel_dict"]
# Add each category of bad channels
for category, channels in channel_dict.items():
if (
category != "removed_channels" and channels
): # Skip the combined list
display_category = (
category.replace("step_", "").replace("_", " ").title()
)
bad_channels_data.append([display_category, ", ".join(channels)])
# Add total count
if "removed_channels" in channel_dict:
total_removed = len(channel_dict["removed_channels"])
if (
"import_details" in json_summary
and "net_nbchan_orig" in json_summary["import_details"]
):
total_channels = json_summary["import_details"]["net_nbchan_orig"]
percentage = (
(total_removed / total_channels) * 100 if total_channels else 0
)
bad_channels_data.append(
[
"Total Removed",
f"{total_removed} / {total_channels} ({percentage:.1f}%)",
]
)
else:
bad_channels_data.append(["Total Removed", str(total_removed)])
else:
# Fall back to metadata
# Look for bad channels in various metadata sections
for step_name, step_data in run_record["metadata"].items():
if isinstance(step_data, dict) and "bads" in step_data:
display_name = (
step_name.replace("step_", "").replace("_", " ").title()
)
if isinstance(step_data["bads"], list) and step_data["bads"]:
bad_channels_data.append(
[display_name, ", ".join(step_data["bads"])]
)
except Exception as e: # pylint: disable=broad-except
message("warning", f"Error processing bad channels data: {str(e)}")
bad_channels_data = [["Error processing bad channels", "N/A"]]
if not bad_channels_data:
bad_channels_data = [["No bad channels data available", "N/A"]]
# Create bad channels table with background styling
bad_channels_table = ReportLabTable(
[[Paragraph("Source", heading_style), Paragraph("Bad Channels", heading_style)]]
+ bad_channels_data,
colWidths=[3 * inch, 3 * inch],
)
bad_channels_table.setStyle(
TableStyle(
[
("GRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#BDC3C7")),
("FONTSIZE", (0, 0), (-1, -1), 7),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#F5F6FA")),
("BACKGROUND", (0, 1), (-1, -1), colors.HexColor("#EFF8F9")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#2C3E50")),
("TOPPADDING", (0, 0), (-1, -1), 2),
("BOTTOMPADDING", (0, 0), (-1, -1), 2),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
]
)
)
story.append(bad_channels_table)
story.append(Spacer(1, 0.2 * inch))
# Results Summary Section
story.append(Paragraph("Results Summary", heading_style))
# Get results summary from metadata
results_data = []
try:
# First try to get results from JSON summary
if json_summary:
# Add processing state
if "proc_state" in json_summary:
results_data.append(["Processing State", json_summary["proc_state"]])
# Add exclusion category if any
if "exclude_category" in json_summary and json_summary["exclude_category"]:
results_data.append(
["Exclusion Category", json_summary["exclude_category"]]
)
# Add export details
if "export_details" in json_summary:
export_details = json_summary["export_details"]
if (
"initial_n_epochs" in export_details
and "final_n_epochs" in export_details
):
initial = export_details["initial_n_epochs"]
final = export_details["final_n_epochs"]
percentage = (final / initial) * 100 if initial else 0
results_data.append(
["Epochs Retained", f"{final} / {initial} ({percentage:.1f}%)"]
)
# For duration, use the actual epoch duration values
if (
"initial_duration" in export_details
and "final_duration" in export_details
):
initial = export_details["initial_duration"]
final = export_details["final_duration"]
# Calculate the actual duration based on epochs and epoch length
if "epoch_length" in export_details:
epoch_length = export_details["epoch_length"]
if (
"initial_n_epochs" in export_details
and "final_n_epochs" in export_details
):
initial_epochs = export_details["initial_n_epochs"]
final_epochs = export_details["final_n_epochs"]
# Recalculate durations based on epoch count and length
initial_duration = initial_epochs * epoch_length
final_duration = final_epochs * epoch_length
percentage = (
(final_duration / initial_duration) * 100
if initial_duration
else 0
)
results_data.append(
[
"Duration Retained",
f"{final_duration:.1f}s / {initial_duration:.1f}s ({percentage:.1f}%)", # pylint: disable=line-too-long
]
)
else:
# Use the values directly from export_details if epoch_length is not available # pylint: disable=line-too-long
percentage = (final / initial) * 100 if initial else 0
results_data.append(
[
"Duration Retained",
f"{final:.1f}s / {initial:.1f}s ({percentage:.1f}%)",
]
)
# Add ICA details
if "ica_details" in json_summary:
ica_details = json_summary["ica_details"]
if "proc_removeComps" in ica_details:
removed_comps = ica_details["proc_removeComps"]
if isinstance(removed_comps, list):
results_data.append(
[
"Removed ICA Components",
", ".join(map(str, removed_comps)),
]
)
else:
# Fall back to metadata
pass
except Exception as e: # pylint: disable=broad-except
message("warning", f"Error processing results data: {str(e)}")
results_data = [["Error processing results", "N/A"]]
if not results_data:
results_data = [["No results data available", "N/A"]]
# Create results table with background styling
results_table = ReportLabTable(
[[Paragraph("Metric", heading_style), Paragraph("Value", heading_style)]]
+ results_data,
colWidths=[3 * inch, 3 * inch],
)
results_table.setStyle(
TableStyle(
[
("GRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#BDC3C7")),
("FONTSIZE", (0, 0), (-1, -1), 7),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#F5F6FA")),
("BACKGROUND", (0, 1), (-1, -1), colors.HexColor("#F5EEF8")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#2C3E50")),
("TOPPADDING", (0, 0), (-1, -1), 2),
("BOTTOMPADDING", (0, 0), (-1, -1), 2),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
]
)
)
story.append(results_table)
story.append(Spacer(1, 0.2 * inch))
# Output Files Section
story.append(Paragraph("Output Files", heading_style))
# Get output files from JSON summary
output_files_data = []
try:
if json_summary and "outputs" in json_summary:
outputs = json_summary["outputs"]
for output_file in outputs:
output_files_data.append([output_file])
elif derivatives_path and Path(derivatives_path).exists():
# If no JSON summary, try to get files directly from derivatives directory
files = list(Path(derivatives_path).glob("*"))
for file in files:
if file.is_file():
output_files_data.append([file.name])
except Exception as e: # pylint: disable=broad-except
message("warning", f"Error processing output files: {str(e)}")
output_files_data = [["Error processing output files"]]
if not output_files_data:
output_files_data = [["No output files available"]]
# Create output files table with background styling
output_files_table = ReportLabTable(
[[Paragraph("File Name", heading_style)]] + output_files_data,
colWidths=[6 * inch],
)
output_files_table.setStyle(
TableStyle(
[
("GRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#BDC3C7")),
("FONTSIZE", (0, 0), (-1, -1), 7),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#F5F6FA")),
("BACKGROUND", (0, 1), (-1, -1), colors.HexColor("#EFF8F9")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#2C3E50")),
("TOPPADDING", (0, 0), (-1, -1), 2),
("BOTTOMPADDING", (0, 0), (-1, -1), 2),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
]
)
)
story.append(output_files_table)
story.append(Spacer(1, 0.2 * inch))
# Add footer with run information
footer_style = ParagraphStyle(
"Footer",
parent=normal_style,
fontSize=6,
textColor=colors.HexColor("#7F8C8D"),
alignment=1,
spaceBefore=12,
)
footer_text = (
f"Run ID: {run_id} | "
f"Task: {run_record.get('task', 'N/A')} | "
f"Timestamp: {run_record.get('timestamp', 'N/A')}"
)
story.append(Paragraph(footer_text, footer_style))
# Build the PDF
doc.build(story)
message("success", f"Report saved to {pdf_path}")
# If derivatives path is available, also save there
if derivatives_path:
try:
shutil.copy(pdf_path, derivatives_path)
message("success", f"Report saved to {derivatives_path}")
except Exception as e: # pylint: disable=broad-except
message("warning", f"Could not save to derivatives: {str(e)}")
return pdf_path
[docs]
def update_task_processing_log(
summary_dict: Dict[str, Any], flagged_reasons: list[str] = []
):
"""Update the task-specific processing log CSV file with processing details.
This function is called by the Pipeline upon exiting the run.
Parameters
----------
summary_dict : dict
The summary dictionary containing processing details
flagged_reasons : list
Any flags found during the run. Flags are stored in the task instance.
See Also
--------
autoclean.step_functions.reports.create_json_summary : Create a JSON summary of the run metadata
Notes
-----
Although there are safeguards to ensure updates even upon run failure, it may still fail.
"""
try:
# Validate required top-level keys
required_keys = [
"output_dir",
"task",
"timestamp",
"run_id",
"proc_state",
"basename",
"bids_subject",
]
for key in required_keys:
if key not in summary_dict:
message("error", f"Missing required key in summary_dict: {key}")
return
# Define CSV path
csv_path = (
Path(summary_dict["output_dir"])
/ f"{summary_dict['task']}_processing_log.csv"
)
# Safe dictionary access function
def safe_get(d, *keys, default=""):
"""Safely access nested dictionary keys"""
current = d
for key in keys:
if not isinstance(current, dict):
return default
current = current.get(key, {})
# Handle case where the value is a dict - return default instead
if isinstance(current, dict):
return default
return current if current is not None else default
# Function to calculate bad trials safely
def calculate_bad_trials():
try:
initial_epochs = safe_get(
summary_dict, "export_details", "initial_n_epochs", default=0
)
final_epochs = safe_get(
summary_dict, "export_details", "final_n_epochs", default=0
)
# Convert to integers safely
if isinstance(initial_epochs, (int, float, str)):
initial_epochs = int(float(initial_epochs)) if initial_epochs else 0
else:
initial_epochs = 0
if isinstance(final_epochs, (int, float, str)):
final_epochs = int(float(final_epochs)) if final_epochs else 0
else:
final_epochs = 0
# Calculate bad trials
return initial_epochs - final_epochs
except Exception: # pylint: disable=broad-except
return 0 # Default to 0 if calculation fails
# Calculate percentages safely
def safe_percentage(numerator, denominator, default=""):
try:
num = float(numerator)
denom = float(denominator)
return str(num / denom) if denom != 0 else default
except (ValueError, TypeError):
return default
# Combine flags into a single string
flags = "; ".join(flagged_reasons) if flagged_reasons else ""
# Extract details from summary_dict with safe access
details = {
"timestamp": summary_dict.get("timestamp", ""),
"study_user": os.getenv("USERNAME", "unknown"),
"run_id": summary_dict.get("run_id", ""),
"proc_state": summary_dict.get("proc_state", ""),
"subj_basename": Path(summary_dict.get("basename", "")).stem,
"bids_subject": summary_dict.get("bids_subject", ""),
"task": summary_dict.get("task", ""),
"flags": flags, # Add the new flagged column
"net_nbchan_orig": str(
safe_get(summary_dict, "import_details", "net_nbchan_orig", default="")
),
"net_nbchan_post": str(
safe_get(summary_dict, "export_details", "net_nbchan_post", default="")
),
"proc_badchans": str(
safe_get(summary_dict, "channel_dict", "removed_channels", default="")
),
"proc_filt_lowcutoff": str(
safe_get(summary_dict, "processing_details", "l_freq", default="")
),
"proc_filt_highcutoff": str(
safe_get(summary_dict, "processing_details", "h_freq", default="")
),
"proc_filt_notch": str(
safe_get(summary_dict, "processing_details", "notch_freqs", default="")
),
"proc_filt_notch_width": str(
safe_get(summary_dict, "processing_details", "notch_widths", default="")
),
"proc_sRate_raw": str(
safe_get(summary_dict, "import_details", "sample_rate", default="")
),
"proc_sRate1": str(
safe_get(summary_dict, "export_details", "srate_post", default="")
),
"proc_xmax_raw": str(
safe_get(summary_dict, "import_details", "duration", default="")
),
"proc_xmax_post": str(
safe_get(summary_dict, "export_details", "final_duration", default="")
),
}
# Add calculated fields
details.update(
{
"proc_xmax_percent": safe_percentage(
safe_get(
summary_dict, "export_details", "final_duration", default=""
),
safe_get(summary_dict, "import_details", "duration", default=""),
),
"epoch_length": str(
safe_get(summary_dict, "export_details", "epoch_length", default="")
),
"epoch_limits": str(
safe_get(summary_dict, "export_details", "epoch_limits", default="")
),
"epoch_trials": str(
safe_get(
summary_dict, "export_details", "initial_n_epochs", default=""
)
),
"epoch_badtrials": str(calculate_bad_trials()),
"epoch_percent": safe_percentage(
safe_get(
summary_dict, "export_details", "final_n_epochs", default=""
),
safe_get(
summary_dict, "export_details", "initial_n_epochs", default=""
),
),
}
)
details.update(
{
"proc_nComps": str(
safe_get(summary_dict, "ica_details", "proc_nComps", default="")
),
"proc_removeComps": str(
safe_get(
summary_dict, "ica_details", "proc_removeComps", default=""
)
),
"exclude_category": summary_dict.get("exclude_category", ""),
}
)
# Handle CSV operations with appropriate error handling
if csv_path.exists():
try:
# Read existing CSV
df = pd.read_csv(
csv_path, dtype=str
) # Force all columns to be string type
# Ensure all columns exist in DataFrame
for col in details.keys():
if col not in df.columns:
df[col] = ""
# Update or append entry
subj_basename = details.get("subj_basename", "")
if subj_basename and subj_basename in df["subj_basename"].values:
# Update existing row
df.loc[
df["subj_basename"] == subj_basename,
list(details.keys()),
] = list(
details.values()
) # Use list of values instead of pd.Series which can cause index mismatch
else:
# Append new entry
df = pd.concat([df, pd.DataFrame([details])], ignore_index=True)
except Exception as csv_err: # pylint: disable=broad-except
message("error", f"Error processing existing CSV: {str(csv_err)}")
# Create new DataFrame as fallback
df = pd.DataFrame([details], dtype=str)
else:
# Create new DataFrame with all columns as string type
df = pd.DataFrame([details], dtype=str)
# Save updated CSV with error handling
try:
# Ensure directory exists
csv_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(csv_path, index=False)
message(
"success",
f"Updated processing log for {details['subj_basename']} in {csv_path}",
)
except Exception as save_err: # pylint: disable=broad-except
message("error", f"Error saving CSV: {str(save_err)}")
return
# Update run record with CSV path
try:
metadata = {
"processing_log": {
"creationDateTime": datetime.now().isoformat(),
"csv_path": str(csv_path),
}
}
manage_database(
operation="update",
update_record={
"run_id": summary_dict.get("run_id", ""),
"metadata": metadata,
},
)
except Exception as db_err: # pylint: disable=broad-except
message("error", f"Error updating database: {str(db_err)}")
except Exception as e: # pylint: disable=broad-except
message(
"error",
f"Error updating processing log: {str(e)}\n{traceback.format_exc()}",
)
[docs]
def create_json_summary(run_id: str) -> dict:
"""
Creates a JSON summary of the run metadata.
The main purpose of this is to create a summary of the run for the autoclean report.
Parameters
----------
run_id : str
The run ID to create a JSON summary for
Returns
-------
summary_dict : dict
The JSON summary of the run metadata
"""
run_record = get_run_record(run_id)
if not run_record:
message("error", f"No run record found for run ID: {run_id}")
return
metadata = run_record.get("metadata", {})
# Create a JSON summary of the metadata
if "step_create_bids_path" in run_record["metadata"]:
bids_info = run_record["metadata"]["step_create_bids_path"]
derivatives_dir = Path(bids_info["derivatives_dir"])
else:
message(
"warning",
"Failed to create json summary -> Could not find bids info in metadata.",
)
return {}
outputs = [file.name for file in derivatives_dir.iterdir() if file.is_file()]
# Determine processing state and exclusion category
proc_state = "postcomps"
exclude_category = ""
if not run_record.get("success", False):
error_msg = run_record.get("error", "").lower()
if "line noise" in error_msg:
proc_state = "LINE NOISE"
exclude_category = "Excessive Line Noise"
elif "insufficient data" in error_msg:
proc_state = "INSUFFICIENT_DATA"
exclude_category = "Insufficient Data"
else:
proc_state = "ERROR"
exclude_category = f"Processing Error: {error_msg[:100]}"
# FIND BAD CHANNELS
channel_dict = {}
if "step_clean_bad_channels" in metadata:
channel_dict["step_clean_bad_channels"] = metadata["step_clean_bad_channels"][
"bads"
]
channel_dict["uncorrelated_channels"] = metadata["step_clean_bad_channels"][
"uncorrelated_channels"
]
channel_dict["deviation_channels"] = metadata["step_clean_bad_channels"][
"deviation_channels"
]
channel_dict["ransac_channels"] = metadata["step_clean_bad_channels"][
"ransac_channels"
]
flagged_chs_file = None
for file_name in outputs:
if file_name.endswith("FlaggedChs.tsv"):
flagged_chs_file = file_name
break
if flagged_chs_file:
with open(derivatives_dir / flagged_chs_file, "r", encoding="utf8") as f:
# Skip the header line
next(f)
# Read each line and extract the label and channel name
for line in f:
parts = line.strip().split("\t")
if len(parts) == 2:
label, channel = parts
if label not in channel_dict:
channel_dict[label] = []
channel_dict[label].append(channel)
# Get all bad channels
bad_channels = [
channel for channels in channel_dict.values() for channel in channels
]
# Remove duplicates while preserving order
unique_bad_channels = []
for channel in bad_channels:
if channel not in unique_bad_channels:
unique_bad_channels.append(channel)
channel_dict["removed_channels"] = unique_bad_channels
output_dir = Path(metadata["step_prepare_directories"]["bids"]).parent
# FIND IMPORT DETAILS
import_details = {}
dropped_channels = []
if "step_drop_outerlayer" in metadata:
try:
dropped_channels = metadata["step_drop_outerlayer"][
"dropped_outer_layer_channels"
]
if dropped_channels is None:
dropped_channels = []
import_details["dropped_channels"] = dropped_channels
except Exception as e: # pylint: disable=broad-except
message("error", f"Failed to load dropped channels: {str(e)}")
if "import_eeg" in metadata:
import_details["sample_rate"] = metadata["import_eeg"]["sampleRate"]
import_details["net_nbchan_orig"] = metadata["import_eeg"]["channelCount"]
import_details["duration"] = metadata["import_eeg"]["durationSec"]
import_details["basename"] = metadata["import_eeg"]["unprocessedFile"]
original_channel_count = int(metadata["import_eeg"]["channelCount"]) - int(
len(dropped_channels)
)
else:
message("error", "No import details found")
return {}
# FIND PROCESSING DETAILS - use verified applied values, not requested parameters
processing_details = {}
if "step_filter_data" in metadata:
# Use actual applied values, not filter_args (which are requested values)
filter_metadata = metadata["step_filter_data"]
processing_details["h_freq"] = filter_metadata.get("applied_h_freq")
processing_details["l_freq"] = filter_metadata.get("applied_l_freq")
processing_details["notch_freqs"] = filter_metadata.get("applied_notch_freqs")
processing_details["notch_widths"] = filter_metadata.get("applied_notch_widths")
# Also record verification of what was actually achieved
processing_details["filtered_sfreq"] = filter_metadata.get("filtered_sfreq")
processing_details["filtered_n_channels"] = filter_metadata.get(
"filtered_n_channels"
)
if "step_resample_data" in metadata:
# Use actual achieved sample rate, not target (which is requested value)
resample_metadata = metadata["step_resample_data"]
processing_details["target_sfreq"] = resample_metadata.get("target_sfreq")
processing_details["actual_sfreq"] = resample_metadata.get("actual_sfreq")
# Record verification metrics
processing_details["resampled_n_samples"] = resample_metadata.get(
"resampled_n_samples"
)
if "step_trim_edges" in metadata:
processing_details["trim_duration"] = metadata["step_trim_edges"][
"trim_duration"
]
if "step_crop_duration" in metadata:
processing_details["crop_duration"] = metadata["step_crop_duration"][
"crop_duration"
]
processing_details["crop_start"] = metadata["step_crop_duration"]["crop_start"]
processing_details["crop_end"] = metadata["step_crop_duration"]["crop_end"]
if "step_assign_eog_channels" in metadata:
processing_details["eog_channels"] = metadata["step_assign_eog_channels"][
"assigned_eog_channels"
]
if "step_drop_outerlayer" in metadata:
processing_details["dropped_channels"] = metadata["step_drop_outerlayer"][
"dropped_outer_layer_channels"
]
processing_details["original_channel_count"] = metadata["step_drop_outerlayer"][
"original_channel_count"
]
processing_details["new_channel_count"] = metadata["step_drop_outerlayer"][
"new_channel_count"
]
if "step_rereference_data" in metadata:
processing_details["ref_type"] = metadata["step_rereference_data"][
"new_ref_type"
]
# FIND EXPORT DETAILS - Quality Control Integrity
# Priority order: 1) Actual measured values from exported files (best QC)
# 2) Verified values from processing steps
# 3) Calculated values (flagged as such for transparency)
export_details = {}
if "save_epochs_to_set" in metadata:
save_epochs_to_set = metadata["save_epochs_to_set"]
epoch_length = save_epochs_to_set["tmax"] - save_epochs_to_set["tmin"]
export_details["epoch_length"] = epoch_length
export_details["final_n_epochs"] = save_epochs_to_set["n_epochs"]
# Use actual duration from final exported data (true QC verification)
if "actual_duration" in save_epochs_to_set:
export_details["final_duration"] = save_epochs_to_set["actual_duration"]
export_details["final_duration_verified"] = True
else:
# Fallback: calculate expected duration (mark as calculated, not measured)
export_details["final_duration"] = (
epoch_length * save_epochs_to_set["n_epochs"]
)
export_details["final_duration_calculated"] = True
# Use actual channel count from the final exported data, not calculations
if (
"save_epochs_to_set" in metadata
and "n_channels" in metadata["save_epochs_to_set"]
):
# Use actual channel count from exported file (true QC verification)
export_details["net_nbchan_post"] = metadata["save_epochs_to_set"][
"n_channels"
]
elif original_channel_count and unique_bad_channels:
# Fallback: calculate based on removed channels (mark as calculated)
export_details["net_nbchan_post"] = original_channel_count - len(
unique_bad_channels
)
export_details["net_nbchan_post_calculated"] = True
else:
# Last resort: use original count (mark as unverified)
export_details["net_nbchan_post"] = original_channel_count
export_details["net_nbchan_post_unverified"] = True
if "step_create_regular_epochs" in metadata:
epoch_metadata = metadata["step_create_regular_epochs"]
elif "step_create_eventid_epochs" in metadata:
epoch_metadata = metadata["step_create_eventid_epochs"]
else:
message(
"warning",
"No epoch creation details found. Processing details may be missing",
)
epoch_metadata = None
if epoch_metadata is not None:
export_details["initial_n_epochs"] = epoch_metadata["initial_epoch_count"]
export_details["initial_duration"] = epoch_metadata["initial_duration"]
# Use actual sample rate from final exported data (best QC verification)
if (
"save_epochs_to_set" in metadata
and "actual_sfreq" in metadata["save_epochs_to_set"]
):
export_details["srate_post"] = metadata["save_epochs_to_set"][
"actual_sfreq"
]
elif (
"step_resample_data" in metadata
and "actual_sfreq" in metadata["step_resample_data"]
):
# Second choice: actual sample rate from resampling step
export_details["srate_post"] = metadata["step_resample_data"][
"actual_sfreq"
]
else:
# Fallback: calculate from epoch metadata (mark as calculated)
export_details["srate_post"] = (
epoch_metadata["single_epoch_samples"]
) / epoch_metadata["single_epoch_duration"]
export_details["srate_post_calculated"] = True
export_details["epoch_limits"] = [
epoch_metadata["tmin"],
epoch_metadata["tmax"],
]
ica_details = {}
if "step_detect_dense_oscillatory_artifacts" in metadata:
ref_artifacts = metadata["step_detect_dense_oscillatory_artifacts"][
"artifacts_detected"
]
processing_details["ref_artifacts"] = ref_artifacts
summary_dict = {
"run_id": run_id,
"task": run_record["task"],
"bids_subject": f"sub-{bids_info['bids_subject']}",
"timestamp": run_record["created_at"],
"basename": import_details["basename"],
"proc_state": proc_state,
"exclude_category": exclude_category,
"import_details": import_details,
"processing_details": processing_details,
"export_details": export_details,
"ica_details": ica_details,
"channel_dict": channel_dict,
"outputs": outputs,
"output_dir": str(output_dir),
"derivatives_dir": str(derivatives_dir),
}
message("success", f"Created JSON summary for run {run_id}")
# Add metadata to database
manage_database(
operation="update",
update_record={"run_id": run_id, "metadata": {"json_summary": summary_dict}},
)
return summary_dict
[docs]
def generate_bad_channels_tsv(summary_dict: Dict[str, Any]) -> None:
"""
Generates a tsv file containing the bad channels and reasons for flagging for the run.
Parameters
----------
summary_dict : dict
The summary dictionary containing the run metadata
"""
try:
channel_dict = summary_dict["channel_dict"]
except Exception as e: # pylint: disable=broad-except
message(
"warning",
f"Could not generate bad channels tsv -> No channel dict found in summary dict: {str(e)}", # pylint: disable=line-too-long
)
return
try:
noisy_channels = channel_dict.get("noisy_channels", [])
uncorrelated_channels = channel_dict.get("uncorrelated_channels", [])
deviation_channels = channel_dict.get("deviation_channels", [])
bridged_channels = channel_dict.get("bridged_channels", [])
rank_channels = channel_dict.get("rank_channels", [])
ransac_channels = channel_dict.get("ransac_channels", [])
except Exception as e: # pylint: disable=broad-except
message(
"warning",
f"Could not generate bad channels tsv -> Failed to fetch bad channels: {str(e)}",
)
return
with open(
f"{summary_dict['derivatives_dir']}/FlaggedChs.tsv", "w", encoding="utf8"
) as f:
f.write("label\tchannel\n")
for channel in noisy_channels:
f.write("Noisy\t" + channel + "\n")
for channel in uncorrelated_channels:
f.write("Uncorrelated\t" + channel + "\n")
for channel in deviation_channels:
f.write("Deviation\t" + channel + "\n")
for channel in ransac_channels:
f.write("Ransac\t" + channel + "\n")
for channel in bridged_channels:
f.write("Bridged\t" + channel + "\n")
for channel in rank_channels:
f.write("Rank\t" + channel + "\n")
message("success", f"Bad channels tsv generated for {summary_dict['run_id']}")