Event Consumer

A service that consumes messages from a broker (Kafka, SQS, RabbitMQ, Pub/Sub). Brief sketch.

A consumer of messages from Kafka, SQS, RabbitMQ, Pub/Sub, or similar. Reads messages, processes them, often updates state and produces downstream messages. The “public interface” is the topic or queue and the schema of messages on it.

This pattern has problems the API provider and API consumer patterns don’t have: ordering, replay, poison messages, dead-letter queues, and delivery semantics (at-most-once, at-least-once, exactly-once-with-effort).

What needs covered

LayerConcernTest type
Message handlerPure transformation per messageSolitary unit tests
IdempotencySame message twice produces the same effectIn-process component tests
Poison message handlingMalformed message goes to DLQ, doesn’t crash the consumerIn-process component tests
OrderingOut-of-order messages produce documented outcomesIn-process component tests
BackpressureConsumer slows when downstream is slowResilience component tests
Broker contractTopic, schema, headersContract tests
Broker clientReal protocol behavior, offset commits, consumer group rebalancingAdapter integration tests against a real broker container
Event Consumer: layers and the tests that cover eachSix architectural layers stacked top to bottom. The first five (message handler logic, idempotency and ordering, dead-letter and poison-message handling, backpressure, and broker client) are inside the component boundary. Below the dashed component boundary, the external broker and schema registry are drawn with a dashed border. Each band shows its name, a one-line description, and the test types that exercise it as small coloured pills. Solitary unit tests cover handler logic. Component tests cover idempotency, dead-letter handling, ordering, and backpressure with the broker doubled. Adapter integration tests pin the broker protocol against a real broker container. Broker contract tests pin the topic, schema, and headers. Out-of-band synthetic publish confirms doubles still match the real broker.Event Consumer: Layers and the Tests That Cover EachINSIDE THE COMPONENT BOUNDARYMessage handler logicSolitary unitComponentPure transformation per messageIdempotency and orderingComponentDuplicate delivery absorbed; ordering policy enforcedDead-letter and poison-message handlingComponentMalformed message routed to DLQ with correlation ID; consumer survivesBackpressureComponentConsumer slows when downstream is slow; offsets uncommitted on failureBroker clientAdapter integrationBrk.Comp.Protocol, offset commits, consumer-group rebalancingcomponent boundaryOUTSIDE THE BOUNDARYExternal broker and schema registryComponentAdapter integ.Broker contractOOBDoubled in component; real in adapter integration; OOB synthetic publish on a schedule.internal layerreal code under testexternal (dashed border)doubled in this test
Layered diagram of an event consumer with six architectural layers. The first five (message handler logic, idempotency and ordering, dead-letter and poison-message handling, backpressure, broker client) are inside the component boundary. Below the dashed boundary, the external broker and schema registry are drawn with a dashed border. Solitary unit tests cover handler logic. Component tests cover idempotency, dead-letter handling, ordering, and backpressure with the broker doubled. Adapter integration tests pin the broker protocol against a real broker container. Broker contract tests pin the topic, schema, and headers. Out-of-band synthetic publish confirms the doubles still match the real broker.

Positive test cases

Common cases to consider, not an exhaustive list. Drop items that don’t apply and add ones the pattern doesn’t mention but your component needs.

  • Well-formed message: produces the expected state change and the documented downstream events.
  • Batch processing: processes per documented policy.
  • Replay from offset: reproduces the same end state.
  • Documented schema versions: are accepted.

Negative test cases

Common cases to consider, not an exhaustive list. Drop items that don’t apply and add ones the pattern doesn’t mention but your component needs.

  • Malformed message: routes to the DLQ with a correlation ID; the consumer survives.
  • Duplicate delivery: absorbed by idempotency.
  • Out-of-order delivery: follows the documented behavior.
  • Mid-batch downstream failure: the offset is left uncommitted.
  • Schema-version skew: handled per the documented policy.
  • Slow downstream: applies backpressure rather than OOM.
  • Consumer-group rebalance during processing: no in-flight messages are stranded.

Test double validation

The broker double in component tests is validated by adapter integration tests against a real broker container the team controls (Kafka in Docker, ElasticMQ for SQS, Redpanda in Docker). The test exercises the broker client adapter against that controlled instance and asserts the adapter speaks the protocol correctly - it does not assert anything about which messages the broker returns or in what order; that is the broker’s behavior, not the adapter’s. Schema registry double is validated by contract tests pinning each version, plus a post-deploy check against the real registry. Post-deploy synthetic publishes a known message to the real topic in a non-prod environment.

Pipeline placement

Handler unit tests and component tests run in CI Stage 1; adapter integration tests against a team-controlled broker container in CI Stage 1 or Stage 2; adapter integration tests against a managed broker the team can’t pin to a known state run out-of-band on a schedule, alongside the post-deploy synthetic.

Example: idempotency under duplicate delivery

Money.usd takes minor units (cents); 4250 represents $42.50.

@Test
void same_message_processed_twice_creates_one_payment_record() {
  PaymentEvent event = new PaymentEvent(
      "evt-9f12", OrderId.of("ord-001"), Money.usd(4250));
  PaymentRepo repo = new InMemoryPaymentRepo();
  PaymentEventHandler handler = new PaymentEventHandler(repo);

  handler.handle(event);
  handler.handle(event);

  assertThat(repo.findByEventId("evt-9f12")).hasSize(1);
  assertThat(repo.totalForOrder(OrderId.of("ord-001"))).isEqualTo(Money.usd(4250));
}
[Fact]
public void Same_message_processed_twice_creates_one_payment_record()
{
    var evt = new PaymentEvent("evt-9f12", OrderId.Of("ord-001"), Money.Usd(4250));
    var repo = new InMemoryPaymentRepo();
    var handler = new PaymentEventHandler(repo);

    handler.Handle(evt);
    handler.Handle(evt);

    repo.FindByEventId("evt-9f12").Should().HaveCount(1);
    repo.TotalForOrder(OrderId.Of("ord-001")).Should().Be(Money.Usd(4250));
}
test("same message processed twice creates one payment record", () => {
  const event = new PaymentEvent(
    "evt-9f12", OrderId.of("ord-001"), Money.usd(4250));
  const repo = new InMemoryPaymentRepo();
  const handler = new PaymentEventHandler(repo);

  handler.handle(event);
  handler.handle(event);

  expect(repo.findByEventId("evt-9f12")).toHaveLength(1);
  expect(repo.totalForOrder(OrderId.of("ord-001"))).toEqual(Money.usd(4250));
});