Batch Processing
This tutorial gives an in depth overview of the PhotometryPipeline class. PhotometryPipeline runs the same loader, preprocessing, and trial-extraction workflow over many recordings and returns one combined PhotometryData object.
The pipeline is useful when you have a large batch of experiments that share processing and trial-windowing parameters. It handles:
-
discovering input files or folders,
-
constructing loaders,
-
running
PhotometryExperiment.preprocess_signal(), -
running
PhotometryExperiment.extract_trial_data(), -
passing metadata into trial-level
obscolumns, -
combining trial data across experiments,
-
saving logs, trial-data files, and optional dashboards,
-
running custom functions at key points in the workflow.
Setup
from pathlib import Path
import numpy as np
import pandas as pd
from PhoPro import CSVLoader, PhotometryData, PhotometryExperiment, PhotometryPipeline
This tutorial goes over two batch layouts:
-
data/pipeline/case1: each experiment has its own folder containing a CSV file and anannotation.jsonfile holding per-experiment metadata: subject ID, age, and sex. -
data/pipeline/case2: all CSV files and their matching JSON annotations live in one directory and share a basename.
# set up reusable paths
CASE1_DIR = Path('data/pipeline/case1')
CASE2_DIR = Path('data/pipeline/case2')
EVENT_COLS = ['trial_cue', 'lever1', 'lever2', 'shock']
print(CASE1_DIR)
print(CASE2_DIR)
data/pipeline/case1
data/pipeline/case2
We will use the same preprocessing and trial extraction settings in several examples.
preprocess_kwargs = dict(
cutoff_frequency=3,
order=4,
correction_method='dF/F',
fit_using='OLS',
)
trial_extraction_kwargs = dict(
align_to='trial_cue',
center_on=['lever1', 'lever2'],
trial_bounds=(-8, 8),
event_tolerences={
'lever1': (2, 4),
'lever2': (2, 4),
'shock': None,
},
trial_normalization='none',
check_overlap=True,
all_events=True,
window_alignment='nearest',
)
1. Creating a Pipeline
A pipeline is configured with a data directory, a target kind, and a loader class. target_type='file' means each experiments data is held in a single file (like CSVs) while target_type='folder' means experiment data is held in folders (like the TDT storage format).
pipeline1 = PhotometryPipeline(
data_directory=CASE1_DIR,
target_type='file',
loader_cls=CSVLoader,
recursive=True,
pattern='experiment_*.csv',
)
discover_inputs shows which inputs will become jobs. In case 1, the CSV files are inside experiment subdirectories, so recursive=True is needed.
[PosixPath('data/pipeline/case1/experiment_1/experiment_1.csv'),
PosixPath('data/pipeline/case1/experiment_3/experiment_3.csv'),
PosixPath('data/pipeline/case1/experiment_2/experiment_2.csv')]
Case 2 is flat, so the pipeline can search only the top-level directory and not in nested folders for target files.
pipeline2 = PhotometryPipeline(
data_directory=CASE2_DIR,
target_type='file',
loader_cls=CSVLoader,
recursive=False,
pattern='experiment_*.csv',
)
pipeline2.discover_inputs()
[PosixPath('data/pipeline/case2/experiment_1.csv'),
PosixPath('data/pipeline/case2/experiment_2.csv'),
PosixPath('data/pipeline/case2/experiment_3.csv')]
2. Loader Keyword Arguments
loader_kwargs are passed to the loader constructor for each discovered input. The discovered path itself is supplied positionally (as the first argument) by the pipeline, so loader_kwargs should contain the rest of the loader configuration.
static_loader_kwargs = dict(
time_col='time',
signal_col='raw_signal',
isosbestic_col='raw_isosbestic',
event_cols=EVENT_COLS,
)
static_loader_kwargs
{'time_col': 'time',
'signal_col': 'raw_signal',
'isosbestic_col': 'raw_isosbestic',
'event_cols': ['trial_cue', 'lever1', 'lever2', 'shock']}
A static dictionary is enough when every input uses the same loader settings and no per-file annotation path is needed.
static_trials = pipeline2.run(
loader_kwargs=static_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir=None,
log_file='output/pipeline_static_loader.log',
passdown_metadata=['source'],
)
print(static_trials)
static_trials.obs.head()
Photometry dataset with 60 trials, 320 timepoints, and 6 observations.
| trial_num | trial_cue | lever1 | lever2 | shock | source | |
|---|---|---|---|---|---|---|
| 0 | 1 | -3.50 | 0.0 | NaN | NaN | data/pipeline/case2/experiment_1.csv |
| 1 | 2 | -2.70 | 0.0 | NaN | 1.25 | data/pipeline/case2/experiment_1.csv |
| 2 | 3 | 0.00 | NaN | NaN | NaN | data/pipeline/case2/experiment_1.csv |
| 3 | 4 | -3.95 | 0.0 | NaN | NaN | data/pipeline/case2/experiment_1.csv |
| 4 | 5 | -3.80 | 0.0 | NaN | NaN | data/pipeline/case2/experiment_1.csv |
For loading paired annotation files, loader_kwargs can be a function. The function receives the discovered Path and returns the kwargs for that specific input.
def case1_loader_kwargs(csv: Path) -> dict:
return dict(
time_col='time',
signal_col='raw_signal',
isosbestic_col='raw_isosbestic',
event_cols=EVENT_COLS,
annotation_file=csv.parent / 'annotation.json',
annotation_handler='json',
)
case1_loader_kwargs(pipeline1.discover_inputs()[0])
{'time_col': 'time',
'signal_col': 'raw_signal',
'isosbestic_col': 'raw_isosbestic',
'event_cols': ['trial_cue', 'lever1', 'lever2', 'shock'],
'annotation_file': PosixPath('data/pipeline/case1/experiment_1/annotation.json'),
'annotation_handler': 'json'}
That path-aware function lets the pipeline derive the correct annotation file for each CSV in the nested folder layout.
case1_trials = pipeline1.run(
loader_kwargs=case1_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir=None,
log_file='output/pipeline_case1.log',
passdown_metadata=['source', 'subject', 'sex', 'age'],
)
print(case1_trials)
case1_trials.obs.head()
Photometry dataset with 60 trials, 320 timepoints, and 9 observations.
| trial_num | trial_cue | lever1 | lever2 | shock | source | subject | sex | age | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 1 | -3.50 | 0.0 | NaN | NaN | data/pipeline/case1/experiment_1/experiment_1.csv | animal_1 | male | young |
| 1 | 2 | -2.70 | 0.0 | NaN | 1.25 | data/pipeline/case1/experiment_1/experiment_1.csv | animal_1 | male | young |
| 2 | 3 | 0.00 | NaN | NaN | NaN | data/pipeline/case1/experiment_1/experiment_1.csv | animal_1 | male | young |
| 3 | 4 | -3.95 | 0.0 | NaN | NaN | data/pipeline/case1/experiment_1/experiment_1.csv | animal_1 | male | young |
| 4 | 5 | -3.80 | 0.0 | NaN | NaN | data/pipeline/case1/experiment_1/experiment_1.csv | animal_1 | male | young |
For the single folder case, the resolver can use Path.with_suffix.
def case2_loader_kwargs(csv: Path) -> dict:
return dict(
time_col='time',
signal_col='raw_signal',
isosbestic_col='raw_isosbestic',
event_cols=EVENT_COLS,
annotation_file=csv.with_suffix('.json'),
annotation_handler='json',
)
case2_trials: PhotometryData = pipeline2.run(
loader_kwargs=case2_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir=None,
log_file='output/pipeline_case2.log',
passdown_metadata=['source', 'subject', 'sex', 'age'],
)
print(case2_trials)
case2_trials.obs.head()
Photometry dataset with 60 trials, 320 timepoints, and 9 observations.
| trial_num | trial_cue | lever1 | lever2 | shock | source | subject | sex | age | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 1 | -3.50 | 0.0 | NaN | NaN | data/pipeline/case2/experiment_1.csv | animal_1 | male | young |
| 1 | 2 | -2.70 | 0.0 | NaN | 1.25 | data/pipeline/case2/experiment_1.csv | animal_1 | male | young |
| 2 | 3 | 0.00 | NaN | NaN | NaN | data/pipeline/case2/experiment_1.csv | animal_1 | male | young |
| 3 | 4 | -3.95 | 0.0 | NaN | NaN | data/pipeline/case2/experiment_1.csv | animal_1 | male | young |
| 4 | 5 | -3.80 | 0.0 | NaN | NaN | data/pipeline/case2/experiment_1.csv | animal_1 | male | young |
3. Multiple Jobs per Input
loader_kwargs may also be a list of dictionaries. A list creates multiple jobs for each discovered input. This is useful when the same file should be loaded in more than one way. For example, loading multiple boxes from a TDT vault folder or multiple exerimental wavelengths from one CSV.
Below is an example of loading a single experiment twice.
single_file_pipeline = PhotometryPipeline(
data_directory=CASE2_DIR,
target_type='file',
loader_cls=CSVLoader,
recursive=False,
# restrict to only experiment_1.csv
pattern='experiment_1.csv',
)
loader_fanout = [
dict(
time_col='time',
signal_col='raw_signal',
isosbestic_col='raw_isosbestic',
event_cols=['trial_cue', 'lever1', 'lever2'],
annotation_file=CASE2_DIR / 'experiment_1.json',
annotation_handler='json',
),
dict(
time_col='time',
signal_col='raw_signal',
isosbestic_col='raw_isosbestic',
event_cols=EVENT_COLS,
annotation_file=CASE2_DIR / 'experiment_1.json',
annotation_handler='json',
),
]
jobs = single_file_pipeline._build_jobs(
inputs=single_file_pipeline.discover_inputs(),
loader_kwargs=loader_fanout,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs={**trial_extraction_kwargs, 'event_tolerences': {'lever1': (2, 4), 'lever2': (2, 4)}},
)
len(jobs)
2
A callable resolver may also return a list, giving per-input fan-out while still deriving paths from each input.
def fanout_loader_kwargs(csv: Path) -> list[dict]:
base = dict(
time_col='time',
signal_col='raw_signal',
isosbestic_col='raw_isosbestic',
annotation_file=csv.with_suffix('.json'),
annotation_handler='json',
)
return [
base | {'event_cols': ['trial_cue', 'lever1', 'lever2']},
base | {'event_cols': EVENT_COLS},
]
fanout_trials = single_file_pipeline.run(
loader_kwargs=fanout_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs={**trial_extraction_kwargs, 'event_tolerences': {'lever1': (2, 4), 'lever2': (2, 4)}},
output_dir=None,
log_file='output/pipeline_fanout.log',
passdown_metadata=['source', 'subject'],
)
print(fanout_trials)
fanout_trials.obs.head()
Photometry dataset with 40 trials, 320 timepoints, and 6 observations.
| trial_num | trial_cue | lever1 | lever2 | source | subject | |
|---|---|---|---|---|---|---|
| 0 | 1 | -3.50 | 0.0 | NaN | data/pipeline/case2/experiment_1.csv | animal_1 |
| 1 | 2 | -2.70 | 0.0 | NaN | data/pipeline/case2/experiment_1.csv | animal_1 |
| 2 | 3 | 0.00 | NaN | NaN | data/pipeline/case2/experiment_1.csv | animal_1 |
| 3 | 4 | -3.95 | 0.0 | NaN | data/pipeline/case2/experiment_1.csv | animal_1 |
| 4 | 5 | -3.80 | 0.0 | NaN | data/pipeline/case2/experiment_1.csv | animal_1 |
4. Validation
The pipeline validates static preprocessing, extraction, dashboard, and resolved loader kwargs before running jobs. This catches misspelled keyword arguments early.
try:
pipeline2.run(
loader_kwargs=static_loader_kwargs | {'not_a_loader_argument': True},
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir=None,
)
except TypeError as err:
print(err)
Unexpected kwargs for CSVLoader.__init__(): ['not_a_loader_argument']
Callable loader_kwargs are validated after concrete jobs are built, because the pipeline cannot know what a resolver will return until it sees an input path.
def bad_loader_kwargs(csv: Path) -> dict:
return {'time_col': 'time', 'not_a_loader_argument': True}
try:
pipeline2.run(
loader_kwargs=bad_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir=None,
)
except TypeError as err:
print(err)
Unexpected kwargs for CSVLoader.__init__(): ['not_a_loader_argument']
5. Passing Metadata Down to Trials
passdown_metadata copies selected experiment-level metadata keys into every trial row for that experiment. This is usually how subject, sex, age, session, and source information enter PhotometryData.obs.
| subject | sex | age | source | |
|---|---|---|---|---|
| 0 | animal_1 | male | young | data/pipeline/case2/experiment_1.csv |
| 1 | animal_1 | male | young | data/pipeline/case2/experiment_1.csv |
| 2 | animal_1 | male | young | data/pipeline/case2/experiment_1.csv |
| 3 | animal_1 | male | young | data/pipeline/case2/experiment_1.csv |
| 4 | animal_1 | male | young | data/pipeline/case2/experiment_1.csv |
Set passdown_metadata=None if you do not want any experiment metadata copied into trial rows.
no_metadata_trials = single_file_pipeline.run(
loader_kwargs=case2_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir=None,
log_file='output/pipeline_no_metadata.log',
passdown_metadata=None,
)
no_metadata_trials.obs.head()
| trial_num | trial_cue | lever1 | lever2 | shock | |
|---|---|---|---|---|---|
| 0 | 1 | -3.50 | 0.0 | NaN | NaN |
| 1 | 2 | -2.70 | 0.0 | NaN | 1.25 |
| 2 | 3 | 0.00 | NaN | NaN | NaN |
| 3 | 4 | -3.95 | 0.0 | NaN | NaN |
| 4 | 5 | -3.80 | 0.0 | NaN | NaN |
6. Experiment IDs and Custom Hooks
run accepts custom functions at several points in the workflow:
-
id_builderruns after trial extraction, assigns an experiment ID, and passes that ID into thetrial_data.obs. -
post_load_operationruns immediately after loading. -
post_preprocess_operationruns immediately after preprocessing. -
post_trial_extraction_operationruns immediately after trial extraction.
The id_builder should accept the experiment object as an arguement and return a string. Custom operations should accept the experiment as the only arguement, mutate the experiment in place, and return None.
def build_id(exp: PhotometryExperiment) -> str:
source = Path(exp.metadata['source'])
return f"{exp.metadata.get('subject', 'unknown')}_{source.stem}"
def trim_first_second(exp: PhotometryExperiment) -> None:
exp.trim_times_by_values(lower=1.0)
def add_processed_summary(exp: PhotometryExperiment) -> None:
exp.metadata['processed_signal_mean'] = float(np.nanmean(exp.signal))
def add_trial_labels(exp: PhotometryExperiment) -> None:
obs = exp.trial_data.obs.copy()
has_lever1 = obs['lever1'].notna()
has_lever2 = obs['lever2'].notna()
has_shock = obs['shock'].notna()
obs['trial_label'] = 'no_response'
obs.loc[has_lever2, 'trial_label'] = 'small_reward'
obs.loc[has_lever1 & ~has_shock, 'trial_label'] = 'large_reward_safe'
obs.loc[has_lever1 & has_shock, 'trial_label'] = 'large_reward_shock'
exp.trial_data.obs = obs
hook_trials: PhotometryData = pipeline1.run(
loader_kwargs=case1_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir=None,
log_file='output/pipeline_hooks.log',
passdown_metadata=['source', 'subject', 'processed_signal_mean'],
id_builder=build_id,
post_load_operation=trim_first_second,
post_preprocess_operation=add_processed_summary,
post_trial_extraction_operation=add_trial_labels,
)
hook_trials.obs[['experiment_id', 'subject', 'processed_signal_mean', 'trial_label']].head()
| experiment_id | subject | processed_signal_mean | trial_label | |
|---|---|---|---|---|
| 0 | animal_1_experiment_1 | animal_1 | 0.000039 | large_reward_safe |
| 1 | animal_1_experiment_1 | animal_1 | 0.000039 | large_reward_shock |
| 2 | animal_1_experiment_1 | animal_1 | 0.000039 | no_response |
| 3 | animal_1_experiment_1 | animal_1 | 0.000039 | large_reward_safe |
| 4 | animal_1_experiment_1 | animal_1 | 0.000039 | large_reward_safe |
7. Writing Outputs
When output_dir is supplied, the final combined trial data are written to output_dir / trial_output_file. If log_file is supplied, pipeline progress and job errors are written to that file.
output_trials = single_file_pipeline.run(
loader_kwargs=case2_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir='output/pipeline_output_demo',
log_file='output/pipeline_output_demo.log',
trial_output_file='combined_trials.h5ad',
passdown_metadata=['source', 'subject', 'sex', 'age'],
)
print(output_trials)
print(Path('output/pipeline_output_demo/combined_trials.h5ad').exists())
print(Path('output/pipeline_output_demo.log').exists())
... storing 'source' as categorical
... storing 'subject' as categorical
... storing 'sex' as categorical
... storing 'age' as categorical
Photometry dataset with 20 trials, 320 timepoints, and 9 observations.
True
True
The saved result can be loaded back as a normal PhotometryData object.
loaded_output = PhotometryData.read_h5ad('output/pipeline_output_demo/combined_trials.h5ad')
loaded_output
Photometry dataset with 20 trials, 320 timepoints, and 9 observations.
8. Saving Dashboards
Set save_dashboards=True to save one processed dashboard per experiment. The dashboards are written into output_dir / 'dashboards'. Dashboard keyword arguments are passed to PhotometryExperiment.plot_dashboard.
dashboard_trials = single_file_pipeline.run(
loader_kwargs=case2_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir='output/pipeline_dashboard_demo',
log_file='output/pipeline_dashboard_demo.log',
trial_output_file='dashboard_trials.h5ad',
passdown_metadata=['source', 'subject'],
save_dashboards=True,
dashboard_kwargs={'raw': False, 'downsample': 30},
id_builder=build_id,
)
sorted(Path('output/pipeline_dashboard_demo/dashboards').glob('*.svg'))
... storing 'experiment_id' as categorical
... storing 'source' as categorical
... storing 'subject' as categorical
[PosixPath('output/pipeline_dashboard_demo/dashboards/animal_1_experiment_1.svg')]
9. Low-memory Mode
By default, the pipeline accumulates each experiment's PhotometryData in memory and writes at the end. low_memory_mode=True appends each experiment's trials directly to an .h5ad file as jobs finish, then loads the final result at the end.
Low-memory mode requires a fresh output file. This is intentional, because appending into a pre-existing file could accidentally mix old and new runs.
low_memory_output_dir = Path('output') / f"pipeline_low_memory_{pd.Timestamp.now().strftime('%Y%m%d_%H%M%S_%f')}"
low_memory_trials = pipeline2.run(
loader_kwargs=case2_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=trial_extraction_kwargs,
output_dir=low_memory_output_dir,
log_file='output/pipeline_low_memory.log',
trial_output_file='trials.h5ad',
low_memory_mode=True,
passdown_metadata=['source', 'subject', 'sex', 'age'],
)
print(low_memory_trials)
print((low_memory_output_dir / 'trials.h5ad').exists())
... storing 'source' as categorical
... storing 'subject' as categorical
... storing 'sex' as categorical
... storing 'age' as categorical
... storing 'source' as categorical
... storing 'subject' as categorical
... storing 'sex' as categorical
... storing 'age' as categorical
... storing 'source' as categorical
... storing 'subject' as categorical
... storing 'sex' as categorical
... storing 'age' as categorical
Photometry dataset with 60 trials, 320 timepoints, and 9 observations.
True
10. Error Handling and Logs
The pipeline logs errors for individual jobs and continues to later jobs. If every job fails, finalization raises because there is no trial data to return. For configuration problems, validation usually raises before any job starts.
with open('output/pipeline_case2.log', 'r') as f:
log_preview = ''.join(f.readlines()[:12])
print(log_preview)
INFO:PhoPro.core.PhotometryPipeline:Beginning pipeline
INFO:PhoPro.core.PhotometryPipeline:Validating static inputs...
INFO:PhoPro.core.PhotometryPipeline:Discovering inputs...
INFO:PhoPro.core.PhotometryPipeline:Building jobs...
INFO:PhoPro.core.PhotometryPipeline:Validating loader_kwargs per-job...
INFO:PhoPro.core.PhotometryPipeline:Iterating over 3 jobs...
INFO:PhoPro.core.PhotometryPipeline:Processing job data/pipeline/case2/experiment_1.csv (1/3)
INFO:PhoPro.core.PhotometryPipeline:Loading expriment...
INFO:PhoPro.core.PhotometryPipeline:Preprocessing signal...
INFO:PhoPro.core.PhotometryPipeline:Extracting trial data...
INFO:PhoPro.core.PhotometryPipeline:Passing down experiment metadata as columns...
11. Choosing Pipeline Settings
A good pipeline setup separates three kinds of decisions:
-
Loader kwargs describe how to read the raw file or folder.
-
Preprocess kwargs describe how to turn raw continuous traces into processed signals.
-
Trial extraction kwargs describe how to slice the processed signal into trials.
Keeping those dictionaries separate makes it easier to reuse the same preprocessing with different trial definitions, or the same trial definition across different file layouts.
def run_case2_with_trial_bounds(bounds: tuple[float, float]) -> PhotometryData:
extraction = trial_extraction_kwargs | {'trial_bounds': bounds}
out: PhotometryData = pipeline2.run(
loader_kwargs=case2_loader_kwargs,
preprocess_kwargs=preprocess_kwargs,
trial_extraction_kwargs=extraction,
output_dir=None,
log_file=f"output/pipeline_bounds_{bounds[0]}_{bounds[1]}.log",
passdown_metadata=['source', 'subject'],
)
return out
short_trials = run_case2_with_trial_bounds((-4, 4))
short_trials
Photometry dataset with 60 trials, 160 timepoints, and 7 observations.
Summary
PhotometryPipeline is designed for reproducible batch workflows. The main pattern is:
-
Configure a pipeline with a data directory, target type, loader class, recursion setting, and glob pattern.
-
Use
discover_inputsto confirm the files or folders that will be processed. -
Provide
loader_kwargsas a static dictionary, a list of dictionaries, or a path-aware function. -
Provide
preprocess_kwargsandtrial_extraction_kwargsfor the experiment processing workflow. -
Use
passdown_metadata, hooks, logs, output files, dashboards, and low-memory mode as needed.
The result is a single PhotometryData object containing trial data from every successful job.
AI Use Disclaimer
Generative AI was used to assist in the creation of this tutorial. I plan to replace it in the future with a more polished version.