Concurrency in Java represents one of the most powerful yet challenging aspects of the language. As a developer who has worked extensively with multithreaded applications, I’ve found that design patterns provide essential structures for managing concurrent operations effectively. These patterns help create robust, scalable, and maintainable code while avoiding common pitfalls like race conditions, deadlocks, and thread starvation.
Thread Pool Pattern
The Thread Pool pattern addresses the significant overhead of thread creation and destruction. Each time we create a thread, the JVM allocates memory and resources, which can become expensive with frequent thread creation.
I’ve implemented thread pools in numerous high-load applications, and they consistently improve performance by reusing existing threads rather than creating new ones for each task.
public class ThreadPoolExample {
public static void main(String[] args) {
// Create a fixed thread pool with 5 worker threads
ExecutorService executor = Executors.newFixedThreadPool(5);
// Submit 10 tasks to be executed by the thread pool
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " executed by " +
Thread.currentThread().getName());
try {
// Simulate work
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Task " + taskId + " completed";
});
}
// Initiate an orderly shutdown
executor.shutdown();
try {
// Wait for all tasks to complete
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
// Force shutdown if tasks don't complete in time
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
For more control over thread pool behavior, we can use ThreadPoolExecutor directly:
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
2, // Core pool size
10, // Maximum pool size
60, TimeUnit.SECONDS, // Keep-alive time for idle threads
new ArrayBlockingQueue<>(100), // Work queue
new ThreadPoolExecutor.CallerRunsPolicy() // Rejection policy
);
In a recent project, I encountered a bottleneck when using a single thread pool for both CPU-intensive and I/O-bound tasks. Separating these concerns into different pools significantly improved application responsiveness:
// For CPU-intensive tasks (number of cores)
ExecutorService computeExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
// For I/O-bound tasks (more threads since they mostly wait)
ExecutorService ioExecutor = Executors.newFixedThreadPool(30);
Producer-Consumer Pattern
The Producer-Consumer pattern is invaluable when you need to decouple operations that generate data from those that process it. This pattern helps manage resource consumption and provides a natural load-balancing mechanism.
public class ProducerConsumerExample {
private static final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
public static void main(String[] args) {
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
String message = "Message " + i;
queue.put(message);
System.out.println("Produced: " + message);
Thread.sleep(100); // Simulate varying production time
}
// Add poison pill to signal end of production
queue.put("DONE");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
String message;
while (!(message = queue.take()).equals("DONE")) {
System.out.println("Consumed: " + message);
Thread.sleep(300); // Simulate varying consumption time
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
In a real-world application, I implemented a log processing system using this pattern. Log entries were produced rapidly by application components and consumed at a measured pace by analytics services:
public class LogProcessor {
private final BlockingQueue<LogEntry> queue;
private final ExecutorService producers;
private final ExecutorService consumers;
private volatile boolean running = true;
public LogProcessor(int queueCapacity, int producerCount, int consumerCount) {
this.queue = new ArrayBlockingQueue<>(queueCapacity);
this.producers = Executors.newFixedThreadPool(producerCount);
this.consumers = Executors.newFixedThreadPool(consumerCount);
}
public void start() {
// Start consumers
for (int i = 0; i < consumers.getCorePoolSize(); i++) {
consumers.submit(this::processLogs);
}
}
public Future<?> submitLog(LogEntry entry) {
return producers.submit(() -> {
try {
queue.put(entry);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private void processLogs() {
while (running) {
try {
LogEntry entry = queue.poll(100, TimeUnit.MILLISECONDS);
if (entry != null) {
// Process log entry
analyzeAndStore(entry);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void analyzeAndStore(LogEntry entry) {
// Analyze log data and store in database
}
public void shutdown() {
running = false;
producers.shutdown();
consumers.shutdown();
}
}
Read-Write Lock Pattern
The Read-Write Lock pattern optimizes concurrency by allowing multiple readers to access a resource simultaneously while ensuring exclusive access for writers. This pattern significantly improves throughput in read-heavy applications.
public class ReadWriteLockExample {
private final Map<String, String> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public String getValue(String key) {
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}
public void putValue(String key, String value) {
writeLock.lock();
try {
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
public String getOrCompute(String key, Function<String, String> computation) {
// First try with read lock
readLock.lock();
try {
String value = cache.get(key);
if (value != null) {
return value;
}
} finally {
readLock.unlock();
}
// If not found, acquire write lock and compute
writeLock.lock();
try {
// Double-check in case another thread updated while we were waiting
String value = cache.get(key);
if (value == null) {
value = computation.apply(key);
cache.put(key, value);
}
return value;
} finally {
writeLock.unlock();
}
}
}
A more sophisticated implementation I developed for a distributed caching system included expiration and statistics:
public class EnhancedReadWriteCache<K, V> {
private final Map<K, CacheEntry<V>> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AtomicLong hits = new AtomicLong();
private final AtomicLong misses = new AtomicLong();
public V get(K key) {
lock.readLock().lock();
try {
CacheEntry<V> entry = cache.get(key);
if (entry != null && !entry.isExpired()) {
hits.incrementAndGet();
return entry.getValue();
} else {
misses.incrementAndGet();
return null;
}
} finally {
lock.readLock().unlock();
}
}
public void put(K key, V value, long timeToLiveMs) {
lock.writeLock().lock();
try {
long expirationTime = timeToLiveMs > 0
? System.currentTimeMillis() + timeToLiveMs
: Long.MAX_VALUE;
cache.put(key, new CacheEntry<>(value, expirationTime));
} finally {
lock.writeLock().unlock();
}
}
public Map<String, Long> getStatistics() {
Map<String, Long> stats = new HashMap<>();
stats.put("hits", hits.get());
stats.put("misses", misses.get());
stats.put("count", (long) cache.size());
return stats;
}
// Periodically clean expired entries
public void cleanExpiredEntries() {
lock.writeLock().lock();
try {
Iterator<Map.Entry<K, CacheEntry<V>>> iterator = cache.entrySet().iterator();
while (iterator.hasNext()) {
if (iterator.next().getValue().isExpired()) {
iterator.remove();
}
}
} finally {
lock.writeLock().unlock();
}
}
private static class CacheEntry<V> {
private final V value;
private final long expirationTime;
public CacheEntry(V value, long expirationTime) {
this.value = value;
this.expirationTime = expirationTime;
}
public V getValue() {
return value;
}
public boolean isExpired() {
return System.currentTimeMillis() > expirationTime;
}
}
}
Future and Promise Pattern
The Future and Promise pattern provides a powerful model for asynchronous programming. Futures represent values that will be available at a later time, while promises allow you to set those values when they become available.
Java’s CompletableFuture implements this pattern elegantly:
public class FuturePromiseExample {
private final ExecutorService executor = Executors.newFixedThreadPool(4);
public CompletableFuture<String> fetchUserData(long userId) {
return CompletableFuture.supplyAsync(() -> {
try {
// Simulate database access
Thread.sleep(500);
return "User data for ID: " + userId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
}
}, executor);
}
public CompletableFuture<String> fetchUserPosts(long userId) {
return CompletableFuture.supplyAsync(() -> {
try {
// Simulate service call
Thread.sleep(700);
return "Posts for user ID: " + userId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
}
}, executor);
}
public void loadUserProfile(long userId) {
CompletableFuture<String> userData = fetchUserData(userId);
CompletableFuture<String> userPosts = fetchUserPosts(userId);
userData.thenCombine(userPosts, (data, posts) -> {
// Combine results when both futures complete
return new UserProfile(data, posts);
}).thenAccept(profile -> {
System.out.println("Profile loaded: " + profile);
}).exceptionally(ex -> {
System.err.println("Error loading profile: " + ex.getMessage());
return null;
});
}
public void shutdown() {
executor.shutdown();
}
private static class UserProfile {
private final String userData;
private final String userPosts;
public UserProfile(String userData, String userPosts) {
this.userData = userData;
this.userPosts = userPosts;
}
@Override
public String toString() {
return "UserProfile{userData='" + userData + "', userPosts='" + userPosts + "'}";
}
}
}
In a microservices architecture I developed, we used CompletableFuture to orchestrate parallel API calls efficiently:
public class ServiceOrchestrator {
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final ShippingService shippingService;
private final NotificationService notificationService;
public CompletableFuture<OrderResult> processOrder(Order order) {
// Check inventory and process payment in parallel
CompletableFuture<Boolean> inventoryCheck =
inventoryService.checkAvailability(order.getItems());
CompletableFuture<PaymentResult> paymentProcess =
paymentService.processPayment(order.getPaymentDetails());
// Only proceed if both inventory check and payment succeed
return inventoryCheck.thenCombine(paymentProcess, (itemsAvailable, paymentResult) -> {
if (!itemsAvailable) {
throw new OrderException("Items not available");
}
if (!paymentResult.isSuccessful()) {
throw new OrderException("Payment failed: " + paymentResult.getErrorMessage());
}
return new OrderConfirmation(order.getId(), paymentResult.getTransactionId());
}).thenCompose(confirmation -> {
// Schedule shipping
return shippingService.scheduleDelivery(order.getItems(), order.getShippingAddress())
.thenApply(shipment -> new OrderResult(confirmation, shipment));
}).whenComplete((result, error) -> {
// Notify customer regardless of outcome
if (error != null) {
notificationService.notifyOrderFailed(order.getCustomerEmail(), error.getMessage());
} else {
notificationService.notifyOrderSuccess(order.getCustomerEmail(), result);
}
});
}
}
Thread Local Storage Pattern
The Thread Local Storage pattern provides thread-isolated variables, allowing each thread to have its own independent copy of data. This pattern is especially useful for maintaining context information without explicit parameter passing.
public class ThreadLocalExample {
private static final ThreadLocal<SimpleDateFormat> dateFormatter =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
private static final ThreadLocal<TransactionContext> transactionContext =
new ThreadLocal<>();
public static String formatDate(Date date) {
return dateFormatter.get().format(date);
}
public static void startTransaction(String transactionId) {
transactionContext.set(new TransactionContext(transactionId));
}
public static TransactionContext getCurrentTransaction() {
return transactionContext.get();
}
public static void endTransaction() {
TransactionContext context = transactionContext.get();
if (context != null) {
// Clean up transaction resources
context.close();
transactionContext.remove(); // Prevent memory leaks
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int id = i;
executor.submit(() -> {
try {
startTransaction("TXN-" + id);
// Perform work using the transaction context
TransactionContext tx = getCurrentTransaction();
tx.addOperation("QUERY-" + id);
// Use thread-local formatter
System.out.println(Thread.currentThread().getName() +
": Processing transaction " + tx.getId() +
" on " + formatDate(new Date()));
Thread.sleep(100); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endTransaction(); // Always clean up
}
});
}
executor.shutdown();
}
private static class TransactionContext implements AutoCloseable {
private final String id;
private final List<String> operations = new ArrayList<>();
private final long startTime;
public TransactionContext(String id) {
this.id = id;
this.startTime = System.currentTimeMillis();
}
public String getId() {
return id;
}
public void addOperation(String operation) {
operations.add(operation);
}
@Override
public void close() {
long duration = System.currentTimeMillis() - startTime;
System.out.println("Transaction " + id + " completed with " +
operations.size() + " operations in " + duration + "ms");
}
}
}
I’ve found ThreadLocal particularly useful in web applications for tracking user sessions across multiple service calls:
public class RequestContextFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
try {
// Extract request information
String requestId = httpRequest.getHeader("X-Request-ID");
if (requestId == null) {
requestId = UUID.randomUUID().toString();
}
String userId = extractUserIdFromSession(httpRequest);
// Set up thread local context
RequestContext context = new RequestContext(requestId, userId);
RequestContext.setCurrent(context);
// Add timing information
context.setStartTime(System.currentTimeMillis());
// Continue with request processing
chain.doFilter(request, response);
} finally {
// Log request timing
RequestContext context = RequestContext.getCurrent();
if (context != null) {
long duration = System.currentTimeMillis() - context.getStartTime();
LogManager.getLogger(getClass()).info(
"Request {} completed in {}ms", context.getRequestId(), duration);
}
// Always clean up thread local
RequestContext.clear();
}
}
private String extractUserIdFromSession(HttpServletRequest request) {
HttpSession session = request.getSession(false);
if (session != null) {
User user = (User) session.getAttribute("user");
return user != null ? user.getId() : "anonymous";
}
return "anonymous";
}
}
// Thread-local request context
public class RequestContext {
private static final ThreadLocal<RequestContext> current = new ThreadLocal<>();
private final String requestId;
private final String userId;
private long startTime;
public RequestContext(String requestId, String userId) {
this.requestId = requestId;
this.userId = userId;
}
public static void setCurrent(RequestContext context) {
current.set(context);
}
public static RequestContext getCurrent() {
return current.get();
}
public static void clear() {
current.remove();
}
// Getters and setters
public String getRequestId() { return requestId; }
public String getUserId() { return userId; }
public long getStartTime() { return startTime; }
public void setStartTime(long startTime) { this.startTime = startTime; }
}
Actor Pattern
The Actor pattern provides a framework for building concurrent systems around the concept of isolated actors that communicate exclusively through message passing. This isolation eliminates many concurrency issues by avoiding shared mutable state.
While Java doesn’t have built-in actors like Erlang or Scala, we can implement the pattern effectively:
public class ActorSystem {
private final ExecutorService executor;
private final Map<String, Actor> actors = new ConcurrentHashMap<>();
public ActorSystem(int threadPoolSize) {
this.executor = Executors.newFixedThreadPool(threadPoolSize);
}
public void registerActor(String actorId, Actor actor) {
actors.put(actorId, actor);
actor.setActorSystem(this);
}
public void sendMessage(String actorId, Message message) {
Actor actor = actors.get(actorId);
if (actor != null) {
executor.execute(() -> actor.receiveMessage(message));
} else {
throw new IllegalArgumentException("Actor not found: " + actorId);
}
}
public void shutdown() {
executor.shutdown();
}
}
public abstract class Actor {
private ActorSystem system;
void setActorSystem(ActorSystem system) {
this.system = system;
}
protected void send(String actorId, Message message) {
system.sendMessage(actorId, message);
}
abstract void receiveMessage(Message message);
}
public interface Message {
String getType();
Map<String, Object> getData();
}
Here’s a practical implementation of a worker coordination system using the actor pattern:
public class WorkerCoordinationExample {
public static void main(String[] args) throws InterruptedException {
ActorSystem system = new ActorSystem(10);
// Register coordinator actor
CoordinatorActor coordinator = new CoordinatorActor();
system.registerActor("coordinator", coordinator);
// Register worker actors
for (int i = 0; i < 5; i++) {
WorkerActor worker = new WorkerActor("worker-" + i);
system.registerActor(worker.getId(), worker);
}
// Start processing
Map<String, Object> data = new HashMap<>();
data.put("taskCount", 20);
system.sendMessage("coordinator", new SimpleMessage("START", data));
// Let the system run for a while
Thread.sleep(10000);
system.shutdown();
}
static class CoordinatorActor extends Actor {
private int pendingTasks = 0;
private int completedTasks = 0;
private final List<String> workers = new ArrayList<>();
private final Random random = new Random();
@Override
void receiveMessage(Message message) {
switch (message.getType()) {
case "START":
int taskCount = (Integer) message.getData().get("taskCount");
System.out.println("Coordinator starting with " + taskCount + " tasks");
pendingTasks = taskCount;
// Find all worker actors
message.getData().keySet().stream()
.filter(k -> k.startsWith("worker-"))
.forEach(workers::add);
// Dispatch initial tasks
dispatchTasks();
break;
case "TASK_COMPLETED":
String workerId = (String) message.getData().get("workerId");
completedTasks++;
System.out.println("Task completed by " + workerId +
". Progress: " + completedTasks + "/" +
(pendingTasks + completedTasks));
// Dispatch more work if available
if (pendingTasks > 0) {
Map<String, Object> taskData = new HashMap<>();
taskData.put("taskId", "task-" + UUID.randomUUID().toString());
send(workerId, new SimpleMessage("EXECUTE_TASK", taskData));
pendingTasks--;
}
break;
}
}
private void dispatchTasks() {
for (String workerId : workers) {
if (pendingTasks > 0) {
Map<String, Object> taskData = new HashMap<>();
taskData.put("taskId", "task-" + UUID.randomUUID().toString());
send(workerId, new SimpleMessage("EXECUTE_TASK", taskData));
pendingTasks--;
}
}
}
}
static class WorkerActor extends Actor {
private final String id;
private final Random random = new Random();
public WorkerActor(String id) {
this.id = id;
}
public String getId() {
return id;
}
@Override
void receiveMessage(Message message) {
if ("EXECUTE_TASK".equals(message.getType())) {
String taskId = (String) message.getData().get("taskId");
System.out.println(id + " executing " + taskId);
try {
// Simulate task execution time
Thread.sleep(500 + random.nextInt(1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Report completion
Map<String, Object> resultData = new HashMap<>();
resultData.put("workerId", id);
resultData.put("taskId", taskId);
send("coordinator", new SimpleMessage("TASK_COMPLETED", resultData));
}
}
}
static class SimpleMessage implements Message {
private final String type;
private final Map<String, Object> data;
public SimpleMessage(String type, Map<String, Object> data) {
this.type = type;
this.data = new HashMap<>(data);
}
@Override
public String getType() {
return type;
}
@Override
public Map<String, Object> getData() {
return data;
}
}
}
These six patterns form a comprehensive toolkit for addressing concurrency challenges in Java applications. By understanding when and how to apply each pattern, you can build robust concurrent systems that leverage multi-core architectures while maintaining code clarity and correctness.
The key to successful concurrent programming isn’t just knowing these patterns but understanding how they complement each other. In complex applications, you’ll often find combinations of patterns working together—perhaps using Thread Pools to manage execution, Producer-Consumer for workload distribution, and Thread Local Storage for maintaining context.
As with any pattern, the goal isn’t rigid adherence but thoughtful application to solve specific problems. By mastering these patterns, you’ll develop an intuition for structuring concurrent code that balances performance, readability, and maintainability.