Skip to content

Asynchronous Programming in Java

Asynchronous programming enables applications to perform non-blocking operations, allowing a single thread to initiate work and continue processing other tasks while waiting for results. Java has evolved from raw threads and callbacks to powerful abstractions like CompletableFuture and, in Java 21, virtual threads via Project Loom. Mastering these patterns is essential for building responsive, scalable services.


Traditional Multithreading vs Asynchronous Models

Traditional multithreading assigns one OS thread per task. This is intuitive but expensive: each thread consumes ~1 MB of stack memory, and context switching under load adds significant overhead. Asynchronous models decouple task submission from execution, allowing a small thread pool to handle many concurrent operations.

The key trade-off: traditional threading is easier to reason about sequentially, while async models require careful handling of continuation logic but scale much better under I/O-bound workloads.


Thread Pools and Execution Contexts

Rather than creating a new thread per task, thread pools reuse a fixed set of threads. Java's ExecutorService is the primary abstraction.

Common pool types provided by Executors:

Factory MethodBest For
newFixedThreadPool(n)CPU-bound tasks with known parallelism
newCachedThreadPool()Short-lived, bursty I/O tasks
newSingleThreadExecutor()Sequential background tasks
newScheduledThreadPool(n)Periodic or delayed tasks
newVirtualThreadPerTaskExecutor()Java 21+ I/O-bound tasks

Example 1 — Fixed thread pool with Future:

java
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;

public class ThreadPoolDemo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        List<Future<Integer>> futures = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            Future<Integer> future = executor.submit(() -> {
                // Simulate computation
                Thread.sleep(100);
                return taskId * taskId;
            });
            futures.add(future);
        }

        int total = 0;
        for (Future<Integer> f : futures) {
            total += f.get(); // blocks until each task completes
        }

        System.out.println("Total: " + total);
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
    }
}

The formula for sizing a CPU-bound thread pool is N_threads = N_cpu. For I/O-bound work, use N_threads = N_cpu * (1 + wait_time / compute_time).


Callback Pattern

Callbacks were the first attempt to avoid blocking the calling thread. A callback is a function passed to an async operation; it is invoked when the result is ready.

Callback hell emerges when callbacks nest inside callbacks. Error handling must be duplicated at each level, and the code becomes deeply indented and hard to follow. This is sometimes called the "pyramid of doom."

Example 2 — Callback-style async using CompletableFuture.runAsync:

java
import java.util.concurrent.CompletableFuture;

public class CallbackDemo {

    public static void main(String[] args) throws Exception {
        // Simulating callback-style composition — notice the nesting
        CompletableFuture.supplyAsync(() -> fetchUser(1))
            .thenAccept(user -> {
                System.out.println("Got user: " + user);
                CompletableFuture.supplyAsync(() -> fetchOrders(user))
                    .thenAccept(orders -> {
                        System.out.println("Got orders: " + orders);
                        CompletableFuture.supplyAsync(() -> fetchPrices(orders))
                            .thenAccept(prices -> {
                                // deeply nested — callback hell
                                System.out.println("Got prices: " + prices);
                            });
                    });
            });

        Thread.sleep(2000); // wait for async work
    }

    static String fetchUser(int id) { return "User#" + id; }
    static String fetchOrders(String user) { return "Orders[" + user + "]"; }
    static String fetchPrices(String orders) { return "Prices{" + orders + "}"; }
}

CompletableFuture was introduced specifically to flatten this pyramid into a readable pipeline.


CompletableFuture API

CompletableFuture<T> is a promise-based abstraction that supports fluent chaining, combining multiple futures, and structured error handling.

Example 3 — Chaining with thenApply, thenCompose, and thenCombine:

java
import java.util.concurrent.*;

public class CompletableFutureChaining {

    static ExecutorService pool = Executors.newFixedThreadPool(4);

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> result =
            CompletableFuture.supplyAsync(() -> fetchUser(42), pool)          // Step 1
                .thenApply(user -> user.toUpperCase())                         // Step 2: transform
                .thenCompose(user ->                                            // Step 3: dependent async call
                    CompletableFuture.supplyAsync(() -> fetchOrders(user), pool)
                )
                .thenCombine(                                                   // Step 4: parallel combination
                    CompletableFuture.supplyAsync(() -> fetchInventory(), pool),
                    (orders, inventory) -> "Orders: " + orders + " | Inv: " + inventory
                );

        System.out.println(result.get(5, TimeUnit.SECONDS));
        pool.shutdown();
    }

    static String fetchUser(int id) { return "user-" + id; }
    static String fetchOrders(String user) { return "orders[" + user + "]"; }
    static String fetchInventory() { return "inventory[42 items]"; }
}

Example 4 — Error handling with exceptionally, handle, and whenComplete:

java
import java.util.concurrent.CompletableFuture;

public class ErrorHandlingDemo {

    public static void main(String[] args) throws Exception {

        // exceptionally: provide a fallback value on failure
        CompletableFuture<String> cf1 = CompletableFuture
            .supplyAsync(() -> { throw new RuntimeException("fetch failed"); })
            .exceptionally(ex -> {
                System.err.println("Caught: " + ex.getMessage());
                return "default-value";
            });

        // handle: runs on both success and failure; can transform either
        CompletableFuture<String> cf2 = CompletableFuture
            .supplyAsync(() -> "raw-data")
            .handle((result, ex) -> {
                if (ex != null) return "error: " + ex.getMessage();
                return result.toUpperCase();
            });

        // whenComplete: side-effect only (logging), does not transform
        CompletableFuture<String> cf3 = CompletableFuture
            .supplyAsync(() -> "done")
            .whenComplete((result, ex) -> {
                if (ex != null) System.err.println("Failed: " + ex);
                else System.out.println("Succeeded with: " + result);
            });

        System.out.println(cf1.get());
        System.out.println(cf2.get());
        cf3.get();
    }
}

Virtual Threads (Java 21 — Project Loom)

Virtual threads are lightweight threads managed by the JVM rather than the OS. Thousands or even millions of virtual threads can run concurrently on a small pool of carrier (OS) threads, making blocking calls cheap.

When a virtual thread blocks (e.g., on a database call), the JVM unmounts it from its carrier thread, which is then free to run another virtual thread. When the blocking operation completes, the virtual thread is remounted.

Example 5 — Virtual thread executor (Java 21):

java
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;

public class VirtualThreadsDemo {

    public static void main(String[] args) throws Exception {
        // Creates a new virtual thread per submitted task — no pool to size
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {

            List<Future<String>> futures = new ArrayList<>();

            for (int i = 0; i < 10_000; i++) {
                final int id = i;
                futures.add(executor.submit(() -> {
                    // Blocking I/O is fine — virtual thread will be unmounted
                    Thread.sleep(50);
                    return "result-" + id;
                }));
            }

            long successCount = futures.stream()
                .filter(f -> {
                    try { f.get(); return true; }
                    catch (Exception e) { return false; }
                })
                .count();

            System.out.println("Completed: " + successCount + " tasks");
        }
    }
}

Virtual threads shine for I/O-bound workloads (HTTP calls, database queries). They are not beneficial for CPU-bound tasks, where you still want platform threads limited to N_cpu.

Example 6 — Combining multiple async operations with allOf and anyOf:

java
import java.util.concurrent.*;
import java.util.Arrays;
import java.util.List;

public class AllOfAnyOfDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> fetchFromServiceA());
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> fetchFromServiceB());
        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> fetchFromServiceC());

        // allOf: wait for ALL to complete
        CompletableFuture<Void> allDone = CompletableFuture.allOf(cf1, cf2, cf3);
        allDone.thenRun(() -> {
            try {
                List<String> results = Arrays.asList(cf1.get(), cf2.get(), cf3.get());
                System.out.println("All results: " + results);
            } catch (Exception e) {
                Thread.currentThread().interrupt();
            }
        }).get(10, TimeUnit.SECONDS);

        // anyOf: respond as soon as ONE completes (fastest wins)
        CompletableFuture<Object> firstDone = CompletableFuture.anyOf(
            CompletableFuture.supplyAsync(() -> fetchFromServiceA()),
            CompletableFuture.supplyAsync(() -> fetchFromServiceB())
        );
        System.out.println("First result: " + firstDone.get(5, TimeUnit.SECONDS));
    }

    static String fetchFromServiceA() { return "A"; }
    static String fetchFromServiceB() { return "B"; }
    static String fetchFromServiceC() { return "C"; }
}

Event Loops and Non-blocking I/O

Event loop architectures (popularized by Node.js and Netty) use a single thread to multiplex many I/O channels via OS-level selectors (epoll, kqueue). Java NIO provides Selector and Channel abstractions for this pattern.

Example 7 — Timeout handling with orTimeout and completeOnTimeout:

java
import java.util.concurrent.*;

public class TimeoutDemo {

    public static void main(String[] args) {

        // orTimeout: completes exceptionally with TimeoutException after deadline
        CompletableFuture<String> cf1 = CompletableFuture
            .supplyAsync(() -> slowOperation())
            .orTimeout(1, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                System.err.println("Timed out: " + ex.getMessage());
                return "timeout-fallback";
            });

        // completeOnTimeout: completes normally with a default value after deadline
        CompletableFuture<String> cf2 = CompletableFuture
            .supplyAsync(() -> slowOperation())
            .completeOnTimeout("default-value", 1, TimeUnit.SECONDS);

        try {
            System.out.println("cf1: " + cf1.get());
            System.out.println("cf2: " + cf2.get());
        } catch (Exception e) {
            System.err.println("Unexpected: " + e.getMessage());
        }
    }

    static String slowOperation() {
        try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        return "slow-result";
    }
}

Reactive Programming Basics

Reactive programming treats data as streams with backpressure support. The two dominant Java libraries are Project Reactor (used in Spring WebFlux) and RxJava.

Backpressure is the mechanism by which a slow consumer signals a fast producer to slow down, preventing unbounded memory growth. CompletableFuture has no backpressure; for streaming pipelines with flow control, Reactor's Flux or RxJava's Flowable are the right tools.

Example 8 — Real-world async API calls with retry logic:

java
import java.util.concurrent.*;
import java.util.function.Supplier;

public class AsyncRetryDemo {

    private static final ExecutorService POOL = Executors.newFixedThreadPool(4);

    /**
     * Retries a CompletableFuture-returning supplier up to maxRetries times
     * with exponential back-off.
     */
    public static <T> CompletableFuture<T> withRetry(
            Supplier<CompletableFuture<T>> action,
            int maxRetries,
            long baseDelayMs) {

        CompletableFuture<T> result = action.get();

        for (int attempt = 1; attempt <= maxRetries; attempt++) {
            final long delay = baseDelayMs * (1L << (attempt - 1)); // 2^(attempt-1)
            result = result.exceptionallyCompose(ex -> {
                System.err.println("Attempt failed: " + ex.getMessage() + ". Retrying in " + delay + "ms");
                return CompletableFuture
                    .delayedExecutor(delay, TimeUnit.MILLISECONDS, POOL)
                    .supply(() -> null) // trigger delay
                    .thenCompose(ignored -> action.get());
            });
        }
        return result;
    }

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> apiCall = withRetry(
            () -> CompletableFuture.supplyAsync(() -> callExternalApi(), POOL),
            3,
            200
        );

        System.out.println(apiCall.get(10, TimeUnit.SECONDS));
        POOL.shutdown();
    }

    static int attempts = 0;
    static String callExternalApi() {
        if (++attempts < 3) throw new RuntimeException("Service unavailable");
        return "API response on attempt " + attempts;
    }
}

Async Error Handling Flowchart


Best Practices

  1. Never block inside an async chain. Calling get() or join() inside a thenApply callback ties up a pool thread and can cause deadlocks. Reserve blocking calls for the terminal step.
  2. Size thread pools correctly. CPU-bound: N_threads = N_cpu. I/O-bound: N_threads = N_cpu * (1 + W/C) where W is average wait time and C is average compute time.
  3. Do not use ForkJoinPool.commonPool() for I/O-bound work. CompletableFuture.supplyAsync(task) without an executor uses the common pool, which is sized for CPU work. Always pass an explicit I/O executor.
  4. Always handle exceptions. An unhandled exception in a CompletableFuture is silently swallowed unless you call get(), add exceptionally, or set a default uncaught exception handler.
  5. Prefer virtual threads for I/O-bound work (Java 21+). Executors.newVirtualThreadPerTaskExecutor() eliminates the need to tune pool sizes for I/O workloads and simplifies code by allowing straightforward blocking calls.
  6. Propagate cancellation. If a CompletableFuture is cancelled, explicitly cancel any chained futures or downstream resources using cancel(true).
  7. Use orTimeout instead of get(timeout, unit). orTimeout integrates cancellation into the pipeline, while get(timeout, unit) only times out the calling thread.
  8. Avoid shared mutable state in lambdas. Each stage in a CompletableFuture chain may execute on a different thread. Rely on immutable data or thread-safe structures.