Skip to main content
Featured

Event-Driven Architecture with TypeScript

November 23, 2025By Steve Winter29 min read
...
code

Build scalable, decoupled systems with event-driven architecture. Production-ready event bus, domain events, outbox pattern, and dead letter queues.

Language: TypeScript
Difficulty: advanced

Event-Driven Architecture with TypeScript

Event-driven architecture (EDA) enables loosely coupled, scalable systems where components communicate through events rather than direct calls. This is essential for microservices, real-time systems, and complex workflows.

Why Event-Driven Architecture?

Benefits:

  • ✅ Loose coupling between services
  • ✅ Scalability (horizontal scaling of event consumers)
  • ✅ Resilience (services can fail independently)
  • ✅ Audit trail (events are recorded)
  • ✅ Temporal decoupling (async processing)

Use cases:

  • User signup workflows
  • Order processing
  • Notification systems
  • Audit logging
  • Data synchronization
  • Webhook dispatching

Core Implementation

1. Event Types and Base Classes

// lib/events/base.ts

/**
 * Base event interface that all domain events must implement
 */
export interface DomainEvent {
  readonly eventId: string;
  readonly eventType: string;
  readonly occurredAt: Date;
  readonly aggregateId: string;
  readonly aggregateType: string;
  readonly version: number;
  readonly metadata?: Record<string, any>;
}

/**
 * Event handler interface
 */
export interface EventHandler<T extends DomainEvent = DomainEvent> {
  eventType: string;
  handle(event: T): Promise<void>;
}

/**
 * Event bus interface
 */
export interface EventBus {
  publish(event: DomainEvent): Promise<void>;
  publishBatch(events: DomainEvent[]): Promise<void>;
  subscribe<T extends DomainEvent>(
    eventType: string,
    handler: EventHandler<T>
  ): void;
}

/**
 * Base abstract class for domain events
 */
export abstract class BaseDomainEvent implements DomainEvent {
  readonly eventId: string;
  readonly occurredAt: Date;
  readonly version: number = 1;

  constructor(
    public readonly eventType: string,
    public readonly aggregateId: string,
    public readonly aggregateType: string,
    public readonly metadata?: Record<string, any>
  ) {
    this.eventId = crypto.randomUUID();
    this.occurredAt = new Date();
  }
}

2. Domain Event Examples

// lib/events/user-events.ts
import { BaseDomainEvent } from './base';

/**
 * User registered event
 */
export class UserRegisteredEvent extends BaseDomainEvent {
  constructor(
    public readonly userId: string,
    public readonly email: string,
    public readonly name: string,
    public readonly tier: 'free' | 'pro' | 'team',
    metadata?: Record<string, any>
  ) {
    super('user.registered', userId, 'User', metadata);
  }
}

/**
 * User upgraded event
 */
export class UserUpgradedEvent extends BaseDomainEvent {
  constructor(
    public readonly userId: string,
    public readonly fromTier: string,
    public readonly toTier: string,
    public readonly amount: number,
    metadata?: Record<string, any>
  ) {
    super('user.upgraded', userId, 'User', metadata);
  }
}

/**
 * User deleted event
 */
export class UserDeletedEvent extends BaseDomainEvent {
  constructor(
    public readonly userId: string,
    public readonly email: string,
    public readonly reason?: string,
    metadata?: Record<string, any>
  ) {
    super('user.deleted', userId, 'User', metadata);
  }
}

3. In-Memory Event Bus (for Development)

// lib/events/in-memory-event-bus.ts
import { DomainEvent, EventBus, EventHandler } from './base';

/**
 * Simple in-memory event bus for development and testing
 * For production, use Redis, RabbitMQ, or AWS SNS/SQS
 */
export class InMemoryEventBus implements EventBus {
  private handlers = new Map<string, EventHandler[]>();
  private eventLog: DomainEvent[] = [];

  /**
   * Subscribe to an event type
   */
  subscribe<T extends DomainEvent>(
    eventType: string,
    handler: EventHandler<T>
  ): void {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, []);
    }
    this.handlers.get(eventType)!.push(handler as EventHandler);
    console.log(`[EventBus] Handler subscribed to ${eventType}`);
  }

  /**
   * Publish a single event
   */
  async publish(event: DomainEvent): Promise<void> {
    console.log(`[EventBus] Publishing event: ${event.eventType}`, {
      eventId: event.eventId,
      aggregateId: event.aggregateId,
    });

    // Store in event log
    this.eventLog.push(event);

    // Get handlers for this event type
    const handlers = this.handlers.get(event.eventType) || [];

    // Execute handlers in parallel
    const promises = handlers.map(async (handler) => {
      try {
        await handler.handle(event);
        console.log(`[EventBus] Handler executed: ${handler.constructor.name}`);
      } catch (error) {
        console.error(
          `[EventBus] Handler failed: ${handler.constructor.name}`,
          error
        );
        // In production, send to dead letter queue
        await this.handleFailedEvent(event, handler, error);
      }
    });

    await Promise.allSettled(promises);
  }

  /**
   * Publish multiple events in batch
   */
  async publishBatch(events: DomainEvent[]): Promise<void> {
    console.log(`[EventBus] Publishing batch of ${events.length} events`);
    await Promise.all(events.map((event) => this.publish(event)));
  }

  /**
   * Handle failed event processing
   * In production, send to dead letter queue
   */
  private async handleFailedEvent(
    event: DomainEvent,
    handler: EventHandler,
    error: unknown
  ): Promise<void> {
    // Store in dead letter queue for retry/investigation
    console.error('[EventBus] Event sent to DLQ', {
      eventId: event.eventId,
      eventType: event.eventType,
      handler: handler.constructor.name,
      error: error instanceof Error ? error.message : 'Unknown error',
    });
  }

  /**
   * Get event log (for debugging/testing)
   */
  getEventLog(): DomainEvent[] {
    return [...this.eventLog];
  }

  /**
   * Clear event log (for testing)
   */
  clearEventLog(): void {
    this.eventLog = [];
  }
}

4. Redis-Based Event Bus (Production)

// lib/events/redis-event-bus.ts
import { createClient, RedisClientType } from 'redis';
import { DomainEvent, EventBus, EventHandler } from './base';

/**
 * Redis-based event bus for production use
 * Uses Redis Streams for reliable event delivery
 */
export class RedisEventBus implements EventBus {
  private redis: RedisClientType;
  private handlers = new Map<string, EventHandler[]>();
  private consumerGroup = 'event-handlers';
  private consumerName = `consumer-${process.pid}`;

  constructor(private redisUrl: string) {
    this.redis = createClient({ url: redisUrl });
  }

  /**
   * Initialize connection and create consumer group
   */
  async initialize(): Promise<void> {
    await this.redis.connect();
    console.log('[RedisEventBus] Connected to Redis');

    // Create consumer groups for each event type
    for (const eventType of this.handlers.keys()) {
      try {
        await this.redis.xGroupCreate(
          `events:${eventType}`,
          this.consumerGroup,
          '0',
          { MKSTREAM: true }
        );
      } catch (error) {
        // Group already exists, ignore
      }
    }
  }

  /**
   * Subscribe to an event type
   */
  subscribe<T extends DomainEvent>(
    eventType: string,
    handler: EventHandler<T>
  ): void {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, []);
      // Start consuming events for this type
      this.startConsumer(eventType);
    }
    this.handlers.get(eventType)!.push(handler as EventHandler);
    console.log(`[RedisEventBus] Handler subscribed to ${eventType}`);
  }

  /**
   * Publish event to Redis Stream
   */
  async publish(event: DomainEvent): Promise<void> {
    const streamKey = `events:${event.eventType}`;
    const eventData = {
      eventId: event.eventId,
      eventType: event.eventType,
      aggregateId: event.aggregateId,
      aggregateType: event.aggregateType,
      occurredAt: event.occurredAt.toISOString(),
      version: event.version.toString(),
      payload: JSON.stringify(event),
    };

    await this.redis.xAdd(streamKey, '*', eventData);
    console.log(`[RedisEventBus] Published event: ${event.eventType}`, {
      eventId: event.eventId,
      streamKey,
    });
  }

  /**
   * Publish batch of events
   */
  async publishBatch(events: DomainEvent[]): Promise<void> {
    const pipeline = this.redis.multi();

    for (const event of events) {
      const streamKey = `events:${event.eventType}`;
      const eventData = {
        eventId: event.eventId,
        eventType: event.eventType,
        aggregateId: event.aggregateId,
        aggregateType: event.aggregateType,
        occurredAt: event.occurredAt.toISOString(),
        version: event.version.toString(),
        payload: JSON.stringify(event),
      };
      pipeline.xAdd(streamKey, '*', eventData);
    }

    await pipeline.exec();
    console.log(`[RedisEventBus] Published batch of ${events.length} events`);
  }

  /**
   * Start consuming events from stream
   */
  private async startConsumer(eventType: string): Promise<void> {
    const streamKey = `events:${eventType}`;

    // Consume events in a loop
    while (true) {
      try {
        const results = await this.redis.xReadGroup(
          this.consumerGroup,
          this.consumerName,
          [{ key: streamKey, id: '>' }],
          { COUNT: 10, BLOCK: 5000 }
        );

        if (!results || results.length === 0) continue;

        for (const stream of results) {
          for (const message of stream.messages) {
            await this.processMessage(eventType, message.id, message.message);
          }
        }
      } catch (error) {
        console.error(`[RedisEventBus] Consumer error:`, error);
        await new Promise((resolve) => setTimeout(resolve, 1000));
      }
    }
  }

  /**
   * Process a single message from stream
   */
  private async processMessage(
    eventType: string,
    messageId: string,
    data: any
  ): Promise<void> {
    const handlers = this.handlers.get(eventType) || [];
    const event = JSON.parse(data.payload) as DomainEvent;

    try {
      // Execute all handlers
      await Promise.all(
        handlers.map((handler) => handler.handle(event))
      );

      // Acknowledge message
      await this.redis.xAck(
        `events:${eventType}`,
        this.consumerGroup,
        messageId
      );
    } catch (error) {
      console.error('[RedisEventBus] Handler failed', {
        eventId: event.eventId,
        error,
      });

      // Send to dead letter queue
      await this.redis.xAdd(
        `events:dlq:${eventType}`,
        '*',
        {
          ...data,
          error: error instanceof Error ? error.message : 'Unknown error',
          failedAt: new Date().toISOString(),
        }
      );
    }
  }

  /**
   * Graceful shutdown
   */
  async close(): Promise<void> {
    await this.redis.quit();
    console.log('[RedisEventBus] Connection closed');
  }
}

5. Event Handlers

// lib/events/handlers/send-welcome-email-handler.ts
import { EventHandler } from '../base';
import { UserRegisteredEvent } from '../user-events';

export class SendWelcomeEmailHandler implements EventHandler<UserRegisteredEvent> {
  eventType = 'user.registered';

  async handle(event: UserRegisteredEvent): Promise<void> {
    console.log(`[SendWelcomeEmail] Sending welcome email to ${event.email}`);

    // Send email via email service
    await this.sendEmail(event.email, event.name, event.tier);

    console.log(`[SendWelcomeEmail] Email sent successfully`);
  }

  private async sendEmail(
    email: string,
    name: string,
    tier: string
  ): Promise<void> {
    // Implementation: Call email service (SendGrid, Mailgun, etc.)
    // For example:
    // await emailService.send({
    //   to: email,
    //   template: 'welcome',
    //   data: { name, tier }
    // });

    // Simulate email sending
    await new Promise((resolve) => setTimeout(resolve, 100));
  }
}
// lib/events/handlers/track-signup-handler.ts
import { EventHandler } from '../base';
import { UserRegisteredEvent } from '../user-events';
import { trackSignup } from '@/lib/analytics';

export class TrackSignupHandler implements EventHandler<UserRegisteredEvent> {
  eventType = 'user.registered';

  async handle(event: UserRegisteredEvent): Promise<void> {
    console.log(`[TrackSignup] Tracking signup for user ${event.userId}`);

    // Track in Google Analytics
    trackSignup(event.tier, {
      method: event.metadata?.authMethod || 'email',
      source: event.metadata?.source || 'signup',
      campaign: event.metadata?.campaign,
    });

    console.log(`[TrackSignup] Signup tracked successfully`);
  }
}
// lib/events/handlers/create-stripe-customer-handler.ts
import { EventHandler } from '../base';
import { UserRegisteredEvent } from '../user-events';

export class CreateStripeCustomerHandler implements EventHandler<UserRegisteredEvent> {
  eventType = 'user.registered';

  async handle(event: UserRegisteredEvent): Promise<void> {
    console.log(`[CreateStripeCustomer] Creating Stripe customer for ${event.email}`);

    // Create Stripe customer
    const customerId = await this.createStripeCustomer(
      event.email,
      event.name,
      event.userId
    );

    // Store customerId in database
    await this.storeCustomerId(event.userId, customerId);

    console.log(`[CreateStripeCustomer] Stripe customer created: ${customerId}`);
  }

  private async createStripeCustomer(
    email: string,
    name: string,
    userId: string
  ): Promise<string> {
    // Implementation: Call Stripe API
    // const customer = await stripe.customers.create({
    //   email,
    //   name,
    //   metadata: { userId }
    // });
    // return customer.id;

    // Simulate API call
    await new Promise((resolve) => setTimeout(resolve, 200));
    return `cus_${Math.random().toString(36).substring(7)}`;
  }

  private async storeCustomerId(userId: string, customerId: string): Promise<void> {
    // Update user record with Stripe customer ID
    // await db.query('UPDATE users SET stripe_customer_id = $1 WHERE id = $2', [customerId, userId]);
  }
}

6. Outbox Pattern for Transactional Events

// lib/events/outbox.ts
import { DomainEvent } from './base';
import { DatabasePool } from '@/lib/db-pool';

/**
 * Outbox pattern for ensuring events are published after transaction commits
 * Solves the dual-write problem (database + event bus)
 */
export class EventOutbox {
  constructor(private db: DatabasePool) {}

  /**
   * Store event in outbox table (within transaction)
   */
  async storeEvent(event: DomainEvent): Promise<void> {
    await this.db.query(
      `INSERT INTO event_outbox (
        event_id,
        event_type,
        aggregate_id,
        aggregate_type,
        payload,
        occurred_at,
        status
      ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
      [
        event.eventId,
        event.eventType,
        event.aggregateId,
        event.aggregateType,
        JSON.stringify(event),
        event.occurredAt,
        'pending',
      ]
    );
  }

  /**
   * Poll outbox for pending events and publish them
   * Run this in a background job every few seconds
   */
  async publishPendingEvents(eventBus: any): Promise<void> {
    // Lock and fetch pending events
    const result = await this.db.query<{
      event_id: string;
      payload: string;
    }>(
      `UPDATE event_outbox
       SET status = 'processing', processed_at = NOW()
       WHERE event_id IN (
         SELECT event_id FROM event_outbox
         WHERE status = 'pending'
         ORDER BY occurred_at
         LIMIT 100
         FOR UPDATE SKIP LOCKED
       )
       RETURNING event_id, payload`
    );

    const events = result.rows;

    for (const row of events) {
      const event = JSON.parse(row.payload) as DomainEvent;

      try {
        // Publish to event bus
        await eventBus.publish(event);

        // Mark as published
        await this.db.query(
          `UPDATE event_outbox SET status = 'published' WHERE event_id = $1`,
          [row.event_id]
        );
      } catch (error) {
        // Mark as failed
        await this.db.query(
          `UPDATE event_outbox
           SET status = 'failed', error = $2
           WHERE event_id = $1`,
          [row.event_id, error instanceof Error ? error.message : 'Unknown error']
        );
      }
    }

    console.log(`[EventOutbox] Published ${events.length} events`);
  }
}

/**
 * Database schema for outbox table:
 *
 * CREATE TABLE event_outbox (
 *   event_id UUID PRIMARY KEY,
 *   event_type VARCHAR(255) NOT NULL,
 *   aggregate_id VARCHAR(255) NOT NULL,
 *   aggregate_type VARCHAR(100) NOT NULL,
 *   payload JSONB NOT NULL,
 *   occurred_at TIMESTAMPTZ NOT NULL,
 *   status VARCHAR(20) NOT NULL DEFAULT 'pending',
 *   processed_at TIMESTAMPTZ,
 *   error TEXT,
 *   created_at TIMESTAMPTZ DEFAULT NOW()
 * );
 *
 * CREATE INDEX idx_outbox_status ON event_outbox(status, occurred_at);
 */

7. Usage Example

// app/api/users/register/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getDbPool } from '@/lib/db-pool';
import { getEventBus } from '@/lib/events';
import { UserRegisteredEvent } from '@/lib/events/user-events';
import { EventOutbox } from '@/lib/events/outbox';

const db = getDbPool();
const eventBus = getEventBus();
const outbox = new EventOutbox(db);

export async function POST(request: NextRequest) {
  try {
    const { email, name, tier } = await request.json();

    // Use transaction + outbox pattern
    await db.transaction(async (client) => {
      // 1. Create user in database
      const result = await client.query(
        `INSERT INTO users (email, name, tier, created_at)
         VALUES ($1, $2, $3, NOW())
         RETURNING id`,
        [email, name, tier]
      );
      const userId = result.rows[0].id;

      // 2. Create event
      const event = new UserRegisteredEvent(
        userId,
        email,
        name,
        tier,
        {
          source: 'api',
          userAgent: request.headers.get('user-agent') || undefined,
        }
      );

      // 3. Store event in outbox (within same transaction)
      await outbox.storeEvent(event);

      // Transaction commits here
    });

    // Background job will pick up and publish the event
    // This ensures event is only published if transaction succeeds

    return NextResponse.json({ success: true }, { status: 201 });
  } catch (error) {
    console.error('[API] Registration failed:', error);
    return NextResponse.json(
      { error: 'Registration failed' },
      { status: 500 }
    );
  }
}

8. Event Bus Initialization

// lib/events/index.ts
import { EventBus } from './base';
import { InMemoryEventBus } from './in-memory-event-bus';
import { RedisEventBus } from './redis-event-bus';
import { SendWelcomeEmailHandler } from './handlers/send-welcome-email-handler';
import { TrackSignupHandler } from './handlers/track-signup-handler';
import { CreateStripeCustomerHandler } from './handlers/create-stripe-customer-handler';

let eventBus: EventBus | null = null;

/**
 * Get or create event bus singleton
 */
export function getEventBus(): EventBus {
  if (!eventBus) {
    // Use Redis in production, in-memory for development
    if (process.env.REDIS_URL) {
      const redisEventBus = new RedisEventBus(process.env.REDIS_URL);
      redisEventBus.initialize();
      eventBus = redisEventBus;
    } else {
      eventBus = new InMemoryEventBus();
    }

    // Register handlers
    registerHandlers(eventBus);
  }

  return eventBus;
}

/**
 * Register all event handlers
 */
function registerHandlers(bus: EventBus): void {
  // User registration handlers
  bus.subscribe('user.registered', new SendWelcomeEmailHandler());
  bus.subscribe('user.registered', new TrackSignupHandler());
  bus.subscribe('user.registered', new CreateStripeCustomerHandler());

  console.log('[EventBus] All handlers registered');
}

/**
 * Close event bus (on shutdown)
 */
export async function closeEventBus(): Promise<void> {
  if (eventBus && 'close' in eventBus) {
    await (eventBus as any).close();
    eventBus = null;
  }
}

Testing

// __tests__/events/user-events.test.ts
import { InMemoryEventBus } from '@/lib/events/in-memory-event-bus';
import { UserRegisteredEvent } from '@/lib/events/user-events';
import { EventHandler } from '@/lib/events/base';

describe('Event System', () => {
  let eventBus: InMemoryEventBus;

  beforeEach(() => {
    eventBus = new InMemoryEventBus();
  });

  it('should publish and handle events', async () => {
    const handlerSpy = jest.fn();

    // Mock handler
    const mockHandler: EventHandler = {
      eventType: 'user.registered',
      handle: handlerSpy,
    };

    // Subscribe
    eventBus.subscribe('user.registered', mockHandler);

    // Publish event
    const event = new UserRegisteredEvent('user-123', 'test@example.com', 'Test', 'free');
    await eventBus.publish(event);

    // Assert handler was called
    expect(handlerSpy).toHaveBeenCalledWith(event);
  });

  it('should store events in event log', async () => {
    const event = new UserRegisteredEvent('user-123', 'test@example.com', 'Test', 'free');
    await eventBus.publish(event);

    const log = eventBus.getEventLog();
    expect(log).toHaveLength(1);
    expect(log[0].eventType).toBe('user.registered');
  });
});

Best Practices

✅ DO

  1. Use idempotent handlers - Events may be delivered multiple times
  2. Include event version - For schema evolution
  3. Store events immutably - Never modify published events
  4. Use outbox pattern - For transactional consistency
  5. Add metadata - For debugging and tracing
  6. Handle failures gracefully - Dead letter queues for failed events
  7. Monitor event processing - Track latency and failures

❌ DON'T

  1. Don't rely on event order - Events may arrive out of order
  2. Don't put large payloads in events - Store in DB, pass ID in event
  3. Don't couple handlers - Each handler should be independent
  4. Don't make handlers slow - Offload heavy work to background jobs
  5. Don't skip the outbox - Always use outbox for transactional events

Summary

Event-driven architecture enables scalable, decoupled systems ✅ Domain events represent state changes in your system ✅ Event bus coordinates event publishing and consumption ✅ Outbox pattern ensures transactional consistency ✅ Multiple handlers can react to the same event independently ✅ Redis Streams provide reliable, scalable event delivery

Production checklist:

  • [ ] Event bus configured (Redis/RabbitMQ/SNS)
  • [ ] Outbox table created
  • [ ] Background job polling outbox
  • [ ] Event handlers registered
  • [ ] Dead letter queue configured
  • [ ] Monitoring and alerting set up
  • [ ] Retry logic implemented

Further reading: