Skip to main content

Event-Driven Architecture on AWS

Overview

AWS provides a comprehensive suite of messaging and event services that enable event-driven architectures at scale. These services facilitate asynchronous communication between distributed components, allowing systems to be loosely coupled, resilient, and independently scalable.

This guide focuses on the AWS-specific implementations and integrations of event-driven patterns. For foundational event-driven architecture concepts - including event types, event storming, event sourcing, CQRS, choreography vs orchestration, and message broker patterns - see the comprehensive Event-Driven Architecture guide. This document covers how to apply those patterns using AWS services.

Event-driven systems replace synchronous request-response patterns with asynchronous message passing. Instead of services calling each other directly, they publish messages to intermediary services (queues, topics, event buses) that deliver messages to consumers. This decoupling provides several critical benefits:

Resilience: When a consumer service is down or slow, messages accumulate in queues rather than causing failures in producers. Services can process messages when they recover, preventing cascading failures.

Scalability: Producers and consumers scale independently based on their own workloads. If message processing is slow, add more consumer instances without touching producer code. If message volume increases, producers can continue publishing while the queue absorbs bursts.

Flexibility: Multiple consumers can independently process the same events without coordination. Adding new functionality means adding new consumers, not modifying existing services. Services can be deployed, updated, or removed without impacting others as long as message contracts remain stable.

Temporal decoupling: Producers don't wait for consumers to process messages. Operations complete immediately after publishing messages, improving response times for users while work happens asynchronously in the background.

Core Principles

  1. Choose the right service for your pattern - SQS for task queues, SNS for pub-sub fan-out, EventBridge for event routing and filtering, Kinesis for real-time streaming
  2. Design for idempotency - Messages may be delivered multiple times; ensure processing the same message twice produces the same result
  3. Implement proper error handling - Use dead-letter queues for failed messages, exponential backoff for retries, and separate retry and poison message handling
  4. Maintain message ordering when needed - Use SQS FIFO queues or Kinesis partition keys when order matters; standard queues provide better throughput when order doesn't matter
  5. Monitor message flow - Track queue depth, age of oldest message, dead-letter queue growth, and consumer lag to identify bottlenecks and failures

Messaging Services Comparison

AWS offers four primary services for event-driven communication, each optimized for different patterns and use cases. Understanding when to use each service is critical for building appropriate solutions.

SQS (Simple Queue Service)

Purpose: Point-to-point messaging for distributing tasks across multiple workers.

When to use:

  • Distributing work across multiple consumers (job processing, background tasks)
  • Decoupling services to handle load spikes (orders, payment processing)
  • Buffering requests between services with different processing speeds
  • Ensuring tasks complete even when consumers are temporarily unavailable

Key characteristics:

  • Standard queues: At-least-once delivery, best-effort ordering, unlimited throughput
  • FIFO queues: Exactly-once processing, strict ordering, 300 TPS (3,000 TPS with batching)
  • Messages retained 4 days default (1 minute to 14 days configurable)
  • Consumers poll queues (pull model)
  • Visibility timeout prevents duplicate processing
  • Dead-letter queues for failed messages

Primary use case: Task queues where each message should be processed by exactly one consumer.

SNS (Simple Notification Service)

Purpose: Pub-sub messaging for broadcasting messages to multiple subscribers simultaneously.

When to use:

  • Broadcasting events to multiple consumers (order placed → inventory, shipping, analytics)
  • Fan-out pattern where one message triggers multiple actions
  • Mobile push notifications, SMS, email notifications
  • Integrating with AWS services (Lambda, SQS, HTTP endpoints)

Key characteristics:

  • Push model (SNS pushes to subscribers)
  • Multiple subscription types (SQS, Lambda, HTTP, email, SMS, mobile push)
  • Message filtering based on attributes
  • FIFO topics for ordered pub-sub (300 TPS)
  • No message persistence (messages lost if no subscribers)

Primary use case: Broadcasting the same message to multiple independent consumers who process it differently.

EventBridge

Purpose: Event bus for content-based routing, filtering, and transformation of events.

When to use:

  • Content-based routing (route events to different targets based on event content)
  • Integrating with SaaS applications (Salesforce, Shopify, Auth0)
  • Schema discovery and validation (schema registry)
  • Event archiving and replay for debugging or analytics
  • Complex event filtering beyond simple attribute matching
  • Cross-account and cross-region event delivery

Key characteristics:

  • Rules match event patterns and route to targets
  • Schema registry automatically discovers and versions event schemas
  • Archive and replay events for testing and recovery
  • 100+ SaaS integrations
  • 300 destinations per rule
  • Transformations before delivery

Primary use case: Complex event routing, filtering, and integration scenarios requiring advanced capabilities beyond basic pub-sub.

Kinesis

Purpose: Real-time streaming platform for ordered, high-throughput data processing.

When to use:

  • Real-time analytics and metrics aggregation
  • Log and event data collection from multiple sources
  • Ordered processing of events (by partition key)
  • Multiple consumers reading the same stream simultaneously
  • Replay capability for reprocessing historical data

Key characteristics:

  • Data Streams: Real-time streaming with ordering per shard
  • Data Firehose: Managed delivery to S3, Redshift, Elasticsearch, Splunk
  • Messages retained 24 hours default (up to 365 days)
  • Multiple consumers can read the same stream
  • Shards provide ordering and parallelism
  • Enhanced fan-out for dedicated throughput per consumer

Primary use case: Real-time data streaming with ordered processing and replay capability.

Service Selection Matrix

RequirementSQSSNSEventBridgeKinesis Streams
PatternPoint-to-point queuePub-sub broadcastEvent routing & filteringOrdered streaming
ConsumersOne per messageMany simultaneousMany based on rulesMany reading same stream
OrderingFIFO: strict
Standard: best-effort
FIFO: strict
Standard: none
No guaranteeStrict per shard
DeliveryPull (polling)PushPushPull (polling)
ThroughputStandard: unlimited
FIFO: 3K TPS
Standard: 100K/topic
FIFO: 300 TPS
Unlimited1 MB/sec per shard
Retention1 min - 14 daysNone (immediate)Archive: indefinite24 hrs - 365 days
FilteringNoBasic attributesAdvanced pattern matchingNo (in stream)
ReplayNoNoYes (from archive)Yes (from stream)
Cost ModelPer requestPer message + deliveryPer eventPer shard-hour + PUT

Decision guidelines:

Use SQS Standard when you need simple task distribution with high throughput and order doesn't matter. Example: processing image uploads, sending emails, background job processing.

Use SQS FIFO when you need guaranteed ordering and exactly-once processing. Example: bank transactions for a single account, order processing, inventory updates.

Use SNS when one event should trigger multiple independent actions simultaneously. Example: order placed → update inventory, charge payment, send confirmation, trigger analytics.

Use SNS → SQS fan-out when you need reliable pub-sub with individual consumer processing control. Each subscriber gets its own SQS queue, enabling independent retry logic and rate limiting.

Use EventBridge when you need sophisticated routing based on event content, SaaS integration, or schema validation. Example: route different order types to different processors, integrate Stripe events, archive for compliance.

Use Kinesis when you need ordered processing of high-volume streams with replay capability. Example: clickstream analytics, IoT sensor data, application logs, metrics aggregation.

For foundational patterns like choreography vs orchestration, event sourcing, and CQRS, see Event-Driven Architecture.

SQS (Simple Queue Service)

SQS provides managed message queues for decoupling services and distributing work across multiple consumers. Queues act as buffers between producers and consumers, allowing them to operate at different rates and providing resilience when consumers are unavailable.

Standard vs FIFO Queues

SQS offers two queue types with fundamentally different characteristics and trade-offs:

Standard Queues provide maximum throughput with at-least-once delivery and best-effort ordering:

  • Unlimited throughput: No practical limit on messages per second
  • At-least-once delivery: Messages may be delivered more than once (idempotent processing required)
  • Best-effort ordering: Messages generally arrive in order but occasionally may be out of sequence
  • Use when: High throughput is critical and your processing is naturally idempotent

FIFO Queues provide strict ordering and exactly-once processing with lower throughput:

  • 300 TPS per action (3,000 TPS with batching via 10 messages per request)
  • Exactly-once processing: Each message delivered once and remains available until consumer deletes it
  • Strict ordering: Messages delivered in the exact order sent within a message group
  • Use when: Order matters or you need exactly-once guarantees (financial transactions, ordered workflows)

When FIFO is required: Use FIFO when the order of operations matters for correctness. Bank account transactions must process in order - depositing $100 then withdrawing $50 is different from withdrawing $50 then depositing $100 if the balance starts at $40. Order processing workflows often require FIFO - you can't ship an order before confirming payment.

When Standard suffices: Use Standard when messages are independent or naturally idempotent. Processing image uploads doesn't require order. Sending welcome emails can handle duplicates by checking if already sent. Background analytics jobs are inherently idempotent.

The throughput difference is significant. Standard queues handle millions of messages per second. FIFO queues handle 300 TPS (or 3,000 TPS with batching), which equals 25.9 million messages per day - sufficient for many workloads but not for high-volume scenarios.

Message Visibility and Processing

When a consumer retrieves a message, it becomes invisible to other consumers for a visibility timeout period. This prevents multiple consumers from processing the same message simultaneously. If the consumer doesn't delete the message before the timeout expires, it becomes visible again for reprocessing.

Setting visibility timeout: The visibility timeout should exceed your maximum expected processing time. If processing takes 45 seconds on average, set visibility timeout to 60-90 seconds to provide a buffer. If set too short, messages become visible while still processing, leading to duplicate processing. If set too long, failed messages wait unnecessarily before retry.

/**
* Spring Boot SQS consumer with visibility timeout handling.
*/
@Configuration
public class SQSListenerConfig {

@Bean
public QueueMessagingTemplate queueMessagingTemplate(
AmazonSQSAsync amazonSQS) {
return new QueueMessagingTemplate(amazonSQS);
}
}

@Service
public class OrderMessageConsumer {

private static final int MAX_PROCESSING_TIME_SECONDS = 45;

/**
* Process order messages with visibility timeout management.
* Visibility timeout set to 90s to allow for maximum processing time.
*/
@SqsListener(
value = "${aws.sqs.order-queue}",
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
)
public void processOrder(
@Payload OrderMessage message,
@Header("MessageId") String messageId,
Acknowledgment acknowledgment) {

try {
log.info("Processing order message: {} (ID: {})",
message.orderId(), messageId);

// Process the order
orderService.processOrder(message);

// Message automatically deleted on success (deletionPolicy = ON_SUCCESS)
log.info("Successfully processed order: {}", message.orderId());

} catch (RecoverableException e) {
// Transient error - let visibility timeout expire for retry
log.warn("Recoverable error processing order {}: {}",
message.orderId(), e.getMessage());
// Don't delete message - it will become visible again
throw e; // Propagate to trigger retry

} catch (UnrecoverableException e) {
// Permanent error - send to DLQ
log.error("Unrecoverable error processing order {}",
message.orderId(), e);
// Message will be deleted and move to DLQ after max retries
throw e;
}
}
}

Extending visibility timeout: For long-running tasks, extend visibility timeout while processing to prevent the message from becoming visible before completion:

/**
* Extend visibility timeout for long-running processing.
*/
@Service
public class LongRunningProcessor {

private final AmazonSQS sqs;

@SqsListener(value = "${aws.sqs.video-processing-queue}")
public void processVideo(
@Payload VideoProcessingMessage message,
@Header("ReceiptHandle") String receiptHandle,
@Header("ApproximateReceiveCount") int receiveCount) {

String queueUrl = queueUrlResolver.getQueueUrl();

try {
// Initial visibility: 300 seconds
// Video processing might take 10-15 minutes

for (int step = 0; step < 5; step++) {
// Extend visibility by 300s before each step
sqs.changeMessageVisibility(
new ChangeMessageVisibilityRequest()
.withQueueUrl(queueUrl)
.withReceiptHandle(receiptHandle)
.withVisibilityTimeout(300)
);

processVideoStep(message, step);
}

// Delete after successful completion
sqs.deleteMessage(queueUrl, receiptHandle);

} catch (Exception e) {
log.error("Failed to process video: {}", message.videoId(), e);
// Let visibility timeout expire for retry
}
}
}

Long Polling vs Short Polling

SQS supports two polling mechanisms with different cost and latency characteristics:

Short Polling (default): Returns immediately, even if the queue is empty. If messages exist, returns up to the maximum requested. If the queue is empty, returns empty response immediately.

Long Polling: Waits up to 20 seconds for messages to arrive if the queue is empty. Only returns early if messages arrive. Reduces API calls and costs when queues have intermittent traffic.

/**
* Configure long polling for cost efficiency.
*/
@Configuration
public class SQSConfig {

@Bean
public AmazonSQSAsync amazonSQS() {
return AmazonSQSAsyncClientBuilder.standard()
.withRegion(Regions.US_EAST_1)
.build();
}

@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync sqs) {
QueueMessagingTemplate template = new QueueMessagingTemplate(sqs);

// Set long polling wait time (1-20 seconds)
template.setReceiveMessageWaitTimeSeconds(20);

return template;
}
}

When to use long polling: Always use long polling unless you have specific reasons not to. Long polling reduces empty responses, lowers costs, and doesn't impact latency when messages are available. The 20-second wait only occurs when the queue is empty.

Cost impact: Short polling generates many empty responses when traffic is low, increasing costs. With one consumer polling every second on an empty queue, that's 86,400 requests per day. With long polling at 20 seconds, it's only 4,320 requests - a 95% reduction.

Dead-Letter Queues

Dead-letter queues (DLQ) capture messages that fail processing after a configured number of attempts. This prevents poison messages (messages that always fail) from blocking queue processing indefinitely.

Configuring dead-letter queues:

/**
* Create SQS queue with dead-letter queue configuration.
*/
@Configuration
public class QueueConfiguration {

private final AmazonSQS sqs;

@Bean
public void configureOrderQueue() {
// Create DLQ first
CreateQueueResult dlqResult = sqs.createQueue(
new CreateQueueRequest()
.withQueueName("order-processing-dlq.fifo")
.withAttributes(Map.of(
"FifoQueue", "true",
"ContentBasedDeduplication", "true"
))
);

String dlqArn = sqs.getQueueAttributes(
dlqResult.getQueueUrl(),
List.of("QueueArn")
).getAttributes().get("QueueArn");

// Create main queue with DLQ configuration
sqs.createQueue(
new CreateQueueRequest()
.withQueueName("order-processing.fifo")
.withAttributes(Map.of(
"FifoQueue", "true",
"ContentBasedDeduplication", "true",
"VisibilityTimeout", "90",
"MessageRetentionPeriod", "345600", // 4 days
// Configure dead-letter queue
"RedrivePolicy", new ObjectMapper().writeValueAsString(
Map.of(
"deadLetterTargetArn", dlqArn,
"maxReceiveCount", 3 // Move to DLQ after 3 failed attempts
)
)
))
);
}
}

DLQ monitoring and processing: Monitor DLQ depth and age of oldest message. Growing DLQs indicate systematic problems requiring investigation. Messages in DLQs represent business operations that failed - they may need manual intervention or indicate bugs requiring fixes.

/**
* Monitor and process dead-letter queue messages.
*/
@Service
public class DeadLetterQueueMonitor {

private final AmazonSQS sqs;
private final AlertService alertService;

@Value("${aws.sqs.order-processing-dlq}")
private String dlqUrl;

/**
* Monitor DLQ depth and alert when threshold exceeded.
*/
@Scheduled(fixedDelay = 300000) // Every 5 minutes
public void monitorDLQ() {
GetQueueAttributesResult result = sqs.getQueueAttributes(
new GetQueueAttributesRequest()
.withQueueUrl(dlqUrl)
.withAttributeNames("ApproximateNumberOfMessages",
"ApproximateAgeOfOldestMessage")
);

Map<String, String> attributes = result.getAttributes();
int messageCount = Integer.parseInt(
attributes.getOrDefault("ApproximateNumberOfMessages", "0"));

if (messageCount > 10) {
alertService.alert(
"DLQ has {} messages requiring investigation",
messageCount
);
}

// Alert if messages older than 1 hour
int oldestAge = Integer.parseInt(
attributes.getOrDefault("ApproximateAgeOfOldestMessage", "0"));
if (oldestAge > 3600) {
alertService.alert(
"DLQ has messages older than 1 hour - investigation needed"
);
}
}

/**
* Process or inspect DLQ messages for manual intervention.
*/
public void inspectDLQMessages() {
ReceiveMessageResult result = sqs.receiveMessage(
new ReceiveMessageRequest()
.withQueueUrl(dlqUrl)
.withMaxNumberOfMessages(10)
.withWaitTimeSeconds(0) // Short poll for inspection
);

for (Message message : result.getMessages()) {
log.error("DLQ Message: {}", message.getBody());
log.error("Attributes: {}", message.getAttributes());

// Analyze failure reason from attributes
String receiveCount = message.getAttributes()
.get("ApproximateReceiveCount");
String firstReceiveTime = message.getAttributes()
.get("ApproximateFirstReceiveTimestamp");

// Decision: retry, discard, or manual fix
// Can be moved back to main queue after fixing root cause
}
}

/**
* Redrive messages from DLQ back to main queue after fixing issues.
*/
public void redriveMessages(String mainQueueUrl, int maxMessages) {
for (int i = 0; i < maxMessages; i++) {
ReceiveMessageResult result = sqs.receiveMessage(
new ReceiveMessageRequest()
.withQueueUrl(dlqUrl)
.withMaxNumberOfMessages(1)
);

if (result.getMessages().isEmpty()) {
break;
}

Message message = result.getMessages().get(0);

// Send to main queue for reprocessing
sqs.sendMessage(mainQueueUrl, message.getBody());

// Delete from DLQ
sqs.deleteMessage(dlqUrl, message.getReceiptHandle());

log.info("Redrove message {} from DLQ to main queue",
message.getMessageId());
}
}
}

maxReceiveCount selection: Set maxReceiveCount based on expected failure scenarios. For transient errors (network timeouts, temporary service unavailability), 3-5 retries is reasonable. For operations that should rarely fail, 2-3 retries may suffice. Too many retries delay problem detection; too few send recoverable failures to DLQ prematurely.

Message Delay and Delay Queues

SQS supports delaying message delivery, either per message or for an entire queue:

Delay per message: Set DelaySeconds when sending to delay that specific message.

Delay queue: Set DelaySeconds on the queue to delay all messages.

/**
* Using message delays for scheduled tasks and rate limiting.
*/
@Service
public class ScheduledTaskPublisher {

private final AmazonSQS sqs;

/**
* Publish reminder message to be processed in 24 hours.
*/
public void scheduleReminder(String userId, String message) {
sqs.sendMessage(
new SendMessageRequest()
.withQueueUrl(reminderQueueUrl)
.withMessageBody(new ReminderMessage(userId, message).toJson())
// Delay 24 hours (86400 seconds)
.withDelaySeconds(86400)
);
}

/**
* Implement exponential backoff by increasing delay on retry.
*/
public void retryWithBackoff(ProcessingTask task, int attemptNumber) {
// Exponential backoff: 5s, 10s, 20s, 40s, ...
int delaySeconds = (int) Math.min(
5 * Math.pow(2, attemptNumber),
900 // Max 15 minutes
);

sqs.sendMessage(
new SendMessageRequest()
.withQueueUrl(retryQueueUrl)
.withMessageBody(task.toJson())
.withDelaySeconds(delaySeconds)
);
}
}

Use cases for delays:

  • Scheduled tasks (send reminder in 1 hour)
  • Exponential backoff for failed operations
  • Rate limiting (process at most X per minute)
  • Deferred cleanup operations

Limitations: Maximum delay is 15 minutes (900 seconds). For longer delays, consider using EventBridge Scheduler or Step Functions with wait states.

Batch Operations

SQS supports batch operations for sending, receiving, and deleting messages, reducing API calls and costs:

/**
* Batch operations for improved performance and cost.
*/
@Service
public class BatchOrderProcessor {

private final AmazonSQS sqs;

/**
* Send multiple messages in a single batch request.
* Up to 10 messages per batch.
*/
public void publishOrderBatch(List<Order> orders) {
List<SendMessageBatchRequestEntry> entries = orders.stream()
.map(order -> new SendMessageBatchRequestEntry()
.withId(order.getId())
.withMessageBody(toJson(order))
.withMessageGroupId(order.getCustomerId()) // For FIFO
.withMessageDeduplicationId(order.getId()) // For FIFO
)
.toList();

// Send in batches of 10
Lists.partition(entries, 10).forEach(batch -> {
SendMessageBatchResult result = sqs.sendMessageBatch(
new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(batch)
);

// Check for failures
if (!result.getFailed().isEmpty()) {
log.error("Failed to send {} messages",
result.getFailed().size());
result.getFailed().forEach(failure ->
log.error("Message ID {}: {} - {}",
failure.getId(),
failure.getCode(),
failure.getMessage())
);
}
});
}

/**
* Receive and process messages in batch.
* Up to 10 messages per receive.
*/
public void processBatch() {
ReceiveMessageResult result = sqs.receiveMessage(
new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withMaxNumberOfMessages(10) // Receive up to 10
.withWaitTimeSeconds(20) // Long polling
);

List<Message> messages = result.getMessages();
if (messages.isEmpty()) {
return;
}

// Process messages
List<ProcessingResult> results = messages.stream()
.map(this::processMessage)
.toList();

// Batch delete successful messages
List<DeleteMessageBatchRequestEntry> deleteEntries = results.stream()
.filter(ProcessingResult::isSuccess)
.map(r -> new DeleteMessageBatchRequestEntry()
.withId(r.getMessageId())
.withReceiptHandle(r.getReceiptHandle())
)
.toList();

if (!deleteEntries.isEmpty()) {
sqs.deleteMessageBatch(
new DeleteMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(deleteEntries)
);
}
}
}

Batch benefits: Batching reduces API calls by up to 10x, reducing costs and improving throughput. Sending 1,000 messages individually requires 1,000 API calls. Batching reduces this to 100 calls. At scale, this significantly impacts both cost and performance.

For Spring Boot integration patterns, see Spring Boot Resilience for retry strategies and circuit breakers with SQS.

SNS (Simple Notification Service)

SNS implements the publish-subscribe pattern, allowing messages to be broadcast to multiple subscribers simultaneously. When a message is published to an SNS topic, all subscribed endpoints receive a copy. This enables fan-out architectures where one event triggers multiple independent actions.

Topics and Subscriptions

An SNS topic is a communication channel that acts as an access point for publishers and subscribers. Publishers send messages to topics; subscribers receive messages from topics. Each topic can have multiple subscribers, and each subscriber receives every published message (subject to filter policies).

SNS subscription types:

  • SQS: Delivers messages to SQS queues (most common for reliable processing)
  • Lambda: Invokes Lambda functions directly
  • HTTP/HTTPS: Posts messages to web endpoints
  • Email/Email-JSON: Sends emails (primarily for alerts and notifications)
  • SMS: Sends text messages
  • Mobile push: Sends push notifications to mobile devices
/**
* Creating SNS topics and subscriptions.
*/
@Configuration
public class SNSConfiguration {

private final AmazonSNS sns;
private final AmazonSQS sqs;

@Bean
public String orderEventsTopic() {
CreateTopicResult result = sns.createTopic("order-events");
return result.getTopicArn();
}

/**
* Subscribe SQS queues to SNS topic for fan-out pattern.
*/
@Bean
public void subscribeQueues(String topicArn) {
// Subscribe inventory queue
String inventoryQueueArn = getQueueArn(inventoryQueueUrl);
sns.subscribe(topicArn, "sqs", inventoryQueueArn);

// Grant SNS permission to send to SQS
sqs.setQueueAttributes(
new SetQueueAttributesRequest()
.withQueueUrl(inventoryQueueUrl)
.withAttributes(Map.of(
"Policy", createSQSPolicy(inventoryQueueArn, topicArn)
))
);

// Subscribe analytics queue
String analyticsQueueArn = getQueueArn(analyticsQueueUrl);
sns.subscribe(topicArn, "sqs", analyticsQueueArn);
sqs.setQueueAttributes(
new SetQueueAttributesRequest()
.withQueueUrl(analyticsQueueUrl)
.withAttributes(Map.of(
"Policy", createSQSPolicy(analyticsQueueArn, topicArn)
))
);
}

private String createSQSPolicy(String queueArn, String topicArn) {
return String.format("""
{
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": "%s",
"Condition": {
"ArnEquals": {"aws:SourceArn": "%s"}
}
}]
}
""", queueArn, topicArn);
}
}

SNS to SQS Fan-Out Pattern

The most common SNS pattern is SNS-to-SQS fan-out, where SNS broadcasts to multiple SQS queues. This combines SNS's broadcast capability with SQS's reliable processing and provides several benefits:

Decoupled processing: Each subscriber processes messages independently at its own pace. If one service is slow or down, others continue unaffected.

Independent retry logic: Each SQS queue has its own dead-letter queue and retry configuration. Failures in one consumer don't affect others.

Scalability: Each consumer scales independently based on queue depth and processing needs.

Message persistence: Messages persist in queues even if consumers are down, unlike direct SNS subscriptions where offline subscribers lose messages.

/**
* Publishing events to SNS topic for fan-out.
*/
@Service
public class OrderEventPublisher {

private final AmazonSNS sns;

@Value("${aws.sns.order-events-topic-arn}")
private String topicArn;

/**
* Publish order created event to all subscribers.
*/
public void publishOrderCreated(Order order) {
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getItems(),
order.getTotalAmount(),
Instant.now()
);

PublishRequest request = new PublishRequest()
.withTopicArn(topicArn)
.withMessage(toJson(event))
// Message attributes for filtering
.withMessageAttributes(Map.of(
"eventType", messageAttribute("order.created"),
"customerId", messageAttribute(order.getCustomerId()),
"totalAmount", messageAttribute(String.valueOf(order.getTotalAmount())),
"priority", messageAttribute(order.isPriority() ? "high" : "normal")
));

PublishResult result = sns.publish(request);
log.info("Published order created event: {} (MessageId: {})",
order.getId(), result.getMessageId());
}

private MessageAttributeValue messageAttribute(String value) {
return new MessageAttributeValue()
.withDataType("String")
.withStringValue(value);
}
}

Message Filtering

SNS subscription filter policies allow subscribers to receive only messages matching specific criteria. This reduces unnecessary message delivery and processing.

/**
* Configure subscription with filter policy.
*/
@Configuration
public class FilteredSubscriptions {

private final AmazonSNS sns;

/**
* High-priority orders go to expedited processing queue.
*/
public void subscribeHighPriorityQueue(
String topicArn,
String queueArn) {

SubscribeResult result = sns.subscribe(
topicArn, "sqs", queueArn);

String subscriptionArn = result.getSubscriptionArn();

// Only receive high-priority messages
String filterPolicy = """
{
"priority": ["high"],
"eventType": ["order.created", "order.updated"]
}
""";

sns.setSubscriptionAttributes(
new SetSubscriptionAttributesRequest()
.withSubscriptionArn(subscriptionArn)
.withAttributeName("FilterPolicy")
.withAttributeValue(filterPolicy)
);
}

/**
* Large orders (>$1000) go to approval workflow.
*/
public void subscribeLargeOrderQueue(
String topicArn,
String queueArn) {

SubscribeResult result = sns.subscribe(
topicArn, "sqs", queueArn);

// Numeric filter for large amounts
String filterPolicy = """
{
"totalAmount": [{"numeric": [">=", 1000]}]
}
""";

sns.setSubscriptionAttributes(
new SetSubscriptionAttributesRequest()
.withSubscriptionArn(result.getSubscriptionArn())
.withAttributeName("FilterPolicy")
.withAttributeValue(filterPolicy)
);
}
}

Filter policy operators:

  • Exact match: {"eventType": ["order.created"]}
  • Multiple values (OR): {"eventType": ["order.created", "order.updated"]}
  • Anything-but: {"status": [{"anything-but": ["cancelled"]}]}
  • Numeric comparison: {"amount": [{"numeric": [">", 100]}]}
  • Prefix matching: {"customerId": [{"prefix": "VIP-"}]}

When to use filtering: Use filtering when subscribers need only a subset of messages. Without filtering, all messages reach all queues, and consumers must filter in code. SNS filtering reduces unnecessary queue writes, storage costs, and processing overhead.

For more on pub-sub patterns and choreography, see Event-Driven Architecture.

EventBridge

EventBridge is AWS's serverless event bus service that provides advanced event routing, filtering, transformation, and integration capabilities beyond SNS. EventBridge is designed for complex event-driven architectures requiring content-based routing, SaaS integration, and schema management.

Event Buses and Rules

EventBridge uses event buses to receive events and rules to route events to targets. Each AWS account has a default event bus for AWS service events. Custom event buses receive events from your applications or partner SaaS providers.

Rules define event patterns to match and targets to invoke when patterns match. One rule can route to up to 5 targets, and one event can match multiple rules.

/**
* Publishing events to EventBridge.
*/
@Service
public class EventBridgePublisher {

private final EventBridgeClient eventBridge;

@Value("${aws.eventbridge.bus-name}")
private String eventBusName;

/**
* Publish domain event to EventBridge.
*/
public void publishOrderEvent(OrderEvent event) {
PutEventsRequestEntry entry = PutEventsRequestEntry.builder()
.eventBusName(eventBusName)
.source("order-service") // Source identifier
.detailType("Order Created") // Event type
.detail(toJson(event)) // Event payload
.build();

PutEventsRequest request = PutEventsRequest.builder()
.entries(entry)
.build();

PutEventsResponse response = eventBridge.putEvents(request);

// Check for failures
if (response.failedEntryCount() > 0) {
response.entries().stream()
.filter(e -> e.errorCode() != null)
.forEach(e -> log.error("Failed to publish event: {} - {}",
e.errorCode(), e.errorMessage()));
}
}

/**
* Batch publish multiple events.
*/
public void publishBatch(List<OrderEvent> events) {
List<PutEventsRequestEntry> entries = events.stream()
.map(event -> PutEventsRequestEntry.builder()
.eventBusName(eventBusName)
.source("order-service")
.detailType(event.getClass().getSimpleName())
.detail(toJson(event))
.build())
.toList();

// Batch up to 10 events per request
Lists.partition(entries, 10).forEach(batch -> {
PutEventsRequest request = PutEventsRequest.builder()
.entries(batch)
.build();

eventBridge.putEvents(request);
});
}
}

Event Pattern Matching

EventBridge rules use event patterns to match events. Patterns are JSON documents specifying which events to match based on content.

// Rule 1: Match high-value orders
{
"source": ["order-service"],
"detail-type": ["Order Created"],
"detail": {
"totalAmount": [{"numeric": [">", 1000]}],
"customerId": [{"prefix": "PREMIUM-"}]
}
}

// Rule 2: Match payment failures for retry
{
"source": ["payment-service"],
"detail-type": ["Payment Failed"],
"detail": {
"errorType": ["insufficient_funds", "card_declined"],
"retryable": [true]
}
}

// Rule 3: Match any order event for analytics
{
"source": ["order-service"],
"detail-type": [{"prefix": "Order"}]
}

Pattern matching capabilities:

  • Exact matching: "status": ["confirmed"]
  • Prefix matching: "customerId": [{"prefix": "VIP-"}]
  • Suffix matching: "email": [{"suffix": "@premium.com"}]
  • Anything-but: "status": [{"anything-but": ["cancelled"]}]
  • Exists: "discountCode": [{"exists": true}]
  • Numeric: "amount": [{"numeric": [">=", 100, "<", 1000]}]
  • OR conditions: Multiple values in array
  • AND conditions: Multiple fields in detail object
/**
* Create EventBridge rule with pattern matching.
*/
@Configuration
public class EventBridgeRules {

private final EventBridgeClient eventBridge;

/**
* Route high-value orders to approval workflow.
*/
@Bean
public void createHighValueOrderRule() {
String eventPattern = """
{
"source": ["order-service"],
"detail-type": ["Order Created"],
"detail": {
"totalAmount": [{"numeric": [">", 10000]}]
}
}
""";

// Create rule
PutRuleRequest ruleRequest = PutRuleRequest.builder()
.name("high-value-orders")
.description("Route orders >$10k to approval workflow")
.eventBusName("order-events")
.eventPattern(eventPattern)
.state(RuleState.ENABLED)
.build();

PutRuleResponse ruleResponse = eventBridge.putRule(ruleRequest);
String ruleArn = ruleResponse.ruleArn();

// Add target: Step Functions state machine
Target target = Target.builder()
.id("approval-workflow")
.arn(approvalStateMachineArn)
.roleArn(eventBridgeRoleArn) // IAM role to invoke Step Functions
.build();

PutTargetsRequest targetsRequest = PutTargetsRequest.builder()
.rule("high-value-orders")
.eventBusName("order-events")
.targets(target)
.build();

eventBridge.putTargets(targetsRequest);
}
}

Schema Registry

EventBridge Schema Registry automatically discovers event schemas from events flowing through event buses and maintains a versioned schema registry. This provides documentation, validation, and code generation capabilities.

Schema discovery analyzes events and infers schemas automatically. As events flow through the bus, EventBridge detects new event types and versions, creating schema registry entries.

Schema versioning tracks changes over time. When an event schema changes, EventBridge creates a new version. Consumers can see schema evolution and understand compatibility.

Code generation creates strongly-typed code bindings from schemas for Java, Python, and TypeScript, enabling compile-time type checking and IDE autocomplete for event structures.

/**
* Using generated schema classes for type safety.
*/
// Generated from EventBridge Schema Registry
@Data
@JsonDeserialize
public class OrderCreatedEvent {
private String orderId;
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private Instant timestamp;

@Data
public static class OrderItem {
private String productId;
private int quantity;
private BigDecimal price;
}
}

@Service
public class TypeSafeEventPublisher {

/**
* Publish using generated schema class for type safety.
*/
public void publishOrder(Order order) {
// Compile-time type checking
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(order.getId());
event.setCustomerId(order.getCustomerId());
event.setItems(order.getItems().stream()
.map(this::toEventItem)
.toList());
event.setTotalAmount(order.getTotal());
event.setTimestamp(Instant.now());

eventBridge.putEvents(
PutEventsRequest.builder()
.entries(PutEventsRequestEntry.builder()
.source("order-service")
.detailType("Order Created")
.detail(toJson(event))
.build())
.build()
);
}
}

Schema registry benefits:

  • Documentation: Schemas serve as living documentation of event structures
  • Validation: Validate events against schemas before publishing
  • Discovery: Teams discover available events and their structures
  • Code generation: Generate type-safe code for producers and consumers
  • Breaking change detection: Identify incompatible schema changes

Enable schema discovery on your event bus to automatically populate the schema registry:

@Configuration
public class SchemaDiscovery {

/**
* Enable schema discovery for custom event bus.
*/
public void enableSchemaDiscovery() {
// Schema discovery enabled in AWS Console or CloudFormation
// Automatically discovers schemas from events
}
}

Event Archive and Replay

EventBridge can archive events for compliance, debugging, or replay. Archives store events indefinitely and allow replaying archived events to event buses for reprocessing or testing.

Use cases for archive and replay:

  • Compliance: Retain events for audit trails
  • Debugging: Replay production events in test environments
  • Reprocessing: Replay events after fixing bugs in consumers
  • Testing: Replay historical events to test new consumers
  • Analytics: Process historical events with new analytics logic
/**
* Create archive and replay events.
*/
@Configuration
public class EventBridgeArchive {

private final EventBridgeClient eventBridge;

/**
* Create archive for all order events.
*/
public void createArchive() {
CreateArchiveRequest request = CreateArchiveRequest.builder()
.archiveName("order-events-archive")
.eventSourceArn(eventBusArn)
.description("Archive all order events for compliance")
// Optional: Event pattern to filter which events to archive
.eventPattern("""
{
"source": ["order-service"]
}
""")
.retentionDays(0) // 0 = indefinite retention
.build();

eventBridge.createArchive(request);
}

/**
* Replay archived events from a time range.
*/
public void replayEvents(Instant startTime, Instant endTime) {
StartReplayRequest request = StartReplayRequest.builder()
.replayName("order-events-replay-" + Instant.now().toEpochMilli())
.eventSourceArn(archiveArn)
.eventStartTime(startTime)
.eventEndTime(endTime)
.destination(ReplayDestination.builder()
.arn(testEventBusArn) // Replay to test bus
.build())
.build();

StartReplayResponse response = eventBridge.startReplay(request);
log.info("Started replay: {}", response.replayArn());

// Monitor replay progress
monitorReplay(response.replayArn());
}

private void monitorReplay(String replayArn) {
DescribeReplayRequest request = DescribeReplayRequest.builder()
.replayName(extractReplayName(replayArn))
.build();

DescribeReplayResponse response = eventBridge.describeReplay(request);

log.info("Replay state: {}", response.state());
log.info("Events replayed: {}", response.eventLastReplayedTime());
}
}

For choosing between SNS and EventBridge, use SNS for simple fan-out with basic filtering. Use EventBridge for content-based routing, schema validation, SaaS integration, or when you need archive and replay capabilities.

Kinesis Data Streams

Kinesis Data Streams provides real-time streaming for ordered, high-throughput data processing. Unlike SQS (pull-based queues) and SNS (push-based broadcast), Kinesis stores ordered streams of records that multiple consumers can read independently and simultaneously.

Streams and Shards

A Kinesis stream consists of one or more shards. Each shard provides a fixed unit of capacity:

  • Write: 1,000 records/second or 1 MB/second
  • Read: 2,000 records/second or 2 MB/second (shared across all consumers)
  • Enhanced fan-out: 2 MB/second per consumer (dedicated throughput)

Shards provide ordering and parallelism. Records with the same partition key always go to the same shard, maintaining order for that key. Different partition keys may go to different shards, enabling parallel processing.

Partition keys determine which shard a record goes to. Kinesis uses the partition key to hash records to shards. All records with the same partition key go to the same shard, providing ordering guarantees.

/**
* Producing records to Kinesis with partition keys.
*/
@Service
public class KinesisEventProducer {

private final KinesisClient kinesis;

@Value("${aws.kinesis.stream-name}")
private String streamName;

/**
* Publish order event with customer ID as partition key.
* All events for same customer maintain order.
*/
public void publishOrderEvent(OrderEvent event) {
byte[] data = toJson(event).getBytes(StandardCharsets.UTF_8);

PutRecordRequest request = PutRecordRequest.builder()
.streamName(streamName)
// Partition key ensures ordering per customer
.partitionKey(event.getCustomerId())
.data(SdkBytes.fromByteArray(data))
.build();

PutRecordResponse response = kinesis.putRecord(request);

log.info("Published to shard {} at sequence {}",
response.shardId(),
response.sequenceNumber());
}

/**
* Batch publish for higher throughput.
* Up to 500 records per batch.
*/
public void publishBatch(List<OrderEvent> events) {
List<PutRecordsRequestEntry> records = events.stream()
.map(event -> PutRecordsRequestEntry.builder()
.partitionKey(event.getCustomerId())
.data(SdkBytes.fromByteArray(
toJson(event).getBytes(StandardCharsets.UTF_8)))
.build())
.toList();

// Batch up to 500 records
Lists.partition(records, 500).forEach(batch -> {
PutRecordsRequest request = PutRecordsRequest.builder()
.streamName(streamName)
.records(batch)
.build();

PutRecordsResponse response = kinesis.putRecords(request);

// Check for failures
if (response.failedRecordCount() > 0) {
log.error("Failed to publish {} records",
response.failedRecordCount());

// Retry failed records
retryFailedRecords(batch, response.records());
}
});
}
}

Consuming Records

Kinesis consumers read records from shards using either the Kinesis Client Library (KCL) or AWS SDK. KCL handles shard discovery, load balancing across instances, and checkpointing automatically.

/**
* Consuming Kinesis records with KCL 2.x.
*/
@Service
public class KinesisEventConsumer implements ShardRecordProcessor {

private String shardId;

@Override
public void initialize(InitializationInput input) {
this.shardId = input.shardId();
log.info("Initializing processor for shard: {}", shardId);
}

@Override
public void processRecords(ProcessRecordsInput input) {
List<KinesisClientRecord> records = input.records();

log.info("Processing {} records from shard {}",
records.size(), shardId);

for (KinesisClientRecord record : records) {
try {
processRecord(record);
} catch (Exception e) {
log.error("Failed to process record: {} from shard: {}",
record.sequenceNumber(), shardId, e);
// Handle error: skip, retry, or send to DLQ
}
}

// Checkpoint after successful batch processing
// Checkpointing saves progress so records aren't reprocessed
try {
input.checkpointer().checkpoint();
} catch (Exception e) {
log.error("Failed to checkpoint", e);
}
}

private void processRecord(KinesisClientRecord record) {
String data = StandardCharsets.UTF_8.decode(
record.data()).toString();

OrderEvent event = fromJson(data, OrderEvent.class);

// Process event idempotently
// May be redelivered after consumer restart
orderService.processOrder(event);
}

@Override
public void shardEnded(ShardEndedInput input) {
log.info("Shard {} ended, checkpointing", shardId);
try {
input.checkpointer().checkpoint();
} catch (Exception e) {
log.error("Failed to checkpoint at shard end", e);
}
}
}

/**
* Configure KCL scheduler.
*/
@Configuration
public class KinesisConfig {

@Bean
public Scheduler kinesisScheduler() {
KinesisClientUtil clientUtil = new KinesisClientUtil();
KinesisAsyncClient kinesisClient = clientUtil.buildAsyncClient();
DynamoDbAsyncClient dynamoClient = clientUtil.buildDynamoDBClient();
CloudWatchAsyncClient cloudWatchClient =
clientUtil.buildCloudWatchClient();

ConfigsBuilder configsBuilder = new ConfigsBuilder(
streamName,
applicationName,
kinesisClient,
dynamoClient,
cloudWatchClient,
UUID.randomUUID().toString(),
new KinesisEventProcessorFactory()
);

return new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
}
}

KCL features:

  • Automatic shard discovery: KCL discovers new shards and rebalances consumers
  • Load balancing: Distributes shards across consumer instances
  • Checkpointing: Saves processing progress in DynamoDB
  • Enhanced fan-out: Dedicated 2 MB/sec per consumer
  • Lease coordination: Manages which instance processes which shard

Kinesis Data Firehose

Kinesis Data Firehose is a fully managed service for loading streaming data into data stores (S3, Redshift, Elasticsearch, Splunk). It handles buffering, compression, encryption, and transformation automatically.

When to use Firehose: Use Firehose when your destination is a data store or analytics service. Firehose eliminates the need to write consumer code for common destinations. Use Data Streams when you need custom processing logic or multiple independent consumers.

/**
* Sending data to Kinesis Data Firehose.
*/
@Service
public class FirehoseProducer {

private final FirehoseClient firehose;

/**
* Send records to Firehose for delivery to S3.
*/
public void sendToFirehose(List<AnalyticsEvent> events) {
List<Record> records = events.stream()
.map(event -> Record.builder()
.data(SdkBytes.fromUtf8String(toJson(event) + "\n"))
.build())
.toList();

PutRecordBatchRequest request = PutRecordBatchRequest.builder()
.deliveryStreamName("analytics-events")
.records(records)
.build();

PutRecordBatchResponse response = firehose.putRecordBatch(request);

if (response.failedPutCount() > 0) {
log.error("Failed to send {} records to Firehose",
response.failedPutCount());
}
}
}

Firehose features:

  • Automatic buffering (60-900 seconds or 1-128 MB)
  • Compression (GZIP, ZIP, Snappy)
  • Encryption (server-side encryption)
  • Data transformation (Lambda functions)
  • Format conversion (JSON to Parquet/ORC)
  • Error handling (failed records to S3)

For stream processing patterns and real-time analytics, see Performance Testing for load testing streaming workloads.

Integration Patterns

Combining AWS messaging services creates powerful integration patterns. Understanding these patterns helps design robust, scalable architectures.

SNS to SQS Fan-Out

The most common pattern: SNS broadcasts to multiple SQS queues. Each queue represents an independent consumer with its own processing logic, retry behavior, and error handling.

Benefits:

  • Each consumer processes independently at its own rate
  • Failures in one consumer don't affect others
  • Each queue has independent retry and DLQ configuration
  • Easy to add new consumers without modifying existing ones
  • Messages persist in queues even if consumers are down

Lambda Event Source Mapping

Lambda can poll SQS queues, Kinesis streams, and DynamoDB streams automatically, triggering function invocations as messages arrive.

/**
* Lambda function triggered by SQS queue.
*/
public class OrderProcessorLambda implements RequestHandler<SQSEvent, Void> {

private final OrderService orderService;

@Override
public Void handleRequest(SQSEvent event, Context context) {
LambdaLogger logger = context.getLogger();

for (SQSEvent.SQSMessage message : event.getRecords()) {
try {
OrderEvent order = fromJson(
message.getBody(),
OrderEvent.class
);

orderService.processOrder(order);

logger.log("Processed order: " + order.getOrderId());

} catch (Exception e) {
logger.log("Failed to process message: " +
message.getMessageId() + " - " + e.getMessage());

// Lambda will retry based on SQS visibility timeout
// After maxReceiveCount, message moves to DLQ
throw new RuntimeException("Processing failed", e);
}
}

return null;
}
}

Lambda + SQS configuration:

  • Batch size: 1-10 messages per invocation (1-10,000 for FIFO)
  • Batch window: Wait up to 5 minutes to collect full batch
  • Concurrency: Lambda scales up to 1,000 concurrent executions
  • Partial batch responses: Report which messages succeeded/failed
  • Error handling: Failed messages return to queue or go to DLQ

Lambda + Kinesis configuration:

  • Batch size: Up to 10,000 records
  • Parallelization: Process multiple shards concurrently
  • Error handling: Retry entire batch or skip failed records
  • Bisect on error: Split failed batches to identify problem records

EventBridge to Step Functions

EventBridge can trigger Step Functions workflows, enabling complex orchestrations in response to events.

This pattern works well for long-running workflows with compensation logic (sagas). EventBridge decouples event detection from workflow execution, and Step Functions orchestrates multi-step processes with error handling.

Kinesis to Lambda to S3/DynamoDB

Real-time stream processing pipeline: Kinesis captures events, Lambda processes them, results stored in S3 or DynamoDB.

Use cases:

  • Real-time metrics dashboards
  • Log aggregation and analysis
  • Clickstream analytics
  • IoT sensor data processing

For architectural patterns and service decomposition, see Microservices Architecture.

Error Handling and Idempotency

Distributed messaging systems require careful error handling and idempotent processing to ensure reliability and correctness.

Retry Strategies

Messages may fail processing due to transient errors (network timeouts, temporary service unavailability) or permanent errors (malformed messages, business logic violations). Differentiate between these to avoid wasting resources on unrecoverable failures.

/**
* Classify errors and apply appropriate retry strategies.
*/
@Service
public class ResilientMessageProcessor {

private final OrderService orderService;
private final AmazonSQS sqs;

@SqsListener("${aws.sqs.order-queue}")
public void processOrder(
@Payload String messageBody,
@Header("ReceiptHandle") String receiptHandle,
@Header("ApproximateReceiveCount") int receiveCount) {

try {
OrderMessage order = fromJson(messageBody, OrderMessage.class);
orderService.processOrder(order);

// Success - message auto-deleted

} catch (NetworkException | TimeoutException e) {
// Transient error - likely recoverable with retry
handleTransientError(e, receiveCount, receiptHandle);

} catch (ValidationException | IllegalArgumentException e) {
// Permanent error - retrying won't help
handlePermanentError(e, messageBody, receiptHandle);

} catch (Exception e) {
// Unknown error - treat as transient but log for investigation
log.error("Unknown error processing message", e);
handleTransientError(e, receiveCount, receiptHandle);
}
}

private void handleTransientError(
Exception e,
int receiveCount,
String receiptHandle) {

if (receiveCount >= 3) {
log.error("Max retries exceeded, message will go to DLQ", e);
// Don't delete - let maxReceiveCount move to DLQ
throw new RuntimeException("Max retries exceeded", e);
}

// Calculate exponential backoff
int backoffSeconds = (int) Math.min(
5 * Math.pow(2, receiveCount - 1),
900 // Max 15 minutes
);

// Extend visibility timeout for backoff
sqs.changeMessageVisibility(
new ChangeMessageVisibilityRequest()
.withQueueUrl(queueUrl)
.withReceiptHandle(receiptHandle)
.withVisibilityTimeout(backoffSeconds)
);

log.warn("Transient error, retry in {}s: {}",
backoffSeconds, e.getMessage());
}

private void handlePermanentError(
Exception e,
String messageBody,
String receiptHandle) {

log.error("Permanent error, sending to DLQ immediately: {}",
messageBody, e);

// Log for investigation
auditLog.logPermanentFailure(messageBody, e);

// Delete from main queue (will trigger DLQ if configured)
throw new UnrecoverableException("Permanent error", e);
}
}

Exponential backoff: Increase delay between retries exponentially (5s, 10s, 20s, 40s, ...). This gives transient issues time to resolve and prevents overwhelming downstream services.

Retry budgets: Limit total retry time to prevent indefinite retries. After maximum retry time or attempts, send to DLQ for manual investigation.

For detailed retry patterns and circuit breakers, see Spring Boot Resilience.

Idempotent Processing

Messages may be delivered multiple times. Consumers must be idempotent - processing the same message multiple times produces the same result as processing it once.

Strategies for idempotency:

1. Deduplication using message IDs:

/**
* Track processed message IDs to prevent duplicate processing.
*/
@Service
public class IdempotentOrderProcessor {

private final ProcessedMessageRepository processedMessages;

@Transactional
public void processOrder(OrderMessage message, String messageId) {
// Check if already processed
if (processedMessages.existsById(messageId)) {
log.debug("Message {} already processed, skipping", messageId);
return;
}

// Process the order
Order order = createOrder(message);
orderRepository.save(order);

// Mark as processed in same transaction
processedMessages.save(new ProcessedMessage(
messageId,
Instant.now(),
message.getOrderId()
));
}
}

2. Natural idempotency using unique constraints:

/**
* Use database unique constraints for natural idempotency.
*/
@Entity
public class Order {
@Id
private String orderId;

// Message ID that created this order
@Column(unique = true, nullable = false)
private String createdByMessageId;

// Other fields...
}

@Service
public class NaturallyIdempotentProcessor {

@Transactional
public void processOrder(OrderMessage message, String messageId) {
try {
Order order = new Order();
order.setOrderId(message.getOrderId());
order.setCreatedByMessageId(messageId); // Unique constraint
order.setCustomerId(message.getCustomerId());
// Set other fields...

orderRepository.save(order);

} catch (DataIntegrityViolationException e) {
// Duplicate - order already created by this message
log.debug("Order already exists for message {}", messageId);
// This is success - idempotent operation
}
}
}

3. Idempotent operations using absolute values:

// NOT idempotent - applying twice increments by $200
public void incrementBalance(String accountId, BigDecimal amount) {
Account account = accountRepository.findById(accountId);
account.setBalance(account.getBalance().add(amount));
accountRepository.save(account);
}

// Idempotent - applying twice sets to $500 both times
public void setBalance(
String accountId,
BigDecimal newBalance,
long expectedVersion) {

Account account = accountRepository.findById(accountId);

// Optimistic locking prevents conflicting updates
if (account.getVersion() != expectedVersion) {
throw new OptimisticLockException();
}

account.setBalance(newBalance);
account.incrementVersion();
accountRepository.save(account);
}

4. Idempotency tokens:

/**
* Use idempotency tokens for API calls.
*/
@Service
public class PaymentProcessor {

private final IdempotencyTokenRepository tokenRepository;

@Transactional
public PaymentResult processPayment(
PaymentRequest request,
String idempotencyToken) {

// Check if this token was already processed
Optional<IdempotencyRecord> existing =
tokenRepository.findById(idempotencyToken);

if (existing.isPresent()) {
// Return cached result from previous processing
log.info("Returning cached result for token {}",
idempotencyToken);
return existing.get().getResult();
}

// Process payment
PaymentResult result = chargePayment(request);

// Store result with token
tokenRepository.save(new IdempotencyRecord(
idempotencyToken,
result,
Instant.now()
));

return result;
}
}

For transactional patterns and consistency, see Database Design and Event Sourcing.

Monitoring and Observability

Effective monitoring detects issues before they impact users and provides visibility into message flow through the system.

Key Metrics to Monitor

SQS Metrics:

  • ApproximateNumberOfMessagesVisible: Messages in queue awaiting processing
  • ApproximateAgeOfOldestMessage: How long oldest message has waited
  • NumberOfMessagesSent: Message publishing rate
  • NumberOfMessagesReceived: Message consumption rate
  • NumberOfMessagesDeleted: Successfully processed messages
  • ApproximateNumberOfMessagesNotVisible: Messages currently being processed
/**
* CloudWatch metrics for SQS monitoring.
*/
@Service
public class SQSMonitor {

private final AmazonSQS sqs;
private final CloudWatchClient cloudWatch;

@Scheduled(fixedDelay = 60000) // Every minute
public void publishQueueMetrics() {
GetQueueAttributesResult result = sqs.getQueueAttributes(
new GetQueueAttributesRequest()
.withQueueUrl(queueUrl)
.withAttributeNames(
"ApproximateNumberOfMessages",
"ApproximateAgeOfOldestMessage",
"ApproximateNumberOfMessagesNotVisible"
)
);

Map<String, String> attrs = result.getAttributes();

// Visible messages (waiting to be processed)
int visibleMessages = Integer.parseInt(
attrs.getOrDefault("ApproximateNumberOfMessages", "0"));

// Age of oldest message (seconds)
int oldestAge = Integer.parseInt(
attrs.getOrDefault("ApproximateAgeOfOldestMessage", "0"));

// Messages being processed
int inFlight = Integer.parseInt(
attrs.getOrDefault("ApproximateNumberOfMessagesNotVisible", "0"));

// Publish to CloudWatch
cloudWatch.putMetricData(
PutMetricDataRequest.builder()
.namespace("CustomMetrics/SQS")
.metricData(
MetricDatum.builder()
.metricName("QueueDepth")
.value((double) visibleMessages)
.unit(StandardUnit.COUNT)
.timestamp(Instant.now())
.dimensions(Dimension.builder()
.name("QueueName")
.value(queueName)
.build())
.build(),
MetricDatum.builder()
.metricName("OldestMessageAge")
.value((double) oldestAge)
.unit(StandardUnit.SECONDS)
.timestamp(Instant.now())
.dimensions(Dimension.builder()
.name("QueueName")
.value(queueName)
.build())
.build()
)
.build()
);

// Alert if queue depth or age exceeds thresholds
if (visibleMessages > 1000) {
alertService.alert(
"Queue {} depth {} exceeds threshold",
queueName, visibleMessages
);
}

if (oldestAge > 300) { // 5 minutes
alertService.alert(
"Queue {} oldest message age {} seconds exceeds threshold",
queueName, oldestAge
);
}
}
}

SNS Metrics:

  • NumberOfMessagesPublished: Messages published to topic
  • NumberOfNotificationsDelivered: Successfully delivered notifications
  • NumberOfNotificationsFailed: Failed deliveries

EventBridge Metrics:

  • Invocations: Events matched by rules
  • TriggeredRules: Rules that matched events
  • FailedInvocations: Failed target invocations
  • ThrottledRules: Rules throttled due to rate limits

Kinesis Metrics:

  • IncomingRecords: Records written to stream
  • IncomingBytes: Data volume written
  • GetRecords.Records: Records read by consumers
  • GetRecords.IteratorAgeMilliseconds: Consumer lag
  • WriteProvisionedThroughputExceeded: Write throttling
  • ReadProvisionedThroughputExceeded: Read throttling

Distributed Tracing

Use AWS X-Ray or OpenTelemetry to trace events through the system. Propagate trace context through message attributes to correlate distributed operations.

/**
* Propagate trace context through SQS messages.
*/
@Service
public class TracedMessagePublisher {

private final AmazonSQS sqs;

public void publishWithTracing(OrderEvent event) {
// Get current trace context
TraceID traceId = AWSXRay.getTraceEntity().getTraceId();
String segmentId = AWSXRay.getTraceEntity().getId();

SendMessageRequest request = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody(toJson(event))
// Propagate trace context in message attributes
.withMessageAttributes(Map.of(
"TraceId", messageAttribute(traceId.toString()),
"ParentId", messageAttribute(segmentId),
"OrderId", messageAttribute(event.getOrderId())
));

sqs.sendMessage(request);
}

@SqsListener("${aws.sqs.order-queue}")
public void consumeWithTracing(
@Payload OrderEvent event,
@Headers Map<String, Object> headers) {

// Extract trace context from message attributes
String traceId = (String) headers.get("TraceId");
String parentId = (String) headers.get("ParentId");

// Continue trace in consumer
Subsegment subsegment = AWSXRay.beginSubsegment("ProcessOrder");
subsegment.putMetadata("orderId", event.getOrderId());

try {
orderService.processOrder(event);
subsegment.putAnnotation("result", "success");
} catch (Exception e) {
subsegment.addException(e);
subsegment.putAnnotation("result", "error");
throw e;
} finally {
AWSXRay.endSubsegment();
}
}
}

For comprehensive observability patterns, see Observability Overview, Logging, Metrics, and Tracing. For AWS-specific observability, see AWS Observability.

Anti-Patterns

Common mistakes that lead to unreliable, inefficient, or expensive messaging systems:

1. Not Using Dead-Letter Queues

Problem: Poison messages block queue processing indefinitely. One bad message prevents all subsequent messages from processing.

Solution: Configure dead-letter queues with appropriate maxReceiveCount. Failed messages move to DLQ after retries, allowing processing to continue. Monitor DLQs and investigate failures.

2. Synchronous Processing of Async Messages

Problem: Waiting for downstream operations to complete while processing messages increases visibility timeout requirements and reduces throughput.

// Bad: Synchronous processing
@SqsListener("order-queue")
public void processOrder(OrderMessage message) {
// Wait for payment to complete (5 seconds)
paymentService.chargeCard(message);

// Wait for inventory update (3 seconds)
inventoryService.reserve(message);

// Wait for email to send (2 seconds)
emailService.sendConfirmation(message);

// Total: 10 seconds per message, low throughput
}

Solution: Process asynchronously. Acknowledge message after publishing downstream events, not after full completion.

// Good: Asynchronous processing
@SqsListener("order-queue")
public void processOrder(OrderMessage message) {
// Persist order immediately
Order order = orderRepository.save(message.toOrder());

// Publish events for async processing
eventPublisher.publish(new PaymentRequiredEvent(order));
eventPublisher.publish(new InventoryReservationEvent(order));
eventPublisher.publish(new OrderConfirmationEvent(order));

// Acknowledge quickly, work happens asynchronously
}

3. Not Implementing Idempotency

Problem: Messages delivered multiple times cause duplicate operations - charging customers twice, creating duplicate records, double-counting analytics.

Solution: Implement idempotency using message IDs, unique constraints, or idempotency tokens (see Idempotent Processing).

4. Using Wrong Queue Type

Problem: Using FIFO queues when order doesn't matter sacrifices 99.7% of throughput (300 TPS vs unlimited). Using standard queues when order matters causes incorrect business logic.

Solution: Use standard queues unless strict ordering or exactly-once processing is required. Most workloads don't require FIFO - analyze your requirements carefully.

5. Not Monitoring Queue Depth and Age

Problem: Growing queues and increasing message age indicate consumer problems, but without monitoring, issues go unnoticed until queues have millions of messages.

Solution: Monitor ApproximateNumberOfMessagesVisible and ApproximateAgeOfOldestMessage. Alert when thresholds exceeded. Auto-scale consumers based on queue depth.

6. Small Batch Sizes

Problem: Receiving one message at a time increases API calls and costs. Processing 1,000 messages requires 1,000 API calls at $0.0000004 each.

Solution: Use batch operations. Receive up to 10 messages per API call, reducing costs by 90% and improving throughput.

7. Short Visibility Timeouts

Problem: Messages become visible again while still processing, causing duplicate processing and wasted resources.

Solution: Set visibility timeout longer than maximum expected processing time. Extend visibility for long-running tasks. Monitor ApproximateReceiveCount to detect timeout issues.

8. Not Using Long Polling

Problem: Short polling returns immediately when queue is empty, generating many empty responses and increasing costs.

Solution: Always use long polling (20 seconds) to reduce API calls and costs without impacting latency.

9. Tight Coupling via Events

Problem: Publishing internal domain objects as events couples consumers to producer implementation. Changes to domain objects break consumers.

Solution: Publish integration events with stable contracts. Translate domain events to integration events at the boundary (see Event Types).

10. Ignoring Failed Messages in DLQ

Problem: Dead-letter queues fill up with failed messages that never get investigated or reprocessed. These represent failed business operations.

Solution: Monitor DLQ depth. Alert when messages appear. Investigate root causes. Fix bugs and replay valid messages. See Dead-Letter Queues.

Further Reading