Creating Pipelines
Learn how to build end-to-end data pipelines that move data from source to destination.
What is a Pipeline?
A pipeline is a complete data flow solution that combines:
- Source connector - Extracts data from the source system
- Transformation logic - Processes and shapes the data
- Destination connector - Loads data into the target system
- Orchestration - Manages scheduling and execution
1. Clone the Repository
git clone https://github.com/514-labs/registry.git
cd registry
2. Generate Pipeline Scaffold
bash -i <(curl https://registry.514.ai/install.sh) google-analytics-to-clickhouse v1 myusername typescript
3. Define Your Pipeline
Navigate to your pipeline directory:
cd pipeline-registry/google-analytics-to-clickhouse/v1/myusername/typescript/default
4. Configure Source and Destination
Edit the pipeline configuration in _meta/pipeline.json
:
{
"source": {
"connector": {
"name": "google-analytics",
"version": "v4",
"author": "514-labs"
}
},
"destination": {
"system": "clickhouse",
"database": "analytics",
"table": "ga_events"
},
"schedule": {
"cron": "0 */6 * * *",
"timezone": "UTC"
}
}
5. Implement Transformations
Add your transformation logic in src/transform/
:
export function transformGAEvent(event: GAEvent): ClickHouseEvent {
return {
timestamp: new Date(event.dateHour),
userId: event.userId,
sessionId: event.sessionId,
eventName: event.eventName,
eventParams: JSON.stringify(event.eventParams),
// ... additional transformations
};
}
6. Define Schema Mappings
Create schema definitions in schemas/
that describe:
- Source data structure
- Transformation rules
- Destination table schema
7. Generate Lineage Diagram
pnpm run generate:lineage
8. Test Your Pipeline
pnpm test
pnpm run test:integration
Best Practices
- Idempotency - Ensure transformations can be safely re-run
- Error handling - Implement robust error recovery
- Monitoring - Add logging and metrics
- Schema evolution - Plan for schema changes
- Performance - Optimize for your data volume
Next Steps
- Explore the pipeline specifications
- Learn about lineage tracking
- Browse existing pipelines in the registry