Loading Data from HPE Data Fabric Database as an Apache Spark RDD
You can use the following API to load JSON-format data from a HPE Data Fabric Database table into an Apache Spark RDD of a JSON document:
SparkContext object:
def loadFromMapRDB[T](table: String): RDD[T]MapRDBJavaSparkContext object:
mapRDBSparkContext.loadFromMapRDB(tableName: String, clazz: Class)The following example creates a userprofilesRDD by calling
loadFromMapRDB from SparkContext (Scala) or
MapRDBSparkContext (Java) and supplying the table
("/tmp/user_profiles"):
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(sc);
JavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
The following example creates a usersInfo RDD by calling
loadFromMapRDB from SparkContext (Scala) or
MapRDBSparkContext (Java) and supplying the table
("/tmp/UserInfo"):
import com.mapr.db.spark._
val usersInfo = sc.loadFromMapRDB("/tmp/UserInfo")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
MapRDBJavaRDD<OJAIDocument> usersInfo = mapRDBSparkContext.loadFromMapRDB("/tmp/UserInfo")
usersInfo data contains the following
information:- Address (map type)
- Date of birth (date type)
- First name (string type)
- Interests (string type)
- Last name (string type)
The following prints the fields and shows the output for a sample user:
usersInfo.foreach(println(_))
usersInfo.foreach(System.out::println);
{
"address":
{"Pin":95035,"city":"milpitas","street":"350 holger way"},
"dob":"1947-11-29",
"first_name":"David",
"interests":["football","books","movies"],
"last_name":"Jones"
}
The following example shows a join operation performed on two different JSON documents using
address.city as the join key:
import com.mapr.db.spark._
val maprd1 = sc.loadFromMapRDB("/tmp/user_profiles")
val maprd2 = sc.loadFromMapRDB("/tmp/user_income")
val collection = maprd1.map(a => (a.`address.city`[String],a))
.cogroup(maprd2.map(a=>(a.`address.city`[String],a)))
.map(a => (a._1,a._2._1.size,a._2._2.size)).collect
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Collection;
MapRDBJavaRDD<OJAIDocument> maprd1 = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
MapRDBJavaRDD<OJAIDocument> maprd2 = mapRDBSparkContext.loadFromMapRDB("/tmp/user_income");
List collection = maprd1.mapToPair(a -> new Tuple2<>(a.getString("address.city"), a))
.cogroup(maprd2.mapToPair(a -> new Tuple2<>(a.getString("address.city"), a)))
.map(a -> new Tuple3<>(a._1, ((Collection<?>)a._2._1).size(), ((Collection<?>)a._2._2).size()))
.collect();
The resulting RDD, collection, contains the count of the users in the
user_profiles and user_income HPE Data Fabric Database tables.
The following example adds a new field into all the JSON documents:
import com.mapr.db.spark._
val maprd = sc.loadFromMapRDB("/tmp/user_profiles")
val documents = maprd.map(a => { a.`address.country` = "USA"; a}).collect
documents.saveToMapRDB("/tmp/cleaned_user_profiles")import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
MapRDBJavaRDD<OJAIDocument> maprd = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
List<OJAIDocument> documents = maprd.map(a -> {a.set("address.country", "USA"); return a;})
.collect();
mapRDBSparkContext.saveToMapRDB(documents, "/tmp/cleaned_user_profiles");Improving Performance by Using Projection Pushdown and Filter Pushdown
To improve performance, you can supply a WHERE clause and projection fields to the
loadFromMapRDB API. In the following example, a condition is supplied to
the loadFromMapRDB function and only certain fields are specified in the
SELECT clause:
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
.where([condition])
.select("address",
"first_name",
"_id",
"last_name")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where([condition])
.select("address",
"first_name",
"_id",
"last_name");
The data is loaded based on the condition. The condition is pushed down to the server, and the server returns data based on the filtering. Only the fields specified in the SELECT clause are projected.
In the following example, the WHERE clause is used as a filter condition:
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
.where(field("salary") >= 100)
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where(MapRDB.newCondition().is("salary", QueryCondition.Op.GREATER_OR_EQUAL, 100));
The userprofilesRDD includes only those documents with a salary field
greater than 100.
By specifying an _id field, you can find and retrieve a row for a given
key:
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
.where(field("_id") === "k2")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where(MapRDB.newCondition().is("_id", QueryCondition.Op.EQUAL, "k2"));
WHERE Clause Semantics
The loadFromMapRDB API supports a WHERE clause to push down the filter to
the JSON document API, ensuring that only relevant documents are propagated to the RDD.
You can use two options to provide the filter condition:
- Scala domain-specific language (DSL)
QueryCondition(from OJAI API)
Following is an example of using loadFromMapRDB and supplying a condition
by using Scala DSL:
Condition isDoe = field("last_name") === "Doe"
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles").where(isDoe)For more information about using Scala DSL, see Scala DSL for Specifying Filter Conditions.
Following is an example of passing the condition using the QueryCondition
API:
val maprd = sc.loadFromMapRDB(tableName)
.where(MapRDB.newCondition()
.is("_id", QueryCondition.Op.EQUAL, "k2")
.build())
MapRDBJavaRDD rdd = mapRDBJavaSparkContext.loadFromMapRDB(tableName)
.where(MapRDB.newCondition().is("_id", QueryCondition.Op.EQUAL, "k2").build());
For more information about QueryCondition, see Querying with Conditions.