Java Concurrency Advanced
Java 25 virtual threads (Project Loom) revolutionize concurrency by making millions of lightweight threads practical. For applications processing high volumes of I/O-bound operations (database queries, API calls), virtual threads provide massive throughput improvements with simpler code than traditional async approaches.
Overview
This guide covers advanced Java concurrency patterns: virtual threads for scalable I/O, structured concurrency for managing concurrent operations as units, CompletableFuture for async composition, reactive streams for backpressure handling, and lock-free algorithms for high-performance scenarios. For foundational concepts like thread safety and synchronization, see Java Concurrency Fundamentals.
Core Principles
- Use Virtual Threads for I/O: Perfect for database queries, HTTP calls, message queues
- Structured Concurrency: Manage concurrent operations as a single unit with clear lifecycle
- CompletableFuture for Async Composition: Chain async operations with error handling
- Reactive Streams for Backpressure: Handle fast producers and slow consumers
- Lock-Free for Performance: Use CAS-based algorithms for high-contention scenarios
Choosing the Right Model
| Workload shape | Recommended model | Why |
|---|---|---|
| Blocking I/O per request | Virtual threads | Keeps code synchronous and easy to reason about while scaling concurrency |
| Fan-out/fan-in orchestration | Structured concurrency (or CompletableFuture fallback) | Represents related subtasks as one logical unit with coordinated cancellation |
| Event streams with backpressure | Reactive streams | Handles mismatched producer/consumer speed explicitly |
| CPU-bound parallel compute | Platform threads/ForkJoinPool | Better control over bounded parallelism for compute-heavy tasks |
Virtual Threads (Java 21+; prefer Java 25)
What Are Virtual Threads?
Virtual threads are lightweight threads managed by the JVM, not the operating system. They allow millions of concurrent tasks without the overhead of platform threads.
Platform threads (traditional Java threads) map 1:1 to operating system threads. OS threads are expensive - each consumes ~1MB of stack memory and context switching requires kernel involvement. This limits Java applications to thousands of concurrent platform threads.
Virtual threads break this limitation by running on a small pool of platform "carrier" threads. When a virtual thread blocks on I/O, it's unmounted from its carrier thread, freeing that carrier to run other virtual threads. This means millions of virtual threads can share a handful of carrier threads efficiently.
// Traditional platform thread (expensive: ~1MB stack, OS thread overhead)
Thread platformThread = Thread.ofPlatform()
.name("platform-worker")
.start(() -> processPayment(payment));
// Virtual thread (lightweight: ~few KB, JVM-managed)
Thread virtualThread = Thread.ofVirtual()
.name("virtual-worker")
.start(() -> processPayment(payment)); // Same blocking code works!
When to Use Virtual Threads
Good: Use virtual threads for I/O-bound operations:
- Database queries: Waiting for database response
- HTTP/REST API calls: Network I/O dominates
- Message queue consumers: Blocking on queue.take()
- File I/O operations: Reading/writing files
Bad fit: Avoid virtual threads for:
- CPU-intensive computations: Use platform threads with ForkJoinPool
- Tasks holding synchronized locks: Pins virtual threads to carrier threads
- Thread-local-heavy code: Millions of copies become expensive
Virtual Thread Pinning and Diagnostics
The most common production mistake with virtual threads is accidental pinning: a virtual thread gets stuck on a carrier thread because of blocking sections combined with monitor locks (synchronized) or native calls. Pinning reduces concurrency and can erase expected throughput gains.
Practical checks:
- Prefer
ReentrantLockor lock-free structures in hot paths over coarsesynchronizedblocks. - Keep blocking I/O outside long critical sections.
- Inspect thread dumps during load tests and look for many blocked virtual threads sharing few carriers.
- Run controlled benchmarks before/after enabling virtual threads to validate tail latency (P95/P99), not just average latency.
Virtual Thread Pool with ExecutorService
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
@Service
public class PaymentProcessingService {
private final ExecutorService executor;
private final PaymentRepository paymentRepository;
private final AuditLogger auditLogger;
public PaymentProcessingService(
PaymentRepository paymentRepository,
AuditLogger auditLogger) {
this.paymentRepository = paymentRepository;
this.auditLogger = auditLogger;
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}
public CompletableFuture<List<PaymentResult>> processPaymentBatch(
List<Payment> payments) {
List<CompletableFuture<PaymentResult>> futures = payments.stream()
.map(payment -> CompletableFuture.supplyAsync(
() -> processPayment(payment),
executor
))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
private PaymentResult processPayment(Payment payment) {
try {
var validatedPayment = paymentRepository.findAndLock(payment.getId());
var result = executePayment(validatedPayment);
auditLogger.log(AuditEventType.PAYMENT_PROCESSED, result);
return result;
} catch (Exception e) {
auditLogger.log(AuditEventType.PAYMENT_FAILED, payment, e);
throw new PaymentProcessingException("Failed to process payment", e);
}
}
@PreDestroy
public void shutdown() {
executor.shutdown();
}
}
Spring Boot Configuration for Virtual Threads
@Configuration
public class VirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
@Bean
public AsyncConfigurer asyncConfigurer() {
return new AsyncConfigurer() {
@Override
public Executor getAsyncExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
};
}
}
# application.yml - Spring Boot 3.5+
spring:
threads:
virtual:
enabled: true
Structured Concurrency (Preview Feature)
What Is Structured Concurrency?
Structured concurrency treats multiple concurrent tasks as a single unit of work with a clear lifecycle. If any subtask fails, all tasks are cancelled.
Traditional concurrent programming with futures leads to "fire and forget" tasks that can leak or fail silently. Structured concurrency enforces a discipline: concurrent tasks must complete before their parent scope exits.
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
public class PaymentOrchestrationService {
private final PaymentValidator validator;
private final FraudDetectionService fraudService;
private final BalanceService balanceService;
public PaymentValidationResult validatePayment(Payment payment) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<ValidationResult> validationTask = scope.fork(
() -> validator.validate(payment)
);
Subtask<FraudCheckResult> fraudTask = scope.fork(
() -> fraudService.checkFraud(payment)
);
Subtask<BalanceCheckResult> balanceTask = scope.fork(
() -> balanceService.checkBalance(payment.getAccountId(), payment.getAmount())
);
// Wait for all to complete (or first failure)
scope.join();
scope.throwIfFailed();
// All tasks succeeded - combine results
return PaymentValidationResult.builder()
.validation(validationTask.get())
.fraudCheck(fraudTask.get())
.balanceCheck(balanceTask.get())
.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PaymentValidationException("Validation interrupted", e);
} catch (ExecutionException e) {
throw new PaymentValidationException("Validation failed", e.getCause());
}
}
}
Structured Concurrency with Deadlines
import java.time.Duration;
import java.time.Instant;
public PaymentResult processPaymentWithTimeout(Payment payment) {
Instant deadline = Instant.now().plus(Duration.ofSeconds(5));
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<PaymentResult> paymentTask = scope.fork(
() -> paymentRepository.process(payment)
);
Subtask<AuditLogEntry> auditTask = scope.fork(
() -> auditLogger.logPayment(payment)
);
scope.joinUntil(deadline);
scope.throwIfFailed();
return paymentTask.get();
} catch (TimeoutException e) {
throw new PaymentTimeoutException("Payment processing exceeded 5 seconds", e);
} catch (InterruptedException | ExecutionException e) {
throw new PaymentProcessingException("Payment processing failed", e);
}
}
Structured concurrency remains a preview/incubator capability in current JDKs. Enable with --enable-preview where required and avoid relying on it for long-lived public APIs.
If your platform policy disallows preview features in production, model the same orchestration pattern with CompletableFuture plus explicit cancellation and timeout handling. Keep the API boundary stable so you can migrate to structured concurrency later without changing service contracts.
CompletableFuture Advanced Patterns
CompletableFuture represents an asynchronous computation that will complete in the future. Beyond basic chaining, it provides powerful combinators for parallel execution, error recovery, and timeout management.
allOf: Wait for All Futures
public CompletableFuture<EnrichedPayment> enrichPayment(Payment payment) {
CompletableFuture<VendorInfo> vendorFuture = CompletableFuture.supplyAsync(
() -> vendorService.getVendorInfo(payment.getVendorId()),
executor
);
CompletableFuture<AccountInfo> accountFuture = CompletableFuture.supplyAsync(
() -> accountService.getAccountInfo(payment.getAccountId()),
executor
);
CompletableFuture<FxRate> fxRateFuture = CompletableFuture.supplyAsync(
() -> fxService.getRate(payment.getCurrency(), "USD"),
executor
);
return CompletableFuture.allOf(vendorFuture, accountFuture, fxRateFuture)
.thenApply(v -> new EnrichedPayment(
payment,
vendorFuture.join(),
accountFuture.join(),
fxRateFuture.join()
));
}
anyOf: Race Multiple Futures
public CompletableFuture<ExchangeRate> getRateWithFallback(String fromCurrency, String toCurrency) {
CompletableFuture<ExchangeRate> primaryProvider = CompletableFuture.supplyAsync(
() -> primaryRateService.getRate(fromCurrency, toCurrency),
executor
);
CompletableFuture<ExchangeRate> secondaryProvider = CompletableFuture.supplyAsync(
() -> secondaryRateService.getRate(fromCurrency, toCurrency),
executor
);
// Return first successful response
return (CompletableFuture<ExchangeRate>) CompletableFuture.anyOf(
primaryProvider,
secondaryProvider
);
}
Exception Handling Patterns
// exceptionally: Simple fallback value
public CompletableFuture<PaymentResult> processWithFallback(Payment payment) {
return processPaymentAsync(payment)
.exceptionally(throwable -> {
log.error("Payment failed, using cached result", throwable);
return getCachedResult(payment);
});
}
// handle: Transform both success and failure
public CompletableFuture<PaymentResult> processWithTransform(Payment payment) {
return processPaymentAsync(payment)
.handle((result, throwable) -> {
if (throwable != null) {
return PaymentResult.failed(payment, throwable.getMessage());
} else {
return enrichResult(result);
}
});
}
// exceptionallyCompose: Async retry on failure
public CompletableFuture<PaymentResult> processPaymentWithRetry(Payment payment) {
return CompletableFuture.supplyAsync(() -> processPayment(payment), executor)
.exceptionallyCompose(throwable -> {
if (throwable.getCause() instanceof TransientException) {
log.warn("Transient failure, retrying payment {}", payment.getId());
return CompletableFuture.supplyAsync(() -> processPayment(payment), executor);
}
return CompletableFuture.failedFuture(throwable);
});
}
Timeouts with orTimeout and completeOnTimeout
// orTimeout: Fail on timeout
public CompletableFuture<PaymentResult> processWithTimeout(Payment payment) {
return processPaymentAsync(payment)
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException) {
log.error("Payment {} timed out", payment.getId());
return PaymentResult.timeout(payment);
}
throw new PaymentProcessingException("Payment failed", throwable);
});
}
// completeOnTimeout: Provide default value on timeout
public CompletableFuture<ExchangeRate> getRateWithDefaultTimeout(
String fromCurrency, String toCurrency) {
ExchangeRate defaultRate = ExchangeRate.of(fromCurrency, toCurrency, BigDecimal.ONE);
return CompletableFuture.supplyAsync(
() -> rateService.getRate(fromCurrency, toCurrency),
executor
)
.completeOnTimeout(defaultRate, 2, TimeUnit.SECONDS);
}
Chaining Dependent Async Operations
Use thenCompose() (not thenApply()) when the next operation returns a CompletableFuture.
// GOOD: thenCompose for dependent async operations
public CompletableFuture<PaymentResult> processPaymentWorkflow(Payment payment) {
return validatePaymentAsync(payment)
.thenCompose(validatedPayment ->
authorizePaymentAsync(validatedPayment)
)
.thenCompose(authResult ->
capturePaymentAsync(authResult)
)
.thenApply(result -> {
// Sync transformation
return enrichResult(result);
});
}
Reactive Streams with Project Reactor
Reactive Streams provide asynchronous, non-blocking stream processing with backpressure. Project Reactor (used by Spring WebFlux) implements this spec with Flux (0..N elements) and Mono (0..1 element).
When to Use Reactive Streams
Use reactive streams when you need:
- Non-blocking I/O at scale: Thousands of concurrent operations
- Backpressure management: Preventing fast producers from overwhelming slow consumers
- Stream processing: Complex transformations of async data streams
Don't use reactive streams for:
- Simple CRUD applications (virtual threads are simpler)
- Operations that already block (JDBC without R2DBC)
- Small-scale concurrency (complexity not worth it)
Mono: Single Async Value
import reactor.core.publisher.Mono;
@Service
public class ReactivePaymentService {
private final WebClient paymentApiClient;
public Mono<PaymentResult> processPaymentAsync(Payment payment) {
return paymentApiClient.post()
.uri("/payments")
.bodyValue(payment)
.retrieve()
.bodyToMono(PaymentResult.class)
.timeout(Duration.ofSeconds(5))
.doOnSuccess(result -> log.info("Payment processed: {}", result.getId()))
.doOnError(error -> log.error("Payment failed", error));
}
public Mono<EnrichedPayment> enrichPayment(String paymentId) {
return findPaymentById(paymentId)
.flatMap(payment -> {
Mono<VendorInfo> vendorMono = getVendorInfo(payment.getVendorId());
Mono<AccountInfo> accountMono = getAccountInfo(payment.getAccountId());
return Mono.zip(vendorMono, accountMono)
.map(tuple -> new EnrichedPayment(
payment,
tuple.getT1(),
tuple.getT2()
));
});
}
}
Flux: Stream of Async Values
import reactor.core.publisher.Flux;
@Service
public class ReactiveReportService {
public Flux<Payment> streamPayments(LocalDate date) {
return template.select(
Query.query(Criteria.where("created_date").is(date)),
Payment.class
);
}
public Flux<PaymentSummary> generatePaymentSummaries(LocalDate date) {
return streamPayments(date)
.map(payment -> new PaymentSummary(
payment.getId(),
payment.getAmount(),
payment.getStatus()
))
.filter(summary -> summary.getAmount().isGreaterThan(Money.ZERO));
}
public Flux<List<Payment>> streamInBatches(LocalDate date, int batchSize) {
return streamPayments(date)
.buffer(batchSize)
.doOnNext(batch -> log.info("Processing batch of {}", batch.size()));
}
}
Backpressure Strategies
// Buffer: Store excess elements
Flux<Payment> buffered = paymentStream
.onBackpressureBuffer(1000);
// Drop: Discard excess elements
Flux<Payment> dropped = paymentStream
.onBackpressureDrop(payment ->
log.warn("Dropped payment: {}", payment.getId())
);
// Latest: Keep only latest element
Flux<Payment> latest = paymentStream
.onBackpressureLatest();
// Error: Fail when backpressure occurs
Flux<Payment> errorOnBackpressure = paymentStream
.onBackpressureError();
Error Handling in Reactive Streams
public Mono<PaymentResult> processWithErrorHandling(Payment payment) {
return processPaymentAsync(payment)
.onErrorReturn(throwable -> throwable instanceof PaymentValidationException,
PaymentResult.invalid(payment))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof TransientException)
.doBeforeRetry(signal ->
log.warn("Retrying payment {} (attempt {})",
payment.getId(), signal.totalRetries() + 1)
))
.doOnError(throwable ->
log.error("Payment processing failed permanently", throwable));
}
Lock-Free Algorithms and CAS Operations
Lock-free algorithms achieve thread safety without locks using atomic CPU instructions (Compare-And-Swap, CAS). They avoid lock contention and deadlock risks at the cost of potential spinning.
AtomicReference and CAS
import java.util.concurrent.atomic.AtomicReference;
@Service
public class LockFreeCache<K, V> {
private static class CacheEntry<V> {
final V value;
final long timestamp;
CacheEntry(V value, long timestamp) {
this.value = value;
this.timestamp = timestamp;
}
}
private final ConcurrentHashMap<K, AtomicReference<CacheEntry<V>>> cache = new ConcurrentHashMap<>();
private final Duration ttl;
public V get(K key, Supplier<V> loader) {
return cache.computeIfAbsent(key, k -> new AtomicReference<>())
.updateAndGet(current -> {
long now = System.currentTimeMillis();
if (current == null || (now - current.timestamp) > ttl.toMillis()) {
V newValue = loader.get();
return new CacheEntry<>(newValue, now);
}
return current;
}).value;
}
}
Lock-Free Stack with Treiber Algorithm
import java.util.concurrent.atomic.AtomicReference;
public class LockFreeStack<T> {
private static class Node<T> {
final T value;
final Node<T> next;
Node(T value, Node<T> next) {
this.value = value;
this.next = next;
}
}
private final AtomicReference<Node<T>> head = new AtomicReference<>();
public void push(T value) {
Node<T> newHead = new Node<>(value, null);
while (true) {
Node<T> currentHead = head.get();
newHead = new Node<>(value, currentHead);
if (head.compareAndSet(currentHead, newHead)) {
return;
}
}
}
public T pop() {
while (true) {
Node<T> currentHead = head.get();
if (currentHead == null) {
return null;
}
Node<T> newHead = currentHead.next;
if (head.compareAndSet(currentHead, newHead)) {
return currentHead.value;
}
}
}
}
When to Use Lock-Free Algorithms
Good: Use lock-free atomics for:
- Simple counters, flags, references
- High-contention scenarios where lock overhead is measurable
- Systems requiring progress guarantees
Avoid lock-free algorithms when:
- Operation requires multiple steps
- Complexity isn't justified
- Low contention (locks are simpler)
Common Virtual Thread Pitfalls
1. Blocking Operations Pinning Carrier Threads
When a virtual thread enters a synchronized block and then blocks on I/O, it cannot unmount from its carrier thread.
// BAD: Holding synchronized lock causes pinning
public synchronized PaymentResult processPayment(Payment payment) {
var result = externalApi.call(payment); // Virtual thread pinned!
return result;
}
// GOOD: Use ReentrantLock
private final ReentrantLock lock = new ReentrantLock();
public PaymentResult processPaymentOptimized(Payment payment) {
lock.lock();
try {
var result = externalApi.call(payment);
return result;
} finally {
lock.unlock();
}
}
2. Thread-Local Abuse
With millions of virtual threads, thread-local storage explodes in memory.
// BAD: Millions of copies with virtual threads
private static final ThreadLocal<DateFormat> formatter =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
// GOOD: Use thread-safe immutable formatter
private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
// GOOD: BETTER: Use scoped values (Java 21 preview)
private static final ScopedValue<RequestContext> REQUEST_CONTEXT =
ScopedValue.newInstance();
Parallel Streams
Parallel streams use the common ForkJoinPool for CPU-intensive operations on large datasets. Use for CPU work on 10,000+ elements.
@Service
public class PaymentReportService {
public PaymentReport generateReport(List<Payment> payments) {
// GOOD: CPU-intensive aggregation with large dataset
if (payments.size() > 10_000) {
return payments.parallelStream()
.collect(Collectors.teeing(
Collectors.summingDouble(p -> p.getAmount().doubleValue()),
Collectors.counting(),
(sum, count) -> new PaymentReport(Money.of(sum, "USD"), count)
));
}
// Small dataset - sequential is faster
return payments.stream()
.collect(/* same collector */);
}
// BAD: I/O operations in parallel stream
public List<Payment> loadPayments(List<String> ids) {
return ids.parallelStream()
.map(paymentRepository::findById) // Use virtual threads instead!
.toList();
}
}
Testing Concurrent Code
JCStress for Detecting Concurrency Bugs
JCStress systematically explores different thread interleavings to catch subtle bugs.
// build.gradle
dependencies {
testImplementation 'org.openjdk.jcstress:jcstress-core:0.16'
}
@JCStressTest
@Outcome(id = "100", expect = ACCEPTABLE, desc = "All increments applied")
@Outcome(expect = FORBIDDEN, desc = "Lost updates detected")
@State
public class CounterConcurrencyTest {
private final AtomicLong counter = new AtomicLong(0);
@Actor
public void actor1() {
for (int i = 0; i < 50; i++) {
counter.incrementAndGet();
}
}
@Actor
public void actor2() {
for (int i = 0; i < 50; i++) {
counter.incrementAndGet();
}
}
@Arbiter
public void arbiter(L_Result result) {
result.r1 = counter.get();
}
}
Performance Considerations
Virtual Thread Overhead
Virtual threads are lightweight (~1KB vs ~1MB for platform threads), but not free. Consider bounded parallelism for very large datasets.
public void processLargeDataset(List<Payment> payments) {
// GOOD: Reasonable for up to ~1M items
if (payments.size() <= 1_000_000) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
payments.forEach(payment ->
executor.submit(() -> processPayment(payment))
);
}
}
// GOOD: BETTER: Process in chunks for very large datasets
if (payments.size() > 1_000_000) {
var chunks = Lists.partition(payments, 100_000);
for (var chunk : chunks) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
chunk.forEach(payment ->
executor.submit(() -> processPayment(payment))
);
}
}
}
}
Further Reading
Internal Documentation
- Java Concurrency Fundamentals - Thread safety, synchronization, JMM
- Java General - Modern Java features
- Java Performance - JVM tuning
- Spring Boot General - Virtual thread configuration
External Resources
- JEP 444: Virtual Threads
- JEP 453: Structured Concurrency (Preview)
- Project Loom
- JCStress Documentation
Summary
Key Takeaways
- Use virtual threads for I/O-bound operations - database queries, API calls, message queues
- Structured concurrency manages concurrent tasks as a single unit with automatic cancellation
- CompletableFuture provides composable async operations with error handling and timeouts
- Reactive streams handle backpressure between fast producers and slow consumers
- Lock-free algorithms use CAS for high-performance scenarios
- Avoid synchronized blocks in virtual threads - they pin carrier threads; use ReentrantLock
- ThreadLocal is expensive with virtual threads - use scoped values or avoid entirely
- Parallel streams for CPU work, virtual threads for I/O - know when to use each
- Always handle exceptions in async code - unhandled exceptions are silently swallowed
- Test concurrent code with JCStress - detects subtle concurrency bugs
Next Steps: Review Java Performance for JVM tuning and Spring Boot General for virtual thread configuration.