Many developers find themselves building applications that feel slow or unresponsive when dealing with many users or slow external services. The traditional way of writing code often involves waiting for one task to finish before starting the next, which can waste precious computing resources. This is where a different approach, centered around data streams and change propagation, becomes valuable. It’s a model that helps applications remain responsive and efficient under load.
In Java, this approach is built upon a simple specification that defines how to handle asynchronous data streams with something called backpressure. This specification is implemented by libraries that provide the tools to build these responsive systems.
At the heart of this model are two fundamental types: one for a stream of multiple items and another for a single result or empty value. Think of the first as a list that arrives over time, and the second as a future promise of one thing. They don’t do anything until you express interest in the data.
Here is a basic example:
import reactor.core.publisher.Flux;
import java.time.Duration;
public class BasicStreams {
public static void main(String[] args) throws InterruptedException {
// A stream of three names
Flux<String> nameStream = Flux.just("Alex", "Jordan", "Casey")
.delayElements(Duration.ofMillis(100)) // Emit each with a delay
.map(name -> name.toUpperCase()); // Transform each item
// Nothing happens yet. The stream is lazy.
System.out.println("Stream defined, but no data flows.");
// Only when we subscribe does the data start moving.
nameStream.subscribe(name -> System.out.println("Received: " + name));
// Keep the main thread alive long enough to see the output
Thread.sleep(500);
}
}
This code defines a stream but doesn’t process it until the subscribe method is called. The map operator transforms each item as it passes through. This declarative style lets you focus on what you want to do, not how the system should execute it step-by-step.
A major advantage of this style is how it manages failure. In a chain of operations, an error is just another type of signal that flows downstream, not an exception that necessarily crashes your program. You can handle it right there in the stream.
For instance, if a network call fails, you can try to get data from a cache instead, or retry the operation.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
public class ErrorHandling {
// Simulates a flaky network call
static Mono<String> fetchFromNetwork(String id) {
return Mono.fromCallable(() -> {
if (Math.random() > 0.7) {
throw new RuntimeException("Network timeout for " + id);
}
return "Data for " + id;
});
}
static Mono<String> fetchFromCache(String id) {
return Mono.just("Cached data for " + id);
}
public static void main(String[] args) {
String queryId = "user123";
Mono<String> resilientData = fetchFromNetwork(queryId)
.onErrorResume(throwable -> {
// On any error, fall back to the cache
System.err.println("Failed to fetch from network: " + throwable.getMessage());
return fetchFromCache(queryId);
})
.retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))); // Retry twice with a delay
resilientData.subscribe(
data -> System.out.println("Success: " + data),
error -> System.err.println("Complete failure: " + error)
);
}
}
The onErrorResume operator lets you provide an alternative source of data. The retryWhen operator can be configured with complex logic, like waiting longer between each attempt. This builds resilience directly into your data pipelines.
Real-world applications often need to combine data from several independent sources, like fetching a user profile and their recent orders simultaneously. You can merge, pair, or sequence these streams.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CombiningStreams {
static Mono<String> getUser() {
return Mono.just("User: Alex").delayElement(Duration.ofMillis(80));
}
static Mono<String> getOrder() {
return Mono.just("Order: #12345").delayElement(Duration.ofMillis(50));
}
public static void main(String[] args) throws InterruptedException {
// Merge: Get both results as soon as they arrive, interleaving them.
Flux<String> merged = Flux.merge(getUser(), getOrder());
merged.subscribe(item -> System.out.println("[Merged] " + item));
// Zip: Wait for both to complete, then pair them into a single object.
Mono<Tuple2<String, String>> zipped = Mono.zip(getUser(), getOrder());
zipped.subscribe(tuple -> System.out.println("[Zipped] User: " + tuple.getT1() + ", Order: " + tuple.getT2()));
Thread.sleep(200); // Wait for async operations
}
}
Merging is useful for independent tasks where order doesn’t matter. Zipping is crucial when you need results from multiple operations to compute a final result. Choosing the right combination strategy is key to efficient orchestration.
A critical concept in this model is backpressure. Imagine a fast-producing data source, like a live sensor, sending data to a slow consumer, like a complex analytics service. Without a safety mechanism, the slow consumer would be overwhelmed. Backpressure allows the consumer to signal how much data it can handle.
Let’s see how a subscriber controls the flow.
import reactor.core.publisher.Flux;
import org.reactivestreams.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
public class BackpressureDemo {
public static void main(String[] args) {
// A fast source: emits numbers 1 to 100 very quickly
Flux<Integer> fastSource = Flux.range(1, 100)
.doOnNext(i -> System.out.println("Produced: " + i))
.doOnComplete(() -> System.out.println("Production complete."));
// A slow subscriber
fastSource.subscribe(new BaseSubscriber<Integer>() {
final AtomicInteger count = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Request only 5 items to start
System.out.println("Subscriber ready. Requesting 5 items.");
request(5);
}
@Override
protected void hookOnNext(Integer value) {
// Process slowly
try { Thread.sleep(200); } catch (InterruptedException e) {}
System.out.println("Consumed: " + value);
// After every 5 items processed, request 5 more
if (count.incrementAndGet() % 5 == 0) {
System.out.println("Requesting next 5 items.");
request(5);
}
}
});
}
}
In this example, the subscriber is in full control. It processes five items, then asks for five more. The producer will not send more than the requested amount. For fast sources, you can also use strategies like buffering a limited number of items or dropping the newest data if the buffer is full, using operators like onBackpressureBuffer or onBackpressureDrop.
A common point of confusion involves threads. By default, operations may execute on whatever thread the source uses. To prevent a long-running operation from blocking other tasks, you must explicitly switch threads.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class ThreadControl {
static String blockingIoOperation(int id) {
try {
Thread.sleep(1000); // Simulate a blocking call
} catch (InterruptedException e) {}
return "Result-" + id;
}
public static void main(String[] args) throws InterruptedException {
System.out.println("Starting on thread: " + Thread.currentThread().getName());
Flux.range(1, 3)
.publishOn(Schedulers.boundedElastic()) // Switch to a worker thread pool for I/O
.map(id -> {
System.out.println("Map on thread: " + Thread.currentThread().getName());
return blockingIoOperation(id);
})
.subscribeOn(Schedulers.parallel()) // Where the initial subscription is handled
.subscribe(result -> System.out.println("Got: " + result));
// Keep main thread alive
Thread.sleep(4000);
}
}
The publishOn operator affects all operations downstream of it. The subscribeOn operator influences the thread context for the entire chain up to the next publishOn. Use dedicated thread pools for blocking operations to keep your main event loops free.
There’s an important distinction between two kinds of data sources. A cold source, like a result from a database query, starts fresh for each subscriber. A hot source, like a live message bus, broadcasts ongoing events, and late subscribers might miss the beginning.
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class HotAndCold {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Cold Publisher ===");
// Each subscriber triggers a new, independent sequence
Flux<Long> coldFlux = Flux.interval(Duration.ofMillis(400)).take(3);
coldFlux.subscribe(i -> System.out.println("Subscriber A: " + i));
Thread.sleep(500); // Wait a bit before second subscription
coldFlux.subscribe(i -> System.out.println("Subscriber B: " + i));
Thread.sleep(1500);
System.out.println("\n=== Hot Publisher ===");
// One source, shared among subscribers
ConnectableFlux<Long> hotSource = Flux.interval(Duration.ofMillis(400)).publish();
hotSource.connect(); // Start emitting now, regardless of subscribers
Thread.sleep(300); // Let the source start emitting
hotSource.subscribe(i -> System.out.println("Hot Subscriber A: " + i));
Thread.sleep(500);
hotSource.subscribe(i -> System.out.println("Hot Subscriber B: " + i)); // Misses the first emissions
Thread.sleep(1000);
}
}
Use cold publishers for request-response patterns. Use hot publishers for event broadcasting, like chat messages or stock ticks. The publish() method turns a cold flux into a ConnectableFlux, which only starts when you call connect().
Testing this kind of asynchronous code can seem daunting. Fortunately, there are tools built for this purpose. You can verify the exact sequence, timing, and values emitted by a stream.
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
public class TestingStreams {
public static void main(String[] args) {
Flux<String> fluxUnderTest = Flux.just("alpha", "beta", "gamma")
.delayElements(Duration.ofMillis(100))
.map(String::toUpperCase);
StepVerifier.create(fluxUnderTest)
.expectNext("ALPHA")
.expectNextMatches(s -> s.startsWith("B"))
.expectNext("GAMMA")
.expectComplete() // Verify the stream completes normally
.verify(Duration.ofSeconds(1)); // Timeout the test if it takes too long
}
}
The StepVerifier subscribes to your stream and lets you assert each event in order. You can also verify errors, specific numbers of items, or that nothing happens for a given period. This is indispensable for creating reliable, deterministic tests.
Most applications have some legacy or library code that is blocking. You can’t always rewrite it immediately. The strategy is to isolate it, run it on a dedicated thread, and wrap it in a reactive type.
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class WrappingBlockingCode {
// A pretend blocking legacy method
static String legacyBlockingMethod() {
try {
Thread.sleep(2000); // Simulates a long database call
} catch (InterruptedException e) {}
return "Legacy Result";
}
public static void main(String[] args) throws InterruptedException {
System.out.println("Calling blocking code reactively...");
Mono<String> safeWrapper = Mono.fromCallable(() -> legacyBlockingMethod())
.subscribeOn(Schedulers.boundedElastic()) // Run on a blocking-friendly thread pool
.timeout(Duration.ofSeconds(3)); // Add a timeout for safety
safeWrapper.subscribe(
result -> System.out.println("Got: " + result),
error -> System.err.println("Failed: " + error.getMessage())
);
// The main thread is free to do other work
System.out.println("Main thread can continue immediately.");
Thread.sleep(3000);
}
}
The fromCallable operator defers the execution of the blocking call. The subscribeOn moves that execution to a thread pool designed for such work, preventing it from stalling other operations. Always consider adding a timeout.
For communicating with external services over HTTP, the reactive stack provides a non-blocking client. It’s far more efficient than traditional clients when making many concurrent calls.
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
public class ReactiveWebClientDemo {
public static void main(String[] args) {
WebClient client = WebClient.create("https://jsonplaceholder.typicode.com");
Mono<String> userData = client.get()
.uri("/users/1")
.retrieve()
.bodyToMono(String.class)
.doOnNext(response -> System.out.println("Received: " + response.substring(0, 50) + "..."))
.timeout(Duration.ofSeconds(5));
userData.subscribe(
data -> System.out.println("Processing complete."),
error -> System.err.println("Error fetching user: " + error.getMessage())
);
// In a real app, you'd often combine multiple such calls
Mono<String> postData = client.get().uri("/posts/1").retrieve().bodyToMono(String.class);
Mono.zip(userData, postData).subscribe(tuple -> {
System.out.println("Fetched both user and post.");
});
}
}
The WebClient returns a Mono or Flux, making it easy to integrate into a larger reactive pipeline. You can chain, combine, or transform these network calls just like any other stream.
Finally, data access can also be reactive. While traditional JDBC is blocking, newer drivers allow for non-blocking database communication.
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.query.Param;
// Define a reactive repository interface
public interface ReactiveUserRepository extends ReactiveCrudRepository<User, Long> {
// Derived query method
Flux<User> findByEmail(String email);
// Custom query
@Query("SELECT * FROM users WHERE registration_date > :afterDate")
Flux<User> findUsersRegisteredAfter(@Param("afterDate") LocalDate afterDate);
}
// Usage in a service
@Service
public class UserService {
private final ReactiveUserRepository userRepository;
public UserService(ReactiveUserRepository userRepository) {
this.userRepository = userRepository;
}
public Flux<User> getActiveUsers() {
return userRepository.findByActive(true)
.doOnNext(user -> System.out.println("Processing user: " + user.getName()));
}
}
Instead of returning a List or a single object, these repository methods return a Flux or Mono. This means the database results can be processed as they stream in, reducing memory overhead and improving responsiveness. It’s important to check driver support for your specific database.
Adopting this stream-based approach requires a shift in thinking. You move from writing a sequence of commands to declaring a pipeline of transformations for data that may arrive asynchronously. The benefits—efficient resource use, inherent resilience patterns, and scalability under concurrent load—are significant, especially for modern, distributed applications. The initial learning curve is repaid by the robustness of the systems you can build. Start with simple streams, master error handling and backpressure, and you’ll find a powerful tool for tackling complex asynchronous challenges.