Loading Data from HPE Ezmeral Data Fabric Database as an Apache Spark RDD
You can use the following API to load JSON-format data from a HPE Ezmeral Data Fabric Database table into an Apache Spark RDD of a JSON document:
SparkContext
object:
def loadFromMapRDB[T](table: String): RDD[T]
MapRDBJavaSparkContext
object:
mapRDBSparkContext.loadFromMapRDB(tableName: String, clazz: Class)
The following example creates a userprofilesRDD
by calling
loadFromMapRDB
from SparkContext
(Scala) or
MapRDBSparkContext
(Java) and supplying the table
("/tmp/user_profiles"
):
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(sc);
JavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
The following example creates a usersInfo
RDD by calling
loadFromMapRDB
from SparkContext
(Scala) or
MapRDBSparkContext
(Java) and supplying the table
("/tmp/UserInfo"
):
import com.mapr.db.spark._
val usersInfo = sc.loadFromMapRDB("/tmp/UserInfo")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
MapRDBJavaRDD<OJAIDocument> usersInfo = mapRDBSparkContext.loadFromMapRDB("/tmp/UserInfo")
usersInfo
data contains the following
information:- Address (map type)
- Date of birth (date type)
- First name (string type)
- Interests (string type)
- Last name (string type)
The following prints the fields and shows the output for a sample user:
usersInfo.foreach(println(_))
usersInfo.foreach(System.out::println);
{
"address":
{"Pin":95035,"city":"milpitas","street":"350 holger way"},
"dob":"1947-11-29",
"first_name":"David",
"interests":["football","books","movies"],
"last_name":"Jones"
}
The following example shows a join operation performed on two different JSON documents using
address.city
as the join key:
import com.mapr.db.spark._
val maprd1 = sc.loadFromMapRDB("/tmp/user_profiles")
val maprd2 = sc.loadFromMapRDB("/tmp/user_income")
val collection = maprd1.map(a => (a.`address.city`[String],a))
.cogroup(maprd2.map(a=>(a.`address.city`[String],a)))
.map(a => (a._1,a._2._1.size,a._2._2.size)).collect
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Collection;
MapRDBJavaRDD<OJAIDocument> maprd1 = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
MapRDBJavaRDD<OJAIDocument> maprd2 = mapRDBSparkContext.loadFromMapRDB("/tmp/user_income");
List collection = maprd1.mapToPair(a -> new Tuple2<>(a.getString("address.city"), a))
.cogroup(maprd2.mapToPair(a -> new Tuple2<>(a.getString("address.city"), a)))
.map(a -> new Tuple3<>(a._1, ((Collection<?>)a._2._1).size(), ((Collection<?>)a._2._2).size()))
.collect();
The resulting RDD, collection
, contains the count of the users in the
user_profiles
and user_income
HPE Ezmeral Data Fabric Database tables.
The following example adds a new field into all the JSON documents:
import com.mapr.db.spark._
val maprd = sc.loadFromMapRDB("/tmp/user_profiles")
val documents = maprd.map(a => { a.`address.country` = "USA"; a}).collect
documents.saveToMapRDB("/tmp/cleaned_user_profiles")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
MapRDBJavaRDD<OJAIDocument> maprd = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
List<OJAIDocument> documents = maprd.map(a -> {a.set("address.country", "USA"); return a;})
.collect();
mapRDBSparkContext.saveToMapRDB(documents, "/tmp/cleaned_user_profiles");
Improving Performance by Using Projection Pushdown and Filter Pushdown
To improve performance, you can supply a WHERE clause and projection fields to the
loadFromMapRDB
API. In the following example, a condition is supplied to
the loadFromMapRDB
function and only certain fields are specified in the
SELECT clause:
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
.where([condition])
.select("address",
"first_name",
"_id",
"last_name")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where([condition])
.select("address",
"first_name",
"_id",
"last_name");
The data is loaded based on the condition. The condition is pushed down to the server, and the server returns data based on the filtering. Only the fields specified in the SELECT clause are projected.
In the following example, the WHERE clause is used as a filter condition:
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
.where(field("salary") >= 100)
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where(MapRDB.newCondition().is("salary", QueryCondition.Op.GREATER_OR_EQUAL, 100));
The userprofilesRDD
includes only those documents with a salary field
greater than 100.
By specifying an _id
field, you can find and retrieve a row for a given
key:
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
.where(field("_id") === "k2")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where(MapRDB.newCondition().is("_id", QueryCondition.Op.EQUAL, "k2"));
WHERE Clause Semantics
The loadFromMapRDB
API supports a WHERE clause to push down the filter to
the JSON document API, ensuring that only relevant documents are propagated to the RDD.
You can use two options to provide the filter condition:
- Scala domain-specific language (DSL)
QueryCondition
(from OJAI API)
Following is an example of using loadFromMapRDB
and supplying a condition
by using Scala DSL:
Condition isDoe = field("last_name") === "Doe"
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles").where(isDoe)
For more information about using Scala DSL, see Scala DSL for Specifying Filter Conditions.
Following is an example of passing the condition using the QueryCondition
API:
val maprd = sc.loadFromMapRDB(tableName)
.where(MapRDB.newCondition()
.is("_id", QueryCondition.Op.EQUAL, "k2")
.build())
MapRDBJavaRDD rdd = mapRDBJavaSparkContext.loadFromMapRDB(tableName)
.where(MapRDB.newCondition().is("_id", QueryCondition.Op.EQUAL, "k2").build());
For more information about QueryCondition
, see Querying with Conditions.