Skip to main content

Event-Driven Architecture

Event-Driven Architecture (EDA) structures applications around the production, detection, and consumption of events. An event represents a significant change in state or an occurrence within a system. Rather than services directly calling each other, they communicate by publishing events that other services independently consume and react to.

This architectural style promotes loose coupling between components. Event producers don't need to know who consumes their events, and consumers don't need to know the source. Services can be added, modified, or removed without affecting others, as long as the event contracts remain stable. This independence enables teams to work autonomously and systems to evolve without coordinated deployments.

The shift from request-response to event notification changes how we think about system design. Instead of orchestrating workflows by calling services in sequence, systems react to events as they occur. This reactive model naturally handles asynchronous operations, provides better resilience when services are temporarily unavailable, and enables real-time processing of business events.

Event Types and Classification

Events serve different purposes and have different characteristics. Understanding these distinctions helps in designing appropriate event schemas and choosing correct processing strategies.

Domain Events

Domain events capture significant business occurrences that domain experts care about. They represent facts that have happened in the business domain and are named in past tense to emphasize their immutable, historical nature.

Domain events are the core of event-driven domain models. They express the ubiquitous language of the domain, making the system's behavior explicit and auditable. When a customer opens an account, transfers money, or updates their profile, these are domain events.

/**
* Domain event representing account creation.
* Named in past tense to indicate something that has already occurred.
* Contains all information needed for consumers to react appropriately.
*/
public record AccountOpenedEvent(
String accountId,
String customerId,
AccountType accountType,
Money initialDeposit,
Instant openedAt,
String openedBy
) implements DomainEvent {

public AccountOpenedEvent {
// Validate invariants
if (initialDeposit.isNegative()) {
throw new IllegalArgumentException("Initial deposit cannot be negative");
}
}
}

// Publishing domain events from aggregate roots
@Entity
public class Account {
@Transient
private final List<DomainEvent> domainEvents = new ArrayList<>();

public static Account open(String customerId, Money initialDeposit) {
Account account = new Account();
account.customerId = customerId;
account.balance = initialDeposit;
account.status = AccountStatus.ACTIVE;

// Record the event - will be published after transaction commits
account.registerEvent(new AccountOpenedEvent(
account.id,
customerId,
account.type,
initialDeposit,
Instant.now(),
SecurityContext.getCurrentUser()
));

return account;
}

private void registerEvent(DomainEvent event) {
this.domainEvents.add(event);
}

public List<DomainEvent> getDomainEvents() {
return Collections.unmodifiableList(domainEvents);
}
}

Domain events should contain enough information for consumers to process them without additional queries. If a payment is processed, the event should include payment amount, currency, timestamp, and relevant identifiers. Consumers shouldn't need to call back to the producer to understand what happened.

Integration Events

Integration events facilitate communication between bounded contexts or separate systems. While domain events are internal to a bounded context, integration events cross boundaries and may require translation or adaptation.

Integration events often have different schemas than domain events because external consumers have different needs. The internal domain model might be too detailed or use terminology specific to one context. Integration events provide a published contract that external systems depend on, requiring careful versioning and backward compatibility.

/**
* Integration event published for external consumption.
* Simplified schema suitable for cross-system communication.
*/
public record PaymentProcessedIntegrationEvent(
String eventId, // Unique event identifier for idempotency
String eventType, // "payment.processed"
int version, // Schema version for evolution
Instant timestamp,
PaymentEventData data
) {
public record PaymentEventData(
String paymentId,
String customerId,
BigDecimal amount,
String currency,
String status
) {}
}

// Translating between domain and integration events
@Service
public class PaymentEventPublisher {
private final EventPublisher publisher;

@EventHandler
public void handlePaymentProcessed(PaymentProcessedDomainEvent domainEvent) {
// Translate domain event to integration event
PaymentProcessedIntegrationEvent integrationEvent =
new PaymentProcessedIntegrationEvent(
UUID.randomUUID().toString(),
"payment.processed",
1, // Schema version
domainEvent.timestamp(),
new PaymentEventData(
domainEvent.paymentId(),
domainEvent.customerId(),
domainEvent.amount().value(),
domainEvent.amount().currency(),
domainEvent.status().toString()
)
);

publisher.publish("payment-events", integrationEvent);
}
}

Integration events form a published contract. Once external systems depend on them, changes require careful migration. Use versioning to evolve schemas while maintaining backward compatibility (see Event Versioning).

Notification Events

Notification events inform interested parties that something happened without providing extensive details. They're lightweight signals that trigger consumers to query for current state or take simple actions.

Notification events work well when the full details aren't needed or when data is sensitive. A "CustomerAddressChanged" notification might only contain customer ID, letting the notification service query for the new address when sending mail. This avoids distributing potentially sensitive address data to all event consumers.

/**
* Lightweight notification event.
* Contains minimal data - consumers query if they need details.
*/
public record CustomerProfileUpdatedNotification(
String customerId,
Instant updatedAt,
ProfileSection section // What changed: ADDRESS, CONTACT_INFO, etc.
) {}

@Service
public class NotificationEventConsumer {

@EventHandler
public void handleProfileUpdated(CustomerProfileUpdatedNotification event) {
// Event doesn't contain the actual changes
// Consumer queries for current state if needed
if (event.section() == ProfileSection.ADDRESS) {
CustomerProfile profile = customerService.getProfile(event.customerId());
mailingListService.updateAddress(event.customerId(), profile.getAddress());
}
}
}

The trade-off with notification events is additional load on the source system - consumers query back for details. This coupling can create problems if many consumers query simultaneously after an event. Consider whether full-data events or read replicas would better serve high-volume scenarios.

Event Storming and Modeling

Event storming is a collaborative workshop technique for discovering domain events and designing event-driven systems. It brings together domain experts, developers, and stakeholders to explore the domain through events.

The process starts by identifying domain events - the things that happen in the business. Participants write events on sticky notes (traditionally orange) in past tense and place them on a timeline. This creates a shared understanding of the business process flow.

After identifying events, the group explores commands (blue sticky notes) that trigger events, actors (yellow) who issue commands, aggregates (yellow boxes) that handle commands, and policies/reactions (purple) that automatically trigger when events occur.

Event storming surfaces questions and assumptions early. When modeling account opening, you might discover that "Account Opened" means different things to different stakeholders - to compliance it means KYC passed, to operations it means the database record exists, to marketing it means the customer can receive promotions. These insights lead to clearer event definitions and better-designed boundaries.

The output of event storming sessions includes:

  • Event catalog: Comprehensive list of domain events
  • Bounded contexts: Natural groupings where certain events and concepts belong
  • Process flows: How events cascade and trigger other actions
  • Consistency boundaries: What must happen atomically vs. eventually

Use event storming when starting new projects, exploring unfamiliar domains, or redesigning existing systems. The collaborative discovery process uncovers complexity that interviews or requirements documents miss.

Event Sourcing Patterns

Event sourcing stores all changes to application state as a sequence of events rather than storing just the current state. The event stream becomes the source of truth. Current state is derived by replaying events from the beginning.

Traditional CRUD systems overwrite previous state. When a customer changes their address, the old address is lost unless explicitly historized. Event sourcing preserves complete history naturally - you have the AddressChanged events with old and new values.

The diagram below contrasts traditional state storage with event sourcing. In traditional systems, each state change overwrites previous data, losing historical information. Event sourcing appends immutable events to an append-only log, preserving every state transition. Current state is reconstructed by replaying this event stream.

This fundamental shift in data storage provides powerful capabilities. You can answer questions like "What was the account balance on January 15th?" by replaying events up to that date. When investigating a bug, replay events through fixed code to see if the issue would have occurred. For regulatory compliance, the complete audit trail shows exactly what happened, when, and in what order - essential for banking applications where every transaction must be traceable.

/**
* Event-sourced aggregate stores events, not current state.
* State is reconstructed by replaying events.
*/
public class Account {
private String accountId;
private BigDecimal balance;
private AccountStatus status;
private List<AccountEvent> uncommittedEvents = new ArrayList<>();

// Private constructor - use static factory methods
private Account() {}

/**
* Create new account by applying AccountOpened event.
* Event is stored, not the resulting state.
*/
public static Account open(String customerId, Money initialDeposit) {
Account account = new Account();
account.apply(new AccountOpenedEvent(
UUID.randomUUID().toString(),
customerId,
initialDeposit,
Instant.now()
));
return account;
}

/**
* Deposit money by applying event.
* Business logic validates, then event is applied and stored.
*/
public void deposit(Money amount) {
if (status != AccountStatus.ACTIVE) {
throw new AccountNotActiveException();
}

apply(new MoneyDepositedEvent(
accountId,
amount,
Instant.now()
));
}

/**
* Apply event to mutate internal state.
* Called when reconstructing from history or applying new events.
*/
private void apply(AccountEvent event) {
switch (event) {
case AccountOpenedEvent e -> {
this.accountId = e.accountId();
this.balance = e.initialDeposit().value();
this.status = AccountStatus.ACTIVE;
}
case MoneyDepositedEvent e -> {
this.balance = this.balance.add(e.amount().value());
}
case MoneyWithdrawnEvent e -> {
this.balance = this.balance.subtract(e.amount().value());
}
case AccountClosedEvent e -> {
this.status = AccountStatus.CLOSED;
}
}

uncommittedEvents.add(event);
}

/**
* Reconstruct account state by replaying all historical events.
*/
public static Account fromHistory(List<AccountEvent> history) {
Account account = new Account();
history.forEach(account::apply);
account.uncommittedEvents.clear(); // Historical events already persisted
return account;
}
}

Event stores append events to streams identified by aggregate ID. Each stream is the complete history for one aggregate instance. Loading an aggregate means reading its stream and replaying events to reconstruct state.

@Repository
public class EventSourcedAccountRepository {
private final EventStore eventStore;

public void save(Account account) {
List<AccountEvent> events = account.getUncommittedEvents();

// Append new events to the stream
eventStore.appendToStream(
"account-" + account.getId(),
account.getVersion(), // Optimistic concurrency check
events
);

account.markEventsAsCommitted();
}

public Account findById(String accountId) {
// Read all events for this aggregate
List<AccountEvent> events = eventStore.readStream("account-" + accountId);

if (events.isEmpty()) {
throw new AccountNotFoundException(accountId);
}

// Reconstruct state by replaying events
return Account.fromHistory(events);
}
}

Snapshots for Performance

Replaying thousands of events to reconstruct state is slow. Snapshots periodically capture current state so reconstruction only needs to replay events since the last snapshot.

public class SnapshotCapableRepository {
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
private static final int SNAPSHOT_FREQUENCY = 100;

public Account findById(String accountId) {
// Try to load latest snapshot
Optional<AccountSnapshot> snapshot = snapshotStore.getLatest(accountId);

List<AccountEvent> events;
Account account;

if (snapshot.isPresent()) {
// Replay only events after snapshot
events = eventStore.readStream(
"account-" + accountId,
snapshot.get().version() + 1
);
account = Account.fromSnapshot(snapshot.get());
} else {
// No snapshot - replay all events
events = eventStore.readStream("account-" + accountId);
account = new Account();
}

events.forEach(account::apply);
return account;
}

public void save(Account account) {
List<AccountEvent> events = account.getUncommittedEvents();
long newVersion = account.getVersion() + events.size();

eventStore.appendToStream("account-" + account.getId(), account.getVersion(), events);

// Create snapshot periodically
if (newVersion % SNAPSHOT_FREQUENCY == 0) {
snapshotStore.save(account.toSnapshot());
}

account.markEventsAsCommitted();
}
}

Snapshots are optimizations - you can always replay from the beginning if a snapshot is corrupted. Snapshot frequency balances write overhead (creating snapshots) against read performance (fewer events to replay).

Event Sourcing Benefits and Challenges

Benefits:

  • Complete audit trail: Every state change is recorded with when and why it happened
  • Temporal queries: Reconstruct state at any point in time by replaying events up to that moment
  • Event replay: Fix bugs by replaying events through corrected logic
  • Analytics: Events are perfect for business intelligence and analytics
  • Debugging: Reproduce bugs by replaying the exact sequence of events

Challenges:

  • Complexity: More complex than simple CRUD
  • Event schema evolution: Events are permanent - schema changes require careful migration
  • Eventual consistency: Read models are eventually consistent with event store
  • Deleting data: GDPR "right to be forgotten" is difficult when events are immutable
  • Performance: Replaying events requires snapshots and optimization

Event sourcing fits naturally in domains with audit requirements, complex state transitions, or where temporal queries matter. It's overkill for simple CRUD entities that don't need history.

Message Brokers and Event Streaming

Event-driven systems need infrastructure to transport events from producers to consumers. Message brokers and event streaming platforms provide reliable, scalable event delivery.

Apache Kafka

Kafka is a distributed event streaming platform that stores events in durable, ordered logs called topics. Topics are partitioned for scalability and parallelism. Each partition is an ordered sequence of events that can be read independently.

Kafka's architecture separates storage from processing. Events are retained for a configured period (days or weeks), allowing consumers to replay historical events. New consumers can join and process all historical events, unlike traditional message queues where messages are deleted after delivery.

/**
* Kafka producer configuration and event publishing.
*/
@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, AccountEvent> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

// Reliability configurations
config.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Exactly-once semantics
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, AccountEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

@Service
public class AccountEventPublisher {
private final KafkaTemplate<String, AccountEvent> kafka;

public void publish(AccountEvent event) {
// Use aggregate ID as partition key to ensure ordering
// All events for same account go to same partition
kafka.send("account-events", event.accountId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish event: {}", event, ex);
// Handle failure: retry, dead letter queue, alert
} else {
log.debug("Published event to partition {} at offset {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}

Partition keys determine which partition an event goes to. All events with the same key go to the same partition, providing ordering guarantees. For account events, use account ID as the key - all events for account-123 are ordered, but events for different accounts may be interleaved.

/**
* Kafka consumer with offset management.
*/
@Service
public class AccountEventConsumer {

@KafkaListener(
topics = "account-events",
groupId = "notification-service",
concurrency = "3" // Run 3 consumer threads
)
public void handleAccountEvent(
ConsumerRecord<String, AccountEvent> record,
Acknowledgment acknowledgment) {

AccountEvent event = record.value();
String accountId = record.key();

try {
processEvent(event);

// Manually commit offset after successful processing
// Ensures event won't be reprocessed if this consumer crashes
acknowledgment.acknowledge();

} catch (Exception e) {
log.error("Failed to process event for account {}: {}",
accountId, event, e);
// Don't acknowledge - event will be reprocessed
// Or send to dead letter topic for manual investigation
}
}

private void processEvent(AccountEvent event) {
// Idempotent processing - handle duplicates gracefully
// Event might be redelivered after crash/rebalance
}
}

Kafka consumer groups enable parallel processing. Each partition is consumed by one consumer in the group, so events in a partition are processed sequentially, but different partitions process in parallel. With 3 partitions and 3 consumers, each consumer handles one partition. With 6 partitions and 3 consumers, each handles two.

RabbitMQ

RabbitMQ is a message broker that routes messages from producers to consumers through exchanges and queues. Unlike Kafka's pull model, RabbitMQ pushes messages to consumers. Messages are typically deleted after acknowledgment, making RabbitMQ suitable for task distribution rather than event streaming.

/**
* RabbitMQ configuration with exchanges and queues.
*/
@Configuration
public class RabbitMQConfig {

@Bean
public TopicExchange accountExchange() {
return new TopicExchange("account.exchange", true, false);
}

@Bean
public Queue notificationQueue() {
return QueueBuilder.durable("account.notifications")
.withArgument("x-dead-letter-exchange", "account.dlx")
.build();
}

@Bean
public Queue auditQueue() {
return QueueBuilder.durable("account.audit").build();
}

/**
* Bind queues to exchange with routing patterns.
* Notification queue gets account.opened and account.closed events.
* Audit queue gets all account.* events.
*/
@Bean
public Binding notificationBinding() {
return BindingBuilder
.bind(notificationQueue())
.to(accountExchange())
.with("account.opened"); // Routing key pattern
}

@Bean
public Binding auditBinding() {
return BindingBuilder
.bind(auditQueue())
.to(accountExchange())
.with("account.*"); // Wildcard - matches all account events
}
}

@Service
public class AccountEventPublisher {
private final RabbitTemplate rabbitTemplate;

public void publishAccountOpened(AccountOpenedEvent event) {
rabbitTemplate.convertAndSend(
"account.exchange",
"account.opened", // Routing key
event,
message -> {
// Add message properties
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setContentType("application/json");
return message;
}
);
}
}

RabbitMQ's exchange types provide flexible routing:

  • Direct: Route to queues where routing key exactly matches binding key
  • Topic: Pattern-based routing with wildcards (* matches one word, # matches zero or more)
  • Fanout: Broadcast to all bound queues regardless of routing key
  • Headers: Route based on message header attributes

Choose RabbitMQ when you need sophisticated routing, task distribution, or request-response patterns over messaging. Choose Kafka for event streaming, event sourcing, or when consumers need to replay historical events.

Event Versioning and Schema Evolution

Events become part of your system's contract. Once published, consumers depend on them. Changing event schemas risks breaking consumers. Versioning strategies enable evolution while maintaining compatibility.

Versioned Event Types

Include version information in the event type itself. Consumers explicitly handle versions they understand.

/**
* Versioned events with explicit version in type.
*/
public sealed interface PaymentProcessedEvent {

record PaymentProcessedV1(
String paymentId,
String customerId,
BigDecimal amount,
String currency
) implements PaymentProcessedEvent {}

record PaymentProcessedV2(
String paymentId,
String customerId,
Money amount, // Changed to Money type
String currency,
PaymentMethod method, // New field
Instant processedAt // New field
) implements PaymentProcessedEvent {}
}

@Service
public class PaymentEventConsumer {

@EventHandler
public void handlePaymentProcessed(PaymentProcessedEvent event) {
switch (event) {
case PaymentProcessedV1 v1 -> handleV1(v1);
case PaymentProcessedV2 v2 -> handleV2(v2);
}
}

private void handleV1(PaymentProcessedV1 event) {
// Process old event format
// May need to fill in default values for missing fields
}

private void handleV2(PaymentProcessedV2 event) {
// Process new event format with all fields
}
}

This approach makes version handling explicit but requires consumers to update when new versions appear. Producers can publish multiple versions during migration periods, letting consumers upgrade gradually.

Schema Registry

Schema registries (like Confluent Schema Registry, AWS Glue Schema Registry) store event schemas centrally and enforce compatibility rules. Producers and consumers fetch schemas from the registry, ensuring they use compatible versions.

/**
* Using Avro with Schema Registry for compatibility enforcement.
*/
@Configuration
public class KafkaAvroConfig {

@Bean
public ProducerFactory<String, SpecificRecord> avroProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

// Schema registry enforces compatibility
config.put("schema.registry.url", "http://localhost:8081");

return new DefaultKafkaProducerFactory<>(config);
}
}

Schema registries enforce compatibility modes:

  • Backward: New schema can read old data (safe for consumers)
  • Forward: Old schema can read new data (safe for producers)
  • Full: Both backward and forward compatible
  • None: No compatibility checks

Backward compatibility is most common: add optional fields, remove optional fields, but never remove required fields or change field types. This lets old consumers continue processing new events.

Upcasting Events

Upcasting transforms old event versions to new versions when reading from storage. The event store contains historical events in various versions, but application code works with the latest version.

/**
* Upcaster transforms old events to current version.
*/
public class PaymentEventUpcaster {

public PaymentProcessedV2 upcast(PaymentProcessedEvent event) {
return switch (event) {
case PaymentProcessedV1 v1 -> new PaymentProcessedV2(
v1.paymentId(),
v1.customerId(),
new Money(v1.amount(), v1.currency()),
v1.currency(),
PaymentMethod.UNKNOWN, // Default for missing field
Instant.now() // Approximate timestamp
);
case PaymentProcessedV2 v2 -> v2; // Already latest version
};
}
}

public class EventSourcedPaymentRepository {

public Payment findById(String paymentId) {
List<PaymentProcessedEvent> events = eventStore.readStream("payment-" + paymentId);

// Upcast all events to latest version
List<PaymentProcessedV2> currentEvents = events.stream()
.map(upcaster::upcast)
.toList();

return Payment.fromHistory(currentEvents);
}
}

Upcasting centralizes version handling, letting business logic work with a single version. The cost is complexity in the upcasting layer and potential data loss when approximating missing fields.

Exactly-Once Processing and Idempotency

Distributed systems cannot guarantee messages are delivered exactly once. Messages may be delivered multiple times due to retries, network issues, or consumer crashes. Consumers must be idempotent - processing the same message multiple times produces the same result as processing it once.

Idempotent Message Processing

Store processed message IDs to detect duplicates. Before processing, check if the message was already handled.

/**
* Idempotent event consumer using database to track processed events.
*/
@Service
public class IdempotentEventConsumer {
private final ProcessedEventRepository processedEvents;
private final NotificationService notificationService;

@Transactional
public void handleAccountOpened(AccountOpenedEvent event) {
String eventId = event.eventId();

// Check if already processed
if (processedEvents.existsById(eventId)) {
log.debug("Event {} already processed, skipping", eventId);
return;
}

// Process the event
notificationService.sendWelcomeEmail(event.customerId());

// Mark as processed in same transaction
processedEvents.save(new ProcessedEvent(eventId, Instant.now()));
}
}

The database query and event processing happen in the same transaction. If processing succeeds but the consumer crashes before committing the transaction, the event will be redelivered and reprocessed. The duplicate check prevents double-processing.

Natural Idempotency

Design operations to be naturally idempotent rather than tracking processed IDs. Update operations using absolute values instead of deltas are naturally idempotent.

// Not idempotent - applying twice adds $200 instead of $100
public void updateBalance(String accountId, BigDecimal delta) {
Account account = repository.findById(accountId);
account.setBalance(account.getBalance().add(delta));
repository.save(account);
}

// Idempotent - applying twice sets balance to $500 both times
public void setBalance(String accountId, BigDecimal newBalance, long expectedVersion) {
Account account = repository.findById(accountId);

// Optimistic locking prevents conflicting updates
if (account.getVersion() != expectedVersion) {
throw new ConcurrentModificationException();
}

account.setBalance(newBalance);
account.incrementVersion();
repository.save(account);
}

Database unique constraints provide natural idempotency for insert operations. If an event triggers a user creation, put the event ID in a unique column. Duplicate events fail the unique constraint and are safely ignored.

@Entity
public class Customer {
@Id
private String customerId;

// Event ID that created this customer
@Column(unique = true, nullable = false)
private String createdByEventId;

// Other fields...
}

@Service
public class CustomerEventHandler {

@Transactional
public void handleCustomerRegistered(CustomerRegisteredEvent event) {
try {
Customer customer = new Customer();
customer.setCustomerId(event.customerId());
customer.setCreatedByEventId(event.eventId()); // Unique constraint
customer.setEmail(event.email());

customerRepository.save(customer);

} catch (DataIntegrityViolationException e) {
// Duplicate event - customer already created
log.debug("Customer {} already exists, event {} already processed",
event.customerId(), event.eventId());
}
}
}

Dead Letter Queues and Error Handling

Not all events can be successfully processed. Messages might be malformed, reference data that doesn't exist, or trigger bugs in consumer code. Dead letter queues (DLQ) capture failed messages for manual investigation.

/**
* Error handling with retry and dead letter queue.
*/
@Service
public class ResilientEventConsumer {
private final AccountService accountService;
private final KafkaTemplate<String, AccountEvent> deadLetterProducer;

@KafkaListener(topics = "account-events", groupId = "payment-service")
public void handleAccountEvent(
ConsumerRecord<String, AccountEvent> record,
Acknowledgment ack) {

try {
AccountEvent event = record.value();
processEvent(event);
ack.acknowledge();

} catch (RecoverableException e) {
// Transient error - don't acknowledge, will retry
log.warn("Recoverable error processing event, will retry: {}", e.getMessage());
// Message will be redelivered

} catch (UnrecoverableException e) {
// Permanent error - send to DLQ
log.error("Unrecoverable error, sending to DLQ", e);
sendToDeadLetterQueue(record, e);
ack.acknowledge(); // Don't reprocess
}
}

private void sendToDeadLetterQueue(
ConsumerRecord<String, AccountEvent> record,
Exception error) {

DeadLetterRecord<AccountEvent> dlr = new DeadLetterRecord<>(
record.value(),
record.topic(),
record.partition(),
record.offset(),
error.getClass().getName(),
error.getMessage(),
Instant.now()
);

deadLetterProducer.send("account-events.dlq", dlr);
}
}

Dead letter queues should include metadata about the failure: original topic, partition, offset, error type, and timestamp. This helps diagnose issues and potentially replay messages after fixing bugs.

Monitor DLQ depth. Growing dead letter queues indicate systemic problems. Investigate failures, fix bugs, and consider replaying valid messages that failed due to transient issues.

Event Choreography vs Orchestration

Events can coordinate workflows in two ways: choreography (decentralized) and orchestration (centralized).

Choreography

In choreography, services react to events independently. There's no central coordinator - the workflow emerges from services publishing and consuming events.

// Order service publishes OrderCreated
@Service
public class OrderService {
public Order createOrder(OrderRequest request) {
Order order = new Order(request);
orderRepository.save(order);

eventPublisher.publish(new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getItems()
));

return order;
}
}

// Payment service reacts to OrderCreated
@Service
public class PaymentService {
@EventHandler
public void handleOrderCreated(OrderCreatedEvent event) {
Payment payment = processPayment(event);

eventPublisher.publish(new PaymentProcessedEvent(
payment.getId(),
event.orderId(),
payment.getAmount()
));
}
}

// Inventory service reacts to PaymentProcessed
@Service
public class InventoryService {
@EventHandler
public void handlePaymentProcessed(PaymentProcessedEvent event) {
reserveItems(event.orderId());

eventPublisher.publish(new ItemsReservedEvent(
event.orderId()
));
}
}

Choreography is decentralized and loosely coupled. Services don't know about each other - they only react to events. Adding new participants means subscribing to relevant events without modifying existing services.

The downside is observability. The complete workflow isn't explicit anywhere. You must trace through multiple services to understand the flow. When something goes wrong, diagnosing which service failed and why requires distributed tracing and correlation IDs.

Orchestration

Orchestration uses a central coordinator (orchestrator or saga) that explicitly manages the workflow. The orchestrator tells each service what to do and handles the overall process.

/**
* Order saga orchestrates the entire order fulfillment workflow.
*/
@Service
public class OrderFulfillmentSaga {
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final ShippingService shippingService;
private final SagaRepository sagaRepository;

public void fulfillOrder(String orderId) {
SagaInstance saga = new SagaInstance(orderId);
sagaRepository.save(saga);

try {
// Step 1: Process payment
saga.recordStep("processing-payment");
Payment payment = paymentService.processPayment(orderId);
saga.recordStep("payment-completed", payment.getId());

// Step 2: Reserve inventory
saga.recordStep("reserving-inventory");
Reservation reservation = inventoryService.reserve(orderId);
saga.recordStep("inventory-reserved", reservation.getId());

// Step 3: Create shipment
saga.recordStep("creating-shipment");
Shipment shipment = shippingService.createShipment(orderId);
saga.recordStep("shipment-created", shipment.getId());

saga.markCompleted();

} catch (PaymentFailedException e) {
saga.markFailed("payment-failed");
// No compensations needed - nothing succeeded yet

} catch (InventoryException e) {
saga.markFailed("inventory-failed");
// Compensate: refund payment
paymentService.refund(saga.getPaymentId());

} catch (ShippingException e) {
saga.markFailed("shipping-failed");
// Compensate: release inventory and refund payment
inventoryService.release(saga.getReservationId());
paymentService.refund(saga.getPaymentId());
}

sagaRepository.save(saga);
}
}

Orchestration makes workflows explicit and easier to understand. The saga code shows the complete flow and handles compensations in one place. This simplifies debugging and monitoring.

The trade-off is coupling. The orchestrator knows about all participants and must change when new steps are added. The orchestrator becomes a potential bottleneck and single point of failure, requiring high availability.

Choose choreography when services are truly independent and you value loose coupling over explicit workflow visibility. Choose orchestration when workflows are complex, require compensation, or need centralized monitoring and control. See Saga Pattern for detailed compensation strategies.

CQRS with Events

Command Query Responsibility Segregation (CQRS) separates write and read models. Commands modify state, queries return data. Events connect the two - the write model publishes events that the read model consumes to build optimized projections.

CQRS recognizes that write operations and read operations have fundamentally different requirements. Writes need to enforce business rules, maintain consistency, and validate invariants. Reads need speed, convenience, and data shaped for specific use cases. By separating these concerns, each model can be optimized independently.

The separation enables powerful optimization strategies. The write model uses normalized database schemas optimized for consistency and transactions. Read models use denormalized schemas optimized for query performance - you might have one read model with payment data joined with customer names for display, another with payment data aggregated for analytics, and a third indexed in Elasticsearch for full-text search. Each read model independently subscribes to the event stream and builds exactly the data structure it needs.

/**
* Write model - handles commands, publishes events.
*/
@Service
public class AccountCommandService {
private final AccountRepository accountRepository;
private final EventPublisher eventPublisher;

public void openAccount(OpenAccountCommand command) {
Account account = Account.open(
command.customerId(),
command.initialDeposit()
);

accountRepository.save(account);

eventPublisher.publish(new AccountOpenedEvent(
account.getId(),
account.getCustomerId(),
account.getBalance()
));
}
}

/**
* Read model - consumes events, builds optimized query view.
*/
@Service
public class AccountQueryService {
private final AccountSummaryRepository summaryRepository;
private final CustomerServiceClient customerClient;

/**
* Fast query from denormalized read model.
* Includes customer name from different service.
*/
public AccountSummary getAccountSummary(String accountId) {
return summaryRepository.findById(accountId)
.orElseThrow(() -> new AccountNotFoundException(accountId));
}

/**
* Update read model by consuming events.
* Denormalize customer data to avoid joins.
*/
@EventHandler
public void handleAccountOpened(AccountOpenedEvent event) {
// Fetch customer name to denormalize in read model
Customer customer = customerClient.getCustomer(event.customerId());

AccountSummary summary = new AccountSummary(
event.accountId(),
event.customerId(),
customer.getName(), // Denormalized
event.balance(),
AccountStatus.ACTIVE
);

summaryRepository.save(summary);
}

@EventHandler
public void handleMoneyDeposited(MoneyDepositedEvent event) {
AccountSummary summary = summaryRepository.findById(event.accountId())
.orElseThrow();

summary.addToBalance(event.amount());
summaryRepository.save(summary);
}
}

CQRS with events enables multiple read models optimized for different queries. One read model might denormalize for fast dashboard queries, another structures data for reporting, another provides search indexes.

/**
* Multiple read models for different query patterns.
*/

// Read model 1: Fast account summary queries
@Entity
public class AccountSummary {
@Id
private String accountId;
private String customerId;
private String customerName; // Denormalized
private BigDecimal balance;
private AccountStatus status;
}

// Read model 2: Transaction history for reporting
@Entity
public class TransactionHistory {
@Id
private String transactionId;
private String accountId;
private TransactionType type;
private BigDecimal amount;
private Instant timestamp;
private String description;
}

// Read model 3: Search index for full-text search
public class AccountSearchDocument {
private String accountId;
private String customerName;
private String accountType;
private List<String> tags;
// Optimized for Elasticsearch/Solr
}

Each read model consumes the same events but projects them differently. The write model remains focused on enforcing business rules. Read models can be rebuilt by replaying events if corrupted or when new query patterns emerge.

The eventual consistency between write and read models requires UX considerations. After creating an account, it might not immediately appear in search results. Show "Processing..." states or redirect to detail pages rather than lists to hide the delay.

Further Reading

  • Books:

    • "Designing Data-Intensive Applications" by Martin Kleppmann - Chapter 11 on stream processing covers event-driven architecture comprehensively
    • "Event-Driven Architecture: How SOA Enables the Real-Time Enterprise" by Hugh Taylor et al.
    • "Enterprise Integration Patterns" by Gregor Hohpe and Bobby Woolf - Foundational messaging patterns
  • Domain-Driven Design & Event Sourcing:

    • "Domain-Driven Design Distilled" by Vaughn Vernon - Chapters on events and event sourcing
    • "Implementing Domain-Driven Design" by Vaughn Vernon - Practical event sourcing implementation
    • "Versioning in an Event Sourced System" by Greg Young - Comprehensive guide to event versioning
  • Event Storming:

    • "Introducing EventStorming" by Alberto Brandolini - Complete guide from the creator of event storming
    • EventStorming.com - Resources, patterns, and examples
  • Technology-Specific:

  • Patterns & Best Practices: