Working with Complex JSON Document Types

The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides APIs to process JSON documents loaded from HPE Ezmeral Data Fabric Database.

Suppose you want to calculate the number of users located in each city:

import com.mapr.db.spark.sql._
          
val customerprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
val numberOfCustaccCities = customerprofilesRDD.map(a => (a.`address.city`[String],a))
                            .groupByKey()
                            .map(a => (a._1, a._2.size))
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import scala.Tuple2;
import java.util.Collection;

MapRDBJavaRDD<OJAIDocument> customerprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
JavaRDD numberOfCustaccCities = customerprofilesRDD.mapToPair
   (a -> new Tuple2<>(a.getString("address.city"), a)).groupByKey()
   .map(a -> new Tuple2<>(a._1, ((Collection<?>)a._2).size()));

If you have not provided an explicit cast, then the object is returned as AnyRef. To access methods specific to a class, such as String or Integer, you can cast it to a specific type later in the process.

Now suppose you want to collect all the addresses (address is of type Map) of all customers:

import com.mapr.db.spark.sql._
          
val customerprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
val customersAddress = customerprofilesRDD.map(a => a.address).collect
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
          
MapRDBJavaRDD<OJAIDocument> customerprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
List<String> customersAddress = customerprofilesRDD.map(a -> a.getString("address")).collect();

customersAddress contains all of the addresses, but is returned as an AnyRef object.

The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark introduces three new classes to wrap complex JSON types:

Class Type
DBMapValue Map[String, AnyRef]
DBArrayValue Array[AnyRef]
DBBinaryValue ByteBuffer

These classes are not exposed; however, you can access the underlying elements of DBArrayValue and DBMapValue by using the same functions as in Seq and Map. DBArrayValue works like a sequence, while DBMapValue works like a map.

DBBinaryValue is a class wrapper around ByteBuffer. ByteBuffer is not serializable, so you will get serialization errors if you use the ByteBuffer in Spark code. You must ensure that byte buffers are converted to DBBinaryValue or serialized byte buffers. The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides an API to convert ByteBuffers to serializable byte buffers.

Accessing Values in a Map

DBMapValue is a type of Map[String, AnyRef]. Any functions that you can use to access values in the Map, you can also use to access values in DBMapValue. In the following example, customeraddress contains the address of the customers who reside in San Jose. customeraddress is an Array[DBMapValue]:

val customerAddress = maprd.map(a => a.address[Map[String, AnyRef]])
            .filter(a => a!= null && 
            a.get("city").contains("San Jose"))
            .collect

This example can also be written in Scala using a functional approach as follows:

val customerAddress = maprd.map(a => (a.address[Map[String, AnyRef]], a).join(my_documents)
            .filter(a => Option(a).map(a => 
            a.get("city").contains("San Jose")).getOrElse(false)))
            .collect
NOTE
You can push the condition specified in the filter condition to the HPE Ezmeral Data Fabric Database table scan by using the where clause.

Accessing the Array JSON Object

This example uses a sequence to access the Array JSON object:

val custInterests = maprd.map(a => a.interests[Seq[AnyRef]])
                    .filter(a => a!= null && a(0) == "sports")
                    .collect

ByteBuffer Serialization

The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides the following API to enable serialization of the ByteBuffer:

MapRDBSpark.serializableBinaryValue(byteBuffer)

The following example shows an array of byte buffers or binary values that are converted to serialized byte buffers by using MapRDBSpark.serializableBinaryValue:

val dstSplits = arrayOfByteBuffer.map(x => MapRDBSpark.serializableBinaryValue(x))