Transactions ensure data consistency. Here are production patterns with Drizzle ORM.
Basic Transaction
import { db } from "@/lib/db";
import { accounts, transfers } from "@/lib/db/schema";
import { eq, sql } from "drizzle-orm";
async function transferFunds(
fromId: string,
toId: string,
amount: number,
) {
return db.transaction(async (tx) => {
// Debit source account
const [from] = await tx
.update(accounts)
.set({ balance: sql`${accounts.balance} - ${amount}` })
.where(eq(accounts.id, fromId))
.returning();
if (from.balance < 0) {
tx.rollback();
throw new Error("Insufficient funds");
}
// Credit destination account
await tx
.update(accounts)
.set({ balance: sql`${accounts.balance} + ${amount}` })
.where(eq(accounts.id, toId));
// Record transfer
await tx.insert(transfers).values({
fromAccountId: fromId,
toAccountId: toId,
amount,
createdAt: new Date(),
});
return { success: true, newBalance: from.balance };
});
}
Transaction With Retry Logic
interface RetryOptions {
maxRetries?: number;
baseDelay?: number;
retryOn?: (error: unknown) => boolean;
}
async function withRetry<T>(
fn: () => Promise<T>,
options: RetryOptions = {},
): Promise<T> {
const {
maxRetries = 3,
baseDelay = 100,
retryOn = isRetryableError,
} = options;
let lastError: unknown;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (attempt === maxRetries || !retryOn(error)) {
throw error;
}
// Exponential backoff with jitter
const delay = baseDelay * 2 ** attempt + Math.random() * baseDelay;
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
throw lastError;
}
function isRetryableError(error: unknown): boolean {
if (error instanceof Error) {
// PostgreSQL serialization failure or deadlock
const message = error.message.toLowerCase();
return (
message.includes("serialization failure") ||
message.includes("deadlock detected") ||
message.includes("could not serialize access")
);
}
return false;
}
// Usage
async function safeTransfer(fromId: string, toId: string, amount: number) {
return withRetry(
() => transferFunds(fromId, toId, amount),
{ maxRetries: 3 },
);
}
Optimistic Locking
import { db } from "@/lib/db";
import { products } from "@/lib/db/schema";
import { eq, and } from "drizzle-orm";
async function updateProduct(
id: string,
data: { name: string; price: number },
expectedVersion: number,
) {
return db.transaction(async (tx) => {
const [updated] = await tx
.update(products)
.set({
...data,
version: expectedVersion + 1,
updatedAt: new Date(),
})
.where(
and(
eq(products.id, id),
eq(products.version, expectedVersion), // Only update if version matches
),
)
.returning();
if (!updated) {
throw new Error(
"Conflict: The record was modified by another user. Please refresh and try again.",
);
}
return updated;
});
}
Unit of Work Pattern
type UnitOfWorkFn<T> = (uow: UnitOfWork) => Promise<T>;
class UnitOfWork {
constructor(private tx: typeof db) {}
get accounts() {
return {
findById: (id: string) =>
this.tx.query.accounts.findFirst({
where: eq(accounts.id, id),
}),
update: (id: string, data: Partial<typeof accounts.$inferInsert>) =>
this.tx.update(accounts).set(data).where(eq(accounts.id, id)),
create: (data: typeof accounts.$inferInsert) =>
this.tx.insert(accounts).values(data).returning(),
};
}
get transfers() {
return {
create: (data: typeof transfers.$inferInsert) =>
this.tx.insert(transfers).values(data).returning(),
findByAccount: (accountId: string) =>
this.tx.query.transfers.findMany({
where: eq(transfers.fromAccountId, accountId),
}),
};
}
}
async function executeUnitOfWork<T>(fn: UnitOfWorkFn<T>): Promise<T> {
return db.transaction(async (tx) => {
const uow = new UnitOfWork(tx as typeof db);
return fn(uow);
});
}
// Usage
async function createAccountWithInitialDeposit(
name: string,
initialBalance: number,
) {
return executeUnitOfWork(async (uow) => {
const [account] = await uow.accounts.create({
name,
balance: initialBalance,
version: 1,
});
await uow.transfers.create({
fromAccountId: "system",
toAccountId: account.id,
amount: initialBalance,
createdAt: new Date(),
});
return account;
});
}
Saga Pattern for Distributed Operations
interface SagaStep<T> {
name: string;
execute: (context: T) => Promise<T>;
compensate: (context: T) => Promise<void>;
}
async function executeSaga<T>(
initialContext: T,
steps: SagaStep<T>[],
): Promise<T> {
const completedSteps: SagaStep<T>[] = [];
let context = initialContext;
for (const step of steps) {
try {
context = await step.execute(context);
completedSteps.push(step);
} catch (error) {
// Compensate in reverse order
for (const completed of completedSteps.reverse()) {
try {
await completed.compensate(context);
} catch (compensateError) {
console.error(
`Compensation failed for ${completed.name}:`,
compensateError,
);
// Log for manual intervention
}
}
throw error;
}
}
return context;
}
// Usage: order placement saga
interface OrderContext {
orderId?: string;
paymentId?: string;
items: { productId: string; quantity: number }[];
}
const placeOrderSaga: SagaStep<OrderContext>[] = [
{
name: "reserve-inventory",
execute: async (ctx) => {
// Reserve items in inventory
await reserveInventory(ctx.items);
return ctx;
},
compensate: async (ctx) => {
await releaseInventory(ctx.items);
},
},
{
name: "process-payment",
execute: async (ctx) => {
const paymentId = await chargePayment(ctx.items);
return { ...ctx, paymentId };
},
compensate: async (ctx) => {
if (ctx.paymentId) await refundPayment(ctx.paymentId);
},
},
{
name: "create-order",
execute: async (ctx) => {
const orderId = await createOrder(ctx.items, ctx.paymentId!);
return { ...ctx, orderId };
},
compensate: async (ctx) => {
if (ctx.orderId) await cancelOrder(ctx.orderId);
},
},
];
Need Reliable Data Systems?
We build transaction-safe applications for critical business operations. Contact us to discuss your project.