Unbounded Stream Processing Using Apache Beam
Today, we are going to build a simple WordCount data pipeline using Apache Kafka for unbounded sources.
Join the DZone community and get the full member experience.
Join For FreeFrom the last two weeks, I have been trying around Apache Beam API. I have read this excellent documentation provided by Beam and it helped me to understand the basics. I recommend readers go through it before we move ahead.
Introduction
Today, we are going to build a simple WordCount data pipeline using Apache Kafka for unbounded sources. We could use any message broker for this application such as Google Pub/Sub and so on. Beam has a lot of built-in IO connectors for messaging. At the end of our pipeline, we will out the result to a text file.
You may also like: Making Data-Intensive Processing Efficient and Portable With Apache Beam
Before we jump into the code, we need to be aware of certain concepts of Streaming such as windowing, triggers, processing time vs Event time. I recommend reading this article Streamin 101 and Streaming 102 from Tyler Akidau.
Alright, let's go ahead and do the setup!
Setup
- Setup a Java environment. We are going to use Beam's Java API.
- Install Zookeeper and Apache Kafka. If you are too lazy to do that, go here (Don't worry about Yarn, just spin up Zookeeper and Kafka by using "bin/grid start all" and "bin/grid stop all" command.)
- Add $KAFKA_HOME variable to your .bashrc/.zshrc file and restart your terminal session.
-
Shell
xxxxxxxxxx
1
1export $KAFKA_HOME=<install folder>/hello-samza/deploy/kafka
- Clone this Git repo here
- Install Python
After things are wired up. We will create a Kafka Topic to push the message. Use below command:
xxxxxxxxxx
# create topic by name "messenger"
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic messenger
# to list the existing topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
We will use Beam's Direct runner as an execution engine for this example. Go the project home directory and execute this command to start the pipeline.
xxxxxxxxxx
mvn compile exec:java -Dexec.mainClass=com.sunil.WindowedWordCount -Pdirect-runner -Dexec.args="--output=<output folder>"
That's it! The pipeline job is listening to the Kafka topic and ready to process the data.
Execute this command to push the event to Kafka topic "messenger".
xxxxxxxxxx
cd scripts
python3 GenMessage.py <name> <message<optional>> <epoch time in ms<optional>>
# example: python3 GenMessage.py sunil
# {"name":"sunil","message": "This message is from sunil","ets": 1580054229156}
Let's Talk About Code Now!
In this example, we are going to count no. of words for a given window size (say 1-hour window). Windows in Beam are based on event-time i.e time derived from the message timestamp rather than system timestamp (processing timestamp). Based on the event time, the beam will put the message into an appropriate window.
When the window reaches the set watermark (Heuristic-based) i.e all the data is expected to have arrived into the system for a certain window, the window closes. Triggers provide flexible ways to kick-off the computation (more on this here) on accumulated events (inside window). In our case, We set to trigger after the window closes.
The remaining set of operation such as GroupBy and CountPerKey (map-reduce) can be performed on this result. The output of the map-reduce operation is persisted to a file partitioned by window timestamp.
Figure 1. Unbounded Stream processing on event time and windowing operation
Our main class looks something like this:
xxxxxxxxxx
public class WindowedWordCount {
static void runWithOptions(WindowedWordCountOptions options) {
Pipeline pipeline = Pipeline.create(options);
Duration WINDOW_TIME = Duration.standardMinutes(1);
Duration ALLOWED_LATENESS = Duration.standardMinutes(1);
CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(Record.class, new RecordSerializableCoder());
pipeline.apply(
KafkaIO.<Long, Record>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getInputTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(RecordDeserializer.class)
.withTimestampPolicyFactory((tp, previousWaterMark) -> new CustomFieldTimePolicy(previousWaterMark))
.withoutMetadata()
)
.apply(Values.<Record>create())
.apply("append event time for PCollection records", WithTimestamps.of((Record rec) -> new Instant(rec.getEts())))
.apply("extract message string", MapElements
.into(TypeDescriptors.strings())
.via(Record::getMessage))
.apply("apply window", Window.<String>into(FixedWindows.of(WINDOW_TIME))
.withAllowedLateness(ALLOWED_LATENESS)
.triggering(AfterWatermark.pastEndOfWindow())
.accumulatingFiredPanes()
)
.apply("count words", new CountWords())
.apply("format result to String",MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> rec) -> rec.getKey() + ":" + rec.getValue()))
.apply("Write it to a text file", new WriteOneFilePerWindow(options.getOutput()));
pipeline.run();
}
public static void main(String[] args) {
WindowedWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(WindowedWordCountOptions.class);
options.setStreaming(true);
runWithOptions(options);
}
}
Event Time and Watermark
Watermark is a system notion to indicate when all the data in a certain window can be expected to have arrived in the system. Generally, watermarks are derived from the source system itself i.e Kafka consumer in our case. We have to configure the Kafka connector to use message event time for watermark generation instead of processing time which is the default.
In beam Kafka IO connector we can set this configuration using the withTimestampPolicyFactory method. Here we are providing custom policy to override the default behavior. The incorrect configuration would lead to no results being output from the pipeline which is hard to debug sometimes.
xxxxxxxxxx
/**
* Custom TimestampPolicy for Kafka source to manage timestamp and watermark when it pulls data from broker
*/
public class CustomFieldTimePolicy extends TimestampPolicy<Long, Record> {
protected Instant currentWatermark;
public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<Long, Record> record) {
currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return currentWatermark;
}
public Instant getWatermark(PartitionContext ctx) {
return currentWatermark;
}
}
Another important part when processing events in event time is setting event timestamp for each message in a PCollection as shown below:
xxxxxxxxxx
... ...
.apply("append event time for PCollection records", WithTimestamps.of((Record rec) -> new Instant(rec.getEts())))
... ...
The window uses this timestamp to move events to appropriate windows.
You are likely to face this error message when you ignore to set the withTimestampPolicyFactory policy because we have set windows to use event time and Kafka connector is using processing time (default) which is always latest and the event time is millisecond old than processing time.
Conclusion
I hope you have learned how to process unbounded stream in event time using Beam and Kafka. Here I have skipped many parts of pipeline code such as encoders/decoders, serde for Kafka messages, Writing window per file and other PTransforms which are documented on Beam Javadocs. Please explore the git code on your leisure. Happy coding!
Further Reading
Making Sense of Stream Processing
Opinions expressed by DZone contributors are their own.
Comments