Request Coalescing in a Kafka Consumer
Overview#
Discord’s post on how they store trillions of messages covers request coalescing as a way to collapse duplicate reads under concurrent load. The same problem appeared in a Kafka consumer I built for processing user-change events.
Each event represents a change to user-related state and is produced to a Kafka
topic keyed by accountId. All events for the same account land on the same
partition; each pod owns one or more partitions. For every message, the consumer
calls an internal API to fetch the current account state and writes the result
to Spanner.
The constraint: the internal API has a 1500 TPS ceiling shared across the entire company. Without coalescing, events for the same account arriving 2–3 seconds apart each trigger a separate API call, often before the upstream state has fully settled. Those calls usually return the same effective account state, but each one still consumes part of the shared rate limit. At scale, unnecessary duplicate calls erode the shared budget for every other team consuming the same API.
Architecture#
User-change events
from upstream pipeline
│
│ upstream state settles after ~20s
▼
┌───────────────────────────────┐
│ Kafka topic │
│ key = accountId │
└───────────────┬───────────────┘
│
│ same accountId → same partition
▼
┌───────────────────────────────┐
│ Consumer pod │
│ │
│ account X message │
│ │ │
│ ▼ │
│ singleflight.Do("account-X") │
│ │ │
│ ├─ later account X │
│ │ messages wait here │
│ │ │
│ ▼ │
│ wait 30s │
│ │ │
│ ▼ │
│ call Internal API once │
│ │ │
│ ▼ │
│ write result to Spanner │
│ │ │
│ ▼ │
│ commit Kafka offset │
│ │
│ account Y uses another group │
│ and can run in parallel │
└───────────────────────────────┘
│
├──────────────► Internal API
│ source of truth
│ 1500 TPS shared
│
└──────────────► Cloud Spanner
durable write target
Stack#
- Go — consumer service
golang.org/x/sync/singleflight— per-accountIdcall deduplication- Apache Kafka — event stream, partitioned by
accountId - Cloud Spanner — write target; chosen for strong consistency, managed replication, and lower operational overhead for a small team
Highlights#
Kafka partitioning is load-bearing, not incidental.
Routing by accountId as the message key guarantees all events for one account
hit the same partition and are processed by one consumer instance. This matters
because singleflight is in-process. It does not deduplicate work across pods.
Without this partitioning strategy, two pods could independently fire API calls
for the same account at the same time, and singleflight would not help. The
Kafka key is therefore part of the coalescing design, not just a stream-routing
detail.
The 30-second sleep is deliberate, not a workaround.
The upstream pipeline takes roughly 20 seconds to apply business logic and
commit state. The consumer waits 30 seconds before calling the API, giving the
upstream time to settle.
Messages for the same accountId that arrive during this wait join the existing
singleflight group and block. They never reach the API independently. When the
call completes, all of them share the result.
Coalesced messages become eligible to commit after the shared call resolves.
Messages waiting on the same singleflight call are not acknowledged until the
shared API call and Spanner write complete. In the production implementation,
offset handling is coordinated outside the fetch function, so the consumer
avoids committing messages before the shared result has been durably written.
This keeps the processing model simple: fetch once, write once, then acknowledge
the messages that depended on that result.
Coalescing protects a shared rate limit, not just throughput.
The internal API enforces a 1500 TPS ceiling across the whole company. Each
unnecessary duplicate call consumes capacity that other teams depend on.
singleflight makes the consumer a cooperative user of that shared resource
rather than a potential source of contention. After deployment, API call volume
dropped 36% — measured via consumer metrics sent to Datadog.
Different accountIds run in parallel.
singleflight groups are keyed by accountId. An in-flight call for account X
does not block processing for account Y. Each account gets its own group, its own
30-second window, and its own API call.
Code#
Illustrative only — this is not the production implementation.
var group singleflight.Group
func processMessage(ctx context.Context, accountID string) error {
result, err, _ := group.Do(accountID, func() (interface{}, error) {
// All messages for the same accountID block here.
// Only the first goroutine executes this function.
// Wait for the upstream pipeline to settle before fetching.
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
}
return fetchFromInternalAPI(ctx, accountID)
})
if err != nil {
return err
}
accountData, ok := result.(*AccountData)
if !ok {
return fmt.Errorf("unexpected result type: %T", result)
}
if err := writeToSpanner(ctx, accountData); err != nil {
return err
}
// Caller commits its Kafka offset after this returns.
// In production, offset handling is coordinated outside this function.
return nil
}
Status#
This pattern is currently running in production. API call reduction is tracked as an ongoing Datadog metric.