Writing a Spark Stream Word Count Application to HPE Ezmeral Data Fabric Database
The example in this section writes a Spark stream word count application to HPE Ezmeral Data Fabric Database.
Example
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.getOrCreate()
import spark.implicits._
//Create a DataSet representing the stream of input lines from Kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
//Generate a running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
//Run the query that saves the result to MapR-DB
val query = wordCounts.writeStream
.format(MapRDBSourceConfig.Format)
.option(MapRDBSourceConfig.TablePathOption, resultTable)
.option(MapRDBSourceConfig.CreateTableOption, true)
.option(MapRDBSourceConfig.IdFieldPathOption, "value")
.outputMode("complete")
.start()
query.awaitTermination()
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredKafkaWordCount")
.getOrCreate();
//Create a DataSet representing the stream of input lines from Kafka
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
//Generate a running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING()).groupBy("value").count();
//Run the query that saves the result to MapR-DB
StreamingQuery query = wordCounts.writeStream()
.format(MapRDBSourceConfig.Format())
.option(MapRDBSourceConfig.TablePathOption(), resultTable)
.option(MapRDBSourceConfig.CreateTableOption(), true)
.option(MapRDBSourceConfig.IdFieldPathOption(), "value")
.outputMode("complete");
.start();
query.awaitTermination();
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
#Create a DataSet representing the stream of input lines from Kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)")
#Split the lines into words
words = lines.select(
#Explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
)
#Generate a running word count
wordCounts = words.groupBy('word').count()
#Run the query that saves the result to MapR-DB
query = wordCounts\
.writeStream\
.format("com.mapr.db.spark.streaming") \
.option("tablePath", table_path) \
.option("createTable", True) \
.option("idFieldPath", "value") \
.outputMode('complete')\
.start()
query.awaitTermination()