Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
The Evolution of Data Pipelines
Evolution of Data Partitioning: Traditional vs. Modern Data Lakes
A poison pill is a message deliberately sent to a Kafka topic, designed to consistently fail when consumed, regardless of the number of consumption attempts. Poison Pill scenarios are frequently underestimated and can arise if not properly accounted for. Neglecting to address them can result in severe disruptions to the seamless operation of an event-driven system. The poison pill for various reasons: The failure of deserialization of the consumed bytes from the Kafka topic on the consumer side. Incompatible serializer and deserializer between the message producer and consumer Corrupted records Data/Message was still being produced to the same Kafka topic even if the producer altered the key or value serializer. A different producer began publishing messages to the Kafka topic using a different key or value serializer. The consumer configured the wrong key or value deserializer, which is not at all compatible with the serializer on the message producer side. The consequences of poison pills if not handled properly: Consumer shutdown. When a consumer receives a poison pill message from the topic, it stops processing and terminates. If we surround the message consumption code with a try/catch block inside the consumer, log files get flooded with error messages and stack traces and, eventually excessive disk space consumption on the system or nodes in the cluster. The poison pill message will block the partition of the topic, stopping the processing of any additional messages. As a result, the processing of the message will be tried again, most likely extremely quickly, placing a heavy demand on the system’s resources. To prevent poison messages in Apache Kafka, we need to design our Kafka consumer application and Kafka topic-handling strategy in a way that can handle and mitigate the impact of problematic or malicious messages. Proper serialization: Use a well-defined and secure serialization format for your messages, such as Avro or JSON Schema. This can help prevent issues related to the deserialization of malformed or malicious messages by consumers. Message validation: Ensure that messages being produced to Kafka topics are validated to meet expected formats and constraints before they are published. This can be done by implementing strict validation rules or schemas for the messages. Messages that do not conform to these rules should be rejected at the producer level. Timeouts and deadlines: Set timeouts and processing deadlines for your consumers. If a message takes too long to process, consider it a potential issue and handle it accordingly. This can help prevent consumers from getting stuck on problematic messages. Consumer restart strategies: Consider implementing strategies for automatically restarting consumers who encounter errors or become unresponsive. Tools like Apache Kafka Streams and Kafka Consumer Groups provide mechanisms for handling consumer failures and rebalancing partitions. Versioned topics: When evolving your message schemas, create new versions of topics rather than modifying existing ones. This allows for backward compatibility and prevents consumers from breaking due to changes in message structure. When message loss is unacceptable, a code fix will be required to specifically handle the poison pill message. Besides, we can configure a dead letter queue (DLQ) and send the poison poll messages to it for retrying or analyzing the root cause. If the message or event loss is acceptable to a certain extent, then by executing the built-in kafka-consumer-groups.sh script from the terminal, we can reset the consumer offset either to “–to-latest” or to a specific time. Thus, by executing this, all the messages, including the poison pill will be skipped that have not been consumed so far. But we need to make sure that the consumer group is not active. Otherwise, the offsets of a consumer or consumer group won’t be changed. Shell kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ourMessageConsumerGroup --reset-offsets --to-latest –-topic myTestTopic –execute Or a specific time Shell kafka-consumer-groups.sh –-bootstrap-server localhost:9092 –-group ourMessageConsumerGroup –-reset-offsets –to-datetime 2023-07-20T00:00:00.000 –-topic myTestTopic –execute Hope you have enjoyed this read. Please like and share if you feel this composition is valuable.
Apache Kafka serves thousands of enterprises as the mission-critical and scalable real-time data fabric for machine learning infrastructures. The evolution of Generative AI (GenAI) with large language models (LLM) like ChatGPT changed how people think about intelligent software and automation. This blog post explains the relationship between data streaming and GenAI and shows the enormous opportunities and some early adopters of GenAI beyond the buzz. Generative AI (GenAI) and Data Streaming Let’s set the context first to have the same understanding of the buzzwords. [Note: My ChatGPT iPhone app generated this section. I slightly shortened and adjusted the content afterward. GenAI is perfect for summarizing existing content so that authors can spend time on new content (that ChatGPT does not know yet).] Natural Language Processing (NLP) ChatGPT, What Is NLP? NLP stands for Natural Language Processing. It is a subfield of artificial intelligence (AI) that focuses on interactions between computers and human language. NLP enables computers to understand, interpret, and generate human language in a meaningful and valuable way. Natural Language Processing involves a range of tasks, including: Text parsing and tokenization: Breaking down text into individual words or tokens. Named Entity Recognition (NER): Identifying and classifying named entities such as people, organizations, locations, and dates in the text. Sentiment analysis: Determining the sentiment or emotional tone expressed in text, whether positive, negative, or neutral. Machine translation: Translating text from one language to another. Question answering: Building systems that can understand and answer questions posed in natural language. Text generation: Creating human-like text or generating responses to prompts. NLP is crucial in applications such as chatbots, virtual assistants, language translation, information retrieval, sentiment analysis, and more. GenAI = Next Generation NLP (And More) ChatGPT, What Is Generative AI? Generative AI is a branch of artificial intelligence focused on creating models and systems capable of generating new content, such as images, text, music, or even entire virtual worlds. These models are trained on large datasets and learn patterns and structures to generate new outputs similar to the training data. That’s why the widespread buzzword is the Large Language Model (LLM). Generative AI is used for next-generation NLP and uses techniques such as generative adversarial networks (GANs), variational autoencoders (VAEs), and recurrent neural networks (RNNs). Generative AI has applications in various fields and industries, including art, design, entertainment, and scientific research. Apache Kafka for Data Streaming ChatGPT, What Is Apache Kafka? Apache Kafka is an open-source distributed streaming platform and became the de facto standard for event streaming. It was developed by the Apache Software Foundation and is widely used for building real-time data streaming applications and event-driven architectures. Kafka provides a scalable and fault-tolerant system for handling high volumes of streaming data. Kafka has a thriving ecosystem with various tools and frameworks that integrate with it, such as Apache Spark, Apache Flink, and others. Apache Kafka is widely adopted in use cases that require real-time data streaming, such as data pipelines, event sourcing, log aggregation, messaging systems, and more. Why Apache Kafka and GenAI? Generative AI (GenAI) is the next-generation NLP engine that helps many projects in the real world for service desk automation, customer conversation with a chatbot, content moderation in social networks, and many other use cases. Apache Kafka became the predominant orchestration layer in these machine learning platforms for integrating various data sources, processing at scale, and real-time model inference. Data streaming with Kafka already powers many GenAI infrastructures and software products. Very different scenarios are possible: Data streaming as data fabric for the entire machine learning infrastructure Model scoring with stream processing for real-time productions Generation of streaming data pipelines with input text or speech Real-time online training of large language models Let’s explore these opportunities for data streaming with Kafka and GenAI in more detail. Real-Time Kafka Data Hub for GenAI and Other Microservices in the Enterprise Architecture I already explored in 2017 (!) how “How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka”. At that time, real-world examples came from tech giants like Uber, Netflix, and Paypal. Today, Apache Kafka is the de facto standard for building scalable and reliable machine learning infrastructures across any enterprise and industry, including: Data integration from various sources (sensors, logs, databases, message brokers, APIs, etc.) using Kafka Connect connectors, fully-managed SaaS integrations, or any kind of HTTP REST API or programming language. Data processing leveraging stream processing for cost-efficient streaming ETL such as filtering, aggregations, and more advanced calculations while the data is in motion (so that any downstream application gets accurate information) Data ingestion for near real-time data sharing with various data warehouses and data lakes so that each analytics platform can use its product and tools. Building scalable and reliable end-to-end pipelines is today’s sweet spot of data streaming with Apache Kafka in the AI and Machine Learning space. Model Scoring With Stream Processing for Real-Time Predictions at Any Scale Deploying an analytic model in a Kafka application is the solution to provide real-time predictions at any scale with low latency. This is one of the biggest problems in the AI space, as data scientists primarily focus on historical data and batch model training in data lakes. However, the model scoring for predictions needs to provide much better SLAs regarding scalability, reliability, and latency. Hence, more and more companies separate model training from model scoring and deploy the analytic model within a stream processor such as Kafka Streams, KSQL, or Apache Flink: Dedicated model servers usually only support batch and request-response (e.g., via HTTP or gRPC). Fortunately, many solutions now also provide native integration with the Kafka protocol. Development Tools for Generating Kafka-Native Data Pipelines From Input Text or Speech Almost every software vendor discusses GenAI to enhance its development environments and user interfaces. For instance, GitHub is a platform and cloud-based service for software development and version control using Git. But their latest innovation is “the AI-Powered Developer Platform to Build, Scale, and Deliver Secure Software”: Github CoPilot X. Cloud providers like AWS provide similar tools. Similarly, look at any data infrastructure vendor like Databricks or Snowflake. The latest conferences and announcements focus on embedded capabilities around large language models and GenAI in their solutions. The same will be true for many data streaming platforms and cloud services. Low-code/no-code tools will add capabilities to generate data pipelines from input text. One of the most straightforward applications that I see coming is generating SQL code out of user text. For instance, “Consume data from Oracle table customer, aggregate the payments by customer, and ingest it into Snowflake.” This could create SQL code for stream processing technologies like KSQL or FlinkSQL. Developer experience, faster time-to-market, and support less technical personas are enormous advantages for embedding GenAI into Kafka development environments. Real-Time Training of Large Language Models (LLM) AI and Machine Learning are still batch-based systems almost all of the time. Model training takes at least hours. This is not ideal, as many GenAI use cases require accurate and updated information. Imagine googling for information today, and you could not find data from the past week. It is impossible to use such a service in many scenarios! Similarly, if I ask ChatGPT today (July 2023): “What is GenAI?” — I get the following response: As of my last update in September 2021, there is no specific information on an entity called “GenAi.” It’s possible that something new has emerged since then. Could you provide more context or clarify your question so I can better assist you? The faster your machine learning infrastructure ingests data into model training, the better. My colleague Michael Drogalis wrote an excellent deep-technical blog post: “GPT-4 + Streaming Data = Real-Time Generative AI” to explore this topic more thoroughly. This architecture is compelling because the chatbot will always have your latest information whenever you prompt it. For instance, if your flight gets delayed or your terminal changes, the chatbot will know about it during your chat session. This is entirely distinct from current approaches where the chat session must be reloaded or wait a few hours/days for new data to arrive. LLM + Vector Database + Kafka = Real-Time GenAI Real-time model training is still a novel approach. Many machine learning algorithms are not ready for continuous online model training today. Combining Kafka with a vector database enables the use of a batch-trained LLM together with real-time updates feeding up-to-date information into the LLM. Nobody will accept an LLM like ChatGPT in a few years, giving you answers like, “I don’t have this information; my model was trained a week ago.” It does not matter if you choose a brand new vector database like Pinecone or leverage the new vector capabilities of your installed Oracle or MongoDB storage. Feed data into the vector database in real-time with Kafka Connect and combine with a mature LLM to enable real-time GenAI with context-specific recommendations. Real-World Case Studies for Kafka and GenAI This section explores how companies across different industries, such as the carmaker BMW, the online travel and booking Expedia, and the dating app Tinder, leverage the combination of data streaming with GenAI for reliable real-time conversational AI, NLP, and chatbots leveraging Kafka. But technologies like ChatGPT make it much easier to adopt GenAI in real-world projects with much faster time-to-market and less cost and risk. Let’s explore a few of these success stories for embedding NLP and GenAI into data-streaming enterprise architectures. Disclaimer: As I want to show real-world case studies instead of visionary outlooks, I show several examples deployed in production in the last few years. Hence, the analytic models do not use GenAI, LLM, or ChatGPT as we know it from the press today. But the principles are precisely the same. The only difference is that you could use a cutting-edge model like ChatGPT with much improved and context-specific responses today. Expedia: Conversations Platform for Better Customer Experience Expedia is a leading online travel and booking. They have many use cases for machine learning. One of my favorite examples is their Conversations Platform, built on Kafka and Confluent Cloud to provide an elastic cloud-native application. The goal of Expedia’s Conversations Platform was simple: Enable millions of travelers to have natural language conversations with an automated agent via text, Facebook, or their channel of choice. Let them book trips, make changes or cancellations, and ask questions: “How long is my layover?” “Does my hotel have a pool?” “How much will I get charged to bring my golf clubs?” Then, take all that is known about that customer across all of Expedia’s brands and apply machine learning models to immediately give customers what they are looking for in real time and automatically, whether a straightforward answer or a complex new itinerary. Real-Time Orchestration Realized in Four Months Such a platform is no place for batch jobs, back-end processing, or offline APIs. To quickly make decisions that incorporate contextual information, the platform needs data in near real-time, and it needs it from a wide range of services and systems. Meeting these needs meant architecting the Conversations Platform around a central nervous system based on Confluent Cloud and Apache Kafka. Kafka made it possible to orchestrate data from loosely coupled systems, enrich data as it flows between them so that by the time it reaches its destination, it is ready to be acted upon, and surface aggregated data for analytics and reporting. Expedia built this platform from zero to production in four months. That’s the tremendous advantage of using a fully managed serverless event streaming platform as the foundation. The project team can focus on the business logic. The Covid pandemic proved the idea of an elastic platform: Companies were hit with a tidal wave of customer questions, cancellations, and re-bookings. Throughout this once-in-a-lifetime event, the Conversations Platform proved up to the challenge, auto-scaling as necessary and taking off much of the load of live agents. Expedia’s Migration From MQ to Kafka as Foundation for Real-Time Machine Learning and Chatbots As part of their conversations platform, Expedia needed to modernize their IT infrastructure, as Ravi Vankamamidi, Director of Technology at Expedia Group, explained in a Kafka Summit keynote. Expedia’s old legacy chatbot service relied on a legacy messaging system. This service was a question-and-answer board with a very limited scope for booking scenarios. This service could handle two-party conversations. It could not scale to bring all different systems into one architecture to build a powerful chatbot that is helpful for customer conversations. Expedia needed a service that was closer to travel assistance. It needed to handle context-specific, multi-party, multi-channel conversations. Hence, features such as natural language processing, translation, and real-time analytics are required. The full service needs to be scalable across multiple brands. Therefore, a fast and highly scalable platform with order guarantees, exact-once-semantics (EOS), and real-time data processing was needed. The Kafka-native event streaming platform powered by Confluent was the best choice and met all requirements. The new conversations platform doubled the Net Promoter Score (NPS) one year after the rollout. The new platform proved the business value of the new platform quickly. BMW: GenAI for Contract Intelligence, Workplace Assistance, and Machine Translation The automotive company BMW presented innovative NLP services at the Kafka Summit in 2019. It is no surprise that a carmaker has various NLP scenarios. These include digital contract intelligence, workplace assistance, machine translation, and customer conversations. The latter contains multiple use cases for conversational AI: Service desk automation Speech analysis of customer interaction center (CIC) calls to improve the quality Self-service using smart knowledge bases Agent support Chatbots The text and speech data is structured, enriched, contextualized, summarized, and translated to build real-time decision support applications. Kafka is a crucial component of BMW’s ML and NLP architecture. The real-time integration and data correlation enable interactive and interoperable data consumption and usage: BMW explained the key advantages of leveraging Kafka and its streaming processing library Kafka Streams as the real-time integration and orchestration platform: Flexible integration: Multiple supported interfaces for different deployment scenarios, including various machine learning technologies, programming languages, and cloud providers Modular end-to-end pipelines: Services can be connected to provide full-fledged NLP applications. Configurability: High agility for each deployment scenario Tinder: Intelligent Content Moderation, Matching, and Recommendations With Kafka and GenAI The dating app Tinder is a great example where I can think of tens of use cases for NLP. Tinder talked at a past Kafka Summit about their Kafka-powered machine learning platform. Tinder is a massive user of Kafka and its ecosystem for various use cases, including content moderation, matching, recommendations, reminders, and user reactivation. They used Kafka Streams as a Kafka-native stream processing engine for metadata processing and correlation in real-time at scale: A critical use case in any dating or social platform is content moderation for detecting fakes, filtering sexual content, and other inappropriate things. Content moderation combines NLP and text processing (e.g., for chat messages) with image processing (e.g., selfie uploads) or processes the metadata with Kafka and stores the linked content in a data lake. Both leverage Deep Learning to process high volumes of text and images. Here is what content moderation looks like in Tinder’s Kafka architecture: Plenty of ways exist to process text, images, and videos with the Kafka ecosystem. Chatbots could also play a key role “in the other way round.” More and more dating apps (and other social networks) fight against spam, fraud, and automated chatbots. Like building a chatbot, a chatbot detection system can analyze the data streams to block a dating app’s chatbot. Kafka as Real-Time Data Fabric for Future GenAI Initiatives Real-time data beats slow data. Generative AI only adds value if it provides accurate and up-to-date information. Data streaming technologies such as Apache Kafka and Apache Flink enable building a reliable, scalable real-time infrastructure for GenAI. Additionally, the event-based heart of the enterprise architecture guarantees data consistency between real-time and non-real-time systems (near real-time, batch, request-response). The early adopters like BWM, Expedia, and Tinder proved that Generative AI integrated into a Kafka architecture adds enormous business value. The evolution of AI models with ChatGPT et al. makes the use case even more compelling across every industry. How do you build conversational AI, chatbots, and other GenAI applications leveraging Apache Kafka? What technologies and architectures do you use? Are data streaming and Kafka part of the architecture? Let’s connect on LinkedIn and discuss it! Stay informed about new blog posts by subscribing to my newsletter.
Tokens are essential for secure digital access, but what if you need to revoke them? Despite our best efforts, there are times when tokens can be compromised. This may occur due to coding errors, accidental logging, zero-day vulnerabilities, and other factors. Token revocation is a critical aspect of modern security, ensuring that access remains in the right hands and unauthorized users are kept out. In this article, we'll explore how different methods, such as centralized control and distributed Kafka handling, play a vital role in keeping your systems and data safe. Access/Refresh Tokens I described more about using JWTs in this article. JWTs allow you to eliminate the use of centralized token storage and verify tokens in the middleware layer of each microservice. To mitigate the risks associated with token compromises, the lifetime of an Access Token is made equal to a small value of time (e.g., 15 minutes). In the worst case, after the token is leaked, it is valid for another 15 minutes, after which its exp will be less than the current time, and the token will be rejected by any microservice. To prevent users from being logged out every 15 minutes, a Refresh Token is added to the Access Token. This way, the user receives an Access Token/Refresh Token pair after successful authentication. When the Access Token's lifetime expires and the user receives a 401 Unauthorized response, they should request the /refresh-token endpoint, passing the Refresh Token value as a parameter and receiving a new Access Token/Refresh Token pair in response. The previous Refresh Token becomes inactive. This process reduces risk and does not negatively impact user experience. Revocation But there are cases when it is necessary to revoke tokens instantly. This can happen in financial services or, for example, in a user's account when he wants to log out from all devices. Here, we can't do it without token revocation. But how to implement a mechanism for revoking JWTs, which are by nature decentralized and stored on the user's devices? Centralized-Approach The most obvious and easiest way is to organize a centralized storage. It will be a blacklist of tokens, and each auth middleware layer will, besides signature validation and verification of token claims, go to this centralized repository to check whether the token is in the blacklist. And if it is, reject it. The token revocation event itself is quite rare (compared to the number of authorization requests), so the blacklist will be small. Moreover, there is no point in storing tokens in the database forever since they have an exp claim, and after this value, they will no longer be valid. If a token in your system is issued with a lifetime of 30 minutes, then you can store revoked tokens in the database for 30 minutes. Advantages Simplicity: This approach simplifies token revocation management compared to other solutions. Fine-grained control: You have fine-grained control over which tokens are revoked and when. Considerations Single point of failure: The centralized token revocation service can become a single point of failure. You should implement redundancy or failover mechanisms to mitigate this risk. Network overhead: Microservices need to communicate with the central service, which can introduce network overhead. Consider the impact on latency and design accordingly. Security: Ensure that the central token revocation service is securely implemented and protected against unauthorized access. This approach offers centralized control and simplicity in token revocation management, which can be beneficial for certain use cases, especially when fine-grained control over revocation is required. However, it does introduce some network communication overhead and requires careful consideration of security and redundancy. Decentralized-Approach (Kafka-Based) A more advanced approach, without a single point of failure, can be implemented with Kafka. Kafka is a distributed and reliable message log by nature. It permits multiple independent listeners and retention policy configurations to store only actual values. Consequently, a blacklist of revoked tokens can be stored in Kafka. When a token requires revocation, the corresponding service generates an event and adds it to Kafka. Middleware services include a Kafka listener that receives this event and stores it in memory. When authorizing a request, in addition to verifying the token's validity, there is no need to contact a centralized service. Revoked tokens are stored in memory, and locating the token in a suitable data structure is a quick process (if we use HashMap, it will be O(1)). It's unnecessary to store tokens in memory forever either, and they should be periodically deleted after their lifetime. But what if our service restarts and memory is cleared? The Kafka listener allows you to read messages from the beginning. When the microservice is brought back up, it will once again pull all messages from Kafka and use the actual blacklist. Advantages Decentralized: Using a distributed message broker like Kafka allows you to implement token revocation in a decentralized manner. Microservices can independently subscribe to the revocation messages without relying on a central authority. Scalability: Kafka is designed for high throughput and scalability. It can handle a large volume of messages, making it suitable for managing token revocations across microservices in a distributed system. Durability: Kafka retains messages for a configurable retention period. This ensures that revoked tokens are stored long enough to cover their validity period. Resilience: The approach allows microservices to handle token revocation even if they restart or experience temporary downtime. They can simply re-consume the Kafka messages upon recovery. Considerations Complexity: Implementing token revocation with Kafka adds complexity to your system. You need to ensure that all microservices correctly handle Kafka topics, subscribe to revocation messages, and manage in-memory token revocation lists. Latency: There might be a slight latency between the time a token is revoked and the time when microservices consume and process the revocation message. During this window, a revoked token could still be accepted. Scalability challenges: As your system grows, managing a large number of revocation messages and in-memory lists across multiple microservices can become challenging. You might need to consider more advanced strategies for partitioning and managing Kafka topics. The choice between the centralized token revocation approach and the Kafka-based approach depends on your specific use case, system complexity, and preferences. The centralized approach offers simplicity and fine-grained control but introduces network overhead and potential single points of failure. The Kafka-based approach provides decentralization, scalability, and resilience but is more complex to implement and maintain. Conclusion In a world where digital security is paramount, token revocation stands as a critical defense. Whether you prefer centralized control or the distributed handling of Kafka, the core message remains clear: Token revocation is a vital part of robust security. By effectively managing and revoking tokens, organizations can fortify their defenses, safeguard sensitive data, and ensure that access remains in the right hands. As we wrap up our discussion on token revocation, remember that proactive security measures are a must in today's digital landscape. So, embrace token revocation to protect what matters most in our interconnected world.
In this tutorial, developers, solution architects, and data engineers can learn how to build high-performance, scalable, and fault-tolerant applications that react to real-time data using Kafka and Hazelcast. We will be using Wikimedia as a real-time data source. Wikimedia provides various streams and APIs (Application Programming Interfaces) to access real-time data about edits and changes made to their projects. For example, this source provides a continuous stream of updates on recent changes, such as new edits or additions to Wikipedia articles. Developers and solution architects often use such streams to monitor and analyze the activity on Wikimedia projects in real-time or to build applications that rely on this data, like this tutorial. Kafka is great for event streaming architectures, continuous data integration (ETL), and messaging systems of record (database). Hazelcast is a unified real-time stream data platform that enables instant action on data in motion by combining stream processing and a fast data store for low-latency querying, aggregation, and stateful computation against event streams and traditional data sources. It allows you to build resource-efficient, real-time applications quickly. You can deploy it at any scale from small edge devices to a large cluster of cloud instances. In this tutorial, we will guide you through setting up and integrating Kafka and Hazelcast to enable real-time data ingestion and processing for reliable streaming processing. By the end, you will have a deep understanding of how to leverage the combined capabilities of Hazelcast and Kafka to unlock the potential of streaming processing and instant action for your applications. So, let's get started! Wikimedia Event Streams in Motion First, let’s understand what we are building: Most of us use or read Wikipedia, so let’s use Wikipedia's recent changes as an example. Wikipedia receives changes from multiple users in real time, and these changes contain details about the change such as title, request_id, URI, domain, stream, topic, type, user, topic, title_url, bot, server_name, and parsedcomment. We will read recent changes from Wikimedia Event Streams. Event Streams is a web service that exposes streams of structured event data in real time. It does it over HTTP with chunked transfer encoding in accordance with the Server-Sent Events protocol (SSE). Event Streams can be accessed directly through HTTP, but they are more often used through a client library. An example of this is a “recentchange”. But what if you want to process or enrich changes in real time? For example, what if you want to determine if a recent change is generated by a bot or human? How can you do this in real time? There are actually multiple options, but here we’ll show you how to use Kafka to transport data and how to use Hazelcast for real-time stream processing for simplicity and performance. Here’s a quick diagram of the data pipeline architecture: Prerequisites If you are new to Kafka or you’re just getting started, I recommend you start with Kafka Documentation. If you are new to Hazelcast or you’re just getting started, I recommend you start with Hazelcast Documentation. For Kafka, you need to download Kafka, start the environment, create a topic to store events, write some events to your topic, and finally read these events. Here’s a Kafka Quick Start. For Hazelcast, you can use either the Platform or the Cloud. I will use a local cluster. Step #1: Start Kafka Run the following commands to start all services in the correct order: Markdown # Start the ZooKeeper service $ bin/zookeeper-server-start.sh config/zookeeper.properties Open another terminal session and run: Markdown # Start the Kafka broker service $ bin/kafka-server-start.sh config/server.properties Once all services have successfully launched, you will have a basic Kafka environment running and ready to use. Step #2: Create a Java Application Project The pom.xml should include the following dependencies in order to run Hazelcast and connect to Kafka: XML <dependencies> <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>5.3.1</version> </dependency> <dependency> <groupId>com.hazelcast.jet</groupId> <artifactId>hazelcast-jet-kafka</artifactId> <version>5.3.1</version> </dependency> </dependencies> Step #3: Create a Wikimedia Publisher Class Basically, the class reads from a URL connection, creates a Kafka Producer, and sends messages to a Kafka topic: Java public static void main(String[] args) throws Exception { String topicName = "events"; URLConnection conn = new URL ("https://stream.wikimedia.org/v2/stream/recentchange").openConnection(); BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8)); try (KafkaProducer<Long, String> producer = new KafkaProducer<>(kafkaProps())) { for (long eventCount = 0; ; eventCount++) { String event = reader.readLine(); producer.send(new ProducerRecord<>(topicName, eventCount, event)); System.out.format("Published '%s' to Kafka topic '%s'%n", event, topicName); Thread.sleep(20 * (eventCount % 20)); } } } private static Properties kafkaProps() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "127.0.0.1:9092"); props.setProperty("key.serializer", LongSerializer.class.getCanonicalName()); props.setProperty("value.serializer", StringSerializer.class.getCanonicalName()); return props; } Step #4: Create a Main Stream Processing Class This class creates a pipeline that reads from a Kafka source using the same Kafka topic, and then it filters out messages that were created by bots (bot:true), keeping only messages created by humans. It sends the output to a logger: Java public static void main(String[] args) { Pipeline p = Pipeline.create(); p.readFrom(KafkaSources.kafka(kafkaProps(), "events")) .withNativeTimestamps(0) .filter(event-> Objects.toString(event.getValue()).contains("bot\":false")) .writeTo(Sinks.logger()); JobConfig cfg = new JobConfig().setName("kafka-traffic-monitor"); HazelcastInstance hz = Hazelcast.bootstrappedInstance(); hz.getJet().newJob(p, cfg); } private static Properties kafkaProps() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "127.0.0.1:9092"); props.setProperty("key.deserializer", LongDeserializer.class.getCanonicalName()); props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName()); props.setProperty("auto.offset.reset", "earliest"); return props; } Step #5: Enriching a Stream If you want to enrich real-time messages with batch or static data such as location details, labels, or some features, you can follow the next step: Create a Hazelcast Map and load static data into it. Use the Map to enrich the Message stream using mapUsingIMap. Conclusion In this post, we explained how to build a real-time application to process Wikimedia streams using Kafka and Hazelcast. Hazelcast allows you to quickly build resource-efficient, real-time applications. You can deploy it at any scale, from small-edge devices to a large cluster of cloud instances. A cluster of Hazelcast nodes shares the data storage and computational load, which can dynamically scale up and down. Referring to the Wikimedia example, it means that this solution is reliable, even when there are significantly higher volumes of users making changes to Wikimedia. We look forward to your feedback and comments about this blog post!
In the world of business, a comparison of Snowflake and Data Bricks is important because it improves data analysis and business management. Organizations, companies, and businesses need a strategy to gather all the data in one place that is to be analyzed. Cloud-based data systems Snowflake and Data Bricks are industry leaders. However, it is important to understand which data platform is the best for your company. Both Snowflake and Data bricks provide the quantity, speed, and quality that business applications require, but there are some variations and some parallels. The founder of Apache Spark founded the enterprise software Business Database. It is famous for using aspects of data lakes and data warehouses in a lake house architecture. Data warehouse business Snowflake provides cloud-based storage and gives services with less difficulty. It provides secure access to data and requires minimal maintenance. In this article, you will get a detailed comparison between Snowflake and Data Bricks. Here, you will get the benefits of each product so you can decide which one is the best for your company or business. Let’s start and take a look at their introduction: What Is Snowflake? Snowflake is a fully managed service that provides unlimited workloads for simple integration, loading, analysis, and sharing of data. Data lakes, data engineering, data application development, data science and security, and the use of shared data are its typical uses. Snowflake naturally separates computing and storage. With this architecture, you can give your user's data workload access to a copy of your data without any negative performance. It enables you to run your data solutions across multiple locations and clouds. It offers many options for interacting with many Snowflake users and also shares datasets and data services. Features Decision-Making Data-Driven You can eliminate data storage and give everyone in the business access to useful insights with the help of Snowflakes. It is important to make partner relationships, optimize pricing, reduce costs, and increase sales. Improving Speed and Quality of Analytics You can strengthen your analytics pipeline with Snowflake by switching from nightly batch loads to real-time data streams. You can make your business secure control access to your data warehouse and improve the quality of analytics at work. Improved Data Exchange You can create your own data exchange with Snowflake. It allows for to secure transfer of live and regulated data. It develops strong data connections with partners, clients, and other businessmen. It allows you to take a full view of your customer and provides information about customer characteristics and interests, occupations, and other useful things. Useful Products and User Experiences You can understand user behavior and products with Snowflake. You can use the entire dataset to satisfy customers, expand your product line, and drive data science. Better Security Compliance and cyber security data can be centralized in a secure data lake. Fast incident response is guaranteed by Snowflake Data Lakes. Aggregates large amounts of log data in one place and helps to get a complete picture of an incident quickly. It combines semi-structured logs and structured enterprise data into a single data lake. Through Snowflake, you can easily edit or change data after it is imported. What Are Data Boxes? Apache Spark powers Data Bricks, a cloud-based data platform. It focuses on big data analytics and collaborations. You can provide a complete data science workspace for this. Business analysts, Data Scientists, and Data Engineers communicate using Data bricks’ machine, learning runtime, controlled ML flow, and collaborative notebooks. Data Frames and Spark SQL libraries allow you to deal with structured data, which are stored in Data Bricks. In addition to creating Artificial intelligence, Data bricks help to draw conclusions from your existing data. Data Bricks offers many libraries and machine learning, including TensorFlow, PyTorch, and others, for building and training machine learning models. Many business clients use Data Bricks to accomplish different production processes across many sectors like healthcare, media and entertainment, finance, retail, and more. Features Delta Lake Data Bricks is a transactional storage layer that is open source and designed to be used for data lifecycle. This layer is used to provide data reliability to your existing data lake. Interactive Notebooks If you have the right language and tools, you can access your data quickly. You can easily analyze it and build models with others. You can share fresh and useful insights. Scala, R, SQL, and Python are just a few languages supported by Data Bricks. Machine Learning Data Bricks give you access to the pre-configured machine learning environment and provide access to Tensor Flow, Scikit-Learn, and Pytorch. You can share and monitor experiments, manage models, and replicate runs from a single central repository. Improved Spark Engine Data Bricks provides you latest versions of Apache Spark. If you get access to multiple cloud service providers, you can quickly set up clusters and build a managed Apache Spark environment. Clusters can be tuned with Data bricks. There is no need for constant monitoring and maintaining performance. Difference Between Snowflake and Data Bricks Architecture Snowflake is an ANSI SQL-based serviceless system with completely separate storage and compute processing layers. In Snowflake, each virtual warehouse locally uses massively parallel processing (MPP) to execute queries. Snowflake uses micro partitions for internal data organization in a compressed columnar format that is stored in the cloud. Snowflakes maintains all aspects of data management, including file size, compression, structure, metadata, statistics, and other items that are not visible to users and only to SQL queries. Virtual warehouses, which are compute clusters consisting of many MPP nodes, are used to perform all processing within Snowflake. Both Snowflake and Data Bricks are SaaS solutions. However, Data Bricks has a very different architecture than those built on Spark. The multi-language engine called Spark can be deployed in the cloud and is based on single nodes or clusters. Data Bricks currently uses AWS, GCP, and Azure, as well as Snowflake. Its structure is made of a control plane and a data plane. All processed data resides in the data plane, while all back-end services managed by Data Bricks Serverless Computing reside in a control plane. Serverless computing enables administrators to create serverless SQL endpoints that are fully managed by Data bricks and offer instant computing. While computational resources for the majority of other Data bricks calculations are shared within a cloud account or traditional data plane, these resources are shared in a serverless data plane. The architecture of Data bricks consists of several main parts: Data bricks Delta Lake Data Brick's Delta Engine ML Flow Data Structure We can save semi-structured and structured files by using Snowflake without the need for an ETL tool to sort data before importing it into EDW. Snowflake immediately transforms the data into its structured form when it is collected. Unlike Data Lake, Snowflake doesn’t require you to structure your unstructured data before you can load and interact with it. You can also use Data Bricks as an ETL tool to structure your unstructured data so it can be used by other means like Snowflake. In the debate between Data Bricks and Snowflake, Data Bricks dominates Snowflake in terms of data structure. Ownership of Data Snowflake has separate processing and storage layers, which allows it to grow independently on the cloud. Snowflake secures access to data and machine resources using role-based access control (RBAC) techniques. Data Bricks’ data processing and storage layers are fully decoupled, unlike the decoupled layers in Snowflake. Users can put their data anywhere in any format, and Data Bricks will handle it efficiently because it is primarily a data application. If we make a comparison between Data Bricks and Snowflake, we clearly see that Data Bricks is easy to use and process data. Data Protection Time travel and failsafe are two unique features of Snowflake. Snowflake's time travel function keeps the data in a state before the update. While enterprise clients can choose a period of up to 90 days, time travel is often limited to one day. Databases, schemas, and tables can all use this capability. When the time travel retention period expires, a 7-day fail-safe period begins, designed to protect and restore previous data. Data bricks work like Snowflake's time travel feature, also Delta Lakes. Data stored in Delta Lake is automatically versioned, allowing users to retrieve previous data versions for future use. Data bricks run on Spark, and because Spark is built on object-level storage, Data bricks never store any data. This is one of its main advantages. It also shows that Data bricks can handle the use cases of on-premise systems. Security Snowflake automatically controls all the data. All communication between the control plane and the data plane takes place within the cloud provider's private network, and all data stored within the data bricks is secured. Both options offer RBAC (Role-Based Access Control). Snowflake and Data Bricks adhere to multiple laws and certifications, including SOC 2 Type II, ISO 27001, HIPAA, and GDPR. However, Data bricks operate on top of object-level storage such as AWS S3, Azure Blob Storage, Google Cloud Storage, etc. Unlike Snowflake, it doesn't have a storage layer. Performance It is difficult to compare Snowflake and Data Bricks in terms of performance. In the case of head-to-head comparison, Snowflake and Data Bricks support slightly different use cases and are not superior to others. Snowflake may be a preferred option because it optimizes all storage for accessing data at the time of ingestion. Use the Case BI and SQL use cases are well supported by Data Bricks and Snowflake. Snowflake provides JDBC and ODBC drivers that are easy to integrate with other software. Given that users do not need to manage the program, it is popular for its use cases in BI and businesses choosing a straightforward analytics platform. The open-source Delta Lake released by Data Bricks meanwhile adds an extra layer of stability to their data lake. Users can send SQL queries to Delta Lake with excellent performance. Given its variety and advanced technology, Data Bricks is known for its use cases that minimize vendor lock-in, are better suited for ML workloads, and support tech giants. Result The best data analysis tools include Snowflake and Data Bricks. Each has advantages and disadvantages. Usage patterns, data volumes, workloads, and data strategy come into play when deciding which platform is ideal for your business. Snowflake is best suited for people who have experience with SQL and for general data manipulation and analysis. Streaming, ML, AI, and data science workloads are better suited to Data Bricks due to its Spark engine, which supports the use of multiple languages. To catch up with other languages, Snowflake has introduced support for Python, Java, and Scala. Some claim that Snowflake reduces storage during ingestion, so it's better for interactive queries. Additionally, it excels in generating reports and dashboards and managing BI workloads. In terms of data warehousing, it performs well. However, some users have noted that it suffers from large amounts of data, similar to what is seen in streaming applications. Snowflake's victory in direct competition is based on data warehousing skills. However, Data Bricks is not actually a data warehouse. Its data platform is more comprehensive and has superior ELT, data science, and machine learning capabilities than Snowflake. Users do not control the cost of managed object storage where they store their data. Data leaks and data processing are important topics. However, it is specifically targeted at data scientists and highly skilled analysts. Finally, the success of Data bricks for a technical audience. Both tech-savvy and non-tech-savvy users can easily use Snowflake. Almost all of the data management features offered by Snowflake are available through Data Bricks and more. But it's more difficult to do, involves a higher learning curve, and requires more maintenance. However, it can handle a much larger range of data workloads and languages. And those familiar with Apache Spark will gravitate towards Data bricks. Snowflake is ideal for users who want to quickly install a good data warehouse and analytics platform without getting bogged down in setup, data science details, or manual setup. It also doesn't claim that Snowflake is a simple tool for new users. Absolutely not. It is not as advanced as Data bricks. That platform is more suitable for complex data engineering, ETL, data science, and streaming applications. Snowflake is a data warehouse for analytics that stores production data. Additionally, it is beneficial for individuals who want to start small and ramp up gradually, as well as for beginners.
In today's data-driven world, efficient data processing plays a pivotal role in the success of any project. Apache Spark, a robust open-source data processing framework, has emerged as a game-changer in this domain. In this article, we will delve into strategies to ensure that your data pipeline is resource-efficient, cost-effective, and time-efficient. This guide is valuable for both data beginners and professionals, serving as a checklist for optimization opportunities. While most of the advice here applies universally to Spark usage, we'll point out specific differences that may affect your application's behavior when using Scala or Python API or writing Spark SQL. Optimizing Data Input Make Use of Data Format In most cases, the data being processed is stored in a columnar format. While this format may not be ideal when you only need to retrieve a few rows from a large partition, it truly excels in analytical use cases. This is particularly beneficial when you have to examine the majority of input rows but are solely interested in a specific subset of columns, which can often number in the hundreds or even thousands within a table. Spark takes full advantage of this storage property by exclusively reading the columns that are involved in subsequent computations. To ensure efficiency, it's crucial to review your execution plan and verify that the earliest project statements are selecting only the columns you will require moving forward. This practice avoids the unnecessary burden of Spark reading hundreds of columns only to discard them later in the process. Don’t Forget About Partitions Now that we've optimized our column selection, let's delve into the efficient retrieval of the necessary rows. As previously mentioned, columnar storage isn't the most efficient choice when you only need to access a few rows out of a massive dataset, and this remains true. However, there are strategies to enhance this situation by cleverly organizing your data. Imagine you're working with sales data collected in hourly intervals from multiple city supermarkets. In your data pipeline, you avoid mixing data from different shops and hours, which is a good practice. Early in the pipeline, you likely perform filtering, but you can take this a step further to prevent Spark from reading superfluous data entirely. By partitioning your data based on both the hour and supermarket_id, Spark can skip irrelevant data with a simple check against the Metastore. However, exercise caution regarding the cardinality of your partitions. If possible, keep the cardinality of your partitioning column in the single or double digits; this makes it an excellent candidate for partitioning. However, if it could potentially reach tens of thousands, consider devising effective aggregation strategies. Returning to the earlier example, envision a supermarket chain like Walmart, where using supermarket_id for partitioning might result in over 10,000 partitions. In such cases, opting for a different partitioning approach, such as using the state, may be more efficient. For instance, in the United States, you can reasonably assume that the cardinality won't exceed 50. To ensure that filter pushdown operates as expected, review your execution plan. In the initial lines where the dataset is loaded, you'll find information like PushedFilters: [*EqualTo(state, CA)]. This confirms that your filtering optimizations are being implemented as intended. Efficient Data Processing Reduce Shuffles Shuffle operations are expensive, so aim to minimize their impact. Reduce the data size before shuffling, apply filtering operations, and consider using "byKey" methods like reduceByKey or aggregateByKey to avoid shuffles whenever possible. You probably won’t be able to get rid of all the shuffles altogether, so consider smart partitioning — it can save resources in future operations. Use Broadcast Continuing our discussion on shuffling, it's essential to consider the option of broadcasting one of your datasets if it's sufficiently small. Broadcasting it to all workers can eliminate the need for a costly shuffle when joining it with your larger "big data" portion. However, it's crucial to navigate the nuances between Spark SQL and the Spark Dataset API in this context. In the Dataset API, using the broadcast method will attempt to broadcast your variable, but it will throw an exception if it doesn't fit in memory. On the other hand, in Spark SQL, the /* BROADCAST(dataset) */ notation functions as a hint. If broadcasting fails to fit in memory, Spark will revert to a regular join. In situations where your supposedly small broadcast dataset unexpectedly grows in size, the outcomes differ between the two approaches. With the Dataset API, you'll notice job failures in Spark, which, while disruptive, clearly highlight the issue. In contrast, in Spark SQL, jobs may become slower, offering greater stability but potentially obscuring the problem for some time. Therefore, understanding these distinctions is crucial when deciding on the broadcasting strategy for your specific use case. Avoid Skews Now, let's delve into the critical aspect of dataset sizes and address the issue of data skews. The most obvious way to identify them is by looking at job processing stats. If you see huge differences between median and max time or input size, or out of 1000 jobs 998 finish in minutes, but 2 leftovers are taking more than an hour — that’s a good indicator of a skew. You can see a skew example below: The longest-running task takes a staggering 7 seconds and reads a massive 116 MB of inputs, whereas the median task processes in just 50 milliseconds and handles a mere 4 KB of data. And here’s an example of a “fixed” pipeline - now the max is just 2 seconds and 10 Mb — still much bigger than a median, but nowhere near the scale it previously was on: Data skews occur when one group being shuffled is disproportionately larger than the others. This can lead to issues such as Out-of-Memory (OOM) errors on a worker node if it cannot accommodate the entire chunk in memory. More commonly, it results in data spillover to disk, causing extended serialization times and excessive time spent in garbage collection (GC). To address data skews, consider techniques like key salting. While we won't delve into the details here, key salting involves introducing a random component to problematic keys and redistributing the data based on the modified keys. This can effectively mitigate skew-related performance bottlenecks. Note: Spark 3.2.0 has Adaptive Query Execution (AQE) enabled by default which should help deal with skewness, but if you see weird task stats - try going with manual salting. Cache Wisely It's important to understand that when you interact with the Spark API, creating new datasets and saving them to variables doesn't actually save any state; instead, you're storing a Directed Acyclic Graph (DAG) of computations required to obtain specific results. Consequently, when you reuse the same variable, Spark will redundantly recompute the same data multiple times. To address this redundancy, it's advisable to cache your dataset when you observe this recomputation pattern. Spark provides various caching options, allowing you to store the dataset either in memory or serialize it to disk. While in-memory caching offers the fastest access, keep in mind that Spark monitors memory consumption and may evict datasets from memory if necessary, as they can be recomputed when needed later. Therefore, it's important to strike a balance and avoid attempting to cache everything all the time, as doing so may not yield the expected performance improvements. Instead, judiciously apply caching to specific datasets that are reused frequently to optimize your Spark application's efficiency. Data Schema and Processing Once More About Formats While it might initially seem enticing to adopt a "flexible" schema, such as storing complex objects as JSON strings, it's essential to consider the long-term implications, especially as your data matures and stabilizes. At this point, it becomes advisable to select the most suitable data format. Opting for native arrays or maps consistently delivers the best performance. This choice offers several advantages, including substantial resource savings by eliminating the need for repetitive deserialization. Additionally, it avoids the overhead of instantiating costly Serialization/Deserialization (SerDe) libraries within User-Defined Functions (UDFs). Furthermore, embracing native arrays or maps can provide added benefits in the future. As Spark continues to evolve and integrate with diverse data formats, you may unlock further optimizations. For instance, this could involve features like the pushdown of reading specific keys within a map, enhancing your application's efficiency and reducing processing overhead. By selecting the optimal data format, you not only improve your current performance but also position yourself for potential performance enhancements in the evolving Spark ecosystem. Avoid UDFs I strongly recommend minimizing the use of UDFs whenever possible, particularly if the same functionality can be achieved using native Spark SQL functions. It's crucial to understand that native Spark functions, while somewhat limited in their capabilities, should be your primary choice, and the utilization of UDFs should be a last resort. There are two key reasons for this recommendation: representation and potential optimizations. Spark manages in-memory data using its proprietary native format. When you call a UDF, each row of data must be transferred to the Java Virtual Machine (JVM) for execution of the function, and afterward, the result is serialized back into Spark's native format. As you can imagine, this process incurs significant computational costs and resource overhead. Moreover, UDFs present a challenge for Spark's optimizers. These optimizers lack visibility into the inner workings of UDFs, rendering them opaque. Consequently, Spark's built-in optimization techniques cannot be applied to UDFs, limiting the potential for performance improvements. By prioritizing native Spark SQL functions and minimizing the reliance on UDFs, you not only reduce computational expenses but also leverage Spark's optimization capabilities to enhance the efficiency of your data processing workflows. Choosing Execution Mode Batch or Streaming? As a final point, I'd like to recommend considering the most suitable execution mode for your needs: batch or streaming. This might initially seem like a significant shift, and indeed, it can be, but the Spark API is designed to abstract away many of the complexities associated with this choice. While Spark is inherently well-tuned for batch datasets (to the extent that even streaming is essentially micro-batching), it's typically the default choice. However, it's worth exploring streaming if your data processing primarily involves small aggregations, where most of the processing can be accommodated within the memory space of just one row. Additionally, if the "side" datasets you need to join with are relatively small, streaming could be a viable option. The decision between batch and streaming should take into account user expectations regarding data output and the significance of data arrival time as a metric. In essence, this choice revolves around aligning your data processing approach with the specific demands and preferences of your users and the importance of real-time data arrival in your application. In this article, I’ve tried briefly summarizing different ways to optimize your Spark application, reducing costs and processing times. While this isn’t an exhaustive list of all possible issues and mitigations, it gives you a good starting point for where to look. The last word of advice before you start your optimization journey would be to not rely on gut feeling, but benchmark your application and look into stats and metrics to know where the actual issues lie.
Spark is one of the most popular and widely used big data processing frameworks in the world. It has a large open-source community, with continuous development, updates, and improvements being made to the platform. Spark has gained popularity due to its ability to perform in-memory data processing, which significantly accelerated the data processing times compared to traditional batch processing systems like Hadoop MapReduce. However, all that glitters is not gold. Spark is well known for being one of the best data processing frameworks available in the market, thanks to its capacity to process batch data, but when it comes to streaming data, Spark can be challenging if you don’t have previous experience working with any streaming framework. The difficulty of learning Spark Streaming, like any other streaming framework, can vary depending on an individual’s background, experience, and familiarity with related concepts. When it comes to streaming data, it doesn’t matter if we have prior experience or if we are just starting. We need to know/learn about distributed systems, event-driven concepts, real-time data processing concepts, and, of course, the syntax for the specific framework that is being used. Whether you are an experienced engineer or you are just getting started, you need to know that you don’t have to worry about Spark Streaming. There is a solution that increases the performance of your streaming pipeline, decreases the complexity of your code, and matches perfectly with Spark. The solution is a streaming database. First things first. You may be wondering what a streaming database is. In a nutshell, a streaming database is a type of database that is designed to handle continuous and real-time data streams. There are many streaming databases in the market, but in this article, we are going to learn how one of the best streaming databases (RisingWave) can level up your streaming pipeline with less code. Streaming Pipeline Architecture With Spark Streaming Every company has its own needs, and the technologies vary depending on the specific use case, but in general, we can find the following architecture in any streaming pipeline. Data Producers Data producers (IoT devices, sensors, server logs, clickstream data, app activity) are the data sources that continuously generate data events into the pipeline. These data events are time-stamped and represent individual pieces of data that need to be processed, analyzed, or stored in real time. Stream Processors Stream processors (Kafka, Flink, Spark Streaming, Amazon Kinesis, Google Cloud Dataflow) are the main components responsible for ingesting data as it flows through the pipeline. Stream processors play a crucial role in achieving low-latency and high-throughput data processing in streaming applications. Data Transformations Data transformation (Spark, Databricks, Upsolver, Google Cloud BigQuery, AWS Glue) is the process of modifying, enriching, or reformatting the real-time data as it flows through the pipeline. Most of the time, the transformations are done in a Spark-friendly environment. Data transformations are the most difficult part to develop in every streaming pipeline. The reason is not because there is a lack of technologies but because there is a lack of knowledge on how to cleanse data, perform aggregations, filter, map, join, and improve the application performance according to the cluster’s hardware (Spark tuning). Data Consumers Data consumers (databases, data lakes, real-time dashboards) are the destinations of the processed data that flows through the pipeline. Once the data has been ingested, processed, and transformed in real-time, it is now sent to the data consumers for further analysis, storage, or, in most cases, visualization. Streaming Pipeline Architecture With RisingWave Now that we know the architecture of a streaming pipeline let’s learn how we can improve it by implementing a streaming database, in this case, RisingWave. As you can see, the data producers and data consumers are the same, but the stream processor has been replaced by the streaming database RisingWave. This small change is a huge improvement to the streaming pipeline. These are some of the improvements the pipeline got thanks to the implementation of a streaming database: The streaming and processing steps are done together. Spark can be used as a complement in the case a second processing is needed, and the use case requires Spark. The complexity of the streaming pipeline has been reduced. Streaming frameworks like Flink or Spark Streaming require a deep knowledge level, but streaming databases ease the complexity. Easier to maintain, develop, and test thanks to the database optimizer that RisingWave has. Building a Streaming Pipeline The Spark Streaming Approach Now that we have learned how to level up our streaming pipeline using a streaming database let’s review how easy it is to build a streaming pipeline using a streaming database versus all the challenges we face when the pipeline is developed without a streaming database. Let’s start by understanding how a streaming pipeline is developed without a streaming database and using Spark Streaming to stream and process the data. The first stage in any streaming database is the data producer. In this case, we will be processing the data from an IoT sensor located in a house. The dataset can be found on Kaggle as Room Occupancy Detection Data (IoT sensor), and it sensors the temperature, humidity, light, CO2 level, humidity ratio, and occupancy of the room every minute. This is an example of how the dataset looks like. The sensor delivers a new record every minute, so we need to process the streaming data. Let’s start building the Spark Streaming by adding all the libraries needed and declaring the schema for the streaming data. Python from pyspark.sql import SparkSession from pyspark.sql.types import StructType, IntegerType, TimestampType, DoubleType from pyspark.sql.functions import count, to_date, avg, round spark = SparkSession.builder.master("local[*]").appName("IoT Streaming Pipeline").getOrCreate() sensor_schema = StructType() \ .add("timestamp", TimestampType(), True) \ .add("temperature", DoubleType(), True) \ .add("humidity", DoubleType(), True) \ .add("light", DoubleType(), True) \ .add("co2", DoubleType(), True) \ .add("humidity_ratio", DoubleType(), True) \ .add("occupancy", IntegerType(), True) Now that we have the schema defined, we can declare the streaming data frame, and we will add another column to cast the timestamp column as a date type. Python sensor_streaming_df = spark \ .readStream \ .format("csv") \ .schema(sensor_schema) \ .option("header", True) \ .option("maxFilesPerTrigger", 1) \ .load("data/") sensor_streaming_df = sensor_streaming_df.withColumn("date", to_date("timestamp")) It’s important to understand how streaming data frames work. So, if you’re unfamiliar with the term, I highly recommend you read this article to get a better idea. It’s also important to mention that we need to define the specific options we need depending on the use case. In this case, the IoT sensor delivers data every minute, so we will be getting a new file every minute, and we need to read one file at a time. That’s the reason why we set the option “maxFilesPerTrigger” to 1. We are all set to start reading the streaming data, and we can visualize it by writing it to the console using the below command and waiting until all the data is processed. Python query = sensor_streaming_df.select("*").writeStream.format("console").start() query.awaitTermination() The data from the IoT sensor is streamed, processed, and printed in batches. Every batch is sent by the sensor, and Spark Streaming processes it. This is what the output looks like. Now, it’s time to apply some data transformations and do some aggregations. Since the sensor sends data every minute, let’s calculate the daily average temperature in Celsius, convert it to Fahrenheit, and then calculate the daily average value for the other metrics, too, using Spark Streaming. Python query = sensor_streaming_df\ .groupby("date")\ .agg(count("*").alias("records_count"), round(avg("temperature"), 2).alias("avg_c_temp"), round((9/5 * avg("temperature")) + 32, 2).alias("avg_f_temp"), round(avg("humidity"), 2).alias("avg_humidity"), round(avg("light"), 2).alias("avg_light"), round(avg("co2"), 2).alias("avg_co2"), round(avg("humidity_ratio"), 2).alias("avg_hum_ratio"))\ .writeStream.format("console").outputMode("complete").start() query.awaitTermination() Spark Streaming reads the stream batches and updates the aggregation values. Once all the stream data has been delivered, we can see the final results as below. The Streaming Database (RisingWave) Approach We have learned how a streaming database is built using Spark Streaming. Now it’s time to build the same streaming pipeline but level it up using the streaming database RisingWave (if you want to know how to get started with RisingWave, I recommend you to go through their getting started website). For this pipeline, we have the same IoT data producer; the data is streamed via a Kafka topic, and the streaming database processes the data. Let’s start by ingesting the data into the streaming database RisingWave. To start ingesting the data, we need to create a data source. The streaming data is delivered to the streaming database via the Kafka topic IoT, so let's establish the connection between the Kafka topic and the data source. Python CREATE SOURCE IF NOT EXISTS iot_stream ( timestamp timestamp, temperature double, humidity double, light double, co2 double, humidity_ratio double, occupancy double ) WITH ( connector = 'kafka', topic = 'iot', properties.bootstrap.server='127.0.0.1:9092', scan.startup.mode = 'earliest' ) ROW FORMAT JSON; Now that we have created the data source, we need to create the materialized view that will allow us to query the data, and here is where the magic happens. We can create the materialized view with the same transformations as we did with Spark Streaming, but easier and faster, thanks to the fact that RisingWave has a database optimizer that abstracts the complexity from the user. Python CREATE MATERIALIZED VIEW iot_sensor AS SELECT timestamp::date AS date, COUNT(*) AS records_count, ROUND(AVG(temperature)) AS avg_c_temp, ROUND((9/5.0 * AVG(temperature)) + 32) AS avg_f_temp, ROUND(AVG(humidity)) AS avg_humidity, ROUND(AVG(light)) AS avg_light, ROUND(AVG(co2)) AS avg_co2, ROUND(AVG(humidity_ratio)) AS avg_hum_ratio FROM iot_stream GROUP BY timestamp::date; With the materialized view created, we can see how the data is being streamed and processed at the same time. Once all the data has been streamed, we can query the materialized view, and we can see how the data has been processed in real time. Now, the data is ready to be delivered to the data consumers. RisingWave allows us to deliver the data in multiple ways using data sinks, or we can use Spark to store the data in multiple formats such as parquet, CSV, JSON, orc, etc. As you can see, using a streaming database reduces the development process difficulty, improves the performance and capabilities of the streaming database, and fits perfectly with the Spark architecture used in most of the streaming pipelines nowadays. Conclusion In this article, we learned how the streaming database RisingWave is the perfect match for any streaming pipeline because of its streaming and processing capabilities. RisingWave complements the Spark architecture and reduces the complexity and skill level needed when a streaming pipeline is built using only Spark Streaming, such as Spark tunning, to improve the performance of a Spark application. All of this is thanks to the database optimizer implemented that gets rid of the complexity of the streaming process from the user.
This article outlines a solution for streaming events from Kafka, forwarding them to Redis using its Stream API, and reading individual streams from Redis via its streaming API. The added complexity in this scenario is the need to stream events from an HTTP endpoint using Server-Sent Events (SSE) while ensuring that only events relevant to a specific client ID are processed and sent. Problem Statement Many companies have an existing Kafka infrastructure where events are being produced. Our goal is to set up a system that subscribes to Kafka messages but only processes events relevant to a specific client ID. These filtered events should be forwarded to Redis using its Stream API. Additionally, we need to establish an HTTP endpoint for Server-Sent Events (SSE) that allows the specified client to receive real-time event updates. Solution Architecture Overview The architecture consists of the following components: Kafka: A distributed event streaming platform that allows you to publish and subscribe to streams of records (events). Spring Boot: A framework for building Java applications. We'll use it to create Kafka consumers and Redis Stream producers. Subscribes to Kafka messages, filters events based on the client ID, and forwards relevant events to Redis Streams. Redis: A high-performance, in-memory data store. We'll use its Streams feature to handle event streams. Stores the streamed events using its Streams API. Docker: A containerization platform. We'll use Docker and Docker-Compose to create containers for Kafka, Redis, and our Spring Boot application. We will utilize this for a local POT, POC. HTTP Server-Sent Events (SSE) Endpoint: Provides real-time event updates to the client, filtering events based on the client ID. Redis Streams Redis Streams is a feature in Redis that provides a way to handle real-time data streams with various use cases. Here are some scenarios where you might want to use Redis Streams: Real-Time Event Processing: Redis Streams are excellent for processing and storing real-time events. You can use it for things like logging, monitoring, tracking user activities, or any use case that involves handling a continuous stream of events. Task Queues: If you need a reliable and distributed task queue, Redis Streams can be a great choice. It allows you to push tasks into a stream and have multiple consumers process those tasks concurrently. Activity Feeds: If you're building a social network or any application that requires activity feeds, Redis Streams can efficiently handle the feed data, ensuring fast access and scalability. Message Brokering: Redis Streams can serve as a lightweight message broker for microservices or other distributed systems. It can handle message routing and ensure that messages are delivered to interested consumers. Real-Time Analytics: When you need to analyze data in real-time, Redis Streams can be useful for storing the incoming data and then processing and aggregating it using Redis capabilities. IoT Data Ingestion: If you're dealing with data from Internet of Things (IoT) devices, Redis Streams can handle the high-throughput and real-time nature of the data generated by these devices. Logging and Audit Trails: Redis Streams can be used to store logs or audit trails in real-time, making it easy to analyze and troubleshoot issues. Stream Processing: If you need to process a continuous stream of data in a specific order (for example, financial transactions or sensor readings), Redis Streams can help you manage the data in the order it was received. Prerequisites Docker and Docker Compose are installed. Basic understanding of Spring Boot, Redis Streams, and Kafka. Java 17 or higher An HTTP client of your choice. I used [httpie] Of course, you can get the code here. Steps 1. Set Up Docker Compose for the Backend Infrastructure and the Spring Boot App Create a `docker-compose.yml` file to define the services: YAML version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 networks: node_net: ipv4_address: 172.28.1.81 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 - 9092:9092 - 9093:9093 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,,EXTERNAL://172.28.1.93:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 networks: node_net: ipv4_address: 172.28.1.93 cache: image: redis:6.2-alpine #image: redis:5.0.3-alpine restart: always ports: - '6379:6379' #command: redis-server --save 20 1 --loglevel warning --requirepass eYVX7EwVmmxKPCDmwMtyKVge8oLd2t81 command: redis-server /usr/local/etc/redis/redis.conf --loglevel verbose --save 20 1 volumes: - cache:/data - ./redis.conf:/usr/local/etc/redis/redis.conf - $PWD/redis-data:/var/lib/redis #environment: # - REDIS_REPLICATION_MODE=master networks: node_net: ipv4_address: 172.28.1.79 volumes: cache: driver: local networks: node_net: ipam: driver: default config: - subnet: 172.28.0.0/16 Create the yaml for the application 'sse-demo.yml'. YAML version: "3.8" services: sse-demo: image: "sse/spring-sse-demo:latest" ports: - "8080:8080" #- "51000-52000:51000-52000" env_file: - local.env environment: - REDIS_REPLICATION_MODE=master - SPRING_PROFILES_ACTIVE=default - REDIS_HOST=172.28.1.79 - REDIS_PORT=6379 - KAFKA_BOOTSTRAP_SERVERS=172.28.1.93:9093 networks: node_net: ipv4_address: 172.28.1.12 networks: node_net: external: name: docker_node_net 2. Create Spring Boot Application Create a Spring Boot application with the required dependencies: Shell git checkout https://github.com/glawson6/spring-sse-demo.git In `pom.xml,` add the necessary dependencies for Kafka and Redis integration. 3. Implement Kafka Consumer and Redis Stream Producer Create a Kafka consumer that listens to Kafka events and sends them to Redis Streams. This component also consumes the Redis streams for the HTTP clients: Java package com.taptech.sse.event; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.taptech.sse.utils.DurationSupplier; import com.taptech.sse.utils.ObjectMapperFactory; import com.taptech.sse.config.SSEProperties; import jakarta.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.event.EventListener; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverRecord; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; import reactor.util.retry.Retry; import java.nio.ByteBuffer; import java.time.Duration; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; import java.util.function.Function; public class DefaultEventReceiverService implements EventReceiverService { private static final Logger logger = LoggerFactory.getLogger(DefaultEventReceiverService.class); public final static String TEST_STREAM = "test.stream"; public final static String TEST_STREAM_KEY = "test.stream.key"; public final static String TEST_STREAM_VALUE = "test.stream.value"; private static final String EMPTY_STR = ""; public static final String CLIENT_STREAM_STARTED = "client.stream.started"; public static final String HASH_PREFIX = "hash."; private static ObjectMapper objectMapper = ObjectMapperFactory.createObjectMapper(ObjectMapperFactory.Scope.SINGLETON); ReactiveStringRedisTemplate redisTemplate; KafkaReceiver<String, String> kafkaReceiver; SSEProperties sseProperties; StreamReadOptions streamReadOptions; public DefaultEventReceiverService(ReactiveStringRedisTemplate redisTemplate, KafkaReceiver<String, String> kafkaReceiver, SSEProperties sseProperties) { this.redisTemplate = redisTemplate; this.kafkaReceiver = kafkaReceiver; this.sseProperties = sseProperties; this.streamReadOptions = StreamReadOptions.empty().autoAcknowledge() .block(Duration.of(sseProperties.getClientHoldSeconds(), ChronoUnit.SECONDS)); } static final Function<String,String> calculateHashKey = str -> new StringBuilder(HASH_PREFIX).append(str).toString(); @PostConstruct public void init() { this.redisTemplate.opsForValue().append(TEST_STREAM_KEY, TEST_STREAM_VALUE).subscribe(); } @EventListener(ApplicationStartedEvent.class) public Disposable startKafkaConsumer() { logger.info("############# Starting Kafka listener....."); return kafkaReceiver.receive() .doOnError(error -> logger.error("Error receiving event, will retry", error)) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(sseProperties.getTopicRetryDelaySeconds()))) .doOnNext(record -> logger.info("Received event: key {}", record.key())) .filterWhen(record -> checkIfStreamBeingAccessed(record)) .concatMap(this::handleEvent) .subscribe(record -> record.receiverOffset().acknowledge()); } Mono<Boolean> checkIfStreamBeingAccessed(ReceiverRecord<String,String> record){ return this.redisTemplate.opsForHash().hasKey(calculateHashKey.apply(record.key()), CLIENT_STREAM_STARTED) .doOnNext(val -> logger.info("key => {}'s stream is being accessed {}",record.key(),val)); } public Mono<ReceiverRecord<String, String>> handleEvent(ReceiverRecord<String, String> record) { return Mono.just(record) .flatMap(this::produce) .doOnError(ex -> logger.warn("Error processing event: key {}", record.key(), ex)) .onErrorResume(ex -> Mono.empty()) .doOnNext(rec -> logger.debug("Successfully processed event: key {}", record.key())) .then(Mono.just(record)); } public Mono<Tuple2<RecordId, ReceiverRecord<String, String>>> produce(ReceiverRecord<String, String> recRecord) { ObjectRecord<String, String> record = StreamRecords.newRecord() .ofObject(recRecord.value()) .withStreamKey(recRecord.key()); return this.redisTemplate.opsForStream().add(record) .map(recId -> Tuples.of(recId, recRecord)); } Function<ObjectRecord<String, String>, NotificationEvent> convertToNotificationEvent() { return (record) -> { NotificationEvent event = null; try { event = objectMapper.readValue(record.getValue(), NotificationEvent.class); } catch (JsonProcessingException e) { e.printStackTrace(); event = new NotificationEvent(); } return event; }; } private Mono<String> createGroup(String workspaceId){ return redisTemplate.getConnectionFactory().getReactiveConnection().streamCommands() .xGroupCreate(ByteBuffer.wrap(workspaceId.getBytes()), workspaceId, ReadOffset.from("0-0"), true) .doOnError((error) -> { if (logger.isDebugEnabled()){ logger.debug("Could not create group.",error); } }) .map(okStr -> workspaceId) .onErrorResume((error) -> Mono.just(workspaceId)); } private Flux<NotificationEvent> findClientNotificationEvents(Consumer consumer, StreamOffset<String> streamOffset, DurationSupplier booleanSupplier){ return this.redisTemplate.opsForStream().read(String.class, consumer, streamReadOptions, streamOffset) .map(convertToNotificationEvent()) .repeat(booleanSupplier); } public Flux<NotificationEvent> consume(final String clientId){ return Flux.from(createGroup(clientId)) .flatMap(id -> addIdToStream(clientId)) .map(id -> Tuples.of(StreamOffset.create(clientId, ReadOffset.lastConsumed()), Consumer.from(clientId, clientId), new DurationSupplier(Duration.of(sseProperties.getClientHoldSeconds(), ChronoUnit.SECONDS), LocalDateTime.now()))) .flatMap(tuple3 -> findClientNotificationEvents(tuple3.getT2(), tuple3.getT1(), tuple3.getT3())); } private Mono<String> addIdToStream(String id) { return this.redisTemplate.opsForHash().put(calculateHashKey.apply(id), CLIENT_STREAM_STARTED, Boolean.TRUE.toString()).map(val -> id); } public Flux<Boolean> deleteWorkspaceStream(String workspaceId){ StreamOffset<String> streamOffset = StreamOffset.create(workspaceId, ReadOffset.lastConsumed()); StreamReadOptions streamReadOptions = StreamReadOptions.empty().noack(); Consumer consumer = Consumer.from(workspaceId, workspaceId); return this.redisTemplate.opsForStream().read(String.class, consumer, streamReadOptions, streamOffset) .flatMap(objRecord -> this.redisTemplate.opsForStream().delete(workspaceId,objRecord.getId()).map(val -> objRecord)) .flatMap(objRecord -> this.redisTemplate.opsForHash().delete(workspaceId)); } @Override public Flux<String> consumeString(String clientId) { return this.redisTemplate.opsForStream().read(String.class, StreamOffset.latest(clientId)).map(ObjectRecord::getValue); } } 4. Configure Services in Spring Boot Properties files cache.provider.name=redis cache.host=${REDIS_HOST:localhost} cache.port=${REDIS_PORT:6379} cache.password=password # Producer properties spring.kafka.producer.bootstrap-servers=127.0.0.1:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.group-id=group_id spring.kafka.boostrap.servers=${KAFKA_BOOTSTRAP_SERVERS:loclahost:9093} # Common Kafka Properties auto.create.topics.enable=true sse.client-hold-seconds=${SSE_CLIENT_HOLD_SECONDS:120} logging.level.root=INFO logging.level.com.taptech.sse=DEBUG 5. Build Docker Image for Spring Boot App Using the `kubernetes-maven-plugin` from jkube, create the image for your Spring Boot application: Shell ./mvnw clean package -Dmaven.test.skip=true k8s:build 6. Start the Services and Run the Application From the src/test/resources/docker directory Start the services: Shell ./startServices.sh Start the app: Shell ./start-sse-demo.sh 7. Connect to a Stream Using One of the IDS in the client-ids.json File Shell http --stream GET http://localhost:8080/sse clientId==dd07bd51-1ab0-4e69-a0ff-f625fa9e7fc0 8. Generate Some Events You can do an HTTP POST to http://localhost:8080/sse/generateNE Shell http POST http://localhost:8080/sse/generateNE After this, watch as your HTTP client receives an event for the clientId that it is subscribed to. Discussion Why would you use Kafka and Redis? Does not Kafka offer this alone? Many companies have invested in Kafka as a backend message provider between their systems. Kafka in itself does not handle message selection very easily. Message selection is not a typical feature provided natively by Kafka for a couple of reasons: Data Size and Latency: Kafka is designed for high-throughput, low-latency message processing. Its architecture focuses on distributing messages to a large number of consumers quickly. Introducing message selection based on arbitrary conditions can slow down the overall processing and introduce latency, which goes against Kafka's primary design goals. Idempotency: Kafka relies on the concept of idempotent producers and consumers. This means that if a consumer or producer retries a message due to a failure, it should not result in duplicate processing. Introducing selective message retrieval would complicate this idempotency guarantee, potentially leading to unintended duplicate processing. Consumer Offset Tracking: Kafka maintains consumer offsets, allowing consumers to keep track of the last processed message. If message selection is introduced, offsets become less straightforward, as some messages might be skipped based on selection criteria. Decoupled Architecture: Kafka is designed to decouple producers from consumers. Producers are unaware of consumer behavior, and consumers can independently decide what messages they want to consume. Message selection would break this decoupling, as producers would need to know which messages to produce based on specific consumer needs. Consumer Flexibility: Kafka consumers can be highly flexible in terms of message processing. They can be designed to filter, transform, and aggregate messages based on their own criteria. Introducing message selection at the Kafka level would limit this flexibility and make the system less adaptable to changing consumer requirements. Scaling and Parallelism: Kafka's scalability and parallelism benefits come from the ability to distribute messages across multiple partitions and allow multiple consumers to process messages in parallel. Selective message retrieval would complicate this parallelism, making it harder to distribute work efficiently. While Kafka itself doesn't provide native message selection features, it's essential to design the consumers to handle message filtering and selection if needed. Consumers can be designed to filter and process messages based on specific criteria, ensuring that only relevant messages are processed within the consumer application. This approach allows Kafka to maintain its core design principles while still providing the flexibility needed for various message-processing scenarios. Kafka could not essentially solve the problem in an easy way, which lead to pushing the messages to another persistent space that could easily select based on known criteria. This requirement leads to the decision to use Redis and allow pushing messages directly to Redis. A decision was made to limit the events being pushed into Redis based on whether there was a client actually expecting a message. If there were no clients, then Kafka messages were being filtered out. Java .filterWhen(record -> checkIfStreamBeingAccessed(record)) The client registers the id so that the Kafka listener will push the to the Redis stream. Java .flatMap(id -> addIdToStream(clientId)) Conclusion By following the steps outlined in this document, we have successfully implemented an event streaming architecture that takes events from Kafka, filters them based on a specific client ID, and forwards the relevant events to Redis using its Stream API. The SSE endpoint allows clients to receive real-time event updates tailored to their respective client IDs. This solution provides an efficient and scalable way to handle event streaming for targeted clients.
Kafka Non-Blocking Retries Non Blocking retries in Kafka are done via configuring retry topics for the main topic. An Additional Dead Letter Topic can also be configured if required. Events will be forwarded to DLT if all retries are exhausted. A lot of resources are available in the public domain to understand the technicalities. Kafka Consumer Non-Blocking Retry: Spring Retry Topics Spring Retry Kafka Consumer What To Test? It can be a challenging job when it comes to writing integration tests for the retry mechanism in your code. How do you test that the event has been retried for the required number of times? How do you test that retries are only performed when certain exceptions occur and not for others? How do you test if another retry is not done if the exception is resolved in the previous retry? How do you test that the nth attempt in the retry succeeds after (n-1) retry attempts have failed? How to test if the event has been sent to the Dead Letter Queue when all the retry attempts have been exhausted? Let’s see with some code. You can find a lot of good articles which show how to set up Non-Blocking retries using Spring Kafka. One such implementation is given below. This is accomplished using the @RetryableTopic and @DltHandler annotations from Spring-Kafka. Setting up the Retryable Consumer Java @Slf4j @Component @RequiredArgsConstructor public class CustomEventConsumer { private final CustomEventHandler handler; @RetryableTopic(attempts = "${retry.attempts}", backoff = @Backoff( delayExpression = "${retry.delay}", multiplierExpression = "${retry.delay.multiplier}" ), topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, dltStrategy = FAIL_ON_ERROR, autoStartDltHandler = "true", autoCreateTopics = "false", include = {CustomRetryableException.class}) @KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}") public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { try { log.info("Received event on topic {}", topic); handler.handleEvent(event); } catch (Exception e) { log.error("Error occurred while processing event", e); throw e; } } @DltHandler public void listenOnDlt(@Payload CustomEvent event) { log.error("Received event on dlt."); handler.handleEventFromDlt(event); } } If you notice in the above code snippet, the include parameter contains CustomRetryableException.class. This tells the consumer to retry only in case CustomRetryableException is thrown by the CustomEventHandler#handleEvent method. You can add as many as you like. There is an exclude parameter as well, but any one of them can be used at a time. The event processing should be retried for a maximum of ${retry.attempts} times before publishing to the DLT. Setting up Test Infra To write integration tests, you need to make sure that you have a functioning Kafka broker (embedded preferred) and a fully functioning publisher. Let's set up our infrastructure: Java @EnableKafka @SpringBootTest @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=" + "${kafka.broker.listeners}", "port=" + "${kafka.broker.port}"}, controlledShutdown = true, topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"} ) @ActiveProfiles("test") class DocumentEventConsumerIntegrationTest { @Autowired private KafkaTemplate<String, CustomEvent> testKafkaTemplate; // tests } ** Configurations are imported from the application-test.yml file. When using an embedded kafka broker, it is important to mention the topics to be created. They will not be created automatically. In this case, we are creating four topics, namely "test", "test-retry-0", "test-retry-1", "test-dlt" We have set out the maximum retry attempts to three. Each topic corresponds to each of the retry attempts. So events should be forwarded to DLT if three retries are exhausted. Test Cases Retry should not be done if consumption is successful on the first attempt. This can be tested by the fact that the CustomEventHandler#handleEvent method is called only once. Further tests on Log statements can also be added. Java @Test void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException { CustomEvent event = new CustomEvent("Hello"); // GIVEN doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class)); // WHEN testKafkaTemplate.send("test", event).get(); // THEN verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class)); verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class)); } Retry should not be done if a non-retryable exception is raised. In this case, the CustomEventHandler#handleEvent method should be invoked only once: Java @Test void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException { CustomEvent event = new CustomEvent("Hello"); // GIVEN doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class)); // WHEN testKafkaTemplate.send("test", event).get(); // THEN verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class)); verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class)); } Retry for the maximum configured number of times if a RetryableException is thrown and subsequently should be published to Dead Letter Topic when retries are exhausted. In this case, the CustomEventHandler#handleEvent method should be invoked three (maxRetries) times and CustomEventHandler#handleEventFromDlt method should be invoked once. Java @Test void test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException { CustomEvent event = new CustomEvent("Hello"); // GIVEN doThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class)); // WHEN testKafkaTemplate.send("test", event).get(); // THEN verify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class)); verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class)); } **A considerable timeout has been added in the verification stage so that exponential back-off delay can be taken into consideration before the test is completed. This is important and may result in an assertion failure if not set properly. Should be retried until RetryableException is resolved And should not continue retrying if a non-retryable exception is raised or consumption eventually succeeds. The test has been set up such as to throw a RetryableException first and then throw a NonRetryable exception, such that retry is done once. Java @Test void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException, InterruptedException { CustomEvent event = new CustomEvent("Hello"); // GIVEN doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class)); // WHEN testKafkaTemplate.send("test", event).get(); // THEN verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class)); verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class)); } Java @Test void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException, InterruptedException { CustomEvent event = new CustomEvent("Hello"); // GIVEN doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class)); // WHEN testKafkaTemplate.send("test", event).get(); // THEN verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class)); verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class)); } Conclusion So, you can see that the integration test is a mix and match of strategies, timeouts, delays, and verifications so as to foolproof the retry mechanism of your Kafka Event-Driven Architecture. Kudos. Feel Free to suggest improvements and reach out to me on LinkedIn. The full code can be found here.
As our digital universe continues to expand at an exponential rate, we find ourselves standing at the precipice of a paradigm shift driven by the rise of streaming data and real-time analytics. From social media updates to IOT sensor data, from market trends to weather predictions, data streams are burgeoning, and developers are seizing the opportunity to extract valuable insights instantaneously. Such a dynamic landscape opens up myriad use cases spanning across industries, offering a more reactive, informed, and efficient way of decision-making. Among these use cases, one that stands out for its universal applicability and growing relevance is 'real-time alerting.' This feature has the potential to revolutionize various sectors by providing proactive responses to potential issues, thereby fostering efficiency, productivity, and safety. From identifying potential system failures in IT infrastructure to triggering alerts for abnormal health readings in medical devices, let's delve deeper into how real-time alerting can be leveraged to transform the way we interact with data. Simplifying the Complex Historically, the development of real-time solutions posed significant challenges. The primary issues centered around managing continuous data streams, ensuring exactly one semantics such that the same alerts don't get triggered repeatedly, processing them in real-time, and dealing with high volumes of data. Traditional batch processing systems were ill-equipped to handle such tasks as they were designed for static, finite data sets. Furthermore, creating stream processing applications requires specialized knowledge and expertise in dealing with complex architectures, resource management, and in-depth programming capabilities, making it an arduous task for many organizations. However, the advent of no-code tooling has significantly simplified the creation of real-time solutions. Azure Stream Analytics provides an easy-to-use, fully managed event-processing engine that helps run real-time analytics and complex event computations on streaming data. It abstracts away the complexity of underlying infrastructure, thereby enabling developers to focus on defining processing logic to manipulate streaming data. The zero code integration with inputs and outputs also makes it easy to quickly connect to read from sources and write to destinations such as Azure Event Hubs. Furthermore, integration with Azure Logic Apps provides a straightforward way to create workflows and automate tasks. The tool's visual designer allows users to design and automate scalable workflows with minimum coding effort, thus accelerating the process of building, deploying, and managing real-time solutions. Such advancements have democratized access to real-time analytics, making it a more feasible proposition for a broader range of organizations and use cases. Overview of the Real-Time Alerting Solution Our real-time alerting solution leverages the power of Azure's integrated suite of services to provide a seamless, efficient, and effective workflow. It consists of four key components. Firstly, Azure Event Hubs serve as the initial source of streaming input data, accommodating millions of events per second from any connected device or service. This continuous inflow of data then feeds into the second component, Azure Stream Analytics. As the stream processing engine, Azure Stream Analytics evaluates the incoming data based on defined queries and detects when specific conditions are met. When a defined condition is triggered, the processed output is sent to another Azure Event Hub. This triggers the third component, a Logic App that's configured to spring into action whenever a new message arrives in the Event Hub. Finally, the Logic App sends out the alert notification via the desired channel - be it email, SMS, or other communication platform. Each of these components plays a crucial role in delivering a robust, scalable, and efficient real-time alerting solution, catering to diverse use cases and ensuring that critical information is relayed immediately. Exactly Once Processing Guarantees In the world of stream processing and real-time alerting, the semantics of data processing - specifically, 'exactly once' versus 'at least once' - play a crucial role in the accuracy and efficiency of the system. 'At least once' semantics guarantee that every message will be processed, but not that it will be processed only once. This approach can lead to the same data point being processed multiple times, particularly in the event of network hiccups, system failures, or node restarts. While this ensures no data loss, it can also lead to duplicate outputs. In a real-time alerting scenario, this could result in the same alert being triggered and sent multiple times for the same data point, which can lead to confusion, redundancy, and a diminished user experience. On the other hand, 'exactly once' semantics ensures that each data point is processed precisely one time - no more, no less. This eliminates the risk of duplicate processing and hence duplicate alerts. In the case of a job or node restart, the system maintains the state in such a way that it knows exactly where it left off, and it can resume processing without repeating any data points. Azure Stream Analytics provides 'exactly once' semantics out of the box when reading from and writing to Azure Event Hubs, ensuring the accuracy and reliability of the real-time alerting solution we build. Defining the Streaming Business Logic Azure Stream Analytics provides an intuitive interface where you simply need to select your Azure subscription, the Event Hubs namespace, and the instance you wish to connect to. The platform abstracts away all the complexity, handling the intricacies of establishing and managing connections, security, and permissions. This seamless integration empowers even non-technical users to tap into the power of streaming analytics, transforming data into actionable insights in real time. Once you've connected your Event Hub input in Azure Stream Analytics, the exploration and manipulation of your streaming data become a breeze. A distinct advantage of this tool is that it provides an immediate visual representation of your incoming data stream, allowing you to easily understand its structure and content. The power of this interface doesn't stop there. By using the intuitive drag-and-drop no-code editor, you can build sophisticated business logic with ease. Want to enrich your streaming data with reference data? Simply drag your reference data source into the editor and connect it with your streaming data. This visual interface allows you to join, filter, aggregate, or perform other transformations without the need for complex SQL statements or code scripts. Moreover, Azure Stream Analytics offers a unique feature to validate the output of each step in your processing logic. This enables you to ensure that your transformations are accurate and produce the expected results before they are sent to the output sink, thereby reducing the risk of inaccurate data causing confusion or triggering false alerts. This combination of visualization, intuitive design, and validation makes Azure Stream Analytics a powerful streaming processing engine. Source Once you start your job, the compute engine will continuously read from the Azure Event Hub, execute the business logic you've implemented, and produce output events to the Azure Event Hub instance configured. Now that we have real-time data being evaluated in a continuous mode for the condition, the next step is to see how we can leverage Logic Apps to send an alert to an endpoint of our choice. Using Logic Apps to Send Notifications Creating a Logic App with Azure Event Hub messages as triggers is a straightforward process that empowers you to automate and optimize your workflows. The process begins within the Logic Apps Designer. First, you create a new Logic App and choose the "When an event is pushed to an Event Hub" trigger. This will allow your app to react to new messages arriving in your specified Event Hub. After selecting the trigger, you will need to provide connection parameters, including the name of your Azure subscription, the Event Hub namespace, and the specific Event Hub instance you wish to monitor. With this in place, your Logic App is now configured to spring into action whenever a new message arrives in your Event Hub. Azure Logic Apps provides extensive connectivity options, supporting numerous services out-of-the-box, which allow them to trigger a variety of actions based on the incoming data. You can add an action in your Logic App workflow to send an email using built-in connectors like Office 365 Outlook or Gmail. You simply need to specify the necessary parameters, such as the recipient's email, subject, and body text. In addition, Azure Logic Apps can also be configured to send SMS alerts by leveraging SMS gateway services like Twilio. You can add an action to send an SMS with Twilio, specify the necessary parameters, such as the recipient's number and the message text, and your Logic App will then send an SMS alert whenever the specified trigger event occurs. These are just a couple of examples of the vast number of alerting endpoints supported by Azure Logic Apps. The platform includes built-in support for numerous services across various domains, including storage services, social media platforms, other Azure services, and more, all to ensure that your real-time alerting system can be as versatile and comprehensive as your use-case demands. Conclusion The advent of no-code tools such as Azure Stream Analytics and Logic Apps has profoundly transformed the way we approach building real-time alerting solutions. By abstracting away the complex underlying details, these tools have democratized access to real-time analytics, empowering both technical and non-technical users alike to easily harness the power of streaming data. Developers can now connect to streaming data sources, visualize and manipulate data streams, and even apply sophisticated business logic, all with a simple drag-and-drop interface. Meanwhile, Logic Apps can be configured to trigger actions, like sending emails or SMS, based on the insights derived from the data. Importantly, this democratization does not come at the cost of performance, scale, or reliability. The solutions built with these tools can handle massive volumes of data, process it in real time, and deliver reliable alerts, all while ensuring 'exactly once' semantics. As we move forward in the age of data-driven decision-making, these advancements will continue to empower more people and organizations to derive immediate, actionable insights from their data, revolutionizing industries, and our everyday lives.
Miguel Garcia
VP of Engineering,
Nextail Labs
Gautam Goswami
Founder,
DataView
Alexander Eleseev
Full Stack Developer,
First Line Software
Ben Herzberg
Chief Scientist,
Satori