Skip to content

PhotometryPipeline

A highly flexible class for bulk processing photometry data. Supports custom operations and subclasses of PhotometryData and PhotometryExperiment


Example Usage

This example is for an risky decision making task in rats stored in the TDT format with multiple experiments per TDT folder.

Setup

from pyFiberPhotometry import PhotometryExperiment, PhotometryData, PhotometryPipeline, TDTLoader

# --- loader params ---
shared_loader_kwargs = dict(
    event_labels = ['Lrg', 'Sml', 'Hsl', 'Zap'],
    signal_label = '_465',
    isosbestic_label = '_405',
    downsample = 10,
    annotation_file = 'annotations.json',
    annotation_handler = 'json',
)
loader_kwargs = [
    dict(box='A', **shared_loader_kwargs),
    dict(box='B', **shared_loader_kwargs)
]

# --- preprocess params ---
preprocess_kwargs = dict(
    cutoff_frequency = 3.0,
    order = 4,
    correction_method = 'dF/F',
    signal_normalization = 'none',
    fit_using = 'OLS',
    maxiter = 1000,
    c = 3,
    artifact_detector = None,
    artifact_corrector = None,
)

# --- trial extraction params ---
trial_extraction_kwargs = dict(
        align_to = 'Hsl',
        center_on = ['Lrg', 'Sml'],
        trial_bounds = (-23.0, 5.0),
        baseline_bounds = (-5, -1),
        event_tolerences = {'Lrg' : (5, 18), 'Sml' : (5, 18), 'Zap': (4.5, 18.5)},
        trial_normalization = 'zero',
        check_overlap = False,
        time_error_threshold = 0.01,
        event_conflict_logic = 'first',
)

# --- uid builder ---
def RDT_id_builder(exp: PhotometryExperiment) -> str:
    id = (
        f"{exp.metadata.get('rat', 'UnknownRat')}_"
        f"{exp.metadata.get('current', 'UnknownCurrent')}uA_"
        f"Box{exp.metadata.get('box', 'UnknownBox')}_"
        f"{exp.metadata.get('stripped_date', 'UnknownDate')}_"
        f"{exp.metadata.get('source', 'UnknownSource').split('-')[-1]}"
    )
    return id

# --- post loading operation ---
# removes last dummy trial
def RDT_post_load(exp: PhotometryExperiment) -> None:
    if 'Hsl' in exp.events:
        exp.events['Hsl'] = exp.events['Hsl'][:-1]

Run

pipeline = PhotometryPipeline(
    data_directory='database/NAc_Young_RDT_Photometry',
    target_type='folder',
    loader_cls=TDTLoader,
    experiment_cls=PhotometryExperiment,
    data_cls=PhotometryData,
    recursive=False,
    pattern='Emely*'
)

result = pipeline.run(
    output_dir='/pipeline_RDT',
    loader_kwargs=loader_kwargs,
    preprocess_kwargs=preprocess_kwargs,
    trial_extraction_kwargs=trial_extraction_kwargs,
    log_file='pipeline.log',
    passdown_metadata=['rat', 'current', 'box', 'source'],
    id_builder=RDT_id_builder,
    post_load_operation=RDT_post_load,
)

PhotometryPipeline(data_directory, target_type, loader_cls, experiment_cls=PhotometryExperiment, data_cls=PhotometryData, recursive=False, pattern=None)

Initialize a generic directory-level photometry pipeline.

Parameters:

  • data_directory (str | Path) –

    Root directory containing candidate input files or folders to process.

  • target_type (Literal['file', 'folder']) –

    File type to treat as a pipeline input. Must be either 'file' or 'folder'.

  • loader_cls (type[PhotometryLoader]) –

    Loader class to use.

  • experiment_cls (type[PhotometryExperiment], default: PhotometryExperiment ) –

    Experiment class to use.

  • data_cls (type[PhotometryData], default: PhotometryData ) –

    Trial-data class to use.

  • recursive (bool, default: False ) –

    Whether to recursively search under data_directory when discovering inputs.

  • pattern (str | None, default: None ) –

    Optional glob pattern used to filter discovered inputs. If None, all children under data_directory are considered.

Raises:

  • ValueError

    If data_directory does not exist or is not a directory.

Source code in pyFiberPhotometry/core/PhotometryPipeline.py
def __init__(
        self,
        data_directory: str | Path,
        target_type: Literal['file', 'folder'],
        loader_cls: type[PhotometryLoader],
        experiment_cls: type[PhotometryExperiment] = PhotometryExperiment,
        data_cls: type[PhotometryData] = PhotometryData,
        recursive: bool = False,
        pattern: str | None = None,
        ) -> None:
    """Initialize a generic directory-level photometry pipeline.

    Args:
        data_directory (str | Path): Root directory containing candidate
            input files or folders to process.
        target_type (Literal['file', 'folder']): File type to
            treat as a pipeline input. Must be either ``'file'`` or
            ``'folder'``.
        loader_cls (type[PhotometryLoader]): Loader class to use.
        experiment_cls (type[PhotometryExperiment]): Experiment class to use.
        data_cls (type[PhotometryData]): Trial-data class to use.
        recursive (bool): Whether to recursively search under
            ``data_directory`` when discovering inputs.
        pattern (str | None): Optional glob pattern used to filter
            discovered inputs. If ``None``, all children under
            ``data_directory`` are considered.

    Raises:
        ValueError: If ``data_directory`` does not exist or is not a
            directory.
    """
    # validate inputs
    data_directory = Path(data_directory).expanduser()
    if not data_directory.exists():
        raise ValueError(f"Data directory {data_directory} does not exist.")
    if not data_directory.is_dir():
        raise ValueError(f"Data directory {data_directory} is not a directory.")    

    # save attributes
    self.data_directory = data_directory
    self.target_type = target_type
    self.loader_cls = loader_cls
    self.experiment_cls = experiment_cls
    self.data_cls = data_cls
    self.recursive = recursive
    self.pattern = pattern

discover_inputs()

Discover candidate inputs under self.data_directory.

Source code in pyFiberPhotometry/core/PhotometryPipeline.py
def discover_inputs(self) -> list[Path]:
    """Discover candidate inputs under ``self.data_directory``."""
    if self.pattern is None:
        candidates = (
            self.data_directory.rglob('*')
            if self.recursive
            else self.data_directory.iterdir()
        )
    else:
        candidates = (
            self.data_directory.rglob(self.pattern)
            if self.recursive
            else self.data_directory.glob(self.pattern)
        )

    inputs = [path for path in candidates if self._matches_input_kind(path)]
    return inputs

run(loader_kwargs, preprocess_kwargs, trial_extraction_kwargs, output_dir=None, log_file=None, trial_output_file='trials.h5ad', low_memory_mode=False, passdown_metadata=['source'], id_builder=None, post_load_operation=None, post_preprocess_operation=None, post_trial_extraction_operation=None)

Run the batch processing pipeline over all discovered inputs.

For each discovered input, the pipeline constructs a loader, loads an experiment, preprocesses the continuous signal, extracts trial-wise data, optionally applies custom hook operations, passes selected metadata down into trial_data.obs, and accumulates the resulting trial-data objects in memory or on disk.

Parameters:

  • output_dir (str | Path | None, default: None ) –

    Directory where outputs should be written. If None, nothing is saved.

  • loader_kwargs (dict[str, Any] | list[dict[str, Any]]) –

    Keyword arguments passed to loader_cls for each job, excluding the discovered input path that the pipeline supplies positionally. Use list of dictionaries creates to create multiple jobs per single input, useful for file formats that contain multiple experiment's data within a single folder/file.

  • preprocess_kwargs (dict[str, Any]) –

    Keyword arguments passed to PhotometryExperiment.preprocess_signal().

  • trial_extraction_kwargs (dict[str, Any]) –

    Keyword arguments passed to PhotometryExperiment.extract_trial_data().

  • log_file (str | None, default: None ) –

    Optional path to a log file. If provided, logging is configured to write to that file. Default None.

  • trial_output_file (str | None, default: 'trials.h5ad' ) –

    Name of the output .h5ad file written under output_dir for accumulated trial data.

  • low_memory_mode (bool, default: False ) –

    If True, accumulate trial data directly on disk instead of keeping the full combined object in memory.

  • passdown_metadata (list[str] | None, default: ['source'] ) –

    Metadata keys from exp.metadata to copy into exp.trial_data.obs for each processed experiment. If None, no metadata columns are added.

  • id_builder (Callable[[type[PhotometryExperiment]], str] | None, default: None ) –

    Optional callable used to construct and assign an experiment ID after loading. Automatically passes the experiment ID to trial_data.obs.

  • post_load_operation (Callable[[type[PhotometryExperiment]], None] | None, default: None ) –

    Optional callable run immediately after an experiment is loaded.

  • post_preprocess_operation (Callable[[type[PhotometryExperiment]], None] | None, default: None ) –

    Optional callable run immediately after preprocessing completes.

  • post_trial_extraction_operation (Callable[[type[PhotometryExperiment]], None] | None, default: None ) –

    Optional callable run immediately after trial extraction completes.

Returns:

  • type[PhotometryData]

    type[PhotometryData]: The trial-data object containing all extracted trials.

Raises:

  • TypeError

    If provided kwargs do not match the accepted signatures of the loader, preprocessing, or trial-extraction methods.

  • ValueError

    If the output configuration is invalid or if no inputs are discovered.

Source code in pyFiberPhotometry/core/PhotometryPipeline.py
def run(
        self,

        # args
        loader_kwargs: dict[str, Any] | list[dict[str, Any]],
        preprocess_kwargs: dict[str, Any],
        trial_extraction_kwargs: dict[str, Any],

        # I/O
        output_dir: str | Path | None = None,
        log_file: str | None = None,
        trial_output_file: str | None = 'trials.h5ad',

        # pipeline params
        low_memory_mode: bool = False,

        # misc
        passdown_metadata: list[str] | None = ['source'],
        id_builder: Callable[[type[PhotometryExperiment]], str] | None = None,
        post_load_operation: Callable[[type[PhotometryExperiment]], None] | None = None,
        post_preprocess_operation: Callable[[type[PhotometryExperiment]], None] | None = None,
        post_trial_extraction_operation: Callable[[type[PhotometryExperiment]], None] | None = None,
        ) -> type[PhotometryData]:
    """Run the batch processing pipeline over all discovered inputs.

    For each discovered input, the pipeline constructs a loader, loads an
    experiment, preprocesses the continuous signal, extracts trial-wise
    data, optionally applies custom hook operations, passes selected
    metadata down into ``trial_data.obs``, and accumulates the resulting
    trial-data objects in memory or on disk.

    Args:
        output_dir (str | Path | None): Directory where outputs should be
            written. If ``None``, nothing is saved.
        loader_kwargs (dict[str, Any] | list[dict[str, Any]]): Keyword
            arguments passed to ``loader_cls`` for each job, excluding the
            discovered input path that the pipeline supplies positionally.
            Use list of dictionaries creates to create multiple jobs per
            single input, useful for file formats that contain multiple
            experiment's data within a single folder/file.
        preprocess_kwargs (dict[str, Any]): Keyword arguments passed to
            ``PhotometryExperiment.preprocess_signal()``.
        trial_extraction_kwargs (dict[str, Any]): Keyword arguments passed
            to ``PhotometryExperiment.extract_trial_data()``.
        log_file (str | None): Optional path to a log file. If provided,
            logging is configured to write to that file. Default ``None``.
        trial_output_file (str | None): Name of the output ``.h5ad`` file
            written under ``output_dir`` for accumulated trial data.
        low_memory_mode (bool): If ``True``, accumulate trial data
            directly on disk instead of keeping the full combined object in
            memory.
        passdown_metadata (list[str] | None): Metadata keys from
            ``exp.metadata`` to copy into ``exp.trial_data.obs`` for each
            processed experiment. If ``None``, no metadata columns are
            added.
        id_builder (Callable[[type[PhotometryExperiment]], str] | None):
            Optional callable used to construct and assign an experiment ID
            after loading. Automatically passes the experiment ID to ``trial_data.obs``.
        post_load_operation (Callable[[type[PhotometryExperiment]], None] | None):
            Optional callable run immediately after an experiment is loaded.
        post_preprocess_operation (Callable[[type[PhotometryExperiment]], None] | None):
            Optional callable run immediately after preprocessing completes.
        post_trial_extraction_operation (Callable[[type[PhotometryExperiment]], None] | None):
            Optional callable run immediately after trial extraction completes.

    Returns:
        type[PhotometryData]: The trial-data object containing all extracted trials.

    Raises:
        TypeError: If provided kwargs do not match the accepted signatures
            of the loader, preprocessing, or trial-extraction methods.
        ValueError: If the output configuration is invalid or if no inputs
            are discovered.
    """

    # --- set up logger ---
    logger = logging.getLogger(__name__)
    if log_file is not None:
        logging.basicConfig(filename=log_file, filemode='w', level=logging.INFO, force=True)
    logger.info('Beginning pipeline')

    # --- coerce inputs ---
    if output_dir is not None:
        output_dir = Path(output_dir)
        trial_output_path = os.path.join(output_dir, trial_output_file)
    else:
        trial_output_path = None

    # --- validate inputs ---
    logger.info('Validating inputs...')
    self._validate_output_dir(output_dir)
    self._validate_low_memory_mode(low_memory_mode, trial_output_path)
    self._validate_all_kwargs(loader_kwargs, preprocess_kwargs, trial_extraction_kwargs,)

    # --- construct jobs ---
    logger.info('Discovering inputs...')
    inputs = self.discover_inputs()
    if len(inputs) == 0:
        raise ValueError(f'No inputs discovered!')

    logger.info('Building jobs...')
    jobs = self._build_jobs(
        inputs=inputs, 
        loader_kwargs=loader_kwargs,
        preprocess_kwargs=preprocess_kwargs,
        trial_extraction_kwargs=trial_extraction_kwargs
    )

    # --- set up job iteration ---
    n_jobs = len(jobs)
    self.trial_data: type[PhotometryData] = None
    n_errors = 0
    n_processed = 0

    # --- iterate over jobs ---
    logger.info(f'Iterating over {n_jobs} jobs...\n')
    for i, job in enumerate(jobs):
        try:
            logger.info(f'Processing job {job["input"]} ({i+1}/{n_jobs})')

            # 1. load experiment
            logger.info(f'Loading expriment...')
            exp = self._load_experiment(job['input'], job['loader_kwargs'])

            if post_load_operation is not None:
                logger.info(f'Running custom post loading operation...')
                post_load_operation(exp)

            # 2. run preprocess
            logger.info(f'Preprocessing signal...')
            self._run_preprocessing(exp, job['preprocess_kwargs'])

            if post_preprocess_operation is not None:
                logger.info(f'Running custom post preprocessing operation...')
                post_preprocess_operation(exp)

            # 3. run extract trial data
            logger.info(f'Extracting trial data...')
            self._run_trial_extraction(exp, job['trial_extraction_kwargs'])

            if post_trial_extraction_operation is not None:
                logger.info(f'Running custom post trial extraction operation...')
                post_trial_extraction_operation(exp)

            # warn if any trial windows were invalid
            if exp.metadata['invalid_windows'] is not None:
                logger.warning(
                    f'Invalid trial windows at indexes {exp.metadata["invalid_windows"]} '
                    f'have been dropped.'
                )

            # assign uid if function provided
            if id_builder is not None:
                logger.info(f'Assiging experiment ID...')
                exp.id = id_builder(exp)
                exp.trial_data.obs['experiment_id'] = exp.id
                logger.info(f'Expriment ID = {str(exp.id)}')

            # pass down specified metadata
            if passdown_metadata is not None:
                logger.info(f'Passing down experiment metadata as columns...')
                exp.trial_data.add_obs_columns(add_from=exp.metadata, keys=passdown_metadata)

            # accumulate object
            logger.info(f'Accumulating result...')
            self._accumulate_result(exp, trial_output_path, low_memory_mode)

            # log expriment info
            logger.info(
                f'Finished processing experiment with '
                f'{exp.trial_data.n_trials} trials x {exp.trial_data.n_times} timepoints.'
            )

            # clean up before next iteration
            logger.info(f'Job {i+1} complete.\n')
            del exp
            n_processed += 1

        # handle errors
        except Exception as e:
            logger.error(f'Error processing {job["input"]}: \n\t {e}\n', exc_info=True)
            n_errors += 1

    # --- finalize ---
    logger.info('Finalizng results...')
    trial_data = self._finalize_result(trial_output_path, low_memory_mode)

    logger.info(
        f'Processing pipeline complete with {n_errors} errors '
        f'and {n_processed} successes out of {n_jobs} jobs.'
    )

    return trial_data