Skip to main content
Back to Blog
Tutorials
3 min read
December 29, 2024

How to Implement Data Pipelines and ETL in Next.js

Build data pipelines in Next.js with ETL patterns for extracting data from APIs, transforming it, loading into databases, and scheduling with cron.

Ryel Banfield

Founder & Lead Developer

ETL (Extract, Transform, Load) pipelines process data from external sources into your application. Here is how to build them.

Pipeline Framework

// lib/pipeline.ts
export type StepStatus = "pending" | "running" | "completed" | "failed" | "skipped";

export interface PipelineStep<TInput = unknown, TOutput = unknown> {
  name: string;
  execute: (input: TInput, context: PipelineContext) => Promise<TOutput>;
  retries?: number;
  retryDelay?: number;
  skipIf?: (input: TInput, context: PipelineContext) => boolean;
}

export interface PipelineContext {
  pipelineId: string;
  startedAt: Date;
  metadata: Record<string, unknown>;
  log: (message: string) => void;
}

export interface StepResult {
  step: string;
  status: StepStatus;
  duration: number;
  error?: string;
  recordsProcessed?: number;
}

export class Pipeline {
  private steps: PipelineStep[] = [];
  private name: string;

  constructor(name: string) {
    this.name = name;
  }

  addStep<TInput, TOutput>(step: PipelineStep<TInput, TOutput>) {
    this.steps.push(step as PipelineStep);
    return this;
  }

  async execute(initialInput: unknown = null): Promise<{
    success: boolean;
    results: StepResult[];
    duration: number;
  }> {
    const pipelineStart = Date.now();
    const results: StepResult[] = [];
    const logs: string[] = [];

    const context: PipelineContext = {
      pipelineId: `${this.name}-${Date.now()}`,
      startedAt: new Date(),
      metadata: {},
      log: (msg) => {
        logs.push(`[${new Date().toISOString()}] ${msg}`);
        console.log(`[Pipeline:${this.name}] ${msg}`);
      },
    };

    let currentInput = initialInput;

    for (const step of this.steps) {
      const stepStart = Date.now();

      if (step.skipIf?.(currentInput, context)) {
        results.push({
          step: step.name,
          status: "skipped",
          duration: 0,
        });
        continue;
      }

      context.log(`Starting step: ${step.name}`);
      const maxRetries = step.retries ?? 0;

      for (let attempt = 0; attempt <= maxRetries; attempt++) {
        try {
          currentInput = await step.execute(currentInput, context);
          results.push({
            step: step.name,
            status: "completed",
            duration: Date.now() - stepStart,
          });
          context.log(`Completed step: ${step.name} in ${Date.now() - stepStart}ms`);
          break;
        } catch (error) {
          if (attempt === maxRetries) {
            const errorMsg = error instanceof Error ? error.message : "Unknown error";
            context.log(`Failed step: ${step.name} — ${errorMsg}`);
            results.push({
              step: step.name,
              status: "failed",
              duration: Date.now() - stepStart,
              error: errorMsg,
            });
            return {
              success: false,
              results,
              duration: Date.now() - pipelineStart,
            };
          }
          context.log(`Retrying step: ${step.name} (attempt ${attempt + 2}/${maxRetries + 1})`);
          await new Promise((r) => setTimeout(r, step.retryDelay ?? 1000));
        }
      }
    }

    return {
      success: true,
      results,
      duration: Date.now() - pipelineStart,
    };
  }
}

Example: Sync Products from External API

// pipelines/sync-products.ts
import { Pipeline, type PipelineStep } from "@/lib/pipeline";

interface RawProduct {
  id: string;
  name: string;
  price_cents: number;
  category_id: string;
  description: string;
  in_stock: boolean;
  updated_at: string;
}

interface TransformedProduct {
  externalId: string;
  name: string;
  price: number;
  category: string;
  description: string;
  available: boolean;
  lastSynced: Date;
}

const extractStep: PipelineStep<null, RawProduct[]> = {
  name: "Extract products from API",
  retries: 3,
  retryDelay: 2000,
  execute: async (_input, context) => {
    const products: RawProduct[] = [];
    let page = 1;
    let hasMore = true;

    while (hasMore) {
      const res = await fetch(
        `${process.env.PRODUCTS_API_URL}/products?page=${page}&limit=100`,
        { headers: { Authorization: `Bearer ${process.env.PRODUCTS_API_KEY}` } }
      );

      if (!res.ok) throw new Error(`API returned ${res.status}`);
      const data = await res.json();

      products.push(...data.products);
      hasMore = data.has_more;
      page++;
    }

    context.log(`Extracted ${products.length} products`);
    context.metadata.totalExtracted = products.length;
    return products;
  },
};

const transformStep: PipelineStep<RawProduct[], TransformedProduct[]> = {
  name: "Transform product data",
  execute: async (products, context) => {
    const transformed = products
      .filter((p) => p.name && p.price_cents > 0)
      .map((p) => ({
        externalId: p.id,
        name: p.name.trim(),
        price: p.price_cents / 100,
        category: p.category_id,
        description: p.description?.slice(0, 500) ?? "",
        available: p.in_stock,
        lastSynced: new Date(),
      }));

    const removed = products.length - transformed.length;
    if (removed > 0) {
      context.log(`Filtered out ${removed} invalid products`);
    }

    return transformed;
  },
};

const loadStep: PipelineStep<TransformedProduct[], { inserted: number; updated: number }> = {
  name: "Load into database",
  retries: 2,
  execute: async (products, context) => {
    let inserted = 0;
    let updated = 0;

    // Process in batches of 50
    for (let i = 0; i < products.length; i += 50) {
      const batch = products.slice(i, i + 50);

      for (const product of batch) {
        // Upsert pattern:
        // const existing = await db.select().from(productsTable).where(eq(productsTable.externalId, product.externalId));
        // if (existing.length > 0) {
        //   await db.update(productsTable).set(product).where(eq(productsTable.externalId, product.externalId));
        //   updated++;
        // } else {
        //   await db.insert(productsTable).values(product);
        //   inserted++;
        // }
        inserted++; // Placeholder
      }

      context.log(`Processed batch ${Math.ceil(i / 50) + 1}`);
    }

    return { inserted, updated };
  },
};

export function createProductSyncPipeline() {
  return new Pipeline("product-sync")
    .addStep(extractStep)
    .addStep(transformStep)
    .addStep(loadStep);
}

Cron Job API Route

// app/api/cron/sync-products/route.ts
import { NextRequest, NextResponse } from "next/server";
import { createProductSyncPipeline } from "@/pipelines/sync-products";

export async function POST(request: NextRequest) {
  // Verify cron secret (Vercel Cron or external scheduler)
  const authHeader = request.headers.get("authorization");
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
  }

  const pipeline = createProductSyncPipeline();
  const result = await pipeline.execute();

  if (!result.success) {
    console.error("Pipeline failed:", result.results);
    return NextResponse.json(
      { success: false, results: result.results, duration: result.duration },
      { status: 500 }
    );
  }

  return NextResponse.json({
    success: true,
    results: result.results,
    duration: result.duration,
  });
}

Vercel Cron Config

// vercel.json
{
  "crons": [
    {
      "path": "/api/cron/sync-products",
      "schedule": "0 */6 * * *"
    }
  ]
}

Need Data Integration Solutions?

We build ETL pipelines, API integrations, and data synchronization systems. Contact us to discuss your data needs.

ETLdata pipelinesbatch processingcronNext.jstutorial

Ready to Start Your Project?

RCB Software builds world-class websites and applications for businesses worldwide.

Get in Touch

Related Articles