Using Structured Streaming to Create a Word Count Application
The example in this section creates a dataset representing a stream of input lines from Kafka and prints out a running word count of the input lines to the console.
Using Apache Kafka
Example
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.getOrCreate()
import spark.implicits._
//Create a DataSet representing the stream of input lines from Kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
//Generate a running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
//Run the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
query.awaitTermination()
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredKafkaWordCount")
.getOrCreate();
//Create a DataSet representing the stream of input lines from Kafka
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
//Generate a running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING()).groupBy("value").count();
//Run the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
#Create a DataSet representing the stream of input lines from Kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)")
#Split the lines into words
words = lines.select(
#explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
)
#Generate a running word count
wordCounts = words.groupBy('word').count()
#Run the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()
query.awaitTermination()
Using Data Fabric Event Store for Apache Kafka
Example
var topic: String = "/user/mapr/stream:reviews"
val df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"maprdemo:9092").option("subscribe", topic).option("group.id",
"testgroup").option("startingOffsets", "earliest").option("failOnDataLoss",
false).option("maxOffsetsPerTrigger", 1000).load()