DocsGitHub7

Creating Pipelines

Learn how to build end-to-end data pipelines that move data from source to destination.

What is a Pipeline?

A pipeline is a complete data flow solution that combines:

  • Source connector - Extracts data from the source system
  • Transformation logic - Processes and shapes the data
  • Destination connector - Loads data into the target system
  • Orchestration - Manages scheduling and execution

1. Clone the Repository

git clone https://github.com/514-labs/registry.git
cd registry

2. Generate Pipeline Scaffold

bash -i <(curl https://registry.514.ai/install.sh)   google-analytics-to-clickhouse v1 myusername typescript

3. Define Your Pipeline

Navigate to your pipeline directory:

cd pipeline-registry/google-analytics-to-clickhouse/v1/myusername/typescript/default

4. Configure Source and Destination

Edit the pipeline configuration in _meta/pipeline.json:

{
  "source": {
    "connector": {
      "name": "google-analytics",
      "version": "v4",
      "author": "514-labs"
    }
  },
  "destination": {
    "system": "clickhouse",
    "database": "analytics",
    "table": "ga_events"
  },
  "schedule": {
    "cron": "0 */6 * * *",
    "timezone": "UTC"
  }
}

5. Implement Transformations

Add your transformation logic in src/transform/:

export function transformGAEvent(event: GAEvent): ClickHouseEvent {
  return {
    timestamp: new Date(event.dateHour),
    userId: event.userId,
    sessionId: event.sessionId,
    eventName: event.eventName,
    eventParams: JSON.stringify(event.eventParams),
    // ... additional transformations
  };
}

6. Define Schema Mappings

Create schema definitions in schemas/ that describe:

  • Source data structure
  • Transformation rules
  • Destination table schema

7. Generate Lineage Diagram

pnpm run generate:lineage

8. Test Your Pipeline

pnpm test
pnpm run test:integration

Best Practices

  • Idempotency - Ensure transformations can be safely re-run
  • Error handling - Implement robust error recovery
  • Monitoring - Add logging and metrics
  • Schema evolution - Plan for schema changes
  • Performance - Optimize for your data volume

Next Steps