Confluent sponsored this post.
Kafka is the de-facto standard for collecting and storing event data — but are you comfortable using that data in your applications in real-time? It’s trickier than it seems — you need to integrate, secure and scale different systems for event storage, processing and querying. This is why we created ksqlDB, a database purpose-built for stream processing applications.
What Is Stream Processing?
Kafka is great for working with events through its durable, append-only log. Taking action on those streams of events in real-time, however, can be a challenge. This is where a stream processor such as ksqlDB comes in. KsqlDB provides a processing model for working with groups of events as if they were in-memory collections and makes them accessible through SQL.
Stream processing deals with data events as they come in and pushes them into pipes known as topics as they go out. The data is immutable, so applications can read from a topic, compute new information, and then push the result into another topic as needed.
What Is a Streaming Database?
A streaming database provides a single SQL interface that helps you build stream processing applications instead of interacting with many different subsystems. So, instead of dealing with events and topics, you deal with streams and tables. A stream is a topic but with a strongly defined schema. SQL is used to create these streams, define their schemas, insert, filter and transform data. Meanwhile, ksqlDB takes care of all the underlying managerial work of executing those statements so you can focus instead on developing your application.
Creating a New Stream
As implied, to define new streams, you can use SQL CREATE commands. While the command itself is relatively simple SQL, with a few extensions it ultimately controls the underlying Kafka topics without the user ever touching Kafka directly.
A stream has a schema and a given key that are the important parts of the command:
1 2 3 4 5 6 7 8 9 | CREATE STREAM readings ( sensor VARCHAR KEY, location VARCHAR, reading INT ) WITH ( kafka_topic = ‘readings’, partitions = 3, value_format = ‘json’ ); |
Whenever a new stream is established, Kafka creates an empty new topic that’s partitioned accordingly. KsqlDB stores metadata for these definitions in its own topic that each ksqlDB server has access to as a global catalog of objects.
Inserting Rows into a Stream
SQL users will instantly recognize that we use standard INSERT statements to add data to the stream. Again, no knowledge of stream management, topics, partitions, etc., is needed for a developer to start adding data into these streams.
Here is some sample data you can load to follow along as we build our example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-1’, ‘wheel’, 45); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-2’, ‘motor’, 41); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-1’, ‘wheel’, 42); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-3’, ‘muffler’, 42); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-3’, ‘muffler’, 40); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-4’, ‘motor’, 43); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-6’, ‘muffler’, 43); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-5’, ‘wheel’, 41); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-5’, ‘wheel’, 42); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-4’, ‘motor’, 41); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-7’, ‘muffler’, 43); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-8’, ‘wheel’, 40); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-9’, ‘motor’, 40); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-9’, ‘motor’, 44); INSERT INTO readings (sensor, location, reading) VALUES (‘sensor-7’, ‘muffler’, 41); |
The database checks that each record’s schema is valid before creating the record and serializing its content.
While we talk about inserting rows into a stream, remember that ultimately the data is going into a Kafka topic as an event record as well. Applications that already use Kafka can continue to run in parallel to ksqlDB applications.
Transforming an Event Stream
Next, we start to add value to the stream by modifying its content and publishing the outcome as a new stream. No low-level custom consumer/producer code is needed to do a straightforward transform with SQL.
This persistent query transforms one stream (readings, above) into a new one called clean that has modified text to upper case in one field.
1 2 3 4 5 6 | CREATE STREAM clean AS SELECT sensor, reading, UCASE(location) AS location FROM readings EMIT CHANGES; |
This query creates a mini-application running as a new stream. As one stream receives new rows, the persistent query (pq1) works through each one and writes into another stream. EMIT CHANGES allows the query to keep running and watches for recent events.
Behind the scenes, ksqlDB compiles the query’s physical execution plan as a Kafka Streams topology. Running as a background service, it reacts to new topic records as they arrive. Processing takes place on ksqlDB servers and is horizontally scalable across nodes.
Filtering Rows out of a Stream
Similar to transforming, a SQL WHERE clause can also do filtering. No new application code is needed and the new stream is created and managed accordingly.
1 2 3 4 5 | CREATE STREAM high_readings AS SELECT sensor, reading, location FROM clean WHERE reading > 41 EMIT CHANGES; |
As you can see, we are effortlessly chaining together several streams at this point. KsqlDB mechanics propagate your data changes through the chain.
Combining Stream Operations into One
To simplify what we’ve created, we must get rid of streams we do not need.
1 2 3 4 5 6 7 | CREATE STREAM high_pri AS SELECT sensor, reading, UCASE(location) AS location FROM readings WHERE reading > 41 EMIT CHANGES; |
We can bypass the multiple streams by combining operations into a single new stream with more SQL.
Managing Partition Keys
In Kafka, partitioning controls data locality (where it resides in the cluster). The choice of how you key your records is crucial, especially if you use Kafka clients to process your data. We defined our key column (sensor) in our first example, but in ksqlDB, we can change this using a PARTITION BY clause.
1 2 3 4 5 | CREATE STREAM by_location AS SELECT * FROM high_pri PARTITION BY location EMIT CHANGES; |
All rows co-locate when they have the same location value, allowing more advanced stateful operations like streaming joins and incremental aggregations.
This final animation shows the overall workflow, and now you can see all the circles of similar color (same location) end up on the same partition.
Learn More about ksqlDB
We’ve only scratched the surface of how ksqlDB works and that its constructs are concise, composable and elegant. They should allow you to develop new applications and solutions faster than before, without diluting Kafka’s core concepts.
Follow our stream processing blogs to discuss joining, scaling, fault tolerance, and how time works. Each is a fascinating world in its own right. Until then, there’s no substitute for trying ksqlDB yourself.
Featured image via Pixabay.