Skip to main content

Java Concurrency Advanced

Why Virtual Threads Matter

Java 25 virtual threads (Project Loom) revolutionize concurrency by making millions of lightweight threads practical. For applications processing high volumes of I/O-bound operations (database queries, API calls), virtual threads provide massive throughput improvements with simpler code than traditional async approaches.

Overview

This guide covers advanced Java concurrency patterns: virtual threads for scalable I/O, structured concurrency for managing concurrent operations as units, CompletableFuture for async composition, reactive streams for backpressure handling, and lock-free algorithms for high-performance scenarios. For foundational concepts like thread safety and synchronization, see Java Concurrency Fundamentals.


Core Principles

  1. Use Virtual Threads for I/O: Perfect for database queries, HTTP calls, message queues
  2. Structured Concurrency: Manage concurrent operations as a single unit with clear lifecycle
  3. CompletableFuture for Async Composition: Chain async operations with error handling
  4. Reactive Streams for Backpressure: Handle fast producers and slow consumers
  5. Lock-Free for Performance: Use CAS-based algorithms for high-contention scenarios

Choosing the Right Model

Workload shapeRecommended modelWhy
Blocking I/O per requestVirtual threadsKeeps code synchronous and easy to reason about while scaling concurrency
Fan-out/fan-in orchestrationStructured concurrency (or CompletableFuture fallback)Represents related subtasks as one logical unit with coordinated cancellation
Event streams with backpressureReactive streamsHandles mismatched producer/consumer speed explicitly
CPU-bound parallel computePlatform threads/ForkJoinPoolBetter control over bounded parallelism for compute-heavy tasks

Virtual Threads (Java 21+; prefer Java 25)

What Are Virtual Threads?

Virtual threads are lightweight threads managed by the JVM, not the operating system. They allow millions of concurrent tasks without the overhead of platform threads.

Platform threads (traditional Java threads) map 1:1 to operating system threads. OS threads are expensive - each consumes ~1MB of stack memory and context switching requires kernel involvement. This limits Java applications to thousands of concurrent platform threads.

Virtual threads break this limitation by running on a small pool of platform "carrier" threads. When a virtual thread blocks on I/O, it's unmounted from its carrier thread, freeing that carrier to run other virtual threads. This means millions of virtual threads can share a handful of carrier threads efficiently.

// Traditional platform thread (expensive: ~1MB stack, OS thread overhead)
Thread platformThread = Thread.ofPlatform()
.name("platform-worker")
.start(() -> processPayment(payment));

// Virtual thread (lightweight: ~few KB, JVM-managed)
Thread virtualThread = Thread.ofVirtual()
.name("virtual-worker")
.start(() -> processPayment(payment)); // Same blocking code works!

When to Use Virtual Threads

Good: Use virtual threads for I/O-bound operations:

  • Database queries: Waiting for database response
  • HTTP/REST API calls: Network I/O dominates
  • Message queue consumers: Blocking on queue.take()
  • File I/O operations: Reading/writing files

Bad fit: Avoid virtual threads for:

  • CPU-intensive computations: Use platform threads with ForkJoinPool
  • Tasks holding synchronized locks: Pins virtual threads to carrier threads
  • Thread-local-heavy code: Millions of copies become expensive

Virtual Thread Pinning and Diagnostics

The most common production mistake with virtual threads is accidental pinning: a virtual thread gets stuck on a carrier thread because of blocking sections combined with monitor locks (synchronized) or native calls. Pinning reduces concurrency and can erase expected throughput gains.

Practical checks:

  • Prefer ReentrantLock or lock-free structures in hot paths over coarse synchronized blocks.
  • Keep blocking I/O outside long critical sections.
  • Inspect thread dumps during load tests and look for many blocked virtual threads sharing few carriers.
  • Run controlled benchmarks before/after enabling virtual threads to validate tail latency (P95/P99), not just average latency.

Virtual Thread Pool with ExecutorService

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

@Service
public class PaymentProcessingService {

private final ExecutorService executor;
private final PaymentRepository paymentRepository;
private final AuditLogger auditLogger;

public PaymentProcessingService(
PaymentRepository paymentRepository,
AuditLogger auditLogger) {
this.paymentRepository = paymentRepository;
this.auditLogger = auditLogger;
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}

public CompletableFuture<List<PaymentResult>> processPaymentBatch(
List<Payment> payments) {

List<CompletableFuture<PaymentResult>> futures = payments.stream()
.map(payment -> CompletableFuture.supplyAsync(
() -> processPayment(payment),
executor
))
.toList();

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}

private PaymentResult processPayment(Payment payment) {
try {
var validatedPayment = paymentRepository.findAndLock(payment.getId());
var result = executePayment(validatedPayment);
auditLogger.log(AuditEventType.PAYMENT_PROCESSED, result);
return result;
} catch (Exception e) {
auditLogger.log(AuditEventType.PAYMENT_FAILED, payment, e);
throw new PaymentProcessingException("Failed to process payment", e);
}
}

@PreDestroy
public void shutdown() {
executor.shutdown();
}
}

Spring Boot Configuration for Virtual Threads

@Configuration
public class VirtualThreadConfig {

@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}

@Bean
public AsyncConfigurer asyncConfigurer() {
return new AsyncConfigurer() {
@Override
public Executor getAsyncExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
};
}
}
# application.yml - Spring Boot 3.5+
spring:
threads:
virtual:
enabled: true

Structured Concurrency (Preview Feature)

What Is Structured Concurrency?

Structured concurrency treats multiple concurrent tasks as a single unit of work with a clear lifecycle. If any subtask fails, all tasks are cancelled.

Traditional concurrent programming with futures leads to "fire and forget" tasks that can leak or fail silently. Structured concurrency enforces a discipline: concurrent tasks must complete before their parent scope exits.

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;

public class PaymentOrchestrationService {

private final PaymentValidator validator;
private final FraudDetectionService fraudService;
private final BalanceService balanceService;

public PaymentValidationResult validatePayment(Payment payment) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

Subtask<ValidationResult> validationTask = scope.fork(
() -> validator.validate(payment)
);

Subtask<FraudCheckResult> fraudTask = scope.fork(
() -> fraudService.checkFraud(payment)
);

Subtask<BalanceCheckResult> balanceTask = scope.fork(
() -> balanceService.checkBalance(payment.getAccountId(), payment.getAmount())
);

// Wait for all to complete (or first failure)
scope.join();
scope.throwIfFailed();

// All tasks succeeded - combine results
return PaymentValidationResult.builder()
.validation(validationTask.get())
.fraudCheck(fraudTask.get())
.balanceCheck(balanceTask.get())
.build();

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PaymentValidationException("Validation interrupted", e);
} catch (ExecutionException e) {
throw new PaymentValidationException("Validation failed", e.getCause());
}
}
}

Structured Concurrency with Deadlines

import java.time.Duration;
import java.time.Instant;

public PaymentResult processPaymentWithTimeout(Payment payment) {
Instant deadline = Instant.now().plus(Duration.ofSeconds(5));

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

Subtask<PaymentResult> paymentTask = scope.fork(
() -> paymentRepository.process(payment)
);

Subtask<AuditLogEntry> auditTask = scope.fork(
() -> auditLogger.logPayment(payment)
);

scope.joinUntil(deadline);
scope.throwIfFailed();

return paymentTask.get();

} catch (TimeoutException e) {
throw new PaymentTimeoutException("Payment processing exceeded 5 seconds", e);
} catch (InterruptedException | ExecutionException e) {
throw new PaymentProcessingException("Payment processing failed", e);
}
}
Preview Feature

Structured concurrency remains a preview/incubator capability in current JDKs. Enable with --enable-preview where required and avoid relying on it for long-lived public APIs.

If your platform policy disallows preview features in production, model the same orchestration pattern with CompletableFuture plus explicit cancellation and timeout handling. Keep the API boundary stable so you can migrate to structured concurrency later without changing service contracts.


CompletableFuture Advanced Patterns

CompletableFuture represents an asynchronous computation that will complete in the future. Beyond basic chaining, it provides powerful combinators for parallel execution, error recovery, and timeout management.

allOf: Wait for All Futures

public CompletableFuture<EnrichedPayment> enrichPayment(Payment payment) {

CompletableFuture<VendorInfo> vendorFuture = CompletableFuture.supplyAsync(
() -> vendorService.getVendorInfo(payment.getVendorId()),
executor
);

CompletableFuture<AccountInfo> accountFuture = CompletableFuture.supplyAsync(
() -> accountService.getAccountInfo(payment.getAccountId()),
executor
);

CompletableFuture<FxRate> fxRateFuture = CompletableFuture.supplyAsync(
() -> fxService.getRate(payment.getCurrency(), "USD"),
executor
);

return CompletableFuture.allOf(vendorFuture, accountFuture, fxRateFuture)
.thenApply(v -> new EnrichedPayment(
payment,
vendorFuture.join(),
accountFuture.join(),
fxRateFuture.join()
));
}

anyOf: Race Multiple Futures

public CompletableFuture<ExchangeRate> getRateWithFallback(String fromCurrency, String toCurrency) {
CompletableFuture<ExchangeRate> primaryProvider = CompletableFuture.supplyAsync(
() -> primaryRateService.getRate(fromCurrency, toCurrency),
executor
);

CompletableFuture<ExchangeRate> secondaryProvider = CompletableFuture.supplyAsync(
() -> secondaryRateService.getRate(fromCurrency, toCurrency),
executor
);

// Return first successful response
return (CompletableFuture<ExchangeRate>) CompletableFuture.anyOf(
primaryProvider,
secondaryProvider
);
}

Exception Handling Patterns

// exceptionally: Simple fallback value
public CompletableFuture<PaymentResult> processWithFallback(Payment payment) {
return processPaymentAsync(payment)
.exceptionally(throwable -> {
log.error("Payment failed, using cached result", throwable);
return getCachedResult(payment);
});
}

// handle: Transform both success and failure
public CompletableFuture<PaymentResult> processWithTransform(Payment payment) {
return processPaymentAsync(payment)
.handle((result, throwable) -> {
if (throwable != null) {
return PaymentResult.failed(payment, throwable.getMessage());
} else {
return enrichResult(result);
}
});
}

// exceptionallyCompose: Async retry on failure
public CompletableFuture<PaymentResult> processPaymentWithRetry(Payment payment) {
return CompletableFuture.supplyAsync(() -> processPayment(payment), executor)
.exceptionallyCompose(throwable -> {
if (throwable.getCause() instanceof TransientException) {
log.warn("Transient failure, retrying payment {}", payment.getId());
return CompletableFuture.supplyAsync(() -> processPayment(payment), executor);
}
return CompletableFuture.failedFuture(throwable);
});
}

Timeouts with orTimeout and completeOnTimeout

// orTimeout: Fail on timeout
public CompletableFuture<PaymentResult> processWithTimeout(Payment payment) {
return processPaymentAsync(payment)
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException) {
log.error("Payment {} timed out", payment.getId());
return PaymentResult.timeout(payment);
}
throw new PaymentProcessingException("Payment failed", throwable);
});
}

// completeOnTimeout: Provide default value on timeout
public CompletableFuture<ExchangeRate> getRateWithDefaultTimeout(
String fromCurrency, String toCurrency) {

ExchangeRate defaultRate = ExchangeRate.of(fromCurrency, toCurrency, BigDecimal.ONE);

return CompletableFuture.supplyAsync(
() -> rateService.getRate(fromCurrency, toCurrency),
executor
)
.completeOnTimeout(defaultRate, 2, TimeUnit.SECONDS);
}

Chaining Dependent Async Operations

Use thenCompose() (not thenApply()) when the next operation returns a CompletableFuture.

// GOOD: thenCompose for dependent async operations
public CompletableFuture<PaymentResult> processPaymentWorkflow(Payment payment) {
return validatePaymentAsync(payment)
.thenCompose(validatedPayment ->
authorizePaymentAsync(validatedPayment)
)
.thenCompose(authResult ->
capturePaymentAsync(authResult)
)
.thenApply(result -> {
// Sync transformation
return enrichResult(result);
});
}

Reactive Streams with Project Reactor

Reactive Streams provide asynchronous, non-blocking stream processing with backpressure. Project Reactor (used by Spring WebFlux) implements this spec with Flux (0..N elements) and Mono (0..1 element).

When to Use Reactive Streams

Use reactive streams when you need:

  • Non-blocking I/O at scale: Thousands of concurrent operations
  • Backpressure management: Preventing fast producers from overwhelming slow consumers
  • Stream processing: Complex transformations of async data streams

Don't use reactive streams for:

  • Simple CRUD applications (virtual threads are simpler)
  • Operations that already block (JDBC without R2DBC)
  • Small-scale concurrency (complexity not worth it)

Mono: Single Async Value

import reactor.core.publisher.Mono;

@Service
public class ReactivePaymentService {

private final WebClient paymentApiClient;

public Mono<PaymentResult> processPaymentAsync(Payment payment) {
return paymentApiClient.post()
.uri("/payments")
.bodyValue(payment)
.retrieve()
.bodyToMono(PaymentResult.class)
.timeout(Duration.ofSeconds(5))
.doOnSuccess(result -> log.info("Payment processed: {}", result.getId()))
.doOnError(error -> log.error("Payment failed", error));
}

public Mono<EnrichedPayment> enrichPayment(String paymentId) {
return findPaymentById(paymentId)
.flatMap(payment -> {
Mono<VendorInfo> vendorMono = getVendorInfo(payment.getVendorId());
Mono<AccountInfo> accountMono = getAccountInfo(payment.getAccountId());

return Mono.zip(vendorMono, accountMono)
.map(tuple -> new EnrichedPayment(
payment,
tuple.getT1(),
tuple.getT2()
));
});
}
}

Flux: Stream of Async Values

import reactor.core.publisher.Flux;

@Service
public class ReactiveReportService {

public Flux<Payment> streamPayments(LocalDate date) {
return template.select(
Query.query(Criteria.where("created_date").is(date)),
Payment.class
);
}

public Flux<PaymentSummary> generatePaymentSummaries(LocalDate date) {
return streamPayments(date)
.map(payment -> new PaymentSummary(
payment.getId(),
payment.getAmount(),
payment.getStatus()
))
.filter(summary -> summary.getAmount().isGreaterThan(Money.ZERO));
}

public Flux<List<Payment>> streamInBatches(LocalDate date, int batchSize) {
return streamPayments(date)
.buffer(batchSize)
.doOnNext(batch -> log.info("Processing batch of {}", batch.size()));
}
}

Backpressure Strategies

// Buffer: Store excess elements
Flux<Payment> buffered = paymentStream
.onBackpressureBuffer(1000);

// Drop: Discard excess elements
Flux<Payment> dropped = paymentStream
.onBackpressureDrop(payment ->
log.warn("Dropped payment: {}", payment.getId())
);

// Latest: Keep only latest element
Flux<Payment> latest = paymentStream
.onBackpressureLatest();

// Error: Fail when backpressure occurs
Flux<Payment> errorOnBackpressure = paymentStream
.onBackpressureError();

Error Handling in Reactive Streams

public Mono<PaymentResult> processWithErrorHandling(Payment payment) {
return processPaymentAsync(payment)
.onErrorReturn(throwable -> throwable instanceof PaymentValidationException,
PaymentResult.invalid(payment))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof TransientException)
.doBeforeRetry(signal ->
log.warn("Retrying payment {} (attempt {})",
payment.getId(), signal.totalRetries() + 1)
))
.doOnError(throwable ->
log.error("Payment processing failed permanently", throwable));
}

Lock-Free Algorithms and CAS Operations

Lock-free algorithms achieve thread safety without locks using atomic CPU instructions (Compare-And-Swap, CAS). They avoid lock contention and deadlock risks at the cost of potential spinning.

AtomicReference and CAS

import java.util.concurrent.atomic.AtomicReference;

@Service
public class LockFreeCache<K, V> {

private static class CacheEntry<V> {
final V value;
final long timestamp;

CacheEntry(V value, long timestamp) {
this.value = value;
this.timestamp = timestamp;
}
}

private final ConcurrentHashMap<K, AtomicReference<CacheEntry<V>>> cache = new ConcurrentHashMap<>();
private final Duration ttl;

public V get(K key, Supplier<V> loader) {
return cache.computeIfAbsent(key, k -> new AtomicReference<>())
.updateAndGet(current -> {
long now = System.currentTimeMillis();

if (current == null || (now - current.timestamp) > ttl.toMillis()) {
V newValue = loader.get();
return new CacheEntry<>(newValue, now);
}

return current;
}).value;
}
}

Lock-Free Stack with Treiber Algorithm

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeStack<T> {

private static class Node<T> {
final T value;
final Node<T> next;

Node(T value, Node<T> next) {
this.value = value;
this.next = next;
}
}

private final AtomicReference<Node<T>> head = new AtomicReference<>();

public void push(T value) {
Node<T> newHead = new Node<>(value, null);

while (true) {
Node<T> currentHead = head.get();
newHead = new Node<>(value, currentHead);

if (head.compareAndSet(currentHead, newHead)) {
return;
}
}
}

public T pop() {
while (true) {
Node<T> currentHead = head.get();

if (currentHead == null) {
return null;
}

Node<T> newHead = currentHead.next;

if (head.compareAndSet(currentHead, newHead)) {
return currentHead.value;
}
}
}
}

When to Use Lock-Free Algorithms

Good: Use lock-free atomics for:

  • Simple counters, flags, references
  • High-contention scenarios where lock overhead is measurable
  • Systems requiring progress guarantees

Avoid lock-free algorithms when:

  • Operation requires multiple steps
  • Complexity isn't justified
  • Low contention (locks are simpler)

Common Virtual Thread Pitfalls

1. Blocking Operations Pinning Carrier Threads

When a virtual thread enters a synchronized block and then blocks on I/O, it cannot unmount from its carrier thread.

// BAD: Holding synchronized lock causes pinning
public synchronized PaymentResult processPayment(Payment payment) {
var result = externalApi.call(payment); // Virtual thread pinned!
return result;
}

// GOOD: Use ReentrantLock
private final ReentrantLock lock = new ReentrantLock();

public PaymentResult processPaymentOptimized(Payment payment) {
lock.lock();
try {
var result = externalApi.call(payment);
return result;
} finally {
lock.unlock();
}
}

2. Thread-Local Abuse

With millions of virtual threads, thread-local storage explodes in memory.

// BAD: Millions of copies with virtual threads
private static final ThreadLocal<DateFormat> formatter =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));

// GOOD: Use thread-safe immutable formatter
private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd");

// GOOD: BETTER: Use scoped values (Java 21 preview)
private static final ScopedValue<RequestContext> REQUEST_CONTEXT =
ScopedValue.newInstance();

Parallel Streams

Parallel streams use the common ForkJoinPool for CPU-intensive operations on large datasets. Use for CPU work on 10,000+ elements.

@Service
public class PaymentReportService {

public PaymentReport generateReport(List<Payment> payments) {
// GOOD: CPU-intensive aggregation with large dataset
if (payments.size() > 10_000) {
return payments.parallelStream()
.collect(Collectors.teeing(
Collectors.summingDouble(p -> p.getAmount().doubleValue()),
Collectors.counting(),
(sum, count) -> new PaymentReport(Money.of(sum, "USD"), count)
));
}

// Small dataset - sequential is faster
return payments.stream()
.collect(/* same collector */);
}

// BAD: I/O operations in parallel stream
public List<Payment> loadPayments(List<String> ids) {
return ids.parallelStream()
.map(paymentRepository::findById) // Use virtual threads instead!
.toList();
}
}

Testing Concurrent Code

JCStress for Detecting Concurrency Bugs

JCStress systematically explores different thread interleavings to catch subtle bugs.

// build.gradle
dependencies {
testImplementation 'org.openjdk.jcstress:jcstress-core:0.16'
}

@JCStressTest
@Outcome(id = "100", expect = ACCEPTABLE, desc = "All increments applied")
@Outcome(expect = FORBIDDEN, desc = "Lost updates detected")
@State
public class CounterConcurrencyTest {

private final AtomicLong counter = new AtomicLong(0);

@Actor
public void actor1() {
for (int i = 0; i < 50; i++) {
counter.incrementAndGet();
}
}

@Actor
public void actor2() {
for (int i = 0; i < 50; i++) {
counter.incrementAndGet();
}
}

@Arbiter
public void arbiter(L_Result result) {
result.r1 = counter.get();
}
}

Performance Considerations

Virtual Thread Overhead

Virtual threads are lightweight (~1KB vs ~1MB for platform threads), but not free. Consider bounded parallelism for very large datasets.

public void processLargeDataset(List<Payment> payments) {
// GOOD: Reasonable for up to ~1M items
if (payments.size() <= 1_000_000) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
payments.forEach(payment ->
executor.submit(() -> processPayment(payment))
);
}
}

// GOOD: BETTER: Process in chunks for very large datasets
if (payments.size() > 1_000_000) {
var chunks = Lists.partition(payments, 100_000);
for (var chunk : chunks) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
chunk.forEach(payment ->
executor.submit(() -> processPayment(payment))
);
}
}
}
}

Further Reading

Internal Documentation

External Resources


Summary

Key Takeaways

  1. Use virtual threads for I/O-bound operations - database queries, API calls, message queues
  2. Structured concurrency manages concurrent tasks as a single unit with automatic cancellation
  3. CompletableFuture provides composable async operations with error handling and timeouts
  4. Reactive streams handle backpressure between fast producers and slow consumers
  5. Lock-free algorithms use CAS for high-performance scenarios
  6. Avoid synchronized blocks in virtual threads - they pin carrier threads; use ReentrantLock
  7. ThreadLocal is expensive with virtual threads - use scoped values or avoid entirely
  8. Parallel streams for CPU work, virtual threads for I/O - know when to use each
  9. Always handle exceptions in async code - unhandled exceptions are silently swallowed
  10. Test concurrent code with JCStress - detects subtle concurrency bugs

Next Steps: Review Java Performance for JVM tuning and Spring Boot General for virtual thread configuration.