Configure Spark 2.2.1 and later to Consume HPE Data Fabric Streams Messages
Using the Kafka 0.10 API, you can configure a Spark application to query HPE Data Fabric Streams for new messages at a given interval. This information is for Spark 2.2.1 and later users.
About this task
Procedure
- Install the Data Fabric core Kafka package, if you have not already done so.
-
Copy the Kafka client jar into the Spark jars directory as shown below:
cp /opt/mapr/lib/kafka-clients-<version>.jar SPARK_HOME/jars -
Add the following dependency:
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-9_2.11 version = <spark_version>-mapr-<mapr_eco_version>NOTEIf you would like to use Streaming Producer Examples, you must add the appropriate Spark streaming Kafka producer jar from the MapR Maven repository to the Spark classpath (/opt/mapr/spark/spark-<spark_version>/jars/. -
Consider the following when you write the Spark application:
-
Verify that it meets the following requirements:
- Imports and use classes from
org.apache.spark.streaming.kafka010. The following code snippet imports three classes.import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} - Defines key and value deserializers in the kafkaParams
map.
val kafkaParams = Map[String, String]( ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset) - Does not configure a broker address or Zookeeper as these are not required for HPE Data Fabric Streams.
- Imports and use classes from
-
Optionally, define a value for
spark.streaming.kafka.consumer.poll.msin the Spark configuration.NOTEYou can configure the poll timeout using Spark optionspark.streaming.kafka.consumer.poll.ms. If you do not configurespark.streaming.kafka.consumer.poll.ms, thespark.network.timeoutproperty is used. Ifspark.network.timeoutis empty, the default is 120 seconds.val sparkConf = new SparkConf() .setAppName("v09DirectKafkaWordCount") .set("spark.streaming.kafka.consumer.poll.ms", pollTimeout)
Example: https://github.com/mapr/spark/blob/3.5.1.0-eep-930/examples/src/main/scala/org/apache/spark/examples/streaming/V010DirectKafkaWordCount.scala is a sample consumer program.The
KafkaUtils.createDirectStreammethod creates an input stream to read HPE Data Fabric Streams messages. TheConsumerStrategies.Subscribemethod creates theconsumerStrategythat will limit the set of topics the stream subscribes to. This is derived from thetopicsparameter passed into the program. UsingLocationStategies.PreferConsistentwill distribute partitions evenly across available executors.val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, consumerStrategy) -
Verify that it meets the following requirements: