Configure Spark to Produce HPE Ezmeral Data Fabric Streams Messages
Using the Kafka 0.9 API, you can configure a Spark application to produce Data Fabric Streams messages.
Procedure
-
Add the following dependency:
groupId = org.apache.spark artifactId = spark-streaming-kafka-producer_2.11 version = <spark_version>-mapr-<mapr_eco_version>
NOTEIf you would like to use Streaming Producer Examples, you must add the appropriate Spark streaming Kafka producer jar from the HPE Maven repository to the Spark classpath (/opt/mapr/spark/spark-<spark_version>/jars/
. -
When you write the Spark program, import and use classes from
org.apache.spark.streaming.kafka.producer._
andorg.apache.spark.streaming.dstream
.The import oforg.apache.spark.streaming.stream.DStream
adds the following method fromDStream
:sendToKafka(topic: String, conf: ProducerConf)
In the code below, callingsendToKafka
will sendnumMessages
messages to the set of topics specified by thetopics
parameter.val producerConf = new ProducerConf(bootstrapServers = kafkaBrokers.split(",").toList) .withKeySerializer("org.apache.kafka.common.serialization.ByteArraySerializer") .withValueSerializer("org.apache.kafka.common.serialization.StringSerializer") val items = (0 until numMessages.toInt).map(i => Item(i, i).toString) val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items) val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD) dStream.foreachRDD(_.sendToKafka(topics, producerConf)) dStream.count().print()
The
org.apache.kafka.common.serialization.ByteArraySerializer
andorg.apache.kafka.common.serialization.StringSerializer
properties are used by default, and in case you do not want to use another serializer,withKeySerializer
andwithValueSerializer
methods are not necessary.