Java virtual threads represent a paradigm shift in how we approach concurrency in Java applications. As a developer who has worked extensively with traditional thread models, I’m excited to share practical techniques for implementing virtual threads to build highly scalable applications.
Virtual threads arrived with Java 19 as a preview feature and were finalized in Java 21, offering a lightweight alternative to platform threads. They enable a high degree of concurrency without the resource overhead traditionally associated with thread creation.
Basic Virtual Thread Creation
Creating virtual threads is straightforward. Unlike platform threads where we need to consider resource limitations, virtual threads are designed to be created abundantly.
public void runSimpleTask() {
Thread vThread = Thread.startVirtualThread(() -> {
System.out.println("Running in a virtual thread");
performWork();
});
// If needed, we can wait for completion
try {
vThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
The beauty of virtual threads lies in their simplicity. We can create thousands or even millions of them without worrying about thread pool configurations or resource exhaustion.
Working with Virtual Thread Executors
For more controlled execution, we can use the executor framework that’s been enhanced to support virtual threads:
public void executeMultipleTasks(List<Runnable> tasks) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (Runnable task : tasks) {
executor.submit(task);
}
} // Auto-closes and performs shutdown
}
This approach creates a new virtual thread for each submitted task. The executor handles the lifecycle management, and the try-with-resources pattern ensures proper cleanup.
Implementing Structured Concurrency
Structured concurrency provides a more organized approach to handling multiple concurrent operations. It ensures that no subtask outlives its parent scope, preventing resource leaks.
public List<ProductInfo> fetchProductDetails(List<String> productIds) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<StructuredTaskScope.Subtask<ProductInfo>> tasks = productIds.stream()
.map(id -> scope.fork(() -> fetchProductInfo(id)))
.toList();
scope.join(); // Wait for all tasks
scope.throwIfFailed(); // Propagate any exception
return tasks.stream()
.map(StructuredTaskScope.Subtask::get)
.toList();
}
}
private ProductInfo fetchProductInfo(String id) {
// API call or database query
return new ProductInfo(id, "Product " + id, 99.99);
}
This pattern is particularly useful when implementing aggregation services that need to collect data from multiple sources.
Optimizing I/O-Bound Operations
Virtual threads shine when dealing with I/O-bound operations. Traditional approaches required dedicated thread pools to prevent blocking, but virtual threads handle this elegantly.
public Map<String, String> fetchMultipleApis(List<String> urls) {
Map<String, String> results = new ConcurrentHashMap<>();
// Launch a virtual thread for each URL
List<Thread> threads = urls.stream()
.map(url -> Thread.startVirtualThread(() -> {
try {
String response = httpGet(url);
results.put(url, response);
} catch (IOException e) {
results.put(url, "Error: " + e.getMessage());
}
}))
.toList();
// Wait for all threads to complete
threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
return results;
}
private String httpGet(String urlString) throws IOException {
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(connection.getInputStream()))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
This code can effortlessly handle hundreds of concurrent HTTP requests without complicated thread management.
Efficient Database Connection Handling
Database operations typically involve waiting for I/O, making them perfect candidates for virtual threads:
public void processLargeDataset(List<Long> recordIds) {
CountDownLatch latch = new CountDownLatch(recordIds.size());
for (Long id : recordIds) {
Thread.startVirtualThread(() -> {
try {
processRecord(id);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("All records processed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processRecord(Long id) {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement("SELECT * FROM records WHERE id = ?")) {
stmt.setLong(1, id);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
// Process each record
String data = rs.getString("data");
transformAndSave(id, data);
}
}
} catch (SQLException e) {
logger.error("Error processing record {}", id, e);
}
}
The key insight here is that each virtual thread can have its own database connection without worrying about thread pool exhaustion, which would be problematic with platform threads.
Implementing Robust Error Handling
Error handling in concurrent applications is critical. With virtual threads, we can implement sophisticated retry mechanisms:
public class RetryableTask<T> {
private final Callable<T> task;
private final int maxAttempts;
private final long initialDelayMs;
private final double backoffFactor;
public RetryableTask(Callable<T> task, int maxAttempts, long initialDelayMs, double backoffFactor) {
this.task = task;
this.maxAttempts = maxAttempts;
this.initialDelayMs = initialDelayMs;
this.backoffFactor = backoffFactor;
}
public CompletableFuture<T> execute() {
return CompletableFuture.supplyAsync(() -> {
int attempt = 0;
long delayMs = initialDelayMs;
while (true) {
try {
return task.call();
} catch (Exception e) {
attempt++;
if (attempt >= maxAttempts) {
throw new RuntimeException("Task failed after " + maxAttempts + " attempts", e);
}
try {
Thread.sleep(delayMs);
delayMs = (long)(delayMs * backoffFactor);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Retry interrupted", ie);
}
}
}
}, Executors.newVirtualThreadPerTaskExecutor());
}
}
Usage example:
RetryableTask<String> task = new RetryableTask<>(
() -> callExternalService("api.example.com/data"),
5, // 5 attempts
1000, // 1 second initial delay
2.0 // Double the delay after each attempt
);
task.execute()
.thenAccept(System.out::println)
.exceptionally(ex -> {
System.err.println("All retries failed: " + ex.getMessage());
return null;
});
This pattern is invaluable when working with external services that might experience temporary failures.
Scaling to Millions of Concurrent Operations
One of the most powerful aspects of virtual threads is their ability to scale to millions of concurrent operations. Here’s a pattern for processing a massive number of tasks:
public void processMillionItems(List<Item> items) {
AtomicInteger completed = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
int total = items.size();
// Process in batches to avoid overwhelming memory
int batchSize = 10_000;
for (int i = 0; i < total; i += batchSize) {
int end = Math.min(i + batchSize, total);
List<Item> batch = items.subList(i, end);
CountDownLatch batchLatch = new CountDownLatch(batch.size());
for (Item item : batch) {
Thread.startVirtualThread(() -> {
try {
processItem(item);
completed.incrementAndGet();
} catch (Exception e) {
failed.incrementAndGet();
logger.error("Failed to process item: {}", item.getId(), e);
} finally {
batchLatch.countDown();
}
});
}
try {
batchLatch.await();
System.out.printf("Progress: %d/%d completed, %d failed%n",
completed.get(), total, failed.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Processing interrupted", e);
}
}
}
This approach allows processing millions of items while maintaining control over memory usage by processing in manageable batches.
Monitoring Virtual Thread Performance
While virtual threads are lightweight, monitoring their behavior is still essential:
public class VirtualThreadMonitor {
private static final AtomicLong createdThreads = new AtomicLong();
private static final AtomicLong completedThreads = new AtomicLong();
private static final AtomicLong failedThreads = new AtomicLong();
private static final AtomicLong activeThreads = new AtomicLong();
private static final Map<String, Histogram> durationHistograms = new ConcurrentHashMap<>();
public static <T> CompletableFuture<T> monitoredTask(String taskType, Callable<T> task) {
return CompletableFuture.supplyAsync(() -> {
createdThreads.incrementAndGet();
activeThreads.incrementAndGet();
long startTime = System.nanoTime();
try {
T result = task.call();
recordSuccess(taskType, startTime);
return result;
} catch (Exception e) {
recordFailure(taskType, startTime);
throw new CompletionException(e);
} finally {
activeThreads.decrementAndGet();
}
}, Executors.newVirtualThreadPerTaskExecutor());
}
private static void recordSuccess(String taskType, long startTime) {
completedThreads.incrementAndGet();
recordDuration(taskType, startTime);
}
private static void recordFailure(String taskType, long startTime) {
failedThreads.incrementAndGet();
recordDuration(taskType, startTime);
}
private static void recordDuration(String taskType, long startTime) {
long durationMs = (System.nanoTime() - startTime) / 1_000_000;
durationHistograms.computeIfAbsent(taskType, k -> new Histogram(2))
.recordValue(durationMs);
}
public static String getStats() {
StringBuilder sb = new StringBuilder();
sb.append("Virtual Thread Stats:\n");
sb.append("Created: ").append(createdThreads.get()).append("\n");
sb.append("Completed: ").append(completedThreads.get()).append("\n");
sb.append("Failed: ").append(failedThreads.get()).append("\n");
sb.append("Active: ").append(activeThreads.get()).append("\n\n");
sb.append("Duration by task type (ms):\n");
durationHistograms.forEach((type, histogram) -> {
sb.append(type).append(":\n");
sb.append(" Min: ").append(histogram.getMinValue()).append("\n");
sb.append(" Mean: ").append(String.format("%.2f", histogram.getMean())).append("\n");
sb.append(" Max: ").append(histogram.getMaxValue()).append("\n");
sb.append(" p99: ").append(histogram.getValueAtPercentile(99)).append("\n");
});
return sb.toString();
}
// Simple histogram class for illustration
private static class Histogram {
private final List<Long> values = Collections.synchronizedList(new ArrayList<>());
private final int significantDigits;
public Histogram(int significantDigits) {
this.significantDigits = significantDigits;
}
public void recordValue(long value) {
values.add(value);
}
public long getMinValue() {
return values.stream().min(Long::compare).orElse(0L);
}
public long getMaxValue() {
return values.stream().max(Long::compare).orElse(0L);
}
public double getMean() {
return values.stream().mapToLong(l -> l).average().orElse(0);
}
public long getValueAtPercentile(double percentile) {
List<Long> sorted = new ArrayList<>(values);
Collections.sort(sorted);
int index = (int) Math.ceil(percentile / 100.0 * sorted.size()) - 1;
return sorted.get(Math.max(0, index));
}
}
}
This monitoring framework provides insights into thread creation, completion rates, and execution durations by task type.
I’ve found virtual threads to be particularly valuable in API-heavy applications where we need to make multiple concurrent service calls. Before virtual threads, I would carefully tune thread pools to avoid resource exhaustion. Now, I simply create a virtual thread for each task, resulting in cleaner, more maintainable code.
The transition to virtual threads doesn’t require a complete rewrite of existing applications. In many cases, you can replace your executor services with virtual thread executors and immediately gain scalability benefits.
Virtual threads in Java represent a significant evolution in concurrent programming. By implementing these techniques, we can build applications that scale efficiently to handle thousands or millions of concurrent operations, all while maintaining code simplicity and readability.
The real power of virtual threads comes from the realization that we can now think about concurrency in terms of the logical structure of our problem rather than the physical limitations of our hardware. This shift in mindset allows us to write more natural, direct code that better represents the concurrent nature of the systems we build.