Skip to main content

Kotlin Concurrency Patterns

Overview

This guide covers advanced concurrency patterns in Kotlin, building on the foundational coroutine concepts covered in Kotlin General. We explore channels for communication between coroutines, actors for encapsulating state, managing shared mutable state safely, concurrent collection patterns, and best practices for building thread-safe concurrent systems.


Core Principles

  1. Structured Concurrency: Parent coroutines manage child lifetimes
  2. Channels: Safe communication between coroutines
  3. Actors: Encapsulate state with single-threaded access
  4. Avoid Shared Mutable State: Prefer immutability and message passing
  5. Mutex for Critical Sections: When shared state is unavoidable
  6. Concurrent Collections: Use thread-safe data structures appropriately
  7. Error Propagation: Handle failures across coroutine boundaries
  8. Cancellation Support: Respect cancellation throughout async operations
  9. Dispatcher Selection: Choose appropriate thread pools
  10. Testing: Use test dispatchers for deterministic testing

Channels for Communication

Channels provide a way for coroutines to communicate by sending and receiving values. A channel is conceptually similar to a blocking queue, but designed for use with coroutines - sending and receiving are suspend functions that don't block threads.

Understanding Channels

Channels solve the problem of communication between concurrent coroutines without shared mutable state. Instead of multiple coroutines accessing shared data (which requires synchronization), coroutines send messages through channels. This follows the principle: "Don't communicate by sharing memory; share memory by communicating."

A channel has two ends: a send end for producing values and a receive end for consuming values. Channels can be buffered (hold a fixed number of elements before suspending senders) or unbuffered (rendezvous - sender and receiver must meet). Channels can also be closed to signal no more elements will be sent.

Hot vs Cold: Unlike Flow (which is cold), channels are hot - they start processing immediately when created. Multiple consumers can receive from the same channel, and each element is delivered to exactly one consumer. For Flow patterns, see our Kotlin Flow guide.

Channel Types and Capacity

// GOOD: Understanding channel types
import kotlinx.coroutines.channels.*

// Rendezvous (unbuffered) - sender and receiver must meet
val rendezvousChannel = Channel<Int>()

// Buffered - holds elements until full
val bufferedChannel = Channel<Int>(capacity = 10)

// Unlimited - never suspends sender (can lead to memory issues)
val unlimitedChannel = Channel<Int>(Channel.UNLIMITED)

// Conflated - keeps only the latest value, drops old ones
val conflatedChannel = Channel<Int>(Channel.CONFLATED)

// Example: Producer-consumer with buffered channel
fun CoroutineScope.produceNumbers() = produce {
var x = 1
while (true) {
send(x++) // Suspend when buffer is full
delay(100)
}
}

suspend fun consumeNumbers() {
val channel = CoroutineScope(Dispatchers.Default).produceNumbers()

// Consume values
for (number in channel) {
println("Received: $number")
delay(200) // Slower consumer
}
}

The capacity determines buffering behavior. Rendezvous (capacity 0) requires sender and receiver to synchronize. Buffered allows some decoupling but suspends when full. Unlimited never suspends the sender but can consume unbounded memory. Conflated is useful when you only care about the latest value.

Pipeline Pattern

Channels enable pipeline patterns where multiple stages process data sequentially. Each stage is a coroutine that receives from an input channel, processes values, and sends to an output channel. This pattern is powerful for stream processing, ETL pipelines, or multi-stage data transformations.

// GOOD: Pipeline pattern with channels
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++)
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) {
if (x % 2 == 0) send(x)
}
}

// Usage: Pipeline of operations
suspend fun processData() = coroutineScope {
val numbers = produceNumbers()
val squared = square(numbers)
val filtered = filter(squared)

// Consume final results
repeat(10) {
println(filtered.receive())
}

// Cancel pipeline
coroutineContext.cancelChildren()
}

Each stage runs concurrently, providing natural parallelism. The pipeline automatically applies backpressure - if a downstream stage is slow, upstream stages suspend. This prevents memory issues from buffering too many elements.

Fan-Out and Fan-In

Fan-out means multiple coroutines receive from the same channel, distributing work. Fan-in means multiple coroutines send to the same channel, combining results. These patterns enable load balancing and parallel processing.

// GOOD: Fan-out - multiple workers consuming from one channel
fun CoroutineScope.produceWork() = produce<Int> {
repeat(100) { send(it) }
}

fun CoroutineScope.launchWorker(
id: Int,
channel: ReceiveChannel<Int>
) = launch {
for (work in channel) {
println("Worker $id processing $work")
delay(100) // Simulate work
}
}

suspend fun fanOutExample() = coroutineScope {
val workChannel = produceWork()

// Launch multiple workers
repeat(5) { workerId ->
launchWorker(workerId, workChannel)
}
}

// GOOD: Fan-in - multiple producers sending to one channel
suspend fun fanInExample() = coroutineScope {
val resultChannel = Channel<Int>()

// Multiple producers
repeat(3) { producerId ->
launch {
repeat(10) {
resultChannel.send(producerId * 100 + it)
delay(50)
}
}
}

// Single consumer
launch {
repeat(30) {
println("Result: ${resultChannel.receive()}")
}
resultChannel.close()
}
}

Fan-out distributes work across workers, enabling parallel processing. Each worker receives different elements. Fan-in collects results from parallel operations into a single stream. Both patterns are essential for building scalable concurrent systems.


Actors for State Encapsulation

An actor is a coroutine that encapsulates state and processes messages sequentially. Only the actor can access its state, eliminating concurrency issues. External code communicates with the actor by sending messages through a channel. This pattern enforces single-threaded access to mutable state without explicit locking.

Why Actors Matter

Shared mutable state is the root cause of most concurrency bugs - race conditions, deadlocks, and data corruption all stem from multiple threads accessing mutable data. Traditional approaches use locks, which are error-prone (easy to forget, hard to get right) and can cause deadlocks.

Actors eliminate these problems by design. State is private to the actor coroutine. All access goes through message passing. The actor processes one message at a time sequentially, so no synchronization is needed. This model is inspired by Erlang's actor model and provides a clean way to manage concurrent state.

Building an Actor

// GOOD: Actor pattern for managing counter state
sealed class CounterMsg
object IncrementCounter : CounterMsg()
object DecrementCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // Private state - only accessed by this actor

for (msg in channel) { // Process messages sequentially
when (msg) {
is IncrementCounter -> counter++
is DecrementCounter -> counter--
is GetCounter -> msg.response.complete(counter)
}
}
}

// Usage: Multiple coroutines safely interacting with actor
suspend fun actorExample() = coroutineScope {
val counter = counterActor()

// Launch multiple coroutines incrementing counter
val jobs = List(100) {
launch {
repeat(1000) {
counter.send(IncrementCounter)
}
}
}

jobs.forEach { it.join() }

// Get final count
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Final count: ${response.await()}") // 100,000 - no lost updates
counter.close()
}

The actor owns the counter state. External code sends messages (increment, decrement, get). The actor processes messages one at a time, modifying state safely. For responses, use CompletableDeferred to send results back. This pattern scales to complex state management scenarios. For state management in Android, see our Android architecture guide.

Actor vs Mutex

Actors and mutexes both solve the shared state problem but with different approaches. A mutex protects a critical section with a lock - multiple threads compete for access. An actor processes messages sequentially - no competition, no blocking.

Use actors when you have complex state with operations that don't need immediate results. Use mutexes for simple state with quick operations where message-passing overhead isn't justified. Actors are more scalable and deadlock-free; mutexes are simpler for trivial cases.

// Actor approach: Message-based, no blocking
val actor = counterActor()
actor.send(IncrementCounter) // Non-blocking send
val response = CompletableDeferred<Int>()
actor.send(GetCounter(response))
val count = response.await() // Suspends, doesn't block thread

// Mutex approach: Lock-based
val mutex = Mutex()
var counter = 0

mutex.withLock {
counter++ // Critical section protected
}

Managing Shared Mutable State

When shared mutable state is unavoidable (e.g., caching, shared counters), you must synchronize access. Kotlin provides several primitives: Mutex for mutual exclusion, Semaphore for limiting concurrency, and atomic operations for lock-free programming.

Mutex for Critical Sections

A Mutex (mutual exclusion) ensures only one coroutine accesses a critical section at a time. Unlike traditional locks which block threads, Mutex.lock() is a suspend function - waiting coroutines don't block threads. Always use withLock to ensure the mutex is released even if an exception occurs.

// GOOD: Mutex for protecting shared state
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

class SafeCounter {
private val mutex = Mutex()
private var count = 0

suspend fun increment() {
mutex.withLock {
count++ // Critical section - only one coroutine at a time
}
}

suspend fun get(): Int = mutex.withLock {
count // Read also needs protection
}
}

// Usage
suspend fun concurrentIncrements() = coroutineScope {
val counter = SafeCounter()

// 100 coroutines incrementing concurrently
val jobs = List(100) {
launch {
repeat(1000) {
counter.increment()
}
}
}

jobs.forEach { it.join() }
println("Final count: ${counter.get()}") // 100,000 - no lost updates
}

// BAD: No synchronization - race condition
class UnsafeCounter {
private var count = 0

fun increment() {
count++ // Not atomic! Multiple coroutines can interfere
}
}

The mutex ensures atomicity of the increment operation. Without it, multiple coroutines could read the same value, increment it, and write back, losing updates. Always protect both reads and writes if writes are occurring.

Atomic Operations

For simple operations like incrementing counters, atomic operations provide lock-free synchronization. AtomicInteger, AtomicLong, and AtomicReference use CPU-level compare-and-swap instructions for thread-safe updates without locks.

// GOOD: Atomic operations for simple concurrent state
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

class AtomicCounter {
private val count = AtomicInteger(0)

fun increment() {
count.incrementAndGet() // Thread-safe, lock-free
}

fun get(): Int = count.get()
}

// GOOD: Atomic reference for object updates
class Cache<K, V> {
private val map = AtomicReference<Map<K, V>>(emptyMap())

fun put(key: K, value: V) {
map.updateAndGet { currentMap ->
currentMap + (key to value) // Creates new immutable map
}
}

fun get(key: K): V? = map.get()[key]
}

// Usage
suspend fun atomicExample() = coroutineScope {
val counter = AtomicCounter()

List(100) {
launch {
repeat(1000) {
counter.increment()
}
}
}.forEach { it.join() }

println("Final count: ${counter.get()}") // 100,000
}

Atomics are faster than mutexes for simple operations because they avoid context switching. However, they only work for single-variable operations. For multi-step operations or complex state updates, use a mutex or actor. For Java concurrency primitives, see our Java concurrency guide.

Thread-Safe Collections

Kotlin/JVM provides concurrent collections from java.util.concurrent for thread-safe data structure access. Use ConcurrentHashMap for maps, CopyOnWriteArrayList for lists with frequent reads and rare writes, and ConcurrentLinkedQueue for queues.

// GOOD: Thread-safe collections
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList

class UserCache {
// ConcurrentHashMap - thread-safe, no external synchronization needed
private val cache = ConcurrentHashMap<String, User>()

fun addUser(user: User) {
cache[user.id] = user
}

fun getUser(id: String): User? = cache[id]

// Atomic check-and-update
fun addIfAbsent(user: User): User? {
return cache.putIfAbsent(user.id, user)
}
}

class EventLog {
// CopyOnWriteArrayList - optimized for reads, copies on write
private val events = CopyOnWriteArrayList<Event>()

fun addEvent(event: Event) {
events.add(event) // Thread-safe
}

fun getAllEvents(): List<Event> = events.toList()
}

// BAD: Regular collection with manual synchronization
class UnsafeCache {
private val cache = mutableMapOf<String, User>()

@Synchronized // Works but less efficient than ConcurrentHashMap
fun addUser(user: User) {
cache[user.id] = user
}
}

ConcurrentHashMap uses fine-grained locking - different segments can be modified concurrently. It's more efficient than wrapping a regular map with a mutex. CopyOnWriteArrayList creates a new copy on every modification, making it suitable when reads vastly outnumber writes.


Error Handling in Concurrent Code

Exceptions in coroutines propagate through the coroutine hierarchy. Uncaught exceptions cancel the parent coroutine and all siblings. Understanding exception propagation is crucial for building resilient concurrent systems.

Structured Exception Handling

By default, an exception in a child coroutine cancels the parent and all siblings. This is often desired - if one part of a concurrent operation fails, you want to cancel related work. However, you can isolate failures using SupervisorJob or supervisorScope.

// GOOD: Default behavior - exception cancels all siblings
suspend fun defaultExceptionBehavior() {
try {
coroutineScope {
launch {
delay(100)
throw Exception("Task 1 failed")
}
launch {
delay(200)
println("Task 2 completed") // Never prints - cancelled by sibling
}
}
} catch (e: Exception) {
println("Caught: ${e.message}")
}
}

// GOOD: SupervisorScope - exceptions don't cancel siblings
suspend fun supervisedExceptions() {
supervisorScope {
val job1 = launch {
delay(100)
throw Exception("Task 1 failed")
}
val job2 = launch {
delay(200)
println("Task 2 completed") // Prints - not affected by sibling
}

// Handle individual failures
job1.invokeOnCompletion { throwable ->
if (throwable != null) {
println("Task 1 failed: ${throwable.message}")
}
}
}
}

// GOOD: CoroutineExceptionHandler for unhandled exceptions
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught unhandled: $exception")
}

suspend fun withExceptionHandler() {
coroutineScope {
launch(handler) {
throw Exception("Unhandled exception")
}
}
}

Use coroutineScope when you want failures to cancel related work (all-or-nothing). Use supervisorScope when concurrent tasks are independent and one failure shouldn't affect others. For exception handling strategies, see our Java error handling guide.

Try-Catch in Async Coroutines

The async builder doesn't throw exceptions immediately - it stores them in the Deferred and throws when you call await(). This means you must handle exceptions where you await results.

// GOOD: Handling async exceptions
suspend fun asyncExceptionHandling() = coroutineScope {
val deferred = async {
delay(100)
throw Exception("Async task failed")
}

try {
val result = deferred.await() // Exception thrown here
} catch (e: Exception) {
println("Caught: ${e.message}")
}
}

// GOOD: Multiple async operations with individual error handling
suspend fun multipleAsyncWithErrors() = supervisorScope {
val results = listOf(
async { fetchData(1) },
async { fetchData(2) },
async { fetchData(3) }
)

results.map { deferred ->
try {
Result.success(deferred.await())
} catch (e: Exception) {
Result.failure(e)
}
}
}

This pattern allows handling each async operation's result or failure independently. If you don't call await(), the exception is silently ignored (unless the scope propagates it).


Cancellation and Cleanup

Coroutines can be cancelled, and cancellation is cooperative - coroutines must check for cancellation and respond appropriately. Always write cancellable code and clean up resources on cancellation.

Checking for Cancellation

Suspend functions in kotlinx.coroutines automatically check for cancellation. For long-running computations without suspension points, explicitly check isActive or call ensureActive().

// GOOD: Cooperative cancellation
suspend fun cancellableComputation() {
var result = 0
for (i in 1..1000000) {
ensureActive() // Throws CancellationException if cancelled
result += i
}
println("Result: $result")
}

// GOOD: Checking isActive
suspend fun cancellableLoop() {
while (isActive) {
// Work
delay(100) // Suspension point - checks cancellation automatically
}
}

// BAD: Non-cancellable computation
suspend fun nonCancellableComputation() {
var result = 0
for (i in 1..1000000) {
result += i // No cancellation check - blocks cancellation
}
}

delay(), yield(), and other suspending functions check for cancellation. If you have long computations, periodically call ensureActive() or check isActive.

Resource Cleanup

Use try-finally or use to ensure resources are cleaned up even on cancellation. Cancellation throws CancellationException, which is caught by finally blocks.

// GOOD: Resource cleanup with try-finally
suspend fun withResourceCleanup() {
val resource = acquireResource()
try {
// Use resource
doWork(resource)
} finally {
resource.release() // Always executed, even on cancellation
}
}

// GOOD: Using 'use' for automatic cleanup
suspend fun withUseBlock() {
FileInputStream("data.txt").use { stream ->
// Use stream
// Automatically closed on normal completion or cancellation
}
}

// GOOD: Cleanup in actor
fun CoroutineScope.resourceActor() = actor<Message> {
val resource = acquireResource()
try {
for (msg in channel) {
// Process messages
}
} finally {
resource.release() // Cleanup when actor is cancelled
}
}

The finally block executes even if the coroutine is cancelled. For Closeable resources, the use extension function automatically calls close() in a finally block.


Structured Concurrency In-Depth

Structured concurrency is the foundational principle of Kotlin coroutines. It establishes a parent-child relationship between coroutines, ensuring that parent coroutines wait for all children to complete and propagate cancellation through the hierarchy. This prevents common concurrency issues like leaked coroutines, missed exceptions, and resource leaks.

Coroutine Scope Hierarchy

Every coroutine runs within a CoroutineScope, which defines the lifecycle boundary for coroutines. A scope provides context (dispatcher, exception handler, job) and enforces that parent coroutines manage their children. When a parent scope is cancelled, all children are automatically cancelled recursively.

The scope hierarchy forms a tree structure. Parent scopes wait for child coroutines to complete before completing themselves. If any child fails with an uncaught exception, the parent is notified and can cancel remaining children. This structured approach eliminates the chaos of unstructured concurrency where coroutines can "escape" and run indefinitely.

CoroutineScope Builders

Kotlin provides several scope builders, each with different semantics for exception handling and cancellation. Understanding when to use each is essential for writing correct concurrent code.

coroutineScope creates a scope that suspends until all children complete. If any child fails, the scope cancels remaining children and rethrows the exception. Use this when operations are interdependent - if one fails, cancel everything.

supervisorScope creates a scope where child failures don't affect siblings. Each child's failure is isolated. The parent only fails if it throws an exception directly. Use this for independent operations where one failure shouldn't cancel others.

withContext creates a scope with a modified context (typically a different dispatcher). It inherits cancellation from the parent and propagates exceptions normally. Use this to switch dispatchers for specific operations.

// GOOD: coroutineScope - failures cancel all children
suspend fun loadUserData(userId: String): UserData {
return coroutineScope {
// If any async operation fails, all others are cancelled
val profileDeferred = async { loadProfile(userId) }
val settingsDeferred = async { loadSettings(userId) }
val preferencesDeferred = async { loadPreferences(userId) }

try {
UserData(
profile = profileDeferred.await(),
settings = settingsDeferred.await(),
preferences = preferencesDeferred.await()
)
} catch (e: Exception) {
// One failed - all were cancelled
log.error("Failed to load user data: ${e.message}")
throw e
}
}
}

// GOOD: supervisorScope - failures are independent
suspend fun loadDashboard(): Dashboard {
return supervisorScope {
// Each operation is independent
// If one fails, others continue
val userStats = async {
try { loadUserStats() }
catch (e: Exception) { null }
}
val recentActivity = async {
try { loadRecentActivity() }
catch (e: Exception) { emptyList() }
}
val notifications = async {
try { loadNotifications() }
catch (e: Exception) { emptyList() }
}

Dashboard(
stats = userStats.await(),
activity = recentActivity.await(),
notifications = notifications.await()
)
}
}

// GOOD: withContext - switch dispatcher temporarily
suspend fun processData(data: ByteArray): ProcessedData {
// IO work on IO dispatcher
val parsed = withContext(Dispatchers.IO) {
parseData(data)
}

// CPU-intensive work on Default dispatcher
val transformed = withContext(Dispatchers.Default) {
transformData(parsed)
}

return transformed
}

// BAD: Mixing scope types incorrectly
suspend fun incorrectScoping() {
supervisorScope {
// supervisorScope used but treating failures as fatal
val result = async { riskyOperation() }.await()
// If this throws, supervisorScope benefits are lost
}
}

The key distinction: coroutineScope is for "all-or-nothing" operations where partial failure is unacceptable. supervisorScope is for "best-effort" operations where partial success is acceptable. Use withContext purely for dispatcher switching without changing exception semantics.

Cancellation Propagation

Cancellation flows from parent to children automatically. When you cancel a parent job, all child jobs are cancelled recursively. Children can also cancel themselves without affecting the parent. This bidirectional relationship is carefully designed to prevent orphaned coroutines.

Cancellation is cooperative - coroutines must check for cancellation at suspension points. Most kotlinx.coroutines functions do this automatically, but long-running computations need explicit checks.

// GOOD: Parent cancellation cascades to children
suspend fun cancellationExample() {
val job = CoroutineScope(Dispatchers.Default).launch {
launch {
repeat(10) {
println("Child 1: $it")
delay(100)
}
}
launch {
repeat(10) {
println("Child 2: $it")
delay(100)
}
}
}

delay(350)
job.cancel() // Cancels parent and both children
job.join()
}

// GOOD: Child cancellation doesn't affect parent
suspend fun childCancellation() = coroutineScope {
val child1 = launch {
repeat(10) {
println("Child 1: $it")
delay(100)
}
}
val child2 = launch {
repeat(10) {
println("Child 2: $it")
delay(100)
}
}

delay(250)
child1.cancel() // Only cancels child1, child2 continues
child2.join()
}

// GOOD: Explicit cancellation check in computation
suspend fun longComputation(): BigDecimal = withContext(Dispatchers.Default) {
var result = BigDecimal.ZERO
for (i in 1..10_000_000) {
if (i % 1000 == 0) ensureActive() // Check cancellation periodically
result += BigDecimal(i)
}
result
}

// BAD: Non-cooperative cancellation (blocks cancellation)
suspend fun nonCooperativeWork() = withContext(Dispatchers.Default) {
var result = 0
// This loop blocks cancellation - no suspension points or checks
for (i in 1..10_000_000) {
result += i
}
result
}

When implementing custom suspending functions that perform long computations, always include periodic cancellation checks. Use ensureActive() for immediate cancellation or check isActive for graceful shutdown. This ensures your functions respect structured concurrency principles.

Job Relationships and SupervisorJob

Jobs form the backbone of the coroutine hierarchy. A Job represents a cancellable piece of work with a lifecycle (New, Active, Completing, Completed, Cancelling, Cancelled). Child jobs are linked to their parent job, enabling cancellation propagation.

A regular Job propagates child failures to the parent - if a child fails, the parent and all siblings are cancelled. A SupervisorJob isolates child failures - children can fail independently without affecting the parent or siblings. Use SupervisorJob for the root of independent task hierarchies.

// GOOD: Regular Job - child failure cancels parent
suspend fun regularJobExample() {
val job = Job()
val scope = CoroutineScope(job + Dispatchers.Default)

scope.launch {
delay(100)
println("Task 1 completed")
}

scope.launch {
delay(50)
throw Exception("Task 2 failed")
// This exception cancels the parent job and task 1
}

job.join()
println("Parent job cancelled: ${job.isCancelled}") // true
}

// GOOD: SupervisorJob - child failures are isolated
suspend fun supervisorJobExample() {
val supervisorJob = SupervisorJob()
val scope = CoroutineScope(supervisorJob + Dispatchers.Default)

scope.launch {
delay(100)
println("Task 1 completed") // This completes normally
}

scope.launch {
delay(50)
throw Exception("Task 2 failed")
// This exception doesn't affect task 1 or parent
}

delay(200)
println("Parent job cancelled: ${supervisorJob.isCancelled}") // false
}

// GOOD: Custom scope with supervisor job for background tasks
class BackgroundTaskManager {
private val supervisorJob = SupervisorJob()
private val scope = CoroutineScope(supervisorJob + Dispatchers.Default)

fun startTask(id: String, work: suspend () -> Unit) {
scope.launch {
try {
log.info("Starting task $id")
work()
log.info("Task $id completed")
} catch (e: CancellationException) {
log.info("Task $id cancelled")
throw e
} catch (e: Exception) {
log.error("Task $id failed", e)
// Exception doesn't cancel other tasks
}
}
}

fun shutdown() {
supervisorJob.cancel()
}
}

// GOOD: Mixing supervisor and regular scopes
suspend fun mixedScopes() {
supervisorScope {
// Independent top-level tasks
launch {
coroutineScope {
// Interdependent sub-tasks
launch { task1() }
launch { task2() }
// If task1 or task2 fails, both are cancelled
// But this doesn't affect the sibling launch below
}
}
launch {
independentTask()
}
}
}

The pattern of using SupervisorJob for the root scope and regular jobs for subtasks is common. It creates fault isolation at the top level while maintaining all-or-nothing semantics for related operations. This is particularly useful in Android ViewModels where you want independent UI operations.


Flow Operators and Transformations

Flow is Kotlin's reactive streams library for asynchronous data streams. Unlike channels (hot), flows are cold - they don't start emitting until collected. Flows support rich transformation operators for mapping, filtering, combining, and processing asynchronous sequences. Understanding these operators is essential for building reactive data pipelines.

Core Flow Operations

Flows provide a declarative API for stream transformations. Operators are applied to the flow and return a new flow - they don't modify the original. Collections trigger execution of the entire flow pipeline. Intermediate operators are lazy; terminal operators trigger execution.

The fundamental operators mirror collection operations but work asynchronously. map transforms each element, filter selects elements, flatMapMerge flattens nested flows, and reduce/fold aggregate values. Understanding these building blocks enables composing complex reactive pipelines.

// GOOD: Basic flow transformations
fun getPaymentFlow(): Flow<Payment> = flow {
// Emit payments from database
val payments = database.getAllPayments()
payments.forEach { emit(it) }
}

suspend fun processPayments() {
getPaymentFlow()
.filter { it.status == PaymentStatus.COMPLETED }
.map { it.amount }
.collect { amount ->
println("Amount: $amount")
}
}

// GOOD: Complex transformation pipeline
fun observeAccountBalance(accountId: String): Flow<BigDecimal> {
return accountUpdatesFlow(accountId)
.map { update -> update.accountId }
.distinctUntilChanged() // Only emit when account ID changes
.flatMapLatest { id -> loadAccountBalance(id) }
.catch { e ->
log.error("Error loading balance", e)
emit(BigDecimal.ZERO) // Fallback on error
}
.flowOn(Dispatchers.IO) // Execute upstream on IO dispatcher
}

// GOOD: Combining multiple flows
suspend fun observePaymentStatus() {
val statusFlow = paymentStatusFlow()
val userFlow = currentUserFlow()

statusFlow.combine(userFlow) { status, user ->
PaymentStatusUpdate(
status = status,
userId = user.id,
timestamp = System.currentTimeMillis()
)
}.collect { update ->
updateUI(update)
}
}

Flows are cold by default - each collector triggers independent execution of the flow. Use shareIn or stateIn to create hot flows that share execution among multiple collectors. For reactive UI patterns with Flow, see our Android data guide.

Buffering and Conflation

By default, flow collectors process elements sequentially - the next element isn't emitted until the previous one is processed. Buffering allows concurrent processing: emissions and collections happen in parallel. Conflation skips intermediate values when the collector is slow, keeping only the latest.

buffer creates a buffer between emitter and collector, allowing them to proceed independently. Useful when emission and collection have different speeds. conflate keeps only the latest value, dropping intermediates. Useful for UI updates where only the latest state matters. collectLatest cancels the previous collection when a new value arrives.

// GOOD: Understanding flow timing without buffering
suspend fun sequentialFlow() {
flow {
repeat(5) {
emit(it)
println("Emitted $it at ${System.currentTimeMillis()}")
}
}.collect {
println("Collecting $it at ${System.currentTimeMillis()}")
delay(100) // Slow collector blocks next emission
println("Processed $it")
}
}

// GOOD: Buffering for concurrent emission and collection
suspend fun bufferedFlow() {
flow {
repeat(5) {
emit(it)
println("Emitted $it")
}
}
.buffer() // Emissions continue while collector processes
.collect {
println("Collecting $it")
delay(100) // Slow collection doesn't block emissions
println("Processed $it")
}
}

// GOOD: Conflation for fast emissions, slow collection
suspend fun conflatedFlow() {
flow {
repeat(10) {
emit(it)
delay(50) // Fast emissions
}
}
.conflate() // Drop intermediate values if collector is slow
.collect {
println("Collecting $it")
delay(200) // Slow collector receives only latest
}
// May print: 0, 4, 7, 9 (skips intermediate values)
}

// GOOD: collectLatest for cancellable collection
suspend fun latestCollection() {
flow {
repeat(5) {
emit(it)
delay(100)
}
}.collectLatest { value ->
println("Processing $value")
delay(200) // Long processing
println("Finished $value") // May not print if next value arrives
}
// Only the last value completes processing fully
}

// GOOD: Buffering in reactive UI updates
fun observeSearchResults(query: Flow<String>): Flow<List<Result>> {
return query
.debounce(300) // Wait for typing to stop
.distinctUntilChanged() // Only search when query changes
.flatMapLatest { q -> searchDatabase(q) }
.buffer() // Buffer results to avoid blocking emissions
.flowOn(Dispatchers.IO)
}

Choosing the right strategy depends on your use case. No buffering ensures sequential processing - every element is processed. Buffer enables parallelism - producer and consumer work concurrently. Conflate prioritizes latest data - useful for real-time updates where intermediate values are irrelevant. collectLatest cancels previous work - ideal for search-as-you-type where only the latest query matters.

FlatMap Variants

Flow provides several flatMap variants for handling nested flows. These operators flatten flows-of-flows into a single stream but differ in concurrency semantics. Understanding the differences is crucial for correct behavior and performance.

flatMapConcat processes nested flows sequentially - waits for each inner flow to complete before starting the next. No concurrency, preserves order. flatMapMerge processes nested flows concurrently - multiple inner flows emit simultaneously. High concurrency, no order guarantee. flatMapLatest cancels the previous inner flow when a new one arrives - only the latest inner flow emits. Useful for cancellable operations like search.

// GOOD: flatMapConcat - sequential processing
suspend fun sequentialFlatMap() {
flowOf(1, 2, 3)
.flatMapConcat { value ->
flow {
emit("$value-A")
delay(100)
emit("$value-B")
}
}
.collect { println(it) }
// Prints: 1-A, 1-B, 2-A, 2-B, 3-A, 3-B (in order)
}

// GOOD: flatMapMerge - concurrent processing
suspend fun concurrentFlatMap() {
flowOf(1, 2, 3)
.flatMapMerge(concurrency = 3) { value ->
flow {
emit("$value-A")
delay(100)
emit("$value-B")
}
}
.collect { println(it) }
// Prints: 1-A, 2-A, 3-A, 1-B, 2-B, 3-B (interleaved)
}

// GOOD: flatMapLatest - cancellable operations
fun searchFlow(queryFlow: Flow<String>): Flow<List<Result>> {
return queryFlow
.debounce(300)
.flatMapLatest { query ->
// Previous search is cancelled when new query arrives
searchDatabase(query)
}
}

// GOOD: Practical example - loading related data
fun loadUserWithDetails(userIds: Flow<String>): Flow<UserDetails> {
return userIds
.flatMapMerge(concurrency = 5) { userId ->
flow {
val user = loadUser(userId)
val profile = loadProfile(userId)
val settings = loadSettings(userId)
emit(UserDetails(user, profile, settings))
}
}
.flowOn(Dispatchers.IO)
}

// GOOD: Handling errors in flatMap
fun loadDataWithRetry(ids: Flow<String>): Flow<Data> {
return ids
.flatMapMerge { id ->
flow {
val data = retryWithBackoff {
loadData(id)
}
emit(data)
}.catch { e ->
log.error("Failed to load $id", e)
// Emit fallback or skip
}
}
}

Choosing the right variant: Use flatMapConcat when order matters and operations must be sequential. Use flatMapMerge for maximum concurrency when order doesn't matter - great for parallel data loading. Use flatMapLatest for cancellable operations where only the latest result is relevant - perfect for reactive search, autocomplete, or following the latest user selection.


StateFlow vs SharedFlow vs Flow

Kotlin provides three related but distinct types for reactive streams: Flow (cold, unicast), SharedFlow (hot, multicast, no state), and StateFlow (hot, multicast, stateful). Understanding when to use each is essential for building reactive applications.

Cold vs Hot Flows

Cold flows (Flow) are lazy - they don't produce values until collected. Each collector triggers independent execution of the flow. Think of them as functions that return a sequence - calling the function executes it. Cold flows are ideal for operations like database queries, network requests, or any computation that should run per-consumer.

Hot flows (SharedFlow, StateFlow) are active - they emit values regardless of collectors. Multiple collectors share the same execution. Think of them as broadcasts - the source emits once, all listeners receive. Hot flows are ideal for events, state updates, or any scenario where multiple observers need the same data without triggering duplicate work.

StateFlow for State Management

StateFlow is a hot flow that always holds a value (state). It emits the current value immediately to new collectors and emits subsequent updates. StateFlow conflates values - it only keeps the latest state, skipping intermediates if collectors are slow. StateFlow never completes - it represents ongoing state.

StateFlow is perfect for UI state, configuration settings, or any value that represents current state rather than events. Because it always has a value, collectors don't need to handle "no value yet" scenarios. The conflation behavior ensures UI always sees the latest state without processing stale intermediates.

// GOOD: StateFlow for UI state
class PaymentViewModel : ViewModel() {
private val _uiState = MutableStateFlow<PaymentUiState>(PaymentUiState.Idle)
val uiState: StateFlow<PaymentUiState> = _uiState.asStateFlow()

fun processPayment(payment: Payment) {
viewModelScope.launch {
_uiState.value = PaymentUiState.Loading

try {
val result = paymentRepository.process(payment)
_uiState.value = PaymentUiState.Success(result)
} catch (e: Exception) {
_uiState.value = PaymentUiState.Error(e.message ?: "Unknown error")
}
}
}
}

// UI collects state
lifecycleScope.launch {
viewModel.uiState.collect { state ->
when (state) {
is PaymentUiState.Idle -> showIdleView()
is PaymentUiState.Loading -> showLoadingSpinner()
is PaymentUiState.Success -> showSuccessMessage(state.result)
is PaymentUiState.Error -> showError(state.message)
}
}
}

// GOOD: Converting cold Flow to hot StateFlow
class UserRepository {
private val userDao: UserDao

// Database flow is cold - executes query per collector
private val userFlow: Flow<User?> = userDao.observeUser(userId)

// Convert to hot StateFlow with initial value
val currentUser: StateFlow<User?> = userFlow
.stateIn(
scope = repositoryScope,
started = SharingStarted.WhileSubscribed(5000), // Keep active 5s after last subscriber
initialValue = null
)
}

// GOOD: Exposing read-only StateFlow
class ConfigurationManager {
private val _config = MutableStateFlow(Configuration.default())
val config: StateFlow<Configuration> = _config.asStateFlow()

fun updateConfig(update: Configuration) {
_config.value = update
}
}

// BAD: Exposing mutable StateFlow (breaks encapsulation)
class BadViewModel {
val state = MutableStateFlow(UiState.Idle) // Anyone can modify!
}

The key pattern: Use MutableStateFlow internally, expose StateFlow (read-only) publicly. This encapsulates state mutation while allowing external observation. For StateFlow in Android MVVM architecture, see our Android architecture guide.

SharedFlow for Events

SharedFlow is a hot flow for broadcasts without state. It emits values to all active collectors but doesn't remember past emissions (unless configured with replay). SharedFlow never completes and can be configured to buffer or drop values when collectors are slow.

SharedFlow is ideal for events - one-time occurrences like button clicks, navigation events, or error notifications. Unlike StateFlow, SharedFlow doesn't conflate by default, ensuring every event is delivered. The replay parameter controls how many past emissions new collectors receive (0 = no replay, typical for events).

// GOOD: SharedFlow for navigation events
class NavigationManager {
private val _navigationEvents = MutableSharedFlow<NavigationEvent>()
val navigationEvents: SharedFlow<NavigationEvent> = _navigationEvents.asSharedFlow()

suspend fun navigateTo(destination: Destination) {
_navigationEvents.emit(NavigationEvent.Navigate(destination))
}

suspend fun navigateBack() {
_navigationEvents.emit(NavigationEvent.Back)
}
}

// Multiple collectors can observe
lifecycleScope.launch {
navigationManager.navigationEvents.collect { event ->
when (event) {
is NavigationEvent.Navigate -> navController.navigate(event.destination)
is NavigationEvent.Back -> navController.popBackStack()
}
}
}

// GOOD: SharedFlow for error events
class ErrorHandler {
private val _errors = MutableSharedFlow<ErrorEvent>(
replay = 0, // No replay - events are one-time
extraBufferCapacity = 64, // Buffer capacity for slow collectors
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val errors: SharedFlow<ErrorEvent> = _errors.asSharedFlow()

fun reportError(error: Throwable) {
// tryEmit doesn't suspend - suitable for error reporting from anywhere
_errors.tryEmit(ErrorEvent(error.message ?: "Unknown error"))
}
}

// GOOD: SharedFlow with replay for recent events
class NotificationManager {
private val _notifications = MutableSharedFlow<Notification>(
replay = 5, // New collectors see last 5 notifications
extraBufferCapacity = 10
)
val notifications: SharedFlow<Notification> = _notifications.asSharedFlow()

suspend fun sendNotification(notification: Notification) {
_notifications.emit(notification)
}
}

// BAD: Using StateFlow for events (events get conflated/lost)
class BadEventHandler {
private val _events = MutableStateFlow<Event?>(null)
val events: StateFlow<Event?> = _events.asStateFlow()

fun sendEvent(event: Event) {
_events.value = event
// If two events are sent quickly, the first might be lost!
}
}

The distinction between StateFlow and SharedFlow boils down to semantics: StateFlow is for "what is the current state?" (conflation is desired), SharedFlow is for "what just happened?" (all events must be delivered). Choose based on whether missed intermediate values matter.

Converting Between Flow Types

You can convert between cold and hot flows using operators. shareIn converts a cold flow to hot SharedFlow. stateIn converts a cold flow to hot StateFlow. Both require a scope and a sharing strategy that determines when to start/stop the upstream flow.

Sharing strategies control lifecycle: Eagerly starts immediately and never stops. Lazily starts on first subscriber and never stops. WhileSubscribed starts on first subscriber, stops after last subscriber (with optional timeout). Choose based on resource management needs.

// GOOD: Converting cold Flow to hot SharedFlow
class PaymentRepository {
private val paymentUpdates: Flow<Payment> = callbackFlow {
val listener = object : PaymentListener {
override fun onPaymentUpdate(payment: Payment) {
trySend(payment)
}
}
gateway.registerListener(listener)
awaitClose { gateway.unregisterListener(listener) }
}

// Share among multiple collectors
val sharedPaymentUpdates: SharedFlow<Payment> = paymentUpdates
.shareIn(
scope = repositoryScope,
started = SharingStarted.WhileSubscribed(5000),
replay = 0
)
}

// GOOD: Converting cold Flow to StateFlow with initial value
class LocationManager {
private val locationFlow: Flow<Location> = callbackFlow {
val callback = { location: Location -> trySend(location) }
locationService.requestUpdates(callback)
awaitClose { locationService.removeUpdates(callback) }
}

val currentLocation: StateFlow<Location?> = locationFlow
.stateIn(
scope = managerScope,
started = SharingStarted.WhileSubscribed(),
initialValue = null
)
}

// GOOD: Sharing strategy comparison
fun demonstrateSharingStrategies() {
// Eagerly: Starts immediately, never stops (even with no collectors)
val eagerFlow = coldFlow.shareIn(
scope,
SharingStarted.Eagerly,
replay = 1
)

// Lazily: Starts on first collector, never stops
val lazyFlow = coldFlow.shareIn(
scope,
SharingStarted.Lazily,
replay = 1
)

// WhileSubscribed: Starts on first collector, stops after last (with timeout)
val whileSubscribedFlow = coldFlow.shareIn(
scope,
SharingStarted.WhileSubscribed(stopTimeoutMillis = 5000),
replay = 1
)
}

// GOOD: Practical example - caching API responses
class ApiRepository {
private val _dataFlow: Flow<Data> = flow {
emit(fetchFromApi())
}.catch { e ->
log.error("API call failed", e)
emit(Data.empty())
}

// Cache API response and share among collectors
val data: StateFlow<Data?> = _dataFlow
.stateIn(
scope = repositoryScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = null
)
}

The choice of sharing strategy impacts resource usage. Eagerly keeps the upstream active always - use for critical data sources that should stay alive. Lazily starts on first use but never stops - use for data needed throughout app lifecycle. WhileSubscribed is most resource-efficient - stops the upstream when unused, ideal for most scenarios.


Coroutine Context and Dispatchers

Every coroutine has a CoroutineContext - an indexed set of elements that define the coroutine's behavior. The context includes the Job (lifecycle), CoroutineDispatcher (threading), CoroutineName (debugging), and CoroutineExceptionHandler (error handling). Understanding context composition and inheritance is key to managing coroutine behavior.

Understanding CoroutineContext

A CoroutineContext is an immutable indexed set - like a map where keys are types. You can combine contexts with +, and elements with the same key are replaced (right-hand side wins). Child coroutines inherit the parent's context, with modifications applied.

The context determines where the coroutine runs (dispatcher), how it's named (for debugging), what job it's part of (for cancellation), and how exceptions are handled. Modifying context elements allows fine-grained control over coroutine behavior without invasive changes.

// GOOD: Understanding context composition
fun demonstrateContext() {
val context = Dispatchers.IO + CoroutineName("MyCoroutine")
// Context contains: Dispatcher, CoroutineName

val job = Job()
val fullContext = context + job
// Context now contains: Dispatcher, CoroutineName, Job

val newContext = fullContext + Dispatchers.Default
// Dispatcher replaced, others unchanged
}

// GOOD: Context inheritance
suspend fun contextInheritance() = coroutineScope {
// Parent context: Default dispatcher
withContext(Dispatchers.Default) {
println("Parent dispatcher: ${coroutineContext[CoroutineDispatcher]}")

launch {
// Child inherits Default dispatcher
println("Child dispatcher: ${coroutineContext[CoroutineDispatcher]}")
}

launch(Dispatchers.IO) {
// Child overrides with IO dispatcher
println("Modified child dispatcher: ${coroutineContext[CoroutineDispatcher]}")
}
}
}

// GOOD: Accessing context elements
suspend fun accessContext() {
val job = coroutineContext[Job]
val dispatcher = coroutineContext[CoroutineDispatcher]
val name = coroutineContext[CoroutineName]

println("Running in: $name on $dispatcher")
}

// GOOD: Custom context element
data class UserId(val id: String) : AbstractCoroutineContextElement(UserId) {
companion object Key : CoroutineContext.Key<UserId>
}

suspend fun withUserId() {
withContext(UserId("user123")) {
val userId = coroutineContext[UserId]
println("Current user: ${userId?.id}")
}
}

Context composition is powerful for cross-cutting concerns. You can inject custom context elements (like user ID, trace ID, or transaction ID) and access them throughout the coroutine hierarchy without passing parameters. This is particularly useful for logging, tracing, and security contexts.

Dispatchers Deep Dive

Dispatchers determine which thread pool executes the coroutine. Choosing the right dispatcher is crucial for performance and correctness. Kotlin provides several built-in dispatchers, each optimized for different workloads.

Dispatchers.Main runs coroutines on the UI thread (Android, Swing, JavaFX). Use for UI updates - the UI framework requires UI operations on the main thread. Dispatchers.IO uses a thread pool optimized for blocking I/O (network, database, file). Threads can block without affecting other work. Dispatchers.Default uses a CPU-bound thread pool for computation (JSON parsing, sorting, algorithms). Dispatchers.Unconfined starts in the caller's thread but resumes in whatever thread the suspending function uses - avoid except for special cases.

// GOOD: Dispatcher usage patterns
class DataProcessor {
suspend fun processUserData(userId: String): Result {
// IO for database read
val userData = withContext(Dispatchers.IO) {
database.loadUser(userId)
}

// Default for CPU-intensive parsing
val parsed = withContext(Dispatchers.Default) {
parseComplexData(userData)
}

// Main for UI update (Android)
withContext(Dispatchers.Main) {
updateUI(parsed)
}

return Result.success(parsed)
}
}

// GOOD: Limiting concurrency on IO dispatcher
class ApiClient {
// Limit concurrent network requests
private val networkDispatcher = Dispatchers.IO.limitedParallelism(10)

suspend fun fetchData(urls: List<String>): List<Data> {
return coroutineScope {
urls.map { url ->
async(networkDispatcher) {
httpClient.get(url)
}
}.awaitAll()
}
}
}

// GOOD: Custom dispatcher for specialized workload
class ImageProcessor {
// Dedicated thread pool for image processing
private val imageDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()

suspend fun processImage(image: Bitmap): Bitmap = withContext(imageDispatcher) {
// CPU-intensive image transformations on dedicated threads
applyFilters(image)
resize(image)
compress(image)
}

fun shutdown() {
imageDispatcher.close()
}
}

// BAD: Blocking calls on Dispatchers.Default
suspend fun blockingOnDefault() = withContext(Dispatchers.Default) {
// This blocks a CPU-bound thread!
val result = httpClient.get("https://api.example.com").readText()
result
}

// GOOD: Blocking calls on Dispatchers.IO
suspend fun blockingOnIO() = withContext(Dispatchers.IO) {
// IO dispatcher is designed for blocking calls
val result = httpClient.get("https://api.example.com").readText()
result
}

A common mistake is using the wrong dispatcher. Blocking I/O on Dispatchers.Default starves CPU-bound work - the thread pool is sized for CPU cores, not blocking. Conversely, CPU-intensive work on Dispatchers.IO is inefficient - the pool is large to accommodate blocking but threads doing CPU work don't benefit. Always match the dispatcher to the workload type.

Context Switching

Use withContext to switch dispatchers mid-coroutine. This is essential for moving between I/O, computation, and UI work. Context switching is lightweight - the coroutine suspends, switches threads, and resumes. Unlike thread switching in traditional programming, there's no significant overhead.

Context switching enables clean separation of concerns: perform I/O on Dispatchers.IO, process data on Dispatchers.Default, update UI on Dispatchers.Main, all within a single coroutine. This makes code more modular and easier to optimize - simply move work to the appropriate dispatcher.

// GOOD: Context switching for layered operations
suspend fun loadAndDisplayData() {
// Start on IO for database query
val rawData = withContext(Dispatchers.IO) {
database.query("SELECT * FROM payments")
}

// Switch to Default for transformation
val transformed = withContext(Dispatchers.Default) {
rawData.map { row ->
Payment(
id = row.getString("id"),
amount = BigDecimal(row.getString("amount")),
// ... parse all fields
)
}.filter { it.status == PaymentStatus.COMPLETED }
}

// Switch to Main for UI update
withContext(Dispatchers.Main) {
paymentListView.updateData(transformed)
}
}

// GOOD: Parallel processing with context
suspend fun processMultipleFiles(files: List<File>): List<Result> = coroutineScope {
files.map { file ->
async(Dispatchers.IO) {
// Read on IO
val content = file.readText()

// Switch to Default for processing
withContext(Dispatchers.Default) {
processContent(content)
}
}
}.awaitAll()
}

// GOOD: Avoiding unnecessary context switches
suspend fun optimizedDataLoad(): Data {
return withContext(Dispatchers.IO) {
// All IO operations in one context
val user = loadUser()
val profile = loadProfile(user.id)
val settings = loadSettings(user.id)
Data(user, profile, settings)
}
}

// BAD: Excessive context switching
suspend fun excessiveSwitching() {
withContext(Dispatchers.IO) { task1() }
withContext(Dispatchers.Default) { task2() }
withContext(Dispatchers.IO) { task3() }
withContext(Dispatchers.Default) { task4() }
// Too many switches - group operations by dispatcher
}

While context switching is cheap, it's not free. Avoid unnecessary switches - group operations by dispatcher when possible. Switch contexts at logical boundaries: after I/O completes, before computation, before UI updates. This improves performance and makes code structure clearer.


Further Reading

Internal Documentation

External Resources


Summary

Key Takeaways

  1. Channels - Message passing between coroutines without shared state
  2. Actors - Encapsulate state with sequential message processing
  3. Mutex - Protect critical sections when shared state is necessary
  4. Atomics - Lock-free operations for simple concurrent counters
  5. Concurrent Collections - Use ConcurrentHashMap and CopyOnWriteArrayList
  6. Structured Concurrency - Parent manages child lifetimes
  7. Exception Handling - Understand propagation and use supervisorScope when needed
  8. Cancellation - Write cooperative cancellable code
  9. Resource Cleanup - Always use try-finally or use for cleanup
  10. Testing - Use test dispatchers for deterministic concurrent tests

Next Steps: Review Kotlin Performance for optimization patterns and Kotlin Testing for testing concurrent code.