Working with Complex JSON Document Types
The HPE Data Fabric Database OJAI Connector for Apache Spark provides APIs to process JSON documents loaded from HPE Data Fabric Database.
Suppose you want to calculate the number of users located in each city:
import com.mapr.db.spark.sql._
val customerprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
val numberOfCustaccCities = customerprofilesRDD.map(a => (a.`address.city`[String],a))
.groupByKey()
.map(a => (a._1, a._2.size))import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import scala.Tuple2;
import java.util.Collection;
MapRDBJavaRDD<OJAIDocument> customerprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
JavaRDD numberOfCustaccCities = customerprofilesRDD.mapToPair
(a -> new Tuple2<>(a.getString("address.city"), a)).groupByKey()
.map(a -> new Tuple2<>(a._1, ((Collection<?>)a._2).size()));If you have not provided an explicit cast, then the object is returned as
AnyRef. To access methods specific to a class, such as
String or Integer, you can cast it to a specific type
later in the process.
Now suppose you want to collect all the addresses (address is of type
Map) of all customers:
import com.mapr.db.spark.sql._
val customerprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
val customersAddress = customerprofilesRDD.map(a => a.address).collectimport com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
MapRDBJavaRDD<OJAIDocument> customerprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
List<String> customersAddress = customerprofilesRDD.map(a -> a.getString("address")).collect();customersAddress contains all of the addresses, but is returned as an
AnyRef object.
The HPE Data Fabric Database OJAI Connector for Apache Spark introduces three new classes to wrap complex JSON types:
| Class | Type |
|---|---|
DBMapValue |
Map[String, AnyRef] |
DBArrayValue |
Array[AnyRef] |
DBBinaryValue |
ByteBuffer |
These classes are not exposed; however, you can access the underlying elements of
DBArrayValue and DBMapValue by using the same functions as
in Seq and Map. DBArrayValue works like a
sequence, while DBMapValue works like a map.
DBBinaryValue is a class wrapper around ByteBuffer.
ByteBuffer is not serializable, so you will get serialization errors if you
use the ByteBuffer in Spark code. You must ensure that byte buffers are
converted to DBBinaryValue or serialized byte buffers. The HPE Data Fabric Database OJAI
Connector for Apache Spark provides an API to convert ByteBuffers to
serializable byte buffers.
Accessing Values in a Map
DBMapValue is a type of Map[String, AnyRef]. Any
functions that you can use to access values in the Map, you can also use
to access values in DBMapValue. In the following example,
customeraddress contains the address of the customers who reside in San
Jose. customeraddress is an Array[DBMapValue]:
val customerAddress = maprd.map(a => a.address[Map[String, AnyRef]])
.filter(a => a!= null &&
a.get("city").contains("San Jose"))
.collectThis example can also be written in Scala using a functional approach as follows:
val customerAddress = maprd.map(a => (a.address[Map[String, AnyRef]], a).join(my_documents)
.filter(a => Option(a).map(a =>
a.get("city").contains("San Jose")).getOrElse(false)))
.collectwhere clause.Accessing the Array JSON Object
This example uses a sequence to access the Array JSON object:
val custInterests = maprd.map(a => a.interests[Seq[AnyRef]])
.filter(a => a!= null && a(0) == "sports")
.collectByteBuffer Serialization
The HPE Data Fabric Database OJAI Connector for Apache Spark provides the following API to enable
serialization of the ByteBuffer:
MapRDBSpark.serializableBinaryValue(byteBuffer)The following example shows an array of byte buffers or binary values that are converted to
serialized byte buffers by using MapRDBSpark.serializableBinaryValue:
val dstSplits = arrayOfByteBuffer.map(x => MapRDBSpark.serializableBinaryValue(x))