Appearance
Asynchronous Programming in Java
Asynchronous programming enables applications to perform non-blocking operations, allowing a single thread to initiate work and continue processing other tasks while waiting for results. Java has evolved from raw threads and callbacks to powerful abstractions like CompletableFuture and, in Java 21, virtual threads via Project Loom. Mastering these patterns is essential for building responsive, scalable services.
Traditional Multithreading vs Asynchronous Models
Traditional multithreading assigns one OS thread per task. This is intuitive but expensive: each thread consumes ~1 MB of stack memory, and context switching under load adds significant overhead. Asynchronous models decouple task submission from execution, allowing a small thread pool to handle many concurrent operations.
The key trade-off: traditional threading is easier to reason about sequentially, while async models require careful handling of continuation logic but scale much better under I/O-bound workloads.
Thread Pools and Execution Contexts
Rather than creating a new thread per task, thread pools reuse a fixed set of threads. Java's ExecutorService is the primary abstraction.
Common pool types provided by Executors:
| Factory Method | Best For |
|---|---|
newFixedThreadPool(n) | CPU-bound tasks with known parallelism |
newCachedThreadPool() | Short-lived, bursty I/O tasks |
newSingleThreadExecutor() | Sequential background tasks |
newScheduledThreadPool(n) | Periodic or delayed tasks |
newVirtualThreadPerTaskExecutor() | Java 21+ I/O-bound tasks |
Example 1 — Fixed thread pool with Future:
java
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class ThreadPoolDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int taskId = i;
Future<Integer> future = executor.submit(() -> {
// Simulate computation
Thread.sleep(100);
return taskId * taskId;
});
futures.add(future);
}
int total = 0;
for (Future<Integer> f : futures) {
total += f.get(); // blocks until each task completes
}
System.out.println("Total: " + total);
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
}The formula for sizing a CPU-bound thread pool is N_threads = N_cpu. For I/O-bound work, use N_threads = N_cpu * (1 + wait_time / compute_time).
Callback Pattern
Callbacks were the first attempt to avoid blocking the calling thread. A callback is a function passed to an async operation; it is invoked when the result is ready.
Callback hell emerges when callbacks nest inside callbacks. Error handling must be duplicated at each level, and the code becomes deeply indented and hard to follow. This is sometimes called the "pyramid of doom."
Example 2 — Callback-style async using CompletableFuture.runAsync:
java
import java.util.concurrent.CompletableFuture;
public class CallbackDemo {
public static void main(String[] args) throws Exception {
// Simulating callback-style composition — notice the nesting
CompletableFuture.supplyAsync(() -> fetchUser(1))
.thenAccept(user -> {
System.out.println("Got user: " + user);
CompletableFuture.supplyAsync(() -> fetchOrders(user))
.thenAccept(orders -> {
System.out.println("Got orders: " + orders);
CompletableFuture.supplyAsync(() -> fetchPrices(orders))
.thenAccept(prices -> {
// deeply nested — callback hell
System.out.println("Got prices: " + prices);
});
});
});
Thread.sleep(2000); // wait for async work
}
static String fetchUser(int id) { return "User#" + id; }
static String fetchOrders(String user) { return "Orders[" + user + "]"; }
static String fetchPrices(String orders) { return "Prices{" + orders + "}"; }
}CompletableFuture was introduced specifically to flatten this pyramid into a readable pipeline.
CompletableFuture API
CompletableFuture<T> is a promise-based abstraction that supports fluent chaining, combining multiple futures, and structured error handling.
Example 3 — Chaining with thenApply, thenCompose, and thenCombine:
java
import java.util.concurrent.*;
public class CompletableFutureChaining {
static ExecutorService pool = Executors.newFixedThreadPool(4);
public static void main(String[] args) throws Exception {
CompletableFuture<String> result =
CompletableFuture.supplyAsync(() -> fetchUser(42), pool) // Step 1
.thenApply(user -> user.toUpperCase()) // Step 2: transform
.thenCompose(user -> // Step 3: dependent async call
CompletableFuture.supplyAsync(() -> fetchOrders(user), pool)
)
.thenCombine( // Step 4: parallel combination
CompletableFuture.supplyAsync(() -> fetchInventory(), pool),
(orders, inventory) -> "Orders: " + orders + " | Inv: " + inventory
);
System.out.println(result.get(5, TimeUnit.SECONDS));
pool.shutdown();
}
static String fetchUser(int id) { return "user-" + id; }
static String fetchOrders(String user) { return "orders[" + user + "]"; }
static String fetchInventory() { return "inventory[42 items]"; }
}Example 4 — Error handling with exceptionally, handle, and whenComplete:
java
import java.util.concurrent.CompletableFuture;
public class ErrorHandlingDemo {
public static void main(String[] args) throws Exception {
// exceptionally: provide a fallback value on failure
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> { throw new RuntimeException("fetch failed"); })
.exceptionally(ex -> {
System.err.println("Caught: " + ex.getMessage());
return "default-value";
});
// handle: runs on both success and failure; can transform either
CompletableFuture<String> cf2 = CompletableFuture
.supplyAsync(() -> "raw-data")
.handle((result, ex) -> {
if (ex != null) return "error: " + ex.getMessage();
return result.toUpperCase();
});
// whenComplete: side-effect only (logging), does not transform
CompletableFuture<String> cf3 = CompletableFuture
.supplyAsync(() -> "done")
.whenComplete((result, ex) -> {
if (ex != null) System.err.println("Failed: " + ex);
else System.out.println("Succeeded with: " + result);
});
System.out.println(cf1.get());
System.out.println(cf2.get());
cf3.get();
}
}Virtual Threads (Java 21 — Project Loom)
Virtual threads are lightweight threads managed by the JVM rather than the OS. Thousands or even millions of virtual threads can run concurrently on a small pool of carrier (OS) threads, making blocking calls cheap.
When a virtual thread blocks (e.g., on a database call), the JVM unmounts it from its carrier thread, which is then free to run another virtual thread. When the blocking operation completes, the virtual thread is remounted.
Example 5 — Virtual thread executor (Java 21):
java
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class VirtualThreadsDemo {
public static void main(String[] args) throws Exception {
// Creates a new virtual thread per submitted task — no pool to size
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 10_000; i++) {
final int id = i;
futures.add(executor.submit(() -> {
// Blocking I/O is fine — virtual thread will be unmounted
Thread.sleep(50);
return "result-" + id;
}));
}
long successCount = futures.stream()
.filter(f -> {
try { f.get(); return true; }
catch (Exception e) { return false; }
})
.count();
System.out.println("Completed: " + successCount + " tasks");
}
}
}Virtual threads shine for I/O-bound workloads (HTTP calls, database queries). They are not beneficial for CPU-bound tasks, where you still want platform threads limited to N_cpu.
Example 6 — Combining multiple async operations with allOf and anyOf:
java
import java.util.concurrent.*;
import java.util.Arrays;
import java.util.List;
public class AllOfAnyOfDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> fetchFromServiceA());
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> fetchFromServiceB());
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> fetchFromServiceC());
// allOf: wait for ALL to complete
CompletableFuture<Void> allDone = CompletableFuture.allOf(cf1, cf2, cf3);
allDone.thenRun(() -> {
try {
List<String> results = Arrays.asList(cf1.get(), cf2.get(), cf3.get());
System.out.println("All results: " + results);
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}).get(10, TimeUnit.SECONDS);
// anyOf: respond as soon as ONE completes (fastest wins)
CompletableFuture<Object> firstDone = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> fetchFromServiceA()),
CompletableFuture.supplyAsync(() -> fetchFromServiceB())
);
System.out.println("First result: " + firstDone.get(5, TimeUnit.SECONDS));
}
static String fetchFromServiceA() { return "A"; }
static String fetchFromServiceB() { return "B"; }
static String fetchFromServiceC() { return "C"; }
}Event Loops and Non-blocking I/O
Event loop architectures (popularized by Node.js and Netty) use a single thread to multiplex many I/O channels via OS-level selectors (epoll, kqueue). Java NIO provides Selector and Channel abstractions for this pattern.
Example 7 — Timeout handling with orTimeout and completeOnTimeout:
java
import java.util.concurrent.*;
public class TimeoutDemo {
public static void main(String[] args) {
// orTimeout: completes exceptionally with TimeoutException after deadline
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> slowOperation())
.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(ex -> {
System.err.println("Timed out: " + ex.getMessage());
return "timeout-fallback";
});
// completeOnTimeout: completes normally with a default value after deadline
CompletableFuture<String> cf2 = CompletableFuture
.supplyAsync(() -> slowOperation())
.completeOnTimeout("default-value", 1, TimeUnit.SECONDS);
try {
System.out.println("cf1: " + cf1.get());
System.out.println("cf2: " + cf2.get());
} catch (Exception e) {
System.err.println("Unexpected: " + e.getMessage());
}
}
static String slowOperation() {
try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "slow-result";
}
}Reactive Programming Basics
Reactive programming treats data as streams with backpressure support. The two dominant Java libraries are Project Reactor (used in Spring WebFlux) and RxJava.
Backpressure is the mechanism by which a slow consumer signals a fast producer to slow down, preventing unbounded memory growth. CompletableFuture has no backpressure; for streaming pipelines with flow control, Reactor's Flux or RxJava's Flowable are the right tools.
Example 8 — Real-world async API calls with retry logic:
java
import java.util.concurrent.*;
import java.util.function.Supplier;
public class AsyncRetryDemo {
private static final ExecutorService POOL = Executors.newFixedThreadPool(4);
/**
* Retries a CompletableFuture-returning supplier up to maxRetries times
* with exponential back-off.
*/
public static <T> CompletableFuture<T> withRetry(
Supplier<CompletableFuture<T>> action,
int maxRetries,
long baseDelayMs) {
CompletableFuture<T> result = action.get();
for (int attempt = 1; attempt <= maxRetries; attempt++) {
final long delay = baseDelayMs * (1L << (attempt - 1)); // 2^(attempt-1)
result = result.exceptionallyCompose(ex -> {
System.err.println("Attempt failed: " + ex.getMessage() + ". Retrying in " + delay + "ms");
return CompletableFuture
.delayedExecutor(delay, TimeUnit.MILLISECONDS, POOL)
.supply(() -> null) // trigger delay
.thenCompose(ignored -> action.get());
});
}
return result;
}
public static void main(String[] args) throws Exception {
CompletableFuture<String> apiCall = withRetry(
() -> CompletableFuture.supplyAsync(() -> callExternalApi(), POOL),
3,
200
);
System.out.println(apiCall.get(10, TimeUnit.SECONDS));
POOL.shutdown();
}
static int attempts = 0;
static String callExternalApi() {
if (++attempts < 3) throw new RuntimeException("Service unavailable");
return "API response on attempt " + attempts;
}
}Async Error Handling Flowchart
Best Practices
- Never block inside an async chain. Calling
get()orjoin()inside athenApplycallback ties up a pool thread and can cause deadlocks. Reserve blocking calls for the terminal step. - Size thread pools correctly. CPU-bound:
N_threads = N_cpu. I/O-bound:N_threads = N_cpu * (1 + W/C)where W is average wait time and C is average compute time. - Do not use
ForkJoinPool.commonPool()for I/O-bound work.CompletableFuture.supplyAsync(task)without an executor uses the common pool, which is sized for CPU work. Always pass an explicit I/O executor. - Always handle exceptions. An unhandled exception in a
CompletableFutureis silently swallowed unless you callget(), addexceptionally, or set a default uncaught exception handler. - Prefer virtual threads for I/O-bound work (Java 21+).
Executors.newVirtualThreadPerTaskExecutor()eliminates the need to tune pool sizes for I/O workloads and simplifies code by allowing straightforward blocking calls. - Propagate cancellation. If a
CompletableFutureis cancelled, explicitly cancel any chained futures or downstream resources usingcancel(true). - Use
orTimeoutinstead ofget(timeout, unit).orTimeoutintegrates cancellation into the pipeline, whileget(timeout, unit)only times out the calling thread. - Avoid shared mutable state in lambdas. Each stage in a
CompletableFuturechain may execute on a different thread. Rely on immutable data or thread-safe structures.
Related Concepts
- High-Performance Streaming & MapReduce — applying async patterns to stream processing pipelines
- Serverless Containers — async invocation models in serverless environments