Configure Spark to Produce HPE Ezmeral Data Fabric Streams Messages

Using the Kafka 0.9 API, you can configure a Spark application to produce Data Fabric Streams messages.

Procedure

  1. Add the following dependency:
    groupId = org.apache.spark
    artifactId = spark-streaming-kafka-producer_2.11
    version = <spark_version>-mapr-<mapr_eco_version>
    NOTE
    If you would like to use Streaming Producer Examples, you must add the appropriate Spark streaming Kafka producer jar from the HPE Maven repository to the Spark classpath (/opt/mapr/spark/spark-<spark_version>/jars/.
  2. When you write the Spark program, import and use classes from org.apache.spark.streaming.kafka.producer._ and org.apache.spark.streaming.dstream.
    The import of org.apache.spark.streaming.stream.DStream adds the following method from DStream:
    sendToKafka(topic: String, conf: ProducerConf)
    In the code below, calling sendToKafka will send numMessages messages to the set of topics specified by the topics parameter.
    val producerConf = new ProducerConf(bootstrapServers = kafkaBrokers.split(",").toList)
       .withKeySerializer("org.apache.kafka.common.serialization.ByteArraySerializer")
       .withValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
                            
    val items = (0 until numMessages.toInt).map(i => Item(i, i).toString)
    val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items)
    val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD)
                            
    dStream.foreachRDD(_.sendToKafka(topics, producerConf))
    dStream.count().print()

    The org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer properties are used by default, and in case you do not want to use another serializer, withKeySerializer and withValueSerializer methods are not necessary.

Example

Source code for a sample producer program can be found at https://github.com/mapr/spark/blob/2.2.1-mapr-1803/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaProducerExample.scala