Working with Complex JSON Document Types
The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides APIs to process JSON documents loaded from HPE Ezmeral Data Fabric Database.
Suppose you want to calculate the number of users located in each city:
import com.mapr.db.spark.sql._
val customerprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
val numberOfCustaccCities = customerprofilesRDD.map(a => (a.`address.city`[String],a))
.groupByKey()
.map(a => (a._1, a._2.size))
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import scala.Tuple2;
import java.util.Collection;
MapRDBJavaRDD<OJAIDocument> customerprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
JavaRDD numberOfCustaccCities = customerprofilesRDD.mapToPair
(a -> new Tuple2<>(a.getString("address.city"), a)).groupByKey()
.map(a -> new Tuple2<>(a._1, ((Collection<?>)a._2).size()));
If you have not provided an explicit cast, then the object is returned as
AnyRef
. To access methods specific to a class, such as
String
or Integer
, you can cast it to a specific type
later in the process.
Now suppose you want to collect all the addresses (address
is of type
Map
) of all customers:
import com.mapr.db.spark.sql._
val customerprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
val customersAddress = customerprofilesRDD.map(a => a.address).collect
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
MapRDBJavaRDD<OJAIDocument> customerprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
List<String> customersAddress = customerprofilesRDD.map(a -> a.getString("address")).collect();
customersAddress
contains all of the addresses, but is returned as an
AnyRef
object.
The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark introduces three new classes to wrap complex JSON types:
Class | Type |
---|---|
DBMapValue |
Map[String, AnyRef] |
DBArrayValue |
Array[AnyRef] |
DBBinaryValue |
ByteBuffer |
These classes are not exposed; however, you can access the underlying elements of
DBArrayValue
and DBMapValue
by using the same functions as
in Seq
and Map
. DBArrayValue
works like a
sequence, while DBMapValue
works like a map.
DBBinaryValue
is a class wrapper around ByteBuffer
.
ByteBuffer
is not serializable, so you will get serialization errors if you
use the ByteBuffer
in Spark code. You must ensure that byte buffers are
converted to DBBinaryValue
or serialized byte buffers. The HPE Ezmeral Data Fabric Database OJAI
Connector for Apache Spark provides an API to convert ByteBuffers
to
serializable byte buffers.
Accessing Values in a Map
DBMapValue
is a type of Map[String, AnyRef]
. Any
functions that you can use to access values in the Map
, you can also use
to access values in DBMapValue
. In the following example,
customeraddress
contains the address of the customers who reside in San
Jose. customeraddress
is an Array[DBMapValue]
:
val customerAddress = maprd.map(a => a.address[Map[String, AnyRef]])
.filter(a => a!= null &&
a.get("city").contains("San Jose"))
.collect
This example can also be written in Scala using a functional approach as follows:
val customerAddress = maprd.map(a => (a.address[Map[String, AnyRef]], a).join(my_documents)
.filter(a => Option(a).map(a =>
a.get("city").contains("San Jose")).getOrElse(false)))
.collect
where
clause.Accessing the Array JSON Object
This example uses a sequence to access the Array JSON object:
val custInterests = maprd.map(a => a.interests[Seq[AnyRef]])
.filter(a => a!= null && a(0) == "sports")
.collect
ByteBuffer Serialization
The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides the following API to enable
serialization of the ByteBuffer
:
MapRDBSpark.serializableBinaryValue(byteBuffer)
The following example shows an array of byte buffers or binary values that are converted to
serialized byte buffers by using MapRDBSpark.serializableBinaryValue
:
val dstSplits = arrayOfByteBuffer.map(x => MapRDBSpark.serializableBinaryValue(x))