Example of reactive web application. Java. Spring 5. Reactive Streams. Docker.
This project is intended to demonstrate best practices for building a reactive web application with Spring 5 platform.
In plain terms reactive programming is about non-blocking applications that are asynchronous and message-driven and require a small number of threads to scale vertically (i.e. within the JVM) rather than horizontally (i.e. through clustering).
A key aspect of reactive applications is the concept of backpressure which is a mechanism to ensure producers don’t overwhelm consumers. For example in a pipeline of reactive components extending from the database to the HTTP response when the HTTP connection is too slow the data repository can also slow down or stop completely until network capacity frees up.
Reactive programming also leads to a major shift from imperative to declarative async composition of logic. It is comparable to writing blocking code vs using the CompletableFuture from Java 8 to compose follow-up actions via lambda expressions.
For a longer introduction check the blog series “Notes on Reactive Programming” by Dave Syer.
“We look at Reactive Programming as one of the methodologies or pieces of the puzzle for Reactive [Systems] as a broader term.” Please read the ‘Reactive Manifesto’ and ‘Reactive programming vs. Reactive systems’ for more informations.
What is driving the rise of Reactive in Enterprise Java? Well, it’s not (all) just a technology fad — people jumping on the bandwagon with the shiny new toys. The driver is efficient resource utilization, or in other words, spending less money on servers and data centres. The promise of Reactive is that you can do more with less, specifically you can process higher loads with fewer threads. This is where the intersection of Reactive and non-blocking, asynchronous I/O comes to the foreground. For the right problem, the effects are dramatic. For the wrong problem, the effects might go into reverse (you actually make things worse). Also remember, even if you pick the right problem, there is no such thing as a free lunch, and Reactive doesn’t solve the problems for you, it just gives you a toolbox that you can use to implement solutions.
Spring Framework 5 includes a new spring-webflux module. The module contains support for reactive HTTP and WebSocket clients as well as for reactive server web applications including REST, HTML browser, and WebSocket style interactions.
On the server-side WebFlux supports 2 distinct programming models:
@RestController
public class BlogPostController {
private final BlogPostRepository blogPostRepository;
public BlogPostController(BlogPostRepository blogPostRepository) {
this.blogPostRepository = blogPostRepository;
}
@PostMapping("/blogposts")
Mono<Void> create(@RequestBody Publisher<BlogPost> blogPostStream) {
return this.blogPostRepository.save(blogPostStream).then();
}
@GetMapping("/blogposts")
Flux<BlogPost> list() {
return this.blogPostRepository.findAll();
}
@GetMapping("/blogposts/{id}")
Mono<BlogPost> findById(@PathVariable String id) {
return this.blogPostRepository.findOne(id);
}
}
Functional programming model is not implemented within this application. I am not sure if it is posible to have both models in one application.
Both programming models are executed on the same reactive foundation that adapts non-blocking HTTP runtimes to the Reactive Streams API.
WebFlux includes a functional, reactive WebClient that offers a fully non-blocking and reactive alternative to the RestTemplate. It exposes network input and output as a reactive ClientHttpRequest and ClientHttpRespones where the body of the request and response is a Flux
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class ApplicationIntegrationTest {
WebTestClient webTestClient;
List<BlogPost> expectedBlogPosts;
List<Project> expectedProjects;
@Autowired
BlogPostRepository blogPostRepository;
@Autowired
ProjectRepository projectRepository;
@Before
public void setup() {
webTestClient = WebTestClient.bindToController(new BlogPostController(blogPostRepository), new ProjectController(projectRepository)).build();
expectedBlogPosts = blogPostRepository.findAll().collectList().block();
expectedProjects = projectRepository.findAll().collectList().block();
}
@Test
public void listAllBlogPostsIntegrationTest() {
this.webTestClient.get().uri("/blogposts")
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.expectBodyList(BlogPost.class).isEqualTo(expectedBlogPosts);
}
@Test
public void listAllProjectsIntegrationTest() {
this.webTestClient.get().uri("/projects")
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.expectBodyList(Project.class).isEqualTo(expectedProjects);
}
@Test
public void streamAllBlogPostsIntegrationTest() throws Exception {
FluxExchangeResult<BlogPost> result = this.webTestClient.get()
.uri("/blogposts")
.accept(TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(TEXT_EVENT_STREAM)
.returnResult(BlogPost.class);
StepVerifier.create(result.getResponseBody())
.expectNext(expectedBlogPosts.get(0), expectedBlogPosts.get(1))
.expectNextCount(1)
.consumeNextWith(blogPost -> assertThat(blogPost.getAuthorId(), endsWith("4")))
.thenCancel()
.verify();
}
...
}
Please note that webClient is requesting Server-Sent Events (text/event-stream). We could stream individual JSON objects (application/stream+json) but that would not be a valid JSON document as a whole and a browser client has no way to consume a stream other than using Server-Sent Events or WebSocket.
Spring Data Kay M1 is the first release ever that comes with support for reactive data access. Its initial set of supported stores — MongoDB, Apache Cassandra and Redis
The repositories programming model is the most high-level abstraction Spring Data users usually deal with. They’re usually comprised of a set of CRUD methods defined in a Spring Data provided interface and domain-specific query methods.
In contrast to the traditional repository interfaces, a reactive repository uses reactive types as return types and can do so for parameter types, too.
public interface BlogPostRepository extends ReactiveSortingRepository<BlogPost, String>{
Flux<BlogPost> findByTitle(Mono<String> title);
}
The application is build by Travis. Pipeline is triggered on every push to master branch.
This application is using embedded mongo database. You do not have to install and run mongo database before you run the application locally.
You can use NON-embedded version of mongo by setting scope of ‘de.flapdoodle.embed.mongo’ to ‘test’. In this case you have to install mongo server locally:
$ brew install mongodb
$ brew services start mongodb
Run it:
$ cd reactive-company
$ ./mvnw spring-boot:run
Run application on local workstation with PCF Dev
You can adopt any CI pipeline you have to deploy your application on any cloud foundry instance, for example:
mvn cf:push [-Dcf.appname] [-Dcf.path] [-Dcf.url] [-Dcf.instances] [-Dcf.memory] [-Dcf.no-start] -Dcf.target=https://api.run.pivotal.io
I am running Docker Community Edition, version: 17.05.0-ce-mac11 (Channel: edge).
A swarm is a cluster of Docker engines, or nodes, where you deploy services. The Docker Engine CLI and API include commands to manage swarm nodes (e.g., add or remove nodes), and deploy and orchestrate services across the swarm. By running script bellow you will initialize a simple swarm with one node, and you will install services:
$ cd reactive-company
$ ./docker-swarm.sh
Portainer is a simple management solution for Docker, and is really simple to deploy:
$ docker service create \
--name portainer \
--publish 9000:9000 \
--constraint 'node.role == manager' \
--mount type=bind,src=/var/run/docker.sock,dst=/var/run/docker.sock \
portainer/portainer \
-H unix:///var/run/docker.sock
Visit http://localhost:9000
$ docker service ls
$ docker service scale stack_reactive-company=2
Now you have two tasks/containers running for this service.
$ docker service logs stack_reactive-company -f
You will be able to determine what task/container handled the request.
When using HTTP/1.1, by default, the TCP connections are left open for reuse. Docker swarm load balancer will not work as expected in this case. You will get routed to the same task of the service every time.
You can use ‘curl’ command line tool (NOT BROWSER) to avoid this problem.
The Swarm load balancer is a basic Layer 4 (TCP) load balancer. Many applications require additional features, like these, to name just a few:
Open your browser and navigate to http://localhost:8080
The response is resolved by HomeController.java and home.html.
Blog posts are fully resolved by the Publisher - Thymeleaf will NOT be executed as a part of the data flow
Projects are fully resolved by the Publisher - Thymeleaf will NOT be executed as a part of the data flow
Open your browser and navigate to http://localhost:8080/stream
This view is resolved by StreamController.java and sse.html template.
@GetMapping(value = "/stream/blog")
public String blog(final Model model) {
final Flux<BlogPost> blogPostStream = this.blogPostRepository.findAll().log();
model.addAttribute("blogPosts", new ReactiveDataDriverContextVariable(blogPostStream, 1000));
return "sse :: #blogTableBody";
}
@GetMapping(value = "/stream/project")
public String project(final Model model) {
final Flux<Project> projectStream = this.projectRepository.findAll().log();
model.addAttribute("projects", new ReactiveDataDriverContextVariable(projectStream, 1000));
return "sse :: #projectTableBody";
}
@GetMapping("/tail/blogposts")
Flux<BlogPost> tail() {
LOG.info("Received request: BlogPost - Tail");
try {
// Using tailable cursor
return this.blogPostRepository.findBy().log();
} finally {
LOG.info("Request pocessed: BlogPost - Tail");
}
}
$ curl http://localhost:8080/blogposts
or
$ curl -v -H "Accept: text/event-stream" http://localhost:8080/blogposts
$ curl http://localhost:8080/projects
or
$ curl -v -H "Accept: text/event-stream" http://localhost:8080/projects
$ curl -v -H "Accept: text/event-stream" http://localhost:8080/tail/blogposts
Run application first (by maven or docker)
$ ./mvnw gatling:execute
By default src/main/test/scala/com/idugalic/RecordedSimulation.scala will be run. The reports will be available in the console and in *html files within the ‘target/gatling/results’ folder
A possible log output we could see is:
As we can see the output of the controller method is evaluated after its execution in a different thread too!
@GetMapping("/blogposts")
Flux<BlogPost> list() {
LOG.info("Received request: BlogPost - List");
try {
return this.blogPostRepository.findAll().log();
} finally {
LOG.info("Request pocessed: BlogPost - List");
}
}
We can no longer think in terms of a linear execution model where one request is handled by one thread. The reactive streams will be handled by a lot of threads in their lifecycle. This complicates things when we migrate from the old MVC framework. We no longer can rely on thread affinity for things like the security context or transaction handling.