Let me tell you how I learned to think about my code differently. I used to write instructions line by line, waiting for each step to finish before starting the next. It felt orderly. Then I built a system that needed to handle thousands of simultaneous requests. My orderly approach fell apart. That’s when I discovered a different way of building software. This approach doesn’t just wait around. It reacts.
This is reactive programming. Instead of telling the computer exactly what to do and when, you describe what should happen when data becomes available. You set up pipelines for information to flow through. When a piece of data arrives at the start, it travels through your predefined logic, transforming as it goes, until it reaches its destination. Nothing happens until you start listening for that data. This model is perfect for today’s applications that need to be responsive under heavy load.
Let me show you the tools. In Java, we often use Project Reactor. Its two fundamental building blocks are Flux and Mono. Think of a Flux as a pipe that can carry zero, one, or many items. A Mono is a pipe that carries either zero items or exactly one item. You declare what happens inside these pipes.
Flux<Integer> numberStream = Flux.just(1, 2, 3, 4, 5);
Flux<String> messageStream = numberStream
.filter(num -> num % 2 == 0) // Let only even numbers pass
.map(num -> "Even number: " + num) // Transform each number
.delayElements(Duration.ofMillis(50)); // Space them out
messageStream.subscribe(
message -> System.out.println("Received: " + message),
error -> System.err.println("Something broke: " + error),
() -> System.out.println("The stream is finished.")
);
Creating these streams does nothing by itself. It’s like drawing a blueprint for a factory assembly line. The machinery only starts when the subscribe() method is called. This lazy nature is powerful. It allows you to build complex processing logic that remains idle until needed, saving resources.
One of the first real problems I hit was overflow. What if the data source produces items ten times faster than my code can process them? In a traditional setup, this would queue items in memory until the system runs out and crashes. Reactive systems have a built-in conversation to prevent this, called backpressure.
The consumer can tell the producer, “I’m ready for N more items.” This control keeps the system stable.
Flux.range(1, 10000) // A fast producer
.onBackpressureBuffer(20) // Hold up to 20 items in a waiting area
.subscribe(
item -> {
// Simulate slow processing
Thread.sleep(10);
System.out.println("Processing: " + item);
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Done"),
subscription -> {
// This is key: Ask for 5 items to start
subscription.request(5);
// Later, you could request more when ready
}
);
If the buffer fills up, you can define strategies. You can drop new items, emit an error, or use other tactics. This gives you a safety valve for handling unexpected traffic spikes without bringing your service down.
Applications are rarely simple. You often need data from several places—a user profile from one service, their recent orders from another, maybe notifications from a third. You need to combine these separate streams of data. Reactor provides operators for this.
// Get user and order independently, as Mono streams
Mono<User> userMono = userService.fetchUser("user123");
Mono<Order> orderMono = orderService.fetchLatestOrder("user123");
// Combine and wait for BOTH results
Mono<UserOrderView> combinedView = Mono.zip(userMono, orderMono)
.map(tuple -> {
User user = tuple.getT1();
Order order = tuple.getT2();
return new UserOrderView(user, order);
});
// For ongoing streams, merge them as they arrive
Flux<LogEntry> appLogs = applicationService.getLogStream();
Flux<LogEntry> systemLogs = systemMonitor.getLogStream();
Flux<LogEntry> allLogs = Flux.merge(appLogs, systemLogs);
The zip operator is like a friend who waits for everyone to arrive before opening the gifts. The merge operator is like a receptionist directing people from two doors into a single room as they walk in. Choosing the right one depends on whether you need synchronization or just a combined flow.
Things go wrong. A network call fails. A database times out. In a reactive stream, an error is just another type of signal that travels downstream. If unhandled, it will cancel the entire subscription. You must plan for failure.
Flux<String> robustDataStream = externalDataService.getDataFeed()
.timeout(Duration.ofSeconds(4)) // Fail if too slow
.onErrorResume(TimeoutException.class, timeoutEx -> {
// If we timeout, switch to a cached data stream
log.warn("Service slow, using cache.");
return cacheService.getCachedFeed();
})
.onErrorResume(ServiceException.class, serviceEx -> {
// For specific errors, maybe try a backup service
return backupService.getDataFeed();
})
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)));
The onErrorResume operator is your contingency plan. It catches an error and switches to a different, safe data source. The retryWhen operator is your persistence. It can resubscribe to the original source after a delay, which is great for temporary glitches. This approach keeps your application resilient.
The real power comes from the dozens of small, focused operators that transform data inside the stream. You can filter, collect, group, or reduce items without any messy external state management.
Flux<Sale> salesThisHour = salesRepository.findByTimestampAfter(startTime);
// Group sales into batches of 10
Flux<List<Sale>> batches = salesThisHour.buffer(10);
// Only take one sale per second to throttle volume
Flux<Sale> sampledSales = salesThisHour.sample(Duration.ofSeconds(1));
// Calculate running total
Mono<Double> totalRevenue = salesThisHour
.map(Sale::getAmount)
.reduce(0.0, Double::sum);
I think of these operators as specialized tools on a workshop wall. Need to group items? Grab buffer. Need to skip duplicates in a row? distinctUntilChanged is your tool. They let you express complex data processing logic in a clean, declarative way.
Where does your code actually run? By default, everything happens on the thread that calls subscribe(). For non-blocking operations, this is fine. But if you need to call a slow, traditional library (like a blocking JDBC driver), you must move that work off the main reactive threads.
Flux.fromIterable(customerIds)
.parallel(4) // Process up to 4 items concurrently
.runOn(Schedulers.parallel()) // Use a pool for CPU work
.map(id -> performCpuHeavyCalculation(id))
.sequential() // Go back to a single stream
.publishOn(Schedulers.boundedElastic()) // Switch to a pool for I/O
.flatMap(id -> Mono.fromCallable(() -> blockingDatabaseCall(id)))
.subscribe();
Schedulers.parallel() is for fast, CPU-intensive tasks. Schedulers.boundedElastic() is your go-to for wrapping old, blocking calls. It creates threads as needed, with a limit, so you won’t overwhelm your system. Telling your pipeline which scheduler to use for each stage is a critical skill.
Testing this async code felt strange at first. You can’t just use assertEquals on a Flux. The test might finish before the data arrives. Reactor provides a tool called StepVerifier. It lets you describe what you expect to happen in the stream, then it plays the stream and checks your expectations.
@Test
void testDataTransformation() {
Flux<String> testFlux = Flux.just("alpha", "bravo", "charlie")
.map(String::toUpperCase);
StepVerifier.create(testFlux)
.expectNext("ALPHA")
.expectNext("BRAVO")
.expectNext("CHARLIE")
.expectComplete() // Verify the stream completes normally
.verify(); // Trigger the subscription and verification
}
@Test
void testSlowStream() {
// A stream that would take 2 hours in real time
Flux<Long> hourlyTick = Flux.interval(Duration.ofHours(1)).take(2);
StepVerifier.withVirtualTime(() -> hourlyTick)
.expectSubscription()
.thenAwait(Duration.ofHours(2)) // Simulate waiting 2 hours instantly
.expectNext(0L, 1L)
.expectComplete()
.verify();
}
The virtual time feature is a game-changer. It lets you test timers, delays, and intervals without actually sitting around waiting. Your tests stay fast and reliable.
We don’t build systems from scratch. You’ll have to integrate with libraries that use old-style, blocking APIs or callbacks. You need safe bridges between the old world and the new reactive world.
// Wrapping a blocking call
Mono<DatabaseRecord> recordMono = Mono.fromCallable(() -> {
return jdbcTemplate.queryForObject(sql, rowMapper); // This blocks!
})
.subscribeOn(Schedulers.boundedElastic()); // Do it on a safe thread
// Turning a callback-based API into a Mono
Mono<ApiResponse> monoResponse = Mono.create(sink -> {
legacyAsyncClient.makeRequest(params, new Callback() {
@Override
public void onSuccess(Response r) { sink.success(r); }
@Override
public void onFailure(Exception e) { sink.error(e); }
});
});
The Mono.fromCallable wrapper defers the blocking call until subscription and runs it on the scheduler you specify. This containment prevents the blocking operation from stalling your core reactive threads. It’s how you keep the system responsive.
For web applications, Spring WebFlux provides a reactive foundation. Your controller methods now return Mono or Flux types. This allows the entire request-handling chain, from the network card to your database call, to be non-blocking.
@RestController
public class ProductController {
@GetMapping("/products/{id}")
public Mono<ProductDto> getProduct(@PathVariable String id) {
return productService.findByIdReactive(id)
.map(ProductMapper::toDto)
.timeout(Duration.ofSeconds(3));
}
@GetMapping(value = "/products/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ProductUpdateDto> streamProductUpdates() {
return productUpdateService.getLiveUpdateStream();
}
}
When this endpoint is called, Spring subscribes to the returned Mono for you. The framework handles the subscription, the waiting, and sending the response when the Mono provides its value. The streaming endpoint opens a long-lived connection, sending new product updates as they happen in real time.
In a distributed system, a single slow or failing service can cause cascading failures. A circuit breaker is a pattern that wraps calls to a service. If failures reach a threshold, it “opens the circuit” and fails immediately for a period, giving the failing service time to recover.
// Configure a circuit breaker
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.slidingWindowSize(20)
.failureRateThreshold(50.0f) // Open circuit if 50% of calls fail
.waitDurationInOpenState(Duration.ofSeconds(60))
.build();
CircuitBreaker cb = CircuitBreaker.of("inventoryService", config);
// Apply it to a reactive call
Flux<Inventory> safeInventoryCall = inventoryService.getStockLevels()
.transformDeferred(CircuitBreakerOperator.of(cb))
.onErrorResume(CallNotPermittedException.class, ex -> {
// Circuit is open! Return a safe fallback.
return Flux.just(Inventory.getDefaultStock());
});
This pattern doesn’t prevent the initial failure, but it stops your system from hammering a broken service. It fails fast and provides a fallback, maintaining overall responsiveness. It’s a vital part of building robust reactive systems.
Shifting to this reactive mindset changed how I design software. It’s less about writing a sequence of commands and more about defining the rules of engagement for data. You build pipelines, plan for failure, and control flow. The initial effort feels significant, but the result is an application that uses resources efficiently, scales elegantly, and stays responsive under conditions that would cripple a traditional design. It’s a powerful way to build for the modern world.