KSQL Demo

The following demo example creates a stream, performs a non-persistent query, and a persistent query.

Setup

Complete the following steps to prepare your environment for querying:

  1. Create a default stream using /sample-stream:
    maprcli stream create -path /sample-stream 
    	-produceperm p -consumeperm p -topicperm p
  2. Run the following script to generate test data that writes to an HPE Ezmeral Data Fabric Streams topic:
    ./bin/ksql-datagen quickstart=pageviews format=delimited 
    topic=/sample-stream:pageviews maxInterval=10000
  3. Run KSQL CLI and create a KSQL table:
    > ./bin/ksql http://<ksql-server>:8084 
      ksql> CREATE STREAM pageviews
            (viewtime BIGINT,
             userid VARCHAR,
             pageid VARCHAR)
             WITH (KAFKA_TOPIC='/sample-stream:pageviews',
             VALUE_FORMAT='DELIMITED');
    
       ksql> CREATE TABLE PAGEVIEWS_TABLE 
             WITH(KAFKA_TOPIC='/sample-stream:pageviews_table')
             AS
               SELECT userid,
               MAX(viewtime)
               FROM pageviews
               GROUP BY userid;
  4. Run the SHOW TABLES command to list your KSQL tables:
    ksql> SHOW TABLES;

Run a Non-persistent Query

For a non-persistent query in KSQL 6.0, run:
ksql> SELECT * FROM PAGEVIEWS_TABLE WHERE userid='User_1';

Run a Persistent Query

For a persistent query, do the following:

  1. Create the topic, /sample-stream:input-topic:
    maprcli stream topic create -path /sample-stream -topic input-topic
  2. Create a KSQL input stream:
    ksql> CREATE STREAM stream1 (message varchar) WITH 
    	(kafka_topic='/sample-stream:input-topic' , value_format='DELIMITED');
  3. Create persistent query with filtering:
    ksql> CREATE STREAM stream2 
    	WITH (kafka_topic='/sample-stream:output-topic' , value_format='DELIMITED') 
    	AS SELECT * FROM stream1 WHERE LEN(message) > 2;
  4. List your queries:
    ksql> SHOW QUERIES;
  5. Run the provided sample code for the console producer:
    /opt/mapr/kafka/kafka-<version>/bin/kafka-console-producer.sh 
    	--broker-list fake.server.id:9092 --topic /sample-stream:input-topic
  6. Run the provided sample code for the console consumer:
    /opt/mapr/kafka/kafka-<version>/bin/kafka-console-consumer.sh 
    	--bootstrap-server fake.server.id:9092 
    	--topic /sample-stream:output-topic
  7. Produce some data:
    >Hi
    >Hello
    >No
    >Yes
  8. Get the next results:
    Hello 
    Yes

Auxiliary Scripts Location

The sample code for kafka-console-producer.sh and kafka-console-consumer.sh is packaged with Kafka. Once Kafka is installed, you can find them at:
/opt/mapr/kafka/kafka-<version>/bin/