System Design · Topic 9 of 16

Message Queues & Streaming

150 XP

Why Decouple with Queues?

Direct synchronous calls between services create tight coupling — the caller blocks, both services must be healthy simultaneously, and failure propagates instantly. Message queues and event streams break this dependency.

The four decoupling benefits:

1. Load Leveling

Without queue:
Traffic spike: 10,000 req/s → Order Service (capacity: 1,000/s) → OVERLOADED, 503s

With queue:
Traffic spike: 10,000 req/s → Queue (absorbs spike) → Order Service (1,000/s) → fine
Queue drains over 10 seconds. No data lost. No 503s.

2. Async Work Offloading

User registers → immediately return 201 Created → queue sends welcome email asynchronously.

Sending email via SMTP takes 200–500ms. Don’t make the user wait for it. Queue it, return fast, send in background.

3. Durability

If Order Service goes down for 30 minutes, orders queued during that time are processed when it recovers. With synchronous calls, those orders would have been 503 errors.

4. Fan-out (Multiple Consumers)

One event triggers multiple independent actions:

OrderPlaced event:
  → EmailService (send confirmation)
  → InventoryService (decrement stock)
  → AnalyticsService (record sale)
  → FraudService (run fraud check)

All four happen independently. OrderService doesn't know about any of them.

Message Queue vs Event Stream: The Core Conceptual Difference

This distinction trips up most interviewers who haven’t worked with both:

MESSAGE QUEUE (RabbitMQ, SQS):
- Message is consumed and DELETED
- Each message processed by EXACTLY ONE consumer
- Queue depth decreases as messages are consumed
- "Work queue" / "task queue" mental model

  Producer → [msg1][msg2][msg3] → Consumer A gets msg1 (deleted)
                                 → Consumer B gets msg2 (deleted)

EVENT STREAM (Kafka):
- Events are RETAINED for a configurable period (hours, days, forever)
- Multiple independent consumer groups each read ALL events
- Offset (position) is tracked per consumer group, not deleted from stream
- "Log" / "append-only ledger" mental model

  Producer → [e1][e2][e3][e4][e5]...
  Consumer Group A: reads e1,e2,e3,e4,e5 (at offset 5)
  Consumer Group B: reads e1,e2 (at offset 2, still catching up)
  Consumer Group C: starts at e1 (new consumer, replays history)

Use a queue when: Each job should be done once by one worker (email sending, order processing, image resizing).

Use a stream when: Multiple independent systems need to react to the same events, or you need replay capability (analytics, audit logs, event sourcing).


Kafka Deep Dive

Kafka is a distributed, partitioned, replicated commit log. Every design decision traces back to this description.

Topics, Partitions, and Offsets

Topic: "orders"  (3 partitions, replication factor 3)

Partition 0: [e0][e1][e2][e3][e4]...
Partition 1: [e0][e1][e2]...
Partition 2: [e0][e1][e2][e3][e4][e5]...

Each event has:
- partition: which partition it lives in
- offset: sequential integer within that partition
- key: optional; determines partition assignment
- value: bytes (usually Avro, Protobuf, or JSON)
- timestamp: event creation time
- headers: optional key-value metadata

How partitioning works:

// Kafka producer partition assignment:
// If key is provided: hash(key) % numPartitions → deterministic partition
// If no key: round-robin across partitions

producer.send({
  topic: 'orders',
  messages: [{
    key: order.userId,        // All orders from same user → same partition
    value: JSON.stringify(order),
    headers: { 'schema-version': '2' }
  }]
});

Using userId as the key ensures all orders for a given user land in the same partition — preserving per-user ordering without global ordering.

Consumer Groups

Topic "orders" — 4 partitions
Consumer Group "order-processor"

Case 1: 2 consumers in group
  Consumer A → Partition 0, Partition 1
  Consumer B → Partition 2, Partition 3

Case 2: 4 consumers in group (1:1 — optimal)
  Consumer A → Partition 0
  Consumer B → Partition 1
  Consumer C → Partition 2
  Consumer D → Partition 3

Case 3: 6 consumers, 4 partitions
  Consumer A → Partition 0
  Consumer B → Partition 1
  Consumer C → Partition 2
  Consumer D → Partition 3
  Consumer E → IDLE (no partition assigned)
  Consumer F → IDLE

Maximum parallelism = number of partitions

Critical insight: Kafka’s maximum consumer parallelism is bounded by partition count. If you need 100x parallelism, you need 100 partitions. Partition count cannot be decreased after creation (only increased), so planning upfront matters.

Replication and Durability

Topic "orders", replication factor 3:

Broker 1 (Leader, Partition 0):  [e0][e1][e2][e3]
Broker 2 (Follower):             [e0][e1][e2][e3]  ← ISR (In-Sync Replica)
Broker 3 (Follower):             [e0][e1][e2]       ← ISR (slightly behind)

ISR = In-Sync Replicas: followers that are caught up within replica.lag.time.max.ms
// Producer durability settings
producer.send({
  topic: 'orders',
  messages: [{ value: JSON.stringify(order) }],
  acks: -1,           // Wait for ALL ISR replicas to acknowledge
                      // acks=0: fire and forget (fastest, no guarantee)
                      // acks=1: leader acknowledged (default, can lose if leader crashes)
                      // acks=-1: all ISRs acked (strongest guarantee)
});

// Consumer commit strategy
consumer.subscribe({ topics: ['orders'], fromBeginning: false });

for await (const { topic, partition, message } of consumer) {
  await processOrder(message.value);
  
  // Commit AFTER processing (at-least-once delivery)
  await consumer.commitOffsets([{
    topic,
    partition,
    offset: (parseInt(message.offset) + 1).toString()
  }]);
}

Log Compaction

Kafka supports two retention modes:

  1. Time/size-based retention (default): keep messages for 7 days, then delete. Used for event streams where you only care about recent history.

  2. Log compaction: keep only the most recent message per key. Used when the topic represents current state (like a changelog for a database).

Before compaction (topic: user-settings):
  key=user:42, value={"theme":"dark"}         offset 0
  key=user:99, value={"theme":"light"}        offset 1
  key=user:42, value={"theme":"light"}        offset 2  ← supersedes offset 0
  key=user:42, value={"notifications":"on"}   offset 3  ← supersedes offset 2

After compaction:
  key=user:99, value={"theme":"light"}        offset 1
  key=user:42, value={"notifications":"on"}   offset 3

Log compaction is how Kafka supports event sourcing — the compacted topic is a materialised view of the latest state per entity.

Throughput Numbers

  • Kafka: 1M+ messages/second on a single broker (LinkedIn benchmarks)
  • Kafka: 2 million messages/second on a 3-broker cluster with replication
  • Typical message size: 1 KB → ~1 GB/s throughput on modern hardware
  • Consumer lag: near-zero latency (e2e <10ms for a single partition)

RabbitMQ Deep Dive

RabbitMQ is a message broker implementing AMQP (Advanced Message Queuing Protocol). It routes messages with fine-grained control unavailable in Kafka.

Exchanges and Routing

Messages in RabbitMQ go through an exchange before reaching queues. The exchange type determines routing:

Producer → Exchange → (routing logic) → Queue → Consumer

Exchange types:
  direct:  route by exact routing key match
  topic:   route by pattern match (*.error.*, orders.#)
  fanout:  broadcast to all bound queues
  headers: route by message header values (rarely used)
// RabbitMQ setup: fanout exchange (pub/sub pattern)
const channel = await connection.createChannel();

await channel.assertExchange('order-events', 'fanout', { durable: true });

// Multiple queues bound to same fanout exchange
await channel.assertQueue('email-service-queue', { durable: true });
await channel.assertQueue('inventory-queue', { durable: true });
await channel.assertQueue('analytics-queue', { durable: true });

channel.bindQueue('email-service-queue', 'order-events', '');
channel.bindQueue('inventory-queue', 'order-events', '');
channel.bindQueue('analytics-queue', 'order-events', '');

// Publish: goes to ALL three queues
channel.publish('order-events', '', Buffer.from(JSON.stringify(order)));
// Topic exchange: route by pattern
await channel.assertExchange('logs', 'topic', { durable: true });

// Routing key: "service.severity"
channel.publish('logs', 'payments.error', Buffer.from(message));
channel.publish('logs', 'auth.warning', Buffer.from(message));
channel.publish('logs', 'orders.info', Buffer.from(message));

// Consumers bind with patterns
channel.bindQueue('critical-alerts', 'logs', '*.error');     // all errors
channel.bindQueue('payment-logs', 'logs', 'payments.*');     // all payment events
channel.bindQueue('all-logs', 'logs', '#');                  // everything

Acknowledgements and Reliability

// Consumer with manual acknowledgement
channel.consume('orders', async (msg) => {
  if (msg === null) return;
  
  try {
    await processOrder(JSON.parse(msg.content.toString()));
    channel.ack(msg);                    // Message deleted from queue
  } catch (error) {
    if (isTransientError(error)) {
      channel.nack(msg, false, true);    // Requeue: (allUpTo=false, requeue=true)
    } else {
      channel.nack(msg, false, false);   // Don't requeue → goes to DLQ
    }
  }
}, { noAck: false });                    // Manual ack mode

noAck: true (auto-ack): Message deleted from queue the moment it’s delivered to consumer. If consumer crashes during processing — message is lost. Use only when loss is acceptable.

noAck: false (manual ack): Message stays in queue (marked “unacknowledged”) until consumer acks or nacks. If consumer crashes — message is redelivered. Correct for at-least-once delivery.

Dead-Letter Queues (DLQ)

Messages that fail processing repeatedly need to go somewhere — not back into the main queue forever.

// Configure DLQ at queue declaration time
await channel.assertQueue('orders', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': 'orders-dlx',      // DLQ exchange
    'x-dead-letter-routing-key': 'failed',
    'x-message-ttl': 30000,                       // Messages expire after 30s
    'x-max-length': 10000                          // Max queue depth
  }
});

await channel.assertExchange('orders-dlx', 'direct', { durable: true });
await channel.assertQueue('orders-dlq', { durable: true });
channel.bindQueue('orders-dlq', 'orders-dlx', 'failed');

// Now nacked messages (with requeue=false) → DLQ automatically
// Alarm on DLQ depth; human reviews failed messages

AWS SQS Deep Dive

SQS is a managed queue service — no servers to maintain, automatically scales to any throughput.

Standard vs FIFO Queues

FeatureStandardFIFO
ThroughputUnlimited (nearly)3,000 msg/s with batching, 300 msg/s otherwise
OrderingBest-effort (not guaranteed)Strict FIFO per message group
DeliveryAt-least-once (duplicates possible)Exactly-once (deduplication)
PriceLowerHigher
// SQS Standard: for high-throughput work queues
const params = {
  QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/orders',
  MessageBody: JSON.stringify(order),
  MessageAttributes: {
    'EventType': { DataType: 'String', StringValue: 'OrderPlaced' }
  }
};

// SQS FIFO: for ordered processing (financial transactions)
const fifoParams = {
  QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/payments.fifo',
  MessageBody: JSON.stringify(payment),
  MessageGroupId: payment.accountId,          // Ordering per account
  MessageDeduplicationId: payment.idempotencyKey  // Dedup window: 5 minutes
};

Visibility Timeout

SQS’s mechanism for at-least-once delivery:

Consumer polls SQS → gets message → message becomes INVISIBLE (timeout: 30s)

Consumer processes successfully → DELETES message → gone from queue

(Alternative) Consumer crashes → 30s timeout expires → message becomes VISIBLE again
→ another consumer picks it up
// Extending visibility timeout for long-running jobs
const message = await sqs.receiveMessage({ QueueUrl, MaxNumberOfMessages: 1 });
const receiptHandle = message.Messages![0].ReceiptHandle;

// If processing will take > 30s, extend before timeout
const extender = setInterval(async () => {
  await sqs.changeMessageVisibility({
    QueueUrl,
    ReceiptHandle: receiptHandle,
    VisibilityTimeout: 60  // extend by 60 more seconds
  });
}, 20_000);  // extend every 20s

try {
  await processLongRunningJob(message.Messages![0].Body);
  await sqs.deleteMessage({ QueueUrl, ReceiptHandle: receiptHandle });
} finally {
  clearInterval(extender);
}

SQS Dead-Letter Queue

// SQS DLQ: messages that fail maxReceiveCount times → moved to DLQ automatically
const mainQueue = await sqs.createQueue({
  QueueName: 'orders',
  Attributes: {
    RedrivePolicy: JSON.stringify({
      deadLetterTargetArn: 'arn:aws:sqs:us-east-1:123:orders-dlq',
      maxReceiveCount: '3'    // After 3 failed attempts → DLQ
    }),
    VisibilityTimeout: '30'
  }
});

Delivery Guarantees

The three delivery semantics and how each is achieved:

At-Most-Once

Message may be lost but never processed twice. Used when loss is preferable to duplicates (analytics counters, fire-and-forget notifications).

Producer sends → no ack waiting
Consumer receives → processes immediately (auto-ack) → no replay on failure

How to implement in Kafka:

// Producer: acks=0 (fire and forget)
// Consumer: autoCommit=true, autoCommitInterval=100ms
// Risk: commits offset before processing completes → message "lost" on crash

At-Least-Once

Message is guaranteed to be delivered but may be processed more than once (duplicates possible on retry). The practical default for most systems.

Producer: wait for broker ack before proceeding
Consumer: only commit offset AFTER successful processing
Retry: network failure → producer retries → may produce duplicate if ack was lost in transit

How to implement in Kafka:

// Producer: acks=-1, retries=MAX_INT, enableIdempotence=true
// Consumer: disable auto-commit, commit only after processing
consumer.run({
  autoCommit: false,
  eachMessage: async ({ message, heartbeat, commitOffsetsIfNecessary }) => {
    await processMessage(message);
    await commitOffsetsIfNecessary();  // commit after processing
  }
});

Key: Consumers must be idempotent — processing the same message twice should have the same effect as processing it once.

Exactly-Once

The holy grail. Each message processed exactly once, even with retries. The hardest guarantee to achieve.

Kafka exactly-once (within Kafka ecosystem):

// Transactional producer: write + offset commit in single atomic transaction
const producer = kafka.producer({ transactional: true });
await producer.connect();
await producer.initTransactions();

await producer.transaction(async (tx) => {
  await tx.send({ topic: 'output', messages: [{ value: result }] });
  await tx.sendOffsets({
    consumerGroupId: 'my-group',
    topics: [{ topic: 'input', partitions: [{ partition: 0, offset: '42' }] }]
  });
  // Commit: both the output message AND the input offset are written atomically
  // Abort: neither is written
});

Exactly-once end-to-end (including external systems like databases): requires idempotent writes to the external system using a unique message ID as the idempotency key.

// Even with exactly-once in Kafka, database writes need idempotency
await db.query(`
  INSERT INTO processed_orders (order_id, result, kafka_offset)
  VALUES ($1, $2, $3)
  ON CONFLICT (kafka_offset) DO NOTHING   -- idempotent: duplicate message = no-op
`, [orderId, result, offset]);

Ordering Guarantees

SystemOrdering Guarantee
KafkaPer partition (strict), not across partitions
SQS StandardNo ordering guarantee
SQS FIFOPer MessageGroupId
RabbitMQPer queue, single consumer (multi-consumer breaks ordering)

Kafka ordering pattern:

// All events for the same entity → same partition via key
producer.send({
  topic: 'user-events',
  messages: [{
    key: userId,            // hash(userId) % numPartitions → same partition
    value: JSON.stringify(event)
  }]
});

// Single consumer per partition within consumer group → strict ordering per user

If you need global ordering across all events: single partition — but this limits throughput to one consumer and one broker.


Idempotent Consumers

With at-least-once delivery, consumers receive duplicate messages on retries, network partitions, and rebalances. They must handle this gracefully.

async function processOrderEvent(event: OrderEvent, offset: string): Promise<void> {
  // Option 1: Check-then-insert with unique constraint
  const inserted = await db.query(`
    INSERT INTO order_events_processed (event_id, processed_at)
    VALUES ($1, NOW())
    ON CONFLICT (event_id) DO NOTHING
    RETURNING event_id
  `, [event.id]);
  
  if (inserted.rowCount === 0) {
    // Already processed — skip safely
    return;
  }
  
  // Option 2: Idempotent operation (same result regardless of repetition)
  await db.query(`
    INSERT INTO inventory (product_id, reserved)
    VALUES ($1, $2)
    ON CONFLICT (product_id)
    DO UPDATE SET reserved = inventory.reserved + EXCLUDED.reserved
    WHERE NOT EXISTS (
      SELECT 1 FROM reservations WHERE order_id = $3
    )
  `, [productId, quantity, orderId]);
}

Design principle: Model your consumer operations as upserts, not inserts. Use the message ID or a business key as the idempotency anchor.


Backpressure

When consumers are slower than producers, the queue grows unboundedly — eventually exhausting memory or disk.

Producer: 10,000 msg/s
Consumer: 1,000 msg/s
Queue growth: 9,000 msg/s → fills up in hours

Strategies:

  1. Reject at producer: producer gets an error when queue is full → client retries with backoff (HTTP 503 with Retry-After header)
  2. Scale consumers: add more consumer instances (works for parallel-safe processing)
  3. Drop with sampling: shed load intelligently (e.g., analytics events, not payment events)
  4. Adaptive rate limiting: slow down producer based on queue depth metrics
// Producer-side backpressure: check queue depth before publishing
async function publishWithBackpressure(message: Message): Promise<void> {
  const queueDepth = await getQueueDepth('orders');
  
  if (queueDepth > HIGH_WATERMARK) {
    // Emit metric, alert on-call
    metrics.increment('queue.backpressure.applied');
    throw new ServiceUnavailableError('Queue at capacity. Retry in 30s.', 30);
  }
  
  await queue.publish(message);
}

The Outbox Pattern: Transactional Messaging

The dual-write problem: updating your database AND publishing to a queue in the same operation — without a distributed transaction:

// What you want to do:
BEGIN TRANSACTION
  INSERT INTO orders (id, status) VALUES ('ord-123', 'confirmed');
  queue.publish('orders', { id: 'ord-123', status: 'confirmed' });
COMMIT

// Problem: database commits but queue publish fails → order exists but no event
// Or: queue published but database rolls back → ghost event for non-existent order

The outbox pattern solves this by writing the event to the database as part of the same transaction, then having a separate process relay it to the queue:

-- Both writes in one ACID transaction
BEGIN;

INSERT INTO orders (id, status, created_at)
VALUES ('ord-123', 'confirmed', NOW());

INSERT INTO outbox (id, topic, payload, created_at, status)
VALUES (
  gen_random_uuid(),
  'orders',
  '{"id":"ord-123","status":"confirmed"}',
  NOW(),
  'PENDING'
);

COMMIT;
// Outbox relay process (runs separately, e.g., every 100ms)
async function relayOutboxMessages(): Promise<void> {
  const messages = await db.query(`
    SELECT id, topic, payload FROM outbox
    WHERE status = 'PENDING'
    ORDER BY created_at ASC
    LIMIT 100
    FOR UPDATE SKIP LOCKED      -- Prevents concurrent relay processes from picking same rows
  `);
  
  for (const msg of messages.rows) {
    await queue.publish(msg.topic, msg.payload);
    
    await db.query(`
      UPDATE outbox SET status = 'SENT', sent_at = NOW()
      WHERE id = $1
    `, [msg.id]);
  }
}

Alternative: Kafka Connect’s Debezium connector reads the database’s change data capture (CDC) stream (PostgreSQL WAL, MySQL binlog) and publishes changes to Kafka automatically — zero application code needed for the relay.


Event Sourcing (Introduction)

Event sourcing stores the history of state changes (events) as the source of truth, not current state snapshots.

// Traditional: store current state
// orders table: { id: 'ord-123', status: 'shipped', total: 9900 }

// Event sourcing: store events
// order_events table:
// { orderId: 'ord-123', type: 'OrderPlaced',   payload: {...}, timestamp: t1 }
// { orderId: 'ord-123', type: 'PaymentTaken',  payload: {...}, timestamp: t2 }
// { orderId: 'ord-123', type: 'OrderShipped',  payload: {...}, timestamp: t3 }

// Current state = replay all events from beginning
function rehydrateOrder(events: OrderEvent[]): Order {
  return events.reduce((order, event) => {
    switch (event.type) {
      case 'OrderPlaced': return { ...order, status: 'placed', ...event.payload };
      case 'PaymentTaken': return { ...order, status: 'paid', ...event.payload };
      case 'OrderShipped': return { ...order, status: 'shipped', ...event.payload };
      default: return order;
    }
  }, {} as Order);
}

Kafka is often used as the event store in event sourcing architectures. Log compaction maintains the full event history per entity key. Projections (read models) are built by consuming the event stream.


Kafka vs SQS vs RabbitMQ: Comparison Table

FeatureKafkaSQS StandardSQS FIFORabbitMQ
TypeEvent streamQueueQueueMessage broker
Throughput1M+ msg/sUnlimited3K msg/s~50K msg/s (cluster)
RetentionConfigurable (days/forever)14 days max14 days maxUntil consumed
Replay✅ Yes (by offset)❌ No❌ No❌ No
Multiple consumers✅ Consumer groups (each gets all)✅ Competing consumers (each gets some)✅ Competing✅ Competing
OrderingPer-partitionNo guaranteePer groupPer queue
Delivery guaranteeAt-least-once (exactly-once with transactions)At-least-onceExactly-onceAt-least-once
ManagedConfluent Cloud / MSKAWS managedAWS managedSelf-hosted or CloudAMQP
RoutingBy key (partition)BasicBy groupSophisticated (exchanges)
ProtocolCustom (Kafka protocol)HTTP/HTTPSHTTP/HTTPSAMQP
Latency<10ms<10ms<10ms<1ms
Use caseEvent streaming, CQRS, CDCAsync work queuesOrdered processingComplex routing, RPC patterns

Interview Checklist

Fundamentals

  • What is the difference between a message queue and an event stream?
  • When would you use Kafka over SQS? Give 3 concrete scenarios
  • Explain at-most-once, at-least-once, and exactly-once delivery. How is each achieved?
  • What is a dead-letter queue? Why is it essential in production?

Kafka

  • Explain topics, partitions, and offsets. Why can’t you decrease partition count?
  • How do consumer groups work? What is the maximum parallelism for a topic?
  • What is the ISR (In-Sync Replica) set? What does acks=-1 guarantee?
  • Explain log compaction and when you’d use it
  • How does Kafka achieve exactly-once semantics? What are the limitations?

RabbitMQ

  • What are the four exchange types? Give a use case for each
  • What is the difference between ack and nack(requeue=true) and nack(requeue=false)?
  • How does the visibility timeout (SQS) compare to RabbitMQ’s unacknowledged state?

Patterns

  • What is the dual-write problem? How does the outbox pattern solve it?
  • How does Debezium/CDC eliminate the need for an outbox relay process?
  • Explain idempotent consumers. How would you implement one in TypeScript?
  • What is backpressure? How do you handle it in a Kafka consumer?

Architecture

  • Design the event pipeline for an e-commerce checkout flow
  • How would you implement guaranteed-once email sending using SQS + DLQ?
  • Design a system where 5 independent services must react to every user signup event
  • A Kafka consumer group is falling behind (growing lag). What are the options?

Senior-Level

  • Why is global ordering expensive in distributed systems? How does Kafka’s per-partition ordering balance consistency and throughput?
  • Design an event sourcing system using Kafka as the event store. How do you build read models?
  • You need to process 10M events/day with <5 minute end-to-end latency. Walk through the Kafka architecture
  • How would you migrate from RabbitMQ to Kafka without downtime?