Kafka Event Streaming AI and Automation
Explore how to use ChatGPT to create an IoT Kafka event consumer and API Logic Server to logic to produce temperature reading events outside a defined range.
Join the DZone community and get the full member experience.
Join For FreeApache Kafka has emerged as a clear leader in corporate architecture for moving from data at rest (DB transactions) to event streaming. There are many presentations that explain how Kafka works and how to scale this technology stack (either on-premise or cloud). Building a microservice using ChatGPT to consume messages and enrich, transform, and persist is the next phase of this project. In this example, we will be consuming input from an IoT device (RaspberryPi) which sends a JSON temperature reading every few seconds.
Consume a Message
As each Kafka event message is produced (and logged), a Kafka microservice consumer is ready to handle each message. I asked ChatGPT to generate some Python code, and it gave me the basics to poll and read from the named "topic." What I got was a pretty good start to consume a topic, key, and JSON payload. The ChatGPT created code to persist this to a database using SQLAlchemy. I then wanted to transform the JSON payload and use API Logic Server (ALS - an open source project on GitHub) rules to unwarp the JSON, validate, calculate, and produce a new set of message payloads based on the source temperature outside a given range.
ChatGPT: “design a Python Event Streaming Kafka Consumer interface”
Note: ChatGPT selected Confluent Kafka libraries (and using their Docker Kafka container)- you can modify your code to use other Python Kafka libraries.
SQLAlchemy Model
Using API Logic Server (ALS: a Python open-source platform), we connect to a MySQL database. ALS will read the tables and create an SQLAlchemy ORM model, a react-admin user interface, safrs-JSON Open API (Swagger), and a running REST web service for each ORM endpoint. The new Temperature table will hold the timestamp, the IoT device ID, and the temperature reading. Here we use the ALS command line utility to create the ORM model:
ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot
The API Logic Server generated class used to hold our Temperature
values.
class Temperature(SAFRSBase, Base):
__tablename__ = 'Temperature'
_s_collection_name = 'Temperature' # type: ignore
__bind_key__ = 'None'
Id = Column(Integer, primary_key=True)
DeviceId = Column(Integer, nullable=False)
TempReading = Column(Integer, nullable=False)
CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)
KafkaMessageSent = Column(Booelan, default=text("False"))
Changes
So instead of saving the Kafka JSON consumer message again in a SQL database (and firing rules to do the work), we unwrap the JSON payload (util.row_to_entity
) and insert it into the Temperature table instead of saving the JSON payload. We let the declarative rules handle each temperature reading.
entity = models.Temperature()
util.row_to_entity(message_data, entity)
session.add(entity)
When the consumer receives the message, it will add it to the session which will trigger the commit_event
rule (below).
Declarative Logic: Produce a Message
Using API Logic Server (an automation framework built using SQLAlchemy, Flask, and LogicBank spreadsheet-like rules engine: formula, sum, count, copy, constraint, event, etc), we add a declarative commit_event
rule on the ORM entity Temperature
. As each message is persisted to the Temperature table, the commit_event
rule is called. If the temperature reading exceeds the MAX_TEMP
or less than MIN_TEMP
, we will send a Kafka message on the topic “TempRangeAlert”
. We also add a constraint to make sure we receive data within a normal range (32
-132
). We will let another event consumer handle the alert message.
from confluent_kafka import Producer
conf = {'bootstrap.servers': 'localhostd:9092'}
producer = Producer(conf)
MAX_TEMP = arg.MAX_TEMP or 102
MIN_TEMP = arg.MIN_TTEMP or 78
def produce_message(
row: models.KafkaMessage,
old_row: models.KafkaMessage,
logic_row: LogicRow):
if logic_row.isInserted() and row.TempReading > MAX_TEMP:
produce(topic="TempRangeAlert",
key=row.Id,
value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")
row.KafkaMessageSent = True
if logic_row.isInserted() and row.TempReading < MIN_TEMP:
produce(topic="TempRangeAlert",
key=row.Id,
value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")
row.KafkaMessageSent = True
Rules.constraint(models.Temperature,
as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132,
error_message= "Temperature {row.TempReading} is out of range"
Rules.commit_event(models.Temperature, calling=produce_message)
Only produce an alert message if the temperature reading is greater than MAX_TEMP
or less than MIN_TEMP
. Constraint will check the temperature range before calling the commit event (note that rules are always unordered and can be introduced as specifications change).
TDD Behave Testing
Using TDD (Test Driven Development), we can write a Behave test to insert records directly into the Temperature table and then check the return value KafkaMessageSent
. Behave begins with a Feature
/Scenario
(.feature file). For each scenario, we write a corresponding Python class using Behave
decorators.
Feature Definition
Feature: TDD Temperature Example
Scenario: Temperature Processing
Given A Kafka Message Normal (Temperature)
When Transactions normal temperature is submitted
Then Check KafkaMessageSent Flag is False
Scenario: Temperature Processing
Given A Kafka Message Abnormal (Temperature)
When Transactions abnormal temperature is submitted
Then Check KafkaMessageSent Flag is True
TDD Python Class
from behave import *
import safrs
db = safrs.DB
session = db.session
def insertTemperature(temp:int) -> bool:
entity = model.Temperature()
entity.TempReading = temp
entity.DeviceId = 'local_behave_test'
session.add(entity)
return entity.KafkaMessageSent
@given('A Kafka Message Normal (Temperature)')
def step_impl(context):
context.temp = 76
assert True
@when('Transactions normal temperature is submitted')
def step_impl(context):
context.response_text = insertTemperature(context.temp)
@then('Check KafkaMessageSent Flag is False')
def step_impl(context):
assert context.response_text == False
Summary
Using ChatGPT to generate the Kafka message code for both the Consumer and Producer seems like a good starting point. Install Confluent Docker for Kafka. Using API Logic Server for the declarative logic rules allows us to add formulas, constraints, and events to the normal flow of transactions into our SQL database and produce (and transform) new Kafka messages is a great combination. ChatGPT and declarative logic is the next level of "paired programming."
Opinions expressed by DZone contributors are their own.
Comments