Integrate Spark with Kafka
From EEP-5.0.0, Spark can be integrated with Kafka-1.0. You can configure a Spark application to produce Kafka messages.
About this task
NOTE
Starting from EEP-8.0.0, HPE Data Fabric does not support
spark-streaming-kafka-producer. To learn about Kafka
integration on Apache Spark 3.1.2 and later in HPE Data Fabric, see
Structured Streaming + Kafka Integration Guide
(Kafka broker version 0.10.0 or higher). NOTE
Starting from HPE Data Fabric 7.8.0, the configuration option
use.brokers is available. This option is a boolean flag that
specifies the type of server you are using to load the Apache Kafka client. By
default, this value is set to false. Set this value to
true if you are using Apache Kafka brokers as a server to load
the Apache Kafka client.For more information, see Configuration Parameters.
Procedure
-
Add the following dependency:
groupId = org.apache.spark artifactId = spark-streaming-kafka-producer_<scala_version> version = <spark_version>-mapr-<mapr_eco_version> -
When you write the Spark program, import and use classes from:
The import oforg.apache.spark.streaming.kafka.producer._ org.apache.spark.streaming.dstream.org.apache.spark.streaming.stream.DStreamadds the following method from DStream:sendToKafka(topic: String, conf: ProducerConf) -
In the code below, calling
sendToKafkawill sendnumMessagesmessages to the set of topics specified by the topics parameter:val producerConf = new ProducerConf( bootstrapServers = kafkaBrokers.split(",").toList) val items = (0 until numMessages.toInt).map(i => Item(i, i).toString) val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items) val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD) dStream.foreachRDD(_.sendToKafka(topics, producerConf)) dStream.count().print()