Consolidating a Noisy Kafka Topic with a Debounced Aggregator
Table of Contents
When an order is short on stock at a warehouse, the system creates an internal transfer request to move inventory from another location. The existing flow publishes one Kafka message per order, which triggers one transfer request per order.
During peak hours, a single warehouse can generate hundreds of these messages in minutes. Each one spawns a small transfer note. The warehouse team ends up processing dozens of tiny requests for the same destination when a single consolidated batch would do.
Before: Order A001 → transfer #1 for WH-42 (100 transfers in 5 min)
Order A002 → transfer #2 for WH-42
Order A003 → transfer #3 for WH-42
...
After: WH-42 (1 consolidated batch) → downstream
The fix: a debounced aggregator between the per-order topic and downstream systems. It buffers by warehouse and publishes consolidated batches when the timing rules are met.
Part 1: Problem and Architecture⌗
Problem Statement⌗
The core issue is not just “too many messages.” It is that each order-level event immediately becomes warehouse-level operational work. During noisy periods this creates:
- many tiny transfer notes for the same destination,
- high processing overhead in downstream systems,
- unstable load spikes instead of predictable batch-shaped traffic.
We want to preserve correctness while changing the shape of traffic from “per-order stream” to “per-warehouse batches.”
High-Level Flow⌗
The aggregator is a separate service consuming the existing topic, so upstream producers stay untouched.
The service runs two phases:
- Ingest phase: Kafka message -> UPSERT into MySQL buffer (
PENDING). - Flush phase: poller claims ready rows, aggregates them, sends one batch message, marks rows
SENT.
Kafka remains the source of truth. The MySQL table is a derived buffer for controlled batch construction.
Ingest Phase⌗
Ingest is intentionally simple: one row per (partition_key, location_id, order_id).
If the order appears again while still pending, we update payload and timestamp.
INSERT INTO event_buffer_table
(partition_key, location_id, order_id, payload, status)
VALUES (?, ?, ?, ?, 'PENDING')
ON DUPLICATE KEY UPDATE
payload = IF(status = 'PENDING', VALUES(payload), payload),
updated_at = IF(status = 'PENDING', NOW(), updated_at);
Offset commit is manual (enable.auto.commit: false) and only happens after DB write succeeds.
If the service crashes before commit, Kafka replays and we safely re-process.
Flush Phase⌗
A poller (e.g. every 30s) flushes a warehouse bucket when either condition is true:
- Debounce idle: no new update for 5 minutes.
- Hard window: oldest pending row is 30 minutes old.
The flush runs in two stages:
- Claim rows with a generated
batch_id(bounded bymax_batch_size). - Send + mark sent for claimed batches.
-- Step 1. Claim phase: find flushable warehouse buckets (PENDING only)
SELECT location_id
FROM event_buffer_table
WHERE status = 'PENDING'
GROUP BY location_id
HAVING MAX(updated_at) < NOW() - INTERVAL 5 MINUTE
OR MIN(created_at) < NOW() - INTERVAL 30 MINUTE;
-- Step 2. Claim phase: assign immutable batch_id for one bucket
SET @batch_id := UUID();
UPDATE event_buffer_table
SET status = 'CLAIMED', batch_id = @batch_id, claimed_at = NOW()
WHERE status = 'PENDING' AND location_id = ?
ORDER BY created_at ASC
LIMIT 500;
-- Step 3. Flush phase (separate worker/tick): load all claimed rows
SELECT batch_id, payload
FROM event_buffer_table
WHERE status = 'CLAIMED'
ORDER BY claimed_at ASC, batch_id ASC;
-- Step 4. Process claimed batches in application code, then mark rows as SENT.
Part 2: Failure Modes and How We Handle Them⌗
Ingest Failure Modes⌗
What if Kafka offset is committed before DB write is durable?⌗
Why dangerous: A crash after offset commit but before DB commit loses the message from processing history.
Design response: Manual offset commit strictly after successful DB write.
Why this works: Crash before commit means Kafka replay; crash after commit means row is already durable.
Tradeoff: End-to-end processing latency is tied to DB write latency.
Flush Failure Modes⌗
What if there are many messages in a 30-minute interval and we crash by OOM?⌗
Why dangerous: Waiting for only a hard 30-minute window can let one bucket grow too large. If the process dies, recovery can become slow and bursty.
Design response: Debounce idle flushes earlier (5-minute quiet period), and claim is capped by max_batch_size.
Why this works: Work is split across multiple smaller batches instead of one giant flush. Latency remains bounded, and memory pressure is reduced.
Tradeoff: More frequent flushes can slightly reduce maximum batch compactness.
What if a warehouse is so noisy that debounce never triggers?⌗
Why dangerous: With a constant trickle, updated_at keeps moving and idle condition never becomes true.
Rows can stay pending indefinitely.
Design response: Keep a hard 30-minute window from the oldest pending row.
Why this works: Even without quiet periods, each bucket is forced to progress. Debounce optimizes for quiet queues; hard window guarantees liveness for noisy queues.
Tradeoff: Some flushes happen before an ideal batch size is reached.
What if sending succeeds but service crashes before marking sent, then reruns?⌗
Why dangerous: Without stable identity, retries can regroup orders inconsistently (A with B first, then A with C later), causing consistency pain downstream.
Design response: Assign batch_id during claim and never mutate it until status is SENT.
If retry happens, the same claimed rows are re-sent with the same batch_id.
Why this works: batch_id becomes an idempotency key for downstream dedupe.
We keep at-least-once delivery while avoiding duplicated business effects.
Tradeoff: Downstream consumers must enforce dedupe on batch_id.
What if flush queries become slow under heavy load?⌗
Why dangerous: Large unbounded flushes increase lock time, query latency, and tail risk.
Design response: Two-stage claim/flush with bounded claim size (max_batch_size), and retry from CLAIMED state.
If Kafka is temporarily unavailable after claim, rows stay CLAIMED and are picked up on the next tick.
Why this works: Each tick does bounded work. Previously claimed but unsent batches are naturally retried.
Tradeoff: A single warehouse may need multiple ticks before all rows are drained.
What if flush jobs run at the same time in a multi-pod deployment?⌗
Why dangerous: Two pods can claim and flush concurrently, producing undefined behavior across claim ordering, contention, and retry timing. Higher isolation levels such as repeatable read can reduce anomalies, but they still do not give explicit global “single runner” intent.
Design response: Add a short-lived Redis lock key before each flush tick. The flush job times out after 1 minute, as set by us, so the lock key TTL is also 1 minute. Only the pod that acquires the key executes the flush job; others skip that tick.
Why this works: It enforces one active flush worker at a time across pods while automatically recovering if a pod crashes (key expires quickly).
Tradeoff: Flush cadence now depends on lock health and Redis availability.
What if the buffer table keeps growing forever?⌗
Why dangerous: Naive bulk DELETE for old rows causes heavy InnoDB fragmentation and throughput degradation.
Design response: MySQL monthly range partitioning plus scheduled partition drop, guarded by “no unsent rows” check.
Why this works: Dropping a partition is metadata-driven and avoids large row-by-row delete churn.
Tradeoff: Partition management adds operational discipline and guard checks.
Downstream Contract⌗
Downstream consumes one consolidated message per warehouse flush:
- key:
location_id - identity:
batch_id - payload:
order_ids, aggregateditems, andflushed_at
Downstream must deduplicate by batch_id to keep processing idempotent.
TL;DR⌗
- Problem: Per-order events created noisy, fragmented warehouse operations.
- Mechanism: Debounced ingest/flush pipeline with hard-window fallback, capped claims, and single-run Redis lock in multi-pod mode.
- Guarantee: At-least-once delivery with stable
batch_idfor downstream idempotent handling. - Operations: Partition-based cleanup keeps the buffer healthy over time.