Sign in

KSQL — Useful language elements

This doc has quick notes to help understand the ksql language and get started with writing ksql queries for kafka stream processing. For extensive knowledge refer to the official documentation https://docs.ksqldb.io/en/0.7.1-ksqldb/developer-guide/syntax-reference/

Streams

CREATE STREAM student
(
`name` VARCHAR,
`id` VARCHAR,
`department` VARCHAR,
`courses` ARRAY<
STRUCT<
`id` VARCHAR,
`name` VARCHAR,
>
>
) WITH (KAFKA_TOPIC='student_info', VALUE_FORMAT='JSON')
  • This statement is like a sql DDL command which defines the schema of a stream. Data doesnt flows into this stream unless a query is made.
  • A Kafka topic has events which are nothing but records having rowtime, key and a value
  • Each stream is associated with an underlying Kafka topic, mentioned in the WITH clause
  • We can mention the fields of interest we want to pull from the underlying kafka topic’s value along with its data type. Check out all data types here https://docs.ksqldb.io/en/latest/reference/sql/data-types/
  • Use STRUCT to model nested data.
  • VALUE_FORMAT mentions the serialization format of the message value in the topic.
  • If unquoted, the field name must be a valid SQL identifier and ksqlDB converts it to uppercase. When you use backticked identifers, ksqlDB captures the case exactly, and any future references to the identifer become case-sensitive.
CREATE STREAM student_filtered [WITH( KAFKA_TOPIC = 'topic')] AS SELECT 
`id`,
`department`,
`courses`->`id` AS `course_id`,
`courses`->`name` AS `course_name`
STRUCT(
"full_name" := `name`,
"first_name" := REGEXP_EXTRACT( '(.*) (.*)', `name`, 1),
"last_name" := REGEXP_EXTRACT( '(.*) (.*)', `name`, 2 )
) as `names`
FROM student
WHERE (`department` = 'IT' OR `department` = 'Humanities')
EMIT CHANGES;
  • This statement creates a stream and a persistent query in ksqldb. If optional [WITH(KAFKA_TOPIC)] statement is not used, ksqldb automatically creates a kafka topic backing this stream. The results of the persistent query is streamed into this topic.
  • Dereference struct using the -> operator
  • Use aliases to rename fields using the AS clause
  • Instantiate a new struct using STRUCT() syntax used above.
  • REGEXP_EXTRACT is a scalar function in ksql. You can list all available functions using command show functions;
  • Provide filtering expression in the WHERE clause.
  • EMIT CHANGES indicates that the query is continuous and outputs all changes.

Tables

  • A table is a mutable, partitioned collection that models change over time.
  • Each row in table must have a non-null key associated with it.
  • We can do aggregations within a time slice of a window using tables.
  • We can join two streams based on stream columns.
  • Table is backed by a kafka topic, in which changes to a key are emitted.
  • We cannot run CSAS statement on top of a kafka topic associated with a table. If we need to further process a table’s data, we need to first define a stream on the table’s kafka topic using CREATE STREAM statement. We can then further run CSAS statement on this stream.
CREATE TABLE test AS SELECT  
`name`, `labels`, aggregate_function(),
AS_VALUE(`name`) as `name_val`,
AS_VALUE(`labels`) as `labels_val`,
TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS `window_start`,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss Z') AS `window_end`
FROM stream_name
WINDOW HOPPING (SIZE 15 MINUTES, ADVANCE BY 30 SECONDS)
WHERE (`name` = 'abc')
GROUP BY `name`, `labels`
HAVING aggregate_function() > 5 EMIT CHANGES;
  • By default the fields used in group_by goes into the key of the table. If we want them in the value, use AS_VALUE function
  • Windowing adds two additional system columns to the data WINDOWSTART and WINDOWEND, which provide the window bounds. These have epoc timestamp format. Use TIMESTAMPTOSTRING function to convert this to readable format.
  • ksql supports these aggregation functions: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/
  • There are three types of windows in kafka stream processing. https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/
  • EMIT CHANGES will emit all changes made to the key as it happens. Use EMIT FINAL, to suppress emitting results till window closes. For EMIT FINAL to work we need to set the ksqldb property ‘ksql.suppress.enabled’ = ‘true’

Useful Commands

  • Print the events in the underlying topic
PRINT topic_name;
PRINT topic_name FROM BEGINNING; -- prints from beginning
  • Pull query to print results from a stream or table to console
SELECT * from topic_name EMIT CHANGES;
  • Show the list of the provided entity in ksqldb
SHOW QUERIES;
SHOW STREAMS;
SHOW TABLES;
SHOW PROPERTIES;
SHOW FUNCTIONS;
  • Delete the persistent query running in ksqldb
TERMINATE query_id;
  • DROP the stream/table If “DELETE TOPIC” command is used alongside, the underlying kafka topic is deleted Before dropping them we must terminate all persistent queries associated with them
DROP STREAM/TABLE id;
DROP STREAM/TABLE id DELETE TOPIC;
  • SET — used to change the ksql property
set 'auto.offset.reset' = 'earliest';