Reactive Messaging Examples for Quarkus
The articles service stores data in a Postgres database. Messages are sent between the microservices via Kafka.
Join the DZone community and get the full member experience.
Join For FreeQuarkus provides several different reactive messaging capabilities. The open-source project cloud-native-starter uses some of these capabilities in a sample application which is described in this article.
There a several easy to follow Quarkus guides about the topic ‘reactive messaging’. Tutorials are great, but the best way to learn new technologies is for me is to use them in simple applications. That’s why I’ve come up with a simple scenario.
Sample Application
The sample comes with a web application that displays links to articles with author information in a simple web application. The web application invokes the web-API service, which implements a backend-for-frontend pattern and invokes the articles' and authors' microservices. The articles service stores data in a Postgres database. Messages are sent between the microservices via Kafka. This diagram describes the high-level architecture:
One benefit of reactive models is the ability to update web applications by sending messages, rather than pulling for updates. This is more efficient and improves the user experience.
The following video shows how articles can be created via the REST API. The web application receives notifications and adds new articles to the page.
The next diagram explains the flow which I’ll go through in more detail in this article.
- The ‘submission’ API client invokes a REST endpoint of the ‘articles’ service to create a new article.
- After the article has been created, a message is sent to Kafka.
- The ‘web-API’ service has subscribed to Kafka messages so that a listener is invoked.
- When new articles are created, events are streamed to the web-app.
Let’s take a closer look at the used technologies.
Sending In-Memory Messages via Vert.X Event Bus
The ‘articles’ and the ‘web-API’ service have been implemented in Java with Quarkus. In both cases, I have used a clean architecture approach where the code of the microservice is organized into three packages. These packages are rather independent of each other and could be exchanged with other implementations.
- API: Contains the REST endpoints and handles incoming and outgoing messages.
- Business: Contains the business logic of the microservice and business entities.
- Data: Contains the code to access databases or other microservices.
After a new article has been stored in the Postgres database, a message is sent to Kafka. This is triggered by the business logic, but the actual code resides in the API layer. That’s why the business layer needs to send the message to the API layer first.
Quarkus provides a mechanism for beans to interact via asynchronous messages by enforcing loose-coupling. Check out the guide Asynchronous messaging between beans. This functionality is provided via Eclipse Vert.x which comes with Quarkus.
Here is the code that sends the event in memory to the API layer:
import io.vertx.axle.core.eventbus.EventBus;
...
EventBus bus;
...
private void sendMessageToKafka(Article article) {
bus.publish("com.ibm.articles.apis.NewArticleCreatedListener", article.id);
}
In the API layer the event can be consumed (see code):
xxxxxxxxxx
import io.quarkus.vertx.ConsumeEvent;
...
public void sendMessageToKafka(String articleId) {
...
}
Eclipse MicroProfile supports another mechanism for in-memory messages. The reason why I didn’t use it, in this case, was that I didn’t get it to work. For me, the @Outgoing annotation only worked on methods that are either triggered by an incoming event or by the platform (e.g. @PostConstruct).
In my case, I have to trigger this functionality from business logic. I’m not sure whether this is a missing feature in MircoProfile, a defect or user error. I’m trying to find this out.
The documentation mentions another reason when you should use the Vert.x event bus: “The asynchronous message passing feature allows replying to messages which are not supported by MicroProfile Reactive Messaging. However, it is limited to single-event behavior (no stream) and local messages.”
Sending Kafka Messages via Kafka API
Next, the API layer of the ‘web-API’ service needs to send the message to Kafka. To set up Kafka in Kubernetes, follow the instructions from my previous article Accessing Apache Kafka from Quarkus.
Eclipse MicroProfile Reactive Messaging provides the same @Outgoing annotation to do this, but I couldn’t get it to work since I have to trigger this functionality manually. I want to find out the reason for this as well.
As a workaround, I used the Kafka API instead. The usage is pretty straight forward. Unfortunately, it looks like the configuration is not read from the same ‘application.properties’ file which MicroProfile uses. Instead, I had to do this in the code:
xxxxxxxxxx
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
...
Vertx vertx;
name = "kafka.bootstrap.servers") (
String kafkaBootstrapServer;
KafkaProducer<String, String> producer;
...
void initKafkaClient() {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", kafkaBootstrapServer);
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = KafkaProducer.create(vertx, config);
}
...
public void sendMessageToKafka(String articleId) {
try {
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create("new-article-created", articleId);
producer.write(record, done -> System.out.println("Kafka message sent: new-article-created - " + articleId));
} catch (Exception e) {
// allow to run this functionality if Kafka hasn't been set up
}
}
Sending and Receiving Messages via MicroProfile
Next, the ‘web-API’ service needs to receive this message from Kafka. This part can be implemented very easily with MicroProfile Reactive Messaging via the annotation @Incoming. Here is the code:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Broadcast;
...
"new-article-created") (
"stream-new-article") (
public String process(String articleId) {
System.out.println("Kafka message received: new-article-created - " + articleId);
return articleId;
}
In the next step notifications about new articles need to be streamed to the web application. To do this, the event is forwarded to the streaming endpoint via the MicroProfile annotations @Outgoing and @Broadcast.
Check out the guide Using Apache Kafka with reactive messaging for more information about these annotations. From a developer experience, this is as easy as it can get. I like especially that the same annotations can be used for Kafka channels as well as in-memory messaging.
Sending Events to Web Applications via Server-Sent Events
The last step is to stream the events to web applications. This is done via Server-Sent Events and with Quarkus very easy to implement. The streaming endpoint receives the messages via @Channel and forwards them via @Produces(MediaType.SERVER_SENT_EVENTS) and @SseElementType (see code):
xxxxxxxxxx
import org.reactivestreams.Publisher;
import io.smallrye.reactive.messaging.annotations.Channel;
import org.jboss.resteasy.annotations.SseElementType;
...
public class NewArticlesStream {
"stream-new-article") Publisher<String> newArticles; (
"/server-sent-events") (
MediaType.SERVER_SENT_EVENTS) (
"text/plain") (
public Publisher<String> stream() {
return newArticles;
}
}
In the web application, the events can be consumed via EventSource. In my case, I only send the id of the article and refresh the list of displayed articles (see code). Alternatively, I also could send the complete article information in the event.
xxxxxxxxxx
let source = new EventSource(this.$store.state.endpoints.api + "server-sent-events");
let that = this;
source.onmessage = function (event) {
that.readArticles();
};
Next Steps
If you want to learn more about reactive programming and reactive messaging, try out the code yourself.
Opinions expressed by DZone contributors are their own.
Comments