Architecture Design Document: Multi-tenant Data Sync Pipeline
Executive Summary
This document outlines the architecture for an automated data synchronization pipeline that replicates data from multiple tenant PostgreSQL databases to BigQuery. The system utilizes Apache Airflow for orchestration, dlt (data load tool) for ETL operations, and Cloud Run for scalable execution. The pipeline includes comprehensive validation to ensure data integrity and consistency between source and target systems.
Motivation and Goals
Why This Matters
- Enables automated matching capabilities across our platform
- Reduces manual data processing overhead
- Establishes foundation for tenant-specific analytics
- Improves data accessibility while maintaining security boundaries
- Provides reliable testing environment for safe development
Goals
- Automated daily full data sync for each tenant
- Robust validation of data integrity
- Minimal operational overhead
- Cost-effective execution using serverless infrastructure
- Comprehensive test coverage for multi-tenant scenarios
Non-Goals
- Real-time data replication
- Partial/incremental updates
- Historical data versioning
Proposal
High-Level Architecture
graph LR
subgraph Production
A[Tenant-Specific DAGs]
A --> D[DLT Tasks]
D --> F[Validation Task]
end
Key Components
-
DAG Generator
-
Queries tenant registry for active accounts
- Generates tenant-specific DAGs dynamically
- Environment-aware configuration for test/prod databases
-
Testing: Validates DAG generation with mock tenants
-
ETL Task
-
Runs on Cloud Run for scalability
- Uses dlt for data extraction and loading
- Implements dlt's 'staging-optimized' replace strategy
-
Testing: Uses dedicated test database with sample data
-
Validation Task
-
Validates record counts
- Optional MD5 checksum validation
- Optional Schema validation
- Testing: Verifies data integrity across test scenarios
Development and Testing Infrastructure
- Separate Postgres container to hold the postgres application database for testing
- Factory patterns for generating test data
- Environment-aware configuration switching
Implementation Details
```python:dags/tenant_sync_dag.py def get_tenant_connections() -> dict: """Returns a mapping of tenant_ids to their database connections.""" env = Variable.get('ENVIRONMENT', 'development')
if env == 'development':
return {
'test_tenant': 'postgresql://test_user:test_pass@test-postgres:5432/test_tenant_db'
}
# In production, query tenant registry for active accounts
tenant_registry = PostgresHook(postgres_conn_id='tenant_registry')
tenant_connections = {}
with tenant_registry.get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT tenant_id, db_host, db_name FROM tenant_registry WHERE is_active = true")
for tenant_id, db_host, db_name in cur.fetchall():
tenant_connections[tenant_id] = (
f'postgresql://prod_user:prod_pass@{db_host}/{db_name}'
)
return tenant_connections
def create_tenant_dag(tenant_id: str, db_connection: str): """Creates environment-aware DAG instance.""" dag = DAG( f'sync_tenant_{tenant_id}', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['tenant_sync'] )
with dag:
sync_task = CloudRunOperator(
task_id='sync_data',
project_id='your-project',
location='us-central1',
service_name='etl-service',
env_vars={
'TENANT_ID': tenant_id,
'DB_CONNECTION': db_connection,
'REPLACE_STRATEGY': 'staging-optimized'
}
)
Create DAGs for each tenant
tenant_connections = get_tenant_connections() for tenant_id, db_connection in tenant_connections.items(): globals()[f'dag_{tenant_id}'] = create_tenant_dag(tenant_id, db_connection) ```
Pros
- Serverless execution minimizes infrastructure costs
- Full refresh ensures data consistency
- Comprehensive validation ensures data quality
- Automated tenant onboarding
- Safe testing environment for development
Cons
- dlt limitation with empty tables
- Full refresh may be resource intensive
- Potential for longer execution times with large datasets
- Additional infrastructure needed for test environment