Skip to content

Reference

Core Pipeline Structure

The Bulk FHIR pipeline is implemented as a Python-based data pipeline with the following core components:

  • BulkFHIRPipeline - Abstract base class defining the pipeline interface
  • FHIRGCSIngestor - Handles ingestion of FHIR data into Google Cloud Storage
  • FHIRStoreProcessor - Processes FHIR data from GCS into BigQuery

CLI Commands

The pipeline exposes the following CLI commands:

trigger-export

Initiates a Bulk FHIR export from the specified EHR provider.

Parameters: - --tenant - The tenant name (required) - --site - The site name (required) - --environment - The environment (development/staging/production) (required) - --ehr-provider - The EHR provider to fetch data from (required) - --since - The start date in YYYY-MM-DD format (optional) - --until - The end date in YYYY-MM-DD format (optional)

cancel-export

Cancels an ongoing Bulk FHIR export.

Parameters: - --tenant - The tenant name (required) - --site - The site name (required) - --environment - The environment (required) - --ehr-provider - The EHR provider (required)

retrieve-and-load

Retrieves and loads FHIR data from a completed export.

Parameters: - --tenant - The tenant name (required) - --site - The site name (required) - --environment - The environment (required) - --ehr-provider - The EHR provider (required) - --max-threads - The maximum number of threads to use (default: 1) - --enable-resource-url-checkpointing - Enable resource URL checkpointing (default: False)

process-fhir-store-to-bq

Processes FHIR data from GCS into BigQuery.

Parameters: - --tenant - The tenant name (required) - --site - The site name (required) - --environment - The environment (required)

API Interactors

Each EHR provider has a specialized API interactor class that implements provider-specific authentication and API interaction patterns. All interactors provide the following common methods:

  • start_bulk_fhir_export() - Initiates a Bulk FHIR export
  • cancel_bulk_fhir_export() - Cancels an ongoing export
  • get_fhir_export_job_status() - Checks the status of an export job

Data Flow Architecture

  1. Export Initiation - API call to EHR provider to start bulk export
  2. Wait sensor - A flat amount of time is set between triggering and retrieval.
  3. Data Retrieval - Download FHIR data files to GCS
  4. BigQuery Loading - Process FHIR data into BigQuery tables

Command Line Examples

Trigger an export from Epic with date filters:

python -m pipelines.bulk_fhir_pipeline trigger-export \
  --tenant trially-dogfood \
  --site demo-site \
  --environment staging \
  --ehr-provider epic \
  --since 2023-01-01 \
  --until 2023-01-31

Retrieve and load data with multi-threading:

python -m pipelines.bulk_fhir_pipeline retrieve-and-load \
  --tenant trially-dogfood \
  --site demo-site \
  --environment staging \
  --ehr-provider epic \
  --max-threads 4 \
  --enable-resource-url-checkpointing True

Checkpointing

The pipeline implements checkpointing at two levels:

  1. Export Status URL Checkpointing - Tracks the status URL of active exports
  2. Resource URL Checkpointing - (Optional) Tracks individual resource URLs to prevent re-downloading

When enabled, resource URL checkpointing maintains a database of processed URLs to allow for resumable downloads and prevent duplicate processing.

11. Known Limitations

  • Only one URL may be handled at a time - if an export fails and the URL isn't cancelled or processed, it will get stuck in it.
  • Export cancellation may not be effective for all providers
  • Some EHR providers may take extended periods (days to weeks) to process exports
  • Rate limiting varies by provider