Java design patterns serve as essential templates for solving common problems in event-driven architectures. I’ve spent years implementing these patterns across various projects, and I’ve found that mastering them can significantly improve system responsiveness, maintainability, and scalability.
Observer Pattern
The Observer pattern establishes a one-to-many relationship between objects. When one object (the subject) changes state, all its dependents (observers) receive automatic notifications.
public interface EventListener {
void onEvent(Event event);
}
public class EventPublisher {
private final List<EventListener> listeners = new CopyOnWriteArrayList<>();
public void subscribe(EventListener listener) {
listeners.add(listener);
}
public void unsubscribe(EventListener listener) {
listeners.remove(listener);
}
public void publish(Event event) {
listeners.forEach(listener -> listener.onEvent(event));
}
}
I find this pattern particularly useful for building user interfaces where multiple components need to respond to a single action. For example, when a user updates their profile, we might need to refresh their avatar, update navigation elements, and log the change—all handled by different observers.
The use of CopyOnWriteArrayList
is crucial for thread safety, as it creates a new copy of the underlying array whenever the collection is modified, allowing safe iteration even during concurrent modifications.
Mediator Pattern
The Mediator pattern reduces direct connections between components by forcing them to communicate through a central point.
public class EventMediator {
private final Map<Class<?>, List<Consumer<?>>> handlers = new HashMap<>();
public <T> void register(Class<T> eventType, Consumer<T> handler) {
handlers.computeIfAbsent(eventType, k -> new ArrayList<>()).add(handler);
}
@SuppressWarnings("unchecked")
public <T> void dispatch(T event) {
List<Consumer<?>> eventHandlers = handlers.getOrDefault(event.getClass(), Collections.emptyList());
eventHandlers.forEach(handler -> ((Consumer<T>)handler).accept(event));
}
}
This pattern excels in complex systems where many components need to interact. Instead of each component knowing about others, they only need to know about the mediator. In practice, I’ve implemented this in chat applications where messages, status updates, and typing indicators all flow through a central mediator.
A practical enhancement is to make the mediator asynchronous:
public <T> void dispatchAsync(T event) {
CompletableFuture.runAsync(() -> dispatch(event));
}
Command Pattern
The Command pattern encapsulates a request as an object, allowing for parameterization of clients with different requests, queuing of requests, and logging of operations.
public interface Command {
void execute();
void undo();
}
public class CommandBus {
private final Queue<Command> commandQueue = new ConcurrentLinkedQueue<>();
private final Deque<Command> executedCommands = new LinkedList<>();
public void dispatch(Command command) {
commandQueue.offer(command);
processQueue();
}
private void processQueue() {
Command command;
while ((command = commandQueue.poll()) != null) {
command.execute();
executedCommands.push(command);
}
}
public void undoLast() {
if (!executedCommands.isEmpty()) {
Command command = executedCommands.pop();
command.undo();
}
}
}
I’ve used this pattern extensively in workflow engines where operations like “approve document,” “reject application,” or “escalate issue” are represented as commands. The undo functionality is particularly valuable for implementing transactional behavior or providing user-facing “undo” features.
Saga Pattern
The Saga pattern manages failures in distributed transactions by organizing multiple local transactions, each publishing events and responding to events.
public class OrderProcessingSaga {
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final ShippingService shippingService;
public CompletableFuture<OrderResult> process(Order order) {
return validateOrder(order)
.thenCompose(valid -> reserveInventory(order))
.thenCompose(reserved -> processPayment(order))
.thenCompose(paymentResult -> arrangeShipping(order))
.thenApply(shippingResult -> completeOrder(order))
.exceptionally(e -> {
compensate(order, e);
return new OrderResult(OrderStatus.FAILED);
});
}
private void compensate(Order order, Throwable cause) {
// Determine which steps completed and perform compensating actions
if (cause instanceof PaymentFailedException) {
inventoryService.releaseReservation(order.getId());
} else if (cause instanceof ShippingFailedException) {
paymentService.refund(order.getId());
inventoryService.releaseReservation(order.getId());
}
}
}
This pattern shines in e-commerce systems where an order might involve inventory checks, payment processing, and shipping arrangements across different services. Each step must be compensated if a later step fails.
I’ve found that maintaining a state machine for each saga instance helps track progress and determine appropriate compensating actions when failures occur.
CQRS Pattern
Command Query Responsibility Segregation (CQRS) separates read and write operations, allowing for independent optimization of each path.
public class OrderService {
private final CommandBus commandBus;
private final QueryBus queryBus;
public void createOrder(CreateOrderCommand command) {
commandBus.dispatch(command);
}
public OrderSummary getOrderSummary(GetOrderQuery query) {
return queryBus.execute(query);
}
}
public class OrderCommandHandler {
private final OrderRepository repository;
private final EventPublisher eventPublisher;
public void handle(CreateOrderCommand command) {
Order order = new Order(command.getCustomerId(), command.getItems());
repository.save(order);
eventPublisher.publish(new OrderCreatedEvent(order.getId()));
}
}
public class OrderQueryHandler {
private final OrderReadModel readModel;
public OrderSummary handle(GetOrderQuery query) {
return readModel.getOrderSummary(query.getOrderId());
}
}
I’ve implemented CQRS in high-traffic systems where read operations vastly outnumber writes. By separating concerns, we can scale read and write sides independently and optimize database schemas for each use case.
The real power comes when combining CQRS with event sourcing, using events to update read models asynchronously.
Event Sourcing Pattern
Event Sourcing persists all changes to application state as a sequence of events, which can be used to reconstruct past states and serve as a complete audit log.
public abstract class AggregateRoot {
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
protected void apply(DomainEvent event) {
uncommittedEvents.add(event);
when(event);
}
protected abstract void when(DomainEvent event);
public List<DomainEvent> getUncommittedEvents() {
return new ArrayList<>(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
}
public class Order extends AggregateRoot {
private String id;
private OrderStatus status;
private List<OrderItem> items;
public Order(String customerId, List<OrderItem> items) {
apply(new OrderCreatedEvent(UUID.randomUUID().toString(), customerId, items));
}
public void pay(String paymentId) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("Order is not in CREATED state");
}
apply(new OrderPaidEvent(id, paymentId));
}
@Override
protected void when(DomainEvent event) {
if (event instanceof OrderCreatedEvent e) {
this.id = e.orderId();
this.status = OrderStatus.CREATED;
this.items = e.items();
} else if (event instanceof OrderPaidEvent) {
this.status = OrderStatus.PAID;
}
}
}
In my experience, event sourcing provides unparalleled audit capabilities and temporal query options. For financial systems, being able to reconstruct the exact state at any point in time is invaluable for compliance and debugging.
Adding event versioning and schema evolution strategies is essential for long-lived event-sourced systems:
public interface EventUpgrader<T extends DomainEvent> {
DomainEvent upgrade(T oldEvent);
}
public class EventStore {
private final Map<Class<?>, EventUpgrader<?>> upgraders = new HashMap<>();
@SuppressWarnings("unchecked")
public List<DomainEvent> getEventsForAggregate(String aggregateId) {
List<StoredEvent> storedEvents = repository.getEvents(aggregateId);
return storedEvents.stream()
.map(stored -> {
DomainEvent event = deserialize(stored);
EventUpgrader<DomainEvent> upgrader =
(EventUpgrader<DomainEvent>) upgraders.get(event.getClass());
return upgrader != null ? upgrader.upgrade(event) : event;
})
.collect(Collectors.toList());
}
}
Reactor Pattern
The Reactor pattern handles service requests delivered concurrently by one or more inputs, dispatching them synchronously to associated request handlers.
public class EventReactor {
private final Selector selector;
private final Map<SelectionKey, EventHandler> handlers = new HashMap<>();
public EventReactor() throws IOException {
this.selector = Selector.open();
}
public void registerHandler(SelectableChannel channel, int ops, EventHandler handler) throws IOException {
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, ops);
handlers.put(key, handler);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isValid()) {
EventHandler handler = handlers.get(key);
handler.handleEvent(key);
}
it.remove();
}
}
} catch (IOException e) {
// Log and handle the exception
}
}
}
public interface EventHandler {
void handleEvent(SelectionKey key) throws IOException;
}
public class AcceptEventHandler implements EventHandler {
private final ServerSocketChannel serverChannel;
private final EventReactor reactor;
@Override
public void handleEvent(SelectionKey key) throws IOException {
SocketChannel client = serverChannel.accept();
if (client != null) {
reactor.registerHandler(client, SelectionKey.OP_READ, new ReadEventHandler());
}
}
}
I’ve applied this pattern in network servers and high-performance I/O systems. The Reactor pattern is the foundation of frameworks like Netty and is integral to non-blocking servers.
A modern implementation might use Java NIO.2 with the AsynchronousChannelGroup
:
public class ModernEventReactor {
private final AsynchronousChannelGroup group;
public ModernEventReactor(int threadCount) throws IOException {
this.group = AsynchronousChannelGroup.withFixedThreadPool(
threadCount, Executors.defaultThreadFactory());
}
public AsynchronousServerSocketChannel createServerChannel(int port) throws IOException {
AsynchronousServerSocketChannel channel =
AsynchronousServerSocketChannel.open(group);
channel.bind(new InetSocketAddress(port));
return channel;
}
public void accept(AsynchronousServerSocketChannel channel,
CompletionHandler<AsynchronousSocketChannel, Void> handler) {
channel.accept(null, handler);
}
}
Practical Application and Best Practices
When implementing these patterns in production systems, I follow several best practices:
-
Start simple with the Observer pattern for basic event notification before progressing to more complex patterns.
-
Use typed events to ensure compile-time safety:
public abstract class DomainEvent {
private final String id;
private final Instant timestamp;
protected DomainEvent() {
this.id = UUID.randomUUID().toString();
this.timestamp = Instant.now();
}
public String getId() { return id; }
public Instant getTimestamp() { return timestamp; }
}
- Implement idempotency for event handlers to handle retries and duplicate events:
public class IdempotentOrderHandler {
private final Set<String> processedEvents = Collections.synchronizedSet(new HashSet<>());
public void handle(OrderCreatedEvent event) {
if (processedEvents.add(event.getId())) {
// Process the event
}
}
}
- Use event schemas with versioning for long-term maintainability:
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class BaseEvent {
private final int version;
protected BaseEvent(int version) {
this.version = version;
}
public int getVersion() {
return version;
}
}
- Implement backpressure mechanisms for high-throughput systems:
public class BackpressuredEventBus {
private final Executor executor;
private final Semaphore semaphore;
public BackpressuredEventBus(int maxConcurrent) {
this.executor = Executors.newCachedThreadPool();
this.semaphore = new Semaphore(maxConcurrent);
}
public void publish(Event event) {
try {
semaphore.acquire();
executor.execute(() -> {
try {
// Process event
} finally {
semaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Integrating these patterns has transformed my approach to building distributed systems. By separating concerns, enabling asynchronous processing, and designing for resilience, I’ve created systems that can handle higher loads, recover from failures gracefully, and adapt to changing requirements with minimal disruption.
The combination of Command pattern for operations, Event Sourcing for persistence, CQRS for scalability, and Saga pattern for distributed transactions creates a powerful foundation for modern applications. When implemented thoughtfully, these patterns create systems that are not just robust but also a joy to maintain and extend.