Event-Driven Architecture with TypeScript
Build scalable, decoupled systems with event-driven architecture. Production-ready event bus, domain events, outbox pattern, and dead letter queues.
Event-Driven Architecture with TypeScript
Event-driven architecture (EDA) enables loosely coupled, scalable systems where components communicate through events rather than direct calls. This is essential for microservices, real-time systems, and complex workflows.
Why Event-Driven Architecture?
Benefits:
- ✅ Loose coupling between services
- ✅ Scalability (horizontal scaling of event consumers)
- ✅ Resilience (services can fail independently)
- ✅ Audit trail (events are recorded)
- ✅ Temporal decoupling (async processing)
Use cases:
- User signup workflows
- Order processing
- Notification systems
- Audit logging
- Data synchronization
- Webhook dispatching
Core Implementation
1. Event Types and Base Classes
// lib/events/base.ts
/**
* Base event interface that all domain events must implement
*/
export interface DomainEvent {
readonly eventId: string;
readonly eventType: string;
readonly occurredAt: Date;
readonly aggregateId: string;
readonly aggregateType: string;
readonly version: number;
readonly metadata?: Record<string, any>;
}
/**
* Event handler interface
*/
export interface EventHandler<T extends DomainEvent = DomainEvent> {
eventType: string;
handle(event: T): Promise<void>;
}
/**
* Event bus interface
*/
export interface EventBus {
publish(event: DomainEvent): Promise<void>;
publishBatch(events: DomainEvent[]): Promise<void>;
subscribe<T extends DomainEvent>(
eventType: string,
handler: EventHandler<T>
): void;
}
/**
* Base abstract class for domain events
*/
export abstract class BaseDomainEvent implements DomainEvent {
readonly eventId: string;
readonly occurredAt: Date;
readonly version: number = 1;
constructor(
public readonly eventType: string,
public readonly aggregateId: string,
public readonly aggregateType: string,
public readonly metadata?: Record<string, any>
) {
this.eventId = crypto.randomUUID();
this.occurredAt = new Date();
}
}
2. Domain Event Examples
// lib/events/user-events.ts
import { BaseDomainEvent } from './base';
/**
* User registered event
*/
export class UserRegisteredEvent extends BaseDomainEvent {
constructor(
public readonly userId: string,
public readonly email: string,
public readonly name: string,
public readonly tier: 'free' | 'pro' | 'team',
metadata?: Record<string, any>
) {
super('user.registered', userId, 'User', metadata);
}
}
/**
* User upgraded event
*/
export class UserUpgradedEvent extends BaseDomainEvent {
constructor(
public readonly userId: string,
public readonly fromTier: string,
public readonly toTier: string,
public readonly amount: number,
metadata?: Record<string, any>
) {
super('user.upgraded', userId, 'User', metadata);
}
}
/**
* User deleted event
*/
export class UserDeletedEvent extends BaseDomainEvent {
constructor(
public readonly userId: string,
public readonly email: string,
public readonly reason?: string,
metadata?: Record<string, any>
) {
super('user.deleted', userId, 'User', metadata);
}
}
3. In-Memory Event Bus (for Development)
// lib/events/in-memory-event-bus.ts
import { DomainEvent, EventBus, EventHandler } from './base';
/**
* Simple in-memory event bus for development and testing
* For production, use Redis, RabbitMQ, or AWS SNS/SQS
*/
export class InMemoryEventBus implements EventBus {
private handlers = new Map<string, EventHandler[]>();
private eventLog: DomainEvent[] = [];
/**
* Subscribe to an event type
*/
subscribe<T extends DomainEvent>(
eventType: string,
handler: EventHandler<T>
): void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType)!.push(handler as EventHandler);
console.log(`[EventBus] Handler subscribed to ${eventType}`);
}
/**
* Publish a single event
*/
async publish(event: DomainEvent): Promise<void> {
console.log(`[EventBus] Publishing event: ${event.eventType}`, {
eventId: event.eventId,
aggregateId: event.aggregateId,
});
// Store in event log
this.eventLog.push(event);
// Get handlers for this event type
const handlers = this.handlers.get(event.eventType) || [];
// Execute handlers in parallel
const promises = handlers.map(async (handler) => {
try {
await handler.handle(event);
console.log(`[EventBus] Handler executed: ${handler.constructor.name}`);
} catch (error) {
console.error(
`[EventBus] Handler failed: ${handler.constructor.name}`,
error
);
// In production, send to dead letter queue
await this.handleFailedEvent(event, handler, error);
}
});
await Promise.allSettled(promises);
}
/**
* Publish multiple events in batch
*/
async publishBatch(events: DomainEvent[]): Promise<void> {
console.log(`[EventBus] Publishing batch of ${events.length} events`);
await Promise.all(events.map((event) => this.publish(event)));
}
/**
* Handle failed event processing
* In production, send to dead letter queue
*/
private async handleFailedEvent(
event: DomainEvent,
handler: EventHandler,
error: unknown
): Promise<void> {
// Store in dead letter queue for retry/investigation
console.error('[EventBus] Event sent to DLQ', {
eventId: event.eventId,
eventType: event.eventType,
handler: handler.constructor.name,
error: error instanceof Error ? error.message : 'Unknown error',
});
}
/**
* Get event log (for debugging/testing)
*/
getEventLog(): DomainEvent[] {
return [...this.eventLog];
}
/**
* Clear event log (for testing)
*/
clearEventLog(): void {
this.eventLog = [];
}
}
4. Redis-Based Event Bus (Production)
// lib/events/redis-event-bus.ts
import { createClient, RedisClientType } from 'redis';
import { DomainEvent, EventBus, EventHandler } from './base';
/**
* Redis-based event bus for production use
* Uses Redis Streams for reliable event delivery
*/
export class RedisEventBus implements EventBus {
private redis: RedisClientType;
private handlers = new Map<string, EventHandler[]>();
private consumerGroup = 'event-handlers';
private consumerName = `consumer-${process.pid}`;
constructor(private redisUrl: string) {
this.redis = createClient({ url: redisUrl });
}
/**
* Initialize connection and create consumer group
*/
async initialize(): Promise<void> {
await this.redis.connect();
console.log('[RedisEventBus] Connected to Redis');
// Create consumer groups for each event type
for (const eventType of this.handlers.keys()) {
try {
await this.redis.xGroupCreate(
`events:${eventType}`,
this.consumerGroup,
'0',
{ MKSTREAM: true }
);
} catch (error) {
// Group already exists, ignore
}
}
}
/**
* Subscribe to an event type
*/
subscribe<T extends DomainEvent>(
eventType: string,
handler: EventHandler<T>
): void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
// Start consuming events for this type
this.startConsumer(eventType);
}
this.handlers.get(eventType)!.push(handler as EventHandler);
console.log(`[RedisEventBus] Handler subscribed to ${eventType}`);
}
/**
* Publish event to Redis Stream
*/
async publish(event: DomainEvent): Promise<void> {
const streamKey = `events:${event.eventType}`;
const eventData = {
eventId: event.eventId,
eventType: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
occurredAt: event.occurredAt.toISOString(),
version: event.version.toString(),
payload: JSON.stringify(event),
};
await this.redis.xAdd(streamKey, '*', eventData);
console.log(`[RedisEventBus] Published event: ${event.eventType}`, {
eventId: event.eventId,
streamKey,
});
}
/**
* Publish batch of events
*/
async publishBatch(events: DomainEvent[]): Promise<void> {
const pipeline = this.redis.multi();
for (const event of events) {
const streamKey = `events:${event.eventType}`;
const eventData = {
eventId: event.eventId,
eventType: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
occurredAt: event.occurredAt.toISOString(),
version: event.version.toString(),
payload: JSON.stringify(event),
};
pipeline.xAdd(streamKey, '*', eventData);
}
await pipeline.exec();
console.log(`[RedisEventBus] Published batch of ${events.length} events`);
}
/**
* Start consuming events from stream
*/
private async startConsumer(eventType: string): Promise<void> {
const streamKey = `events:${eventType}`;
// Consume events in a loop
while (true) {
try {
const results = await this.redis.xReadGroup(
this.consumerGroup,
this.consumerName,
[{ key: streamKey, id: '>' }],
{ COUNT: 10, BLOCK: 5000 }
);
if (!results || results.length === 0) continue;
for (const stream of results) {
for (const message of stream.messages) {
await this.processMessage(eventType, message.id, message.message);
}
}
} catch (error) {
console.error(`[RedisEventBus] Consumer error:`, error);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
}
/**
* Process a single message from stream
*/
private async processMessage(
eventType: string,
messageId: string,
data: any
): Promise<void> {
const handlers = this.handlers.get(eventType) || [];
const event = JSON.parse(data.payload) as DomainEvent;
try {
// Execute all handlers
await Promise.all(
handlers.map((handler) => handler.handle(event))
);
// Acknowledge message
await this.redis.xAck(
`events:${eventType}`,
this.consumerGroup,
messageId
);
} catch (error) {
console.error('[RedisEventBus] Handler failed', {
eventId: event.eventId,
error,
});
// Send to dead letter queue
await this.redis.xAdd(
`events:dlq:${eventType}`,
'*',
{
...data,
error: error instanceof Error ? error.message : 'Unknown error',
failedAt: new Date().toISOString(),
}
);
}
}
/**
* Graceful shutdown
*/
async close(): Promise<void> {
await this.redis.quit();
console.log('[RedisEventBus] Connection closed');
}
}
5. Event Handlers
// lib/events/handlers/send-welcome-email-handler.ts
import { EventHandler } from '../base';
import { UserRegisteredEvent } from '../user-events';
export class SendWelcomeEmailHandler implements EventHandler<UserRegisteredEvent> {
eventType = 'user.registered';
async handle(event: UserRegisteredEvent): Promise<void> {
console.log(`[SendWelcomeEmail] Sending welcome email to ${event.email}`);
// Send email via email service
await this.sendEmail(event.email, event.name, event.tier);
console.log(`[SendWelcomeEmail] Email sent successfully`);
}
private async sendEmail(
email: string,
name: string,
tier: string
): Promise<void> {
// Implementation: Call email service (SendGrid, Mailgun, etc.)
// For example:
// await emailService.send({
// to: email,
// template: 'welcome',
// data: { name, tier }
// });
// Simulate email sending
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
// lib/events/handlers/track-signup-handler.ts
import { EventHandler } from '../base';
import { UserRegisteredEvent } from '../user-events';
import { trackSignup } from '@/lib/analytics';
export class TrackSignupHandler implements EventHandler<UserRegisteredEvent> {
eventType = 'user.registered';
async handle(event: UserRegisteredEvent): Promise<void> {
console.log(`[TrackSignup] Tracking signup for user ${event.userId}`);
// Track in Google Analytics
trackSignup(event.tier, {
method: event.metadata?.authMethod || 'email',
source: event.metadata?.source || 'signup',
campaign: event.metadata?.campaign,
});
console.log(`[TrackSignup] Signup tracked successfully`);
}
}
// lib/events/handlers/create-stripe-customer-handler.ts
import { EventHandler } from '../base';
import { UserRegisteredEvent } from '../user-events';
export class CreateStripeCustomerHandler implements EventHandler<UserRegisteredEvent> {
eventType = 'user.registered';
async handle(event: UserRegisteredEvent): Promise<void> {
console.log(`[CreateStripeCustomer] Creating Stripe customer for ${event.email}`);
// Create Stripe customer
const customerId = await this.createStripeCustomer(
event.email,
event.name,
event.userId
);
// Store customerId in database
await this.storeCustomerId(event.userId, customerId);
console.log(`[CreateStripeCustomer] Stripe customer created: ${customerId}`);
}
private async createStripeCustomer(
email: string,
name: string,
userId: string
): Promise<string> {
// Implementation: Call Stripe API
// const customer = await stripe.customers.create({
// email,
// name,
// metadata: { userId }
// });
// return customer.id;
// Simulate API call
await new Promise((resolve) => setTimeout(resolve, 200));
return `cus_${Math.random().toString(36).substring(7)}`;
}
private async storeCustomerId(userId: string, customerId: string): Promise<void> {
// Update user record with Stripe customer ID
// await db.query('UPDATE users SET stripe_customer_id = $1 WHERE id = $2', [customerId, userId]);
}
}
6. Outbox Pattern for Transactional Events
// lib/events/outbox.ts
import { DomainEvent } from './base';
import { DatabasePool } from '@/lib/db-pool';
/**
* Outbox pattern for ensuring events are published after transaction commits
* Solves the dual-write problem (database + event bus)
*/
export class EventOutbox {
constructor(private db: DatabasePool) {}
/**
* Store event in outbox table (within transaction)
*/
async storeEvent(event: DomainEvent): Promise<void> {
await this.db.query(
`INSERT INTO event_outbox (
event_id,
event_type,
aggregate_id,
aggregate_type,
payload,
occurred_at,
status
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.eventId,
event.eventType,
event.aggregateId,
event.aggregateType,
JSON.stringify(event),
event.occurredAt,
'pending',
]
);
}
/**
* Poll outbox for pending events and publish them
* Run this in a background job every few seconds
*/
async publishPendingEvents(eventBus: any): Promise<void> {
// Lock and fetch pending events
const result = await this.db.query<{
event_id: string;
payload: string;
}>(
`UPDATE event_outbox
SET status = 'processing', processed_at = NOW()
WHERE event_id IN (
SELECT event_id FROM event_outbox
WHERE status = 'pending'
ORDER BY occurred_at
LIMIT 100
FOR UPDATE SKIP LOCKED
)
RETURNING event_id, payload`
);
const events = result.rows;
for (const row of events) {
const event = JSON.parse(row.payload) as DomainEvent;
try {
// Publish to event bus
await eventBus.publish(event);
// Mark as published
await this.db.query(
`UPDATE event_outbox SET status = 'published' WHERE event_id = $1`,
[row.event_id]
);
} catch (error) {
// Mark as failed
await this.db.query(
`UPDATE event_outbox
SET status = 'failed', error = $2
WHERE event_id = $1`,
[row.event_id, error instanceof Error ? error.message : 'Unknown error']
);
}
}
console.log(`[EventOutbox] Published ${events.length} events`);
}
}
/**
* Database schema for outbox table:
*
* CREATE TABLE event_outbox (
* event_id UUID PRIMARY KEY,
* event_type VARCHAR(255) NOT NULL,
* aggregate_id VARCHAR(255) NOT NULL,
* aggregate_type VARCHAR(100) NOT NULL,
* payload JSONB NOT NULL,
* occurred_at TIMESTAMPTZ NOT NULL,
* status VARCHAR(20) NOT NULL DEFAULT 'pending',
* processed_at TIMESTAMPTZ,
* error TEXT,
* created_at TIMESTAMPTZ DEFAULT NOW()
* );
*
* CREATE INDEX idx_outbox_status ON event_outbox(status, occurred_at);
*/
7. Usage Example
// app/api/users/register/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getDbPool } from '@/lib/db-pool';
import { getEventBus } from '@/lib/events';
import { UserRegisteredEvent } from '@/lib/events/user-events';
import { EventOutbox } from '@/lib/events/outbox';
const db = getDbPool();
const eventBus = getEventBus();
const outbox = new EventOutbox(db);
export async function POST(request: NextRequest) {
try {
const { email, name, tier } = await request.json();
// Use transaction + outbox pattern
await db.transaction(async (client) => {
// 1. Create user in database
const result = await client.query(
`INSERT INTO users (email, name, tier, created_at)
VALUES ($1, $2, $3, NOW())
RETURNING id`,
[email, name, tier]
);
const userId = result.rows[0].id;
// 2. Create event
const event = new UserRegisteredEvent(
userId,
email,
name,
tier,
{
source: 'api',
userAgent: request.headers.get('user-agent') || undefined,
}
);
// 3. Store event in outbox (within same transaction)
await outbox.storeEvent(event);
// Transaction commits here
});
// Background job will pick up and publish the event
// This ensures event is only published if transaction succeeds
return NextResponse.json({ success: true }, { status: 201 });
} catch (error) {
console.error('[API] Registration failed:', error);
return NextResponse.json(
{ error: 'Registration failed' },
{ status: 500 }
);
}
}
8. Event Bus Initialization
// lib/events/index.ts
import { EventBus } from './base';
import { InMemoryEventBus } from './in-memory-event-bus';
import { RedisEventBus } from './redis-event-bus';
import { SendWelcomeEmailHandler } from './handlers/send-welcome-email-handler';
import { TrackSignupHandler } from './handlers/track-signup-handler';
import { CreateStripeCustomerHandler } from './handlers/create-stripe-customer-handler';
let eventBus: EventBus | null = null;
/**
* Get or create event bus singleton
*/
export function getEventBus(): EventBus {
if (!eventBus) {
// Use Redis in production, in-memory for development
if (process.env.REDIS_URL) {
const redisEventBus = new RedisEventBus(process.env.REDIS_URL);
redisEventBus.initialize();
eventBus = redisEventBus;
} else {
eventBus = new InMemoryEventBus();
}
// Register handlers
registerHandlers(eventBus);
}
return eventBus;
}
/**
* Register all event handlers
*/
function registerHandlers(bus: EventBus): void {
// User registration handlers
bus.subscribe('user.registered', new SendWelcomeEmailHandler());
bus.subscribe('user.registered', new TrackSignupHandler());
bus.subscribe('user.registered', new CreateStripeCustomerHandler());
console.log('[EventBus] All handlers registered');
}
/**
* Close event bus (on shutdown)
*/
export async function closeEventBus(): Promise<void> {
if (eventBus && 'close' in eventBus) {
await (eventBus as any).close();
eventBus = null;
}
}
Testing
// __tests__/events/user-events.test.ts
import { InMemoryEventBus } from '@/lib/events/in-memory-event-bus';
import { UserRegisteredEvent } from '@/lib/events/user-events';
import { EventHandler } from '@/lib/events/base';
describe('Event System', () => {
let eventBus: InMemoryEventBus;
beforeEach(() => {
eventBus = new InMemoryEventBus();
});
it('should publish and handle events', async () => {
const handlerSpy = jest.fn();
// Mock handler
const mockHandler: EventHandler = {
eventType: 'user.registered',
handle: handlerSpy,
};
// Subscribe
eventBus.subscribe('user.registered', mockHandler);
// Publish event
const event = new UserRegisteredEvent('user-123', 'test@example.com', 'Test', 'free');
await eventBus.publish(event);
// Assert handler was called
expect(handlerSpy).toHaveBeenCalledWith(event);
});
it('should store events in event log', async () => {
const event = new UserRegisteredEvent('user-123', 'test@example.com', 'Test', 'free');
await eventBus.publish(event);
const log = eventBus.getEventLog();
expect(log).toHaveLength(1);
expect(log[0].eventType).toBe('user.registered');
});
});
Best Practices
✅ DO
- Use idempotent handlers - Events may be delivered multiple times
- Include event version - For schema evolution
- Store events immutably - Never modify published events
- Use outbox pattern - For transactional consistency
- Add metadata - For debugging and tracing
- Handle failures gracefully - Dead letter queues for failed events
- Monitor event processing - Track latency and failures
❌ DON'T
- Don't rely on event order - Events may arrive out of order
- Don't put large payloads in events - Store in DB, pass ID in event
- Don't couple handlers - Each handler should be independent
- Don't make handlers slow - Offload heavy work to background jobs
- Don't skip the outbox - Always use outbox for transactional events
Summary
✅ Event-driven architecture enables scalable, decoupled systems ✅ Domain events represent state changes in your system ✅ Event bus coordinates event publishing and consumption ✅ Outbox pattern ensures transactional consistency ✅ Multiple handlers can react to the same event independently ✅ Redis Streams provide reliable, scalable event delivery
Production checklist:
- [ ] Event bus configured (Redis/RabbitMQ/SNS)
- [ ] Outbox table created
- [ ] Background job polling outbox
- [ ] Event handlers registered
- [ ] Dead letter queue configured
- [ ] Monitoring and alerting set up
- [ ] Retry logic implemented
Further reading: