Integrate Spark with Kafka
From EEP-5.0.0, Spark can be integrated with Kafka-1.0. You can configure a Spark application to produce Kafka messages.
About this task
NOTE
Starting from EEP-8.0.0, HPE Ezmeral Data Fabric does not support
spark-streaming-kafka-producer
. To learn about Kafka
integration on Apache Spark 3.1.2 and later in HPE Ezmeral Data Fabric, see
Structured Streaming + Kafka Integration Guide
(Kafka broker version 0.10.0 or higher). NOTE
Starting from HPE Ezmeral Data Fabric 7.8.0, the configuration option
use.brokers
is available. This option is a boolean flag that
specifies the type of server you are using to load the Apache Kafka client. By
default, this value is set to false
. Set this value to
true
if you are using Apache Kafka brokers as a server to load
the Apache Kafka client.For more information, see Configuration Parameters.
Procedure
-
Add the following dependency:
groupId = org.apache.spark artifactId = spark-streaming-kafka-producer_<scala_version> version = <spark_version>-mapr-<mapr_eco_version>
-
When you write the Spark program, import and use classes from:
The import oforg.apache.spark.streaming.kafka.producer._ org.apache.spark.streaming.dstream.
org.apache.spark.streaming.stream.DStream
adds the following method from DStream:sendToKafka(topic: String, conf: ProducerConf)
-
In the code below, calling
sendToKafka
will sendnumMessages
messages to the set of topics specified by the topics parameter:val producerConf = new ProducerConf( bootstrapServers = kafkaBrokers.split(",").toList) val items = (0 until numMessages.toInt).map(i => Item(i, i).toString) val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items) val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD) dStream.foreachRDD(_.sendToKafka(topics, producerConf)) dStream.count().print()