Reference
Core Pipeline Structure
The Bulk FHIR pipeline is implemented as a Python-based data pipeline with the following core components:
BulkFHIRPipeline- Abstract base class defining the pipeline interfaceFHIRGCSIngestor- Handles ingestion of FHIR data into Google Cloud StorageFHIRStoreProcessor- Processes FHIR data from GCS into BigQuery
CLI Commands
The pipeline exposes the following CLI commands:
trigger-export
Initiates a Bulk FHIR export from the specified EHR provider.
Parameters:
- --tenant - The tenant name (required)
- --site - The site name (required)
- --environment - The environment (development/staging/production) (required)
- --ehr-provider - The EHR provider to fetch data from (required)
- --since - The start date in YYYY-MM-DD format (optional)
- --until - The end date in YYYY-MM-DD format (optional)
cancel-export
Cancels an ongoing Bulk FHIR export.
Parameters:
- --tenant - The tenant name (required)
- --site - The site name (required)
- --environment - The environment (required)
- --ehr-provider - The EHR provider (required)
retrieve-and-load
Retrieves and loads FHIR data from a completed export.
Parameters:
- --tenant - The tenant name (required)
- --site - The site name (required)
- --environment - The environment (required)
- --ehr-provider - The EHR provider (required)
- --max-threads - The maximum number of threads to use (default: 1)
- --enable-resource-url-checkpointing - Enable resource URL checkpointing (default: False)
process-fhir-store-to-bq
Processes FHIR data from GCS into BigQuery.
Parameters:
- --tenant - The tenant name (required)
- --site - The site name (required)
- --environment - The environment (required)
API Interactors
Each EHR provider has a specialized API interactor class that implements provider-specific authentication and API interaction patterns. All interactors provide the following common methods:
start_bulk_fhir_export()- Initiates a Bulk FHIR exportcancel_bulk_fhir_export()- Cancels an ongoing exportget_fhir_export_job_status()- Checks the status of an export job
Data Flow Architecture
- Export Initiation - API call to EHR provider to start bulk export
- Wait sensor - A flat amount of time is set between triggering and retrieval.
- Data Retrieval - Download FHIR data files to GCS
- BigQuery Loading - Process FHIR data into BigQuery tables
Command Line Examples
Trigger an export from Epic with date filters:
python -m pipelines.bulk_fhir_pipeline trigger-export \
--tenant trially-dogfood \
--site demo-site \
--environment staging \
--ehr-provider epic \
--since 2023-01-01 \
--until 2023-01-31
Retrieve and load data with multi-threading:
python -m pipelines.bulk_fhir_pipeline retrieve-and-load \
--tenant trially-dogfood \
--site demo-site \
--environment staging \
--ehr-provider epic \
--max-threads 4 \
--enable-resource-url-checkpointing True
Checkpointing
The pipeline implements checkpointing at two levels:
- Export Status URL Checkpointing - Tracks the status URL of active exports
- Resource URL Checkpointing - (Optional) Tracks individual resource URLs to prevent re-downloading
When enabled, resource URL checkpointing maintains a database of processed URLs to allow for resumable downloads and prevent duplicate processing.
11. Known Limitations
- Only one URL may be handled at a time - if an export fails and the URL isn't cancelled or processed, it will get stuck in it.
- Export cancellation may not be effective for all providers
- Some EHR providers may take extended periods (days to weeks) to process exports
- Rate limiting varies by provider