Integrate Spark with Kafka

From EEP-5.0.0, Spark can be integrated with Kafka-1.0. You can configure a Spark application to produce Kafka messages.

About this task

NOTE
Starting from EEP-8.0.0, HPE Ezmeral Data Fabric does not support spark-streaming-kafka-producer. To learn about Kafka integration on Apache Spark 3.1.2 and later in HPE Ezmeral Data Fabric, see Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher).
NOTE
Starting from HPE Ezmeral Data Fabric 7.8.0, the configuration option use.brokers is available. This option is a boolean flag that specifies the type of server you are using to load the Apache Kafka client. By default, this value is set to false. Set this value to true if you are using Apache Kafka brokers as a server to load the Apache Kafka client.

For more information, see Configuration Parameters.

Procedure

  1. Add the following dependency:
    groupId = org.apache.spark
    artifactId = spark-streaming-kafka-producer_2.11
    version = <spark_version>-mapr-<mapr_eco_version>
  2. When you write the Spark program, import and use classes from:
    org.apache.spark.streaming.kafka.producer._ 
    org.apache.spark.streaming.dstream.
    The import of org.apache.spark.streaming.stream.DStream adds the following method from DStream:
    sendToKafka(topic: String, conf: ProducerConf)
  3. In the code below, calling sendToKafka will send numMessages messages to the set of topics specified by the topics parameter:
    val producerConf = new ProducerConf(
    bootstrapServers = kafkaBrokers.split(",").toList)
                            
    val items = (0 until numMessages.toInt).map(i => Item(i, i).toString)
    val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items)
    val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD)
                            
    dStream.foreachRDD(_.sendToKafka(topics, producerConf))
    dStream.count().print()

Example

Source code for a sample producer program can be found at https://github.com/mapr/spark/blob/2.2.1-mapr-1803/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaProducerExample.scala