Saving an Apache Spark RDD to a HPE Ezmeral Data Fabric Database JSON Table
Saving an RDD[OJAIDocument] to HPE Ezmeral Data Fabric Database
The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides the following API to save an
RDD[OJAIDocument]
to a HPE Ezmeral Data Fabric Database table:
def saveToMapRDB(tablename: String, createTable: Boolean = false, bulkInsert: Boolean = false, idFieldPath: String = DocumentConstants.ID_KEY) : Unit
MapRDBJavaSparkContext
object:
def saveToMapRDB[D](javaRDD: JavaRDD[D], tableName: String, createTable: Boolean, bulkInsert: Boolean, idField: String): Unit
def saveRowRDDToMapRDB(javaRDD: JavaRDD[Row], tableName: String, createTable: Boolean, bulkInsert: Boolean, idField: String): Unit
def saveToMapRDB[K, V <: AnyRef](javaPairRDD: JavaPairRDD[K, V], keyClazz: Class[K], valueClazz: Class[V], tableName: String, createTable: Boolean, bulkInsert: Boolean): Unit
address
and
first_name
data is loaded from the "/tmp/user_profiles"
table, stored as an RDD (userprofilesRDD
), and then saved to the
"/tmp/user_firstname_and_address"
table: import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
.where("condition")
.select("address", "first_name")
userprofilesRDD.saveToMapRDB("/tmp/user_firstname_and_address")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(sc);
JavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where("condition")
.select("address", "first_name");
mapRDBSparkContext.saveToMapRDB(userprofilesRDD, "/tmp/user_firstname_and_address", true, false, "_id");
The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark also provides the following API to insert an
RDD[OJAIDocument]
to a HPE Ezmeral Data Fabric Database table:
insertToMapRDB
API is available starting in the EEP 4.1.0 release.
import com.mapr.db.spark._
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
userprofilesRDD.insertToMapRDB(tablename, createTable = true, bulkInsert = false, idFieldPath = "_id")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.apache.spark.sql.SparkSession;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD<OJAIDocument> userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
mapRDBSparkContext.insertRowRDDToMapRDB(userprofilesRDD, tablename);
insertToMapRDB
API throws an exception if a row with the same
ID already exists. This API supports the following parameters:
Parameter | Default | Description |
---|---|---|
tableName |
Not applicable | The name of the HPE Ezmeral Data Fabric Database table in which you are saving the document. |
createTable |
false |
Creates the table before saving the documents. Note that if the table already exists and createTable is set to true, the API throws an exception. |
idFieldPath |
_id |
Specifies the key to be used for the document. |
bulkInsert |
false |
Loads a group of rows simultaneously. bulkInsert is similar to a bulk load in MapReduce. |
Parameter | Default | Description |
---|---|---|
RDD (JavaRDD or JavaPairRDD) |
Not applicable | Specifies the RDD which you are saving to the HPE Ezmeral Data Fabric Database table. |
tableName |
Not applicable |
Specifies the name of the HPE Ezmeral Data Fabric Database table in which you are saving the document. |
createTable |
false |
Creates the table before saving the document.
Note that if the table already exists and createTable is set to
true, the API throws an exception. |
idFieldPath |
_id |
Specifies the key to be used for the document. |
bulkInsert |
false |
Loads a group of rows simultaneously. bulkInsert is similar to a bulk load in MapReduce. |
keyClazz (Only for JavaPairRDD) |
Not applicable | Specifies the class type which is the key in
the JavaPairRDD which you are saving into the HPE Ezmeral Data Fabric Database
table. |
valueClazz(Only for JavaPairRDD) |
Not applicable | Specifies the class type which is the value in
the JavaPairRDD which you are saving into the HPE Ezmeral Data Fabric Database
table. |
saveToMapRDB
method works with JavaRDD
and JavaPairRDD
. For saving JavaRDD[Row]
, use the saveRowRDDToMapRDB
method.
The following example specifies a key by using the idFieldPath
parameter and the bulkInsert
value to save the RDD:
import com.mapr.db.spark._
userprofilesRDD.saveToMapRDB("/tmp/user_firstname_and_address",
idFieldPath = "user_id",
bulkInsert = false)
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
mapRDBSparkContext.saveToMapRDB(userprofilesRDD, "/tmp/user_firstname_and_address", false, false, "user_id");
The following example saves an RDD of Person
objects into the newly
created /tmp/Userinfo
table:
import com.mapr.db.spark._
val sparkConf = new SparkConf().setAppName("json app").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val people = sc.parallelize(getUsers())
people.saveToMapRDB("/tmp/UserInfo", createTable= true)
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
SparkConf sparkConf = new SparkConf().setAppName("json app").setMaster("local[*]");
SparkContext sc = new SparkContext(sparkConf);
JavaRDD rdd = sc.parallelize(getUsers());
mapRDBSparkContext.saveToMapRDB(rdd, "/tmp/UserInfo", true);
The following example shows the getUsers
function that allocates the
Person
objects:
def getUsers(): Array[Person] = {
val users: Array[Person] =
Array(
Person("DavUSCalif", "David", "Jones",
ODate.parse("1947-11-29"),
Seq("football", "books", "movies"),
Map("city" -> "milpitas", "street" -> "350 holger way", "Pin" -> 95035)),
Person("PetUSUtah", "Peter", "pan",
ODate.parse("1974-1-29"),
Seq("boxing", "music", "movies"),
Map("city" -> "salt lake", "street" -> "351 lake way", "Pin" -> 89898)),
Person("JamUSAriz", "James", "junior",
ODate.parse("1968-10-2"),
Seq("tennis", "painting", "music"),
Map("city" -> "phoenix", "street" -> "358 pond way", "Pin" -> 67765)),
Person("JimUSCalif", "Jimmy", "gill",
ODate.parse("1976-1-9"),
Seq("cricket", "sketching"),
Map("city" -> "san jose", "street" -> "305 city way", "Pin" -> 95652)),
Person("IndUSCalif", "Indiana", "Jones",
ODate.parse("1987-5-4"),
Seq("squash", "comics", "movies"),
Map("city" -> "sunnyvale", "street" -> "35 town way", "Pin" -> 95985)))
users
}
Saving a JSON Document to HPE Ezmeral Data Fabric Database
To save a JSON document using the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark, you must first convert the JSON document into an OJAI document and then save the RDD, as shown in the following example:
import com.mapr.db.spark._
val documents = sc.parallelize((1 to 10)
.map(i => s"""{"_id": "$i", "test": "$i"}"""))
val maprd = documents.map(a => MapRDBSpark.newDocument(a))
maprd.saveToMapRDB("/tmp/testData")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext;
JavaRDD<String> documents = JavaSparkContext.fromSparkContext(sc)
.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.map(i -> { return "{\"id\": \"" + i + "\", \"test\": \"" + i + "\"}"; });
JavaRDD<OJAIDocument> maprd = documents.map(MapRDBSpark::newDocument);
mapRDBSparkContext.saveToMapRDB(maprd, "/tmp/testData");
_id
field is required to save JSON data into a table, so an
_id
field must be present. If you need only to convert the JSON data to an
OJAI document (without saving to HPE Ezmeral Data Fabric Database), the _id
field is not required. If
the HPE Ezmeral Data Fabric Database table already contains a record with the same _id
value, HPE Ezmeral Data Fabric Database
replaces the record. Otherwise, it inserts a new record.Just as you can load a JSON
document into a Scala bean class (see Creating an RDD of a Class), you
can save the RDD of Scala class objects in a HPE Ezmeral Data Fabric Database JSON table.
saveToMapRDB
can save any bean object as a JSON document by converting it
to a JSON document.
Table Splits and saveToMapRDB
If the createTable
parameter is set to true, saveToMapRDB
can use the partition information from the RDD's lineage to create the splits for a new
table:
sc.loadFromMapRDB("/tmp/user_profiles").saveToMapRDB("/userProfiles",
createTable = true)
Suppose /tmp/user_profiles
has a table with five splits.
saveToMapRDB
uses this information to create the
/userProfiles
table with the same number and range of splits. You can
also supply this information by using MapRDBSpark.newPartitioner
:
sc.loadFromMapRDB("/tmp/user_profiles").keyBy(doc => doc.get("_id"))
.repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String]
("/profiles"))
.saveToMapRDB("/userProfiles", createTable = true)
For more information about partitioning, see Using the Custom Partitioner with the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark.