Skip to main content

Coding Interview: Design a Rate Limiter

February 14, 2026By CTO21 min read
...
questions

Coding interview question for implementing a production-grade rate limiter, covering sliding window algorithms, distributed coordination, and API design patterns.

Role: Mid-Level / Senior Engineer
Level: Mid
Type: Coding

Coding Interview: Design a Rate Limiter

Implement a rate limiter that can be used to protect API endpoints. Start with a single-server implementation, then extend it to work in a distributed environment. This question bridges coding and system design, testing both implementation skills and architectural thinking.

Interview Format (60 minutes)

Time Allocation:

  • Requirements and API design: 5-10 minutes
  • Core algorithm implementation: 20-25 minutes
  • Distributed extension: 15-20 minutes
  • Testing and edge cases: 5-10 minutes

Step 1: Requirements and API Design (5-10 min)

Clarifying Questions

Good questions to ask:

  • What are we rate limiting? (API requests per client)
  • How do we identify clients? (API key, IP address, or user ID)
  • What's the expected rate limit? (e.g., 100 requests per minute)
  • Should limits be configurable per endpoint? (yes)
  • What happens when rate limited? (return 429 Too Many Requests)
  • Do we need distributed support? (yes, across multiple servers)
  • Precision requirements? (some burstiness acceptable)

Agreed requirements:

  1. Configurable rate limits per client per endpoint
  2. Return 429 Too Many Requests when limit exceeded
  3. Include rate limit headers in responses
  4. Work across multiple application servers
  5. Minimal latency impact (<5ms overhead)

Interface Design

class RateLimiter:
    def is_allowed(self, client_id: str, endpoint: str) -> RateLimitResult:
        """
        Check if a request should be allowed.

        Returns:
            RateLimitResult with:
                - allowed: bool
                - remaining: int (requests left in window)
                - reset_at: float (timestamp when window resets)
                - retry_after: float (seconds until next allowed request)
        """
        pass

HTTP Response Headers:

X-RateLimit-Limit: 100
X-RateLimit-Remaining: 42
X-RateLimit-Reset: 1707900000
Retry-After: 30  (only on 429 responses)

Good candidate discusses:

  • Where the rate limiter sits (middleware)
  • Response headers for client transparency
  • Different rate limits for different tiers (free vs paid)

Step 2: Core Algorithm Implementation (20-25 min)

Algorithm Options

Walk the candidate through the trade-offs before they pick one to implement.

1. Fixed Window Counter

Window: [00:00 - 01:00] → count = 87
Window: [01:00 - 02:00] → count = 0 (reset)

Problem: 80 requests at 0:59, 80 requests at 1:01
= 160 requests in 2 seconds, but both windows pass

Pros: Simple, low memory Cons: Burst at window boundaries (up to 2x allowed rate)

2. Sliding Window Log

class SlidingWindowLog:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = {}  # client_id -> sorted list of timestamps

    def is_allowed(self, client_id: str) -> RateLimitResult:
        now = time.time()
        window_start = now - self.window_seconds

        if client_id not in self.requests:
            self.requests[client_id] = []

        # Remove expired entries
        timestamps = self.requests[client_id]
        while timestamps and timestamps[0] < window_start:
            timestamps.pop(0)

        if len(timestamps) < self.max_requests:
            timestamps.append(now)
            return RateLimitResult(
                allowed=True,
                remaining=self.max_requests - len(timestamps),
                reset_at=timestamps[0] + self.window_seconds
            )

        return RateLimitResult(
            allowed=False,
            remaining=0,
            reset_at=timestamps[0] + self.window_seconds,
            retry_after=timestamps[0] + self.window_seconds - now
        )

Pros: Precise, no boundary issues Cons: High memory (stores every timestamp)

3. Sliding Window Counter (Recommended)

from dataclasses import dataclass
from typing import Dict, Tuple
import time
import math


@dataclass
class RateLimitResult:
    allowed: bool
    remaining: int
    reset_at: float
    retry_after: float = 0


class SlidingWindowCounter:
    """
    Combines fixed window efficiency with sliding window accuracy.
    Uses weighted average of current and previous window counts.
    """

    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        # {client_id: (current_window_start, current_count, prev_count)}
        self.counters: Dict[str, Tuple[float, int, int]] = {}

    def _get_window_start(self, now: float) -> float:
        """Get the start timestamp of the current fixed window."""
        return math.floor(now / self.window_seconds) * self.window_seconds

    def is_allowed(self, client_id: str) -> RateLimitResult:
        now = time.time()
        current_window = self._get_window_start(now)
        window_elapsed = now - current_window
        weight = window_elapsed / self.window_seconds

        if client_id not in self.counters:
            self.counters[client_id] = (current_window, 0, 0)

        stored_window, current_count, prev_count = self.counters[client_id]

        # Window has advanced
        if stored_window < current_window:
            if current_window - stored_window >= 2 * self.window_seconds:
                # More than 2 windows passed, reset everything
                prev_count = 0
            else:
                prev_count = current_count
            current_count = 0
            stored_window = current_window

        # Weighted count: full current window + proportional previous window
        estimated_count = current_count + prev_count * (1 - weight)
        reset_at = current_window + self.window_seconds

        if estimated_count < self.max_requests:
            current_count += 1
            self.counters[client_id] = (stored_window, current_count, prev_count)
            remaining = max(0, int(self.max_requests - estimated_count - 1))
            return RateLimitResult(
                allowed=True,
                remaining=remaining,
                reset_at=reset_at
            )

        return RateLimitResult(
            allowed=False,
            remaining=0,
            reset_at=reset_at,
            retry_after=reset_at - now
        )

Why sliding window counter is best:

  • Low memory (2 counters per client per endpoint vs storing every timestamp)
  • Good accuracy (weighted average smooths boundary bursts)
  • Easy to implement in Redis (atomic operations on counters)

Token Bucket (Alternative)

class TokenBucket:
    """
    Allows controlled bursts while maintaining average rate.
    Tokens are added at a fixed rate, consumed per request.
    """

    def __init__(self, capacity: int, refill_rate: float):
        """
        Args:
            capacity: Maximum burst size
            refill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.buckets: Dict[str, Tuple[float, float]] = {}  # {id: (tokens, last_refill)}

    def is_allowed(self, client_id: str) -> RateLimitResult:
        now = time.time()

        if client_id not in self.buckets:
            self.buckets[client_id] = (self.capacity, now)

        tokens, last_refill = self.buckets[client_id]

        # Add tokens based on elapsed time
        elapsed = now - last_refill
        tokens = min(self.capacity, tokens + elapsed * self.refill_rate)

        if tokens >= 1:
            tokens -= 1
            self.buckets[client_id] = (tokens, now)
            return RateLimitResult(
                allowed=True,
                remaining=int(tokens),
                reset_at=now + (self.capacity - tokens) / self.refill_rate
            )

        # Calculate when next token will be available
        wait_time = (1 - tokens) / self.refill_rate
        return RateLimitResult(
            allowed=False,
            remaining=0,
            reset_at=now + wait_time,
            retry_after=wait_time
        )

When to use token bucket: When you want to allow controlled bursts (e.g., a client can burst 10 requests instantly, but averages 100/minute).

Strong candidate discusses:

  • Trade-offs between algorithms
  • Memory usage implications
  • Precision vs efficiency
  • Burst handling

Step 3: Distributed Extension (15-20 min)

The Problem

Server A: client has 90 requests (limit 100)
Server B: client has 90 requests (limit 100)
Total: 180 requests — over the limit!

Redis-Based Distributed Rate Limiter

import redis
import time
import math


class DistributedRateLimiter:
    """
    Distributed sliding window counter using Redis.
    All operations are atomic via Lua scripting.
    """

    def __init__(self, redis_client: redis.Redis,
                 max_requests: int, window_seconds: int):
        self.redis = redis_client
        self.max_requests = max_requests
        self.window_seconds = window_seconds

        # Lua script for atomic check-and-increment
        self.lua_script = self.redis.register_script("""
            local key = KEYS[1]
            local now = tonumber(ARGV[1])
            local window = tonumber(ARGV[2])
            local limit = tonumber(ARGV[3])

            local window_start = math.floor(now / window) * window
            local prev_window_start = window_start - window
            local elapsed = now - window_start
            local weight = elapsed / window

            -- Keys for current and previous windows
            local curr_key = key .. ":" .. tostring(window_start)
            local prev_key = key .. ":" .. tostring(prev_window_start)

            -- Get counts
            local curr_count = tonumber(redis.call("GET", curr_key) or "0")
            local prev_count = tonumber(redis.call("GET", prev_key) or "0")

            -- Weighted estimate
            local estimated = curr_count + prev_count * (1 - weight)

            if estimated < limit then
                -- Increment and set expiry
                redis.call("INCR", curr_key)
                redis.call("EXPIRE", curr_key, window * 2)

                local remaining = limit - estimated - 1
                local reset_at = window_start + window
                return {1, math.max(0, math.floor(remaining)), reset_at}
            else
                local reset_at = window_start + window
                local retry_after = reset_at - now
                return {0, 0, reset_at, math.ceil(retry_after)}
            end
        """)

    def is_allowed(self, client_id: str,
                   endpoint: str = "default") -> RateLimitResult:
        key = f"ratelimit:{endpoint}:{client_id}"
        now = time.time()

        result = self.lua_script(
            keys=[key],
            args=[now, self.window_seconds, self.max_requests]
        )

        allowed = bool(result[0])
        remaining = int(result[1])
        reset_at = float(result[2])
        retry_after = float(result[3]) if len(result) > 3 else 0

        return RateLimitResult(
            allowed=allowed,
            remaining=remaining,
            reset_at=reset_at,
            retry_after=retry_after
        )

Why Lua scripts:

  • Atomic execution (no race conditions)
  • Single round-trip to Redis
  • All logic runs server-side

Strong candidate explains:

  • Why atomicity matters (race conditions between check and increment)
  • Why Lua vs MULTI/EXEC (Lua can contain conditional logic)
  • Redis key expiry for automatic cleanup

Middleware Integration

from functools import wraps
from flask import request, jsonify, make_response


def rate_limit(max_requests: int, window_seconds: int):
    """Decorator for rate-limited endpoints."""

    limiter = DistributedRateLimiter(
        redis_client=redis_pool,
        max_requests=max_requests,
        window_seconds=window_seconds
    )

    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kwargs):
            # Identify client (API key > user ID > IP)
            client_id = (
                request.headers.get("X-API-Key") or
                getattr(request, "user_id", None) or
                request.remote_addr
            )

            endpoint = request.endpoint
            result = limiter.is_allowed(client_id, endpoint)

            if not result.allowed:
                response = make_response(
                    jsonify({"error": "Rate limit exceeded"}), 429
                )
                response.headers["Retry-After"] = str(int(result.retry_after))
            else:
                response = make_response(f(*args, **kwargs))

            # Always include rate limit headers
            response.headers["X-RateLimit-Limit"] = str(max_requests)
            response.headers["X-RateLimit-Remaining"] = str(result.remaining)
            response.headers["X-RateLimit-Reset"] = str(int(result.reset_at))

            return response

        return wrapped
    return decorator


# Usage
@app.route("/api/messages", methods=["POST"])
@rate_limit(max_requests=100, window_seconds=60)
def send_message():
    return jsonify({"status": "sent"})

Handling Redis Failures

class ResilientRateLimiter:
    """Falls back to local rate limiting if Redis is unavailable."""

    def __init__(self, redis_client, max_requests, window_seconds):
        self.distributed = DistributedRateLimiter(
            redis_client, max_requests, window_seconds
        )
        self.local = SlidingWindowCounter(max_requests, window_seconds)
        self.use_local = False

    def is_allowed(self, client_id: str,
                   endpoint: str = "default") -> RateLimitResult:
        if self.use_local:
            return self.local.is_allowed(f"{endpoint}:{client_id}")

        try:
            result = self.distributed.is_allowed(client_id, endpoint)
            self.use_local = False
            return result
        except redis.ConnectionError:
            self.use_local = True
            # Use stricter local limit (assume N servers sharing load)
            return self.local.is_allowed(f"{endpoint}:{client_id}")

Strong candidate discusses:

  • Fail-open vs fail-closed trade-off
  • Local fallback with adjusted limits
  • Circuit breaker pattern for Redis connection
  • Health check to recover from fallback mode

Step 4: Testing and Edge Cases (5-10 min)

Test Cases

import pytest
from unittest.mock import patch
from freezegun import freeze_time


class TestSlidingWindowCounter:
    def setup_method(self):
        self.limiter = SlidingWindowCounter(
            max_requests=10, window_seconds=60
        )

    def test_allows_requests_under_limit(self):
        for i in range(10):
            result = self.limiter.is_allowed("client1")
            assert result.allowed is True
            assert result.remaining == 9 - i

    def test_blocks_requests_over_limit(self):
        for _ in range(10):
            self.limiter.is_allowed("client1")

        result = self.limiter.is_allowed("client1")
        assert result.allowed is False
        assert result.remaining == 0
        assert result.retry_after > 0

    def test_different_clients_independent(self):
        for _ in range(10):
            self.limiter.is_allowed("client1")

        # Client 2 should still be allowed
        result = self.limiter.is_allowed("client2")
        assert result.allowed is True

    @freeze_time("2026-01-01 00:00:00")
    def test_window_resets(self):
        for _ in range(10):
            self.limiter.is_allowed("client1")

        # Advance time past window
        with freeze_time("2026-01-01 00:01:01"):
            result = self.limiter.is_allowed("client1")
            assert result.allowed is True

    @freeze_time("2026-01-01 00:00:50")
    def test_sliding_window_weighted_count(self):
        """Previous window requests are weighted by time remaining."""
        # Fill previous window
        with freeze_time("2026-01-01 00:00:00"):
            for _ in range(10):
                self.limiter.is_allowed("client1")

        # 50 seconds into new window (previous window weight = 1/6)
        # Estimated: 0 (current) + 10 * (10/60) = ~1.67
        # Should still allow ~8 more requests
        with freeze_time("2026-01-01 00:01:50"):
            result = self.limiter.is_allowed("client1")
            assert result.allowed is True

    def test_concurrent_requests(self):
        """Verify thread safety."""
        import threading

        allowed_count = 0
        lock = threading.Lock()

        def make_request():
            nonlocal allowed_count
            result = self.limiter.is_allowed("client1")
            if result.allowed:
                with lock:
                    allowed_count += 1

        threads = [threading.Thread(target=make_request)
                  for _ in range(20)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()

        # Note: without locking in the limiter itself,
        # this may allow more than 10.
        # This test demonstrates the need for thread safety.
        assert allowed_count <= 12  # Allow some slack for race conditions

Edge Cases to Discuss

Strong candidates identify:

  1. Clock skew in distributed systems — use Redis server time, not local time
  2. Hot keys — a single high-traffic client hammering one Redis key
  3. Burst after long idle — first request after window reset gets full quota
  4. Multiple rate limits — per-second, per-minute, per-hour (layered)
  5. Graceful degradation — what happens when Redis is down?
  6. Key expiry and memory — ensure Redis keys are cleaned up

Production Considerations

# Layered rate limits for production
RATE_LIMITS = {
    "default": [
        {"max": 10, "window": 1},      # 10 per second (burst protection)
        {"max": 100, "window": 60},     # 100 per minute
        {"max": 5000, "window": 3600},  # 5000 per hour
    ],
    "auth/login": [
        {"max": 5, "window": 60},       # 5 per minute (brute force protection)
        {"max": 20, "window": 3600},    # 20 per hour
    ],
    "upload": [
        {"max": 5, "window": 60},       # 5 per minute
    ]
}

Evaluation Rubric

Strong Performance (Hire)

  • Discusses multiple algorithms with trade-offs
  • Clean implementation with proper data structures
  • Handles the distributed case with atomic operations
  • Considers failure modes and resilience
  • Writes meaningful tests
  • Thinks about production concerns (monitoring, tiered limits)
  • Code is readable and well-structured

Adequate Performance (Maybe)

  • Implements a working rate limiter (any algorithm)
  • Understands the distributed challenge
  • Mentions Redis but implementation has race conditions
  • Limited testing
  • Can be guided toward better solutions

Weak Performance (No Hire)

  • Can't implement a basic counter-based limiter
  • Doesn't understand why distributed coordination is needed
  • No consideration of thread safety or race conditions
  • Can't reason about trade-offs between algorithms
  • Poor code quality (naming, structure)

Follow-up Questions

For mid-level candidates:

  • How would you add per-endpoint rate limits?
  • What if different API tiers have different limits?
  • How would you test this under load?

For senior candidates:

  • How would you implement adaptive rate limiting that adjusts based on server load?
  • Design a rate limiting system for a multi-region deployment
  • How would you handle rate limiting for WebSocket connections?
  • How would you implement a fair queuing system instead of hard rejection?

For staff+ candidates:

  • Design a centralised rate limiting service for a microservices architecture
  • How would you implement cost-based rate limiting (different endpoints have different costs)?
  • How would you handle rate limiting during a partial outage?

This question tests algorithmic thinking, systems knowledge, and production engineering skills. A strong candidate will progress naturally from a clean local implementation to a distributed solution while discussing trade-offs at each step.