Saving an Apache Spark Dataset to a HPE Ezmeral Data Fabric Database JSON Table
Starting in the EEP 4.1.0 release, the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides the following API to save a Dataset to a HPE Ezmeral Data Fabric Database table:
For saving a Dataset, apply the following method on a
Spark
object:
def saveToMapRDB(tableName: String, idFieldPath : String = "_id",
createTable: Boolean = false, bulkInsert:Boolean = false): Unit
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
val ds = spark.loadFromMapRDB("/tmp/user_profiles")
ds.saveToMapRDB(tableName, createTable = true)
For saving a Dataset, apply the following method on a
MapRDBJavaSession
object:
def saveToMapRDB[T](ds: Dataset[T], tableName: String, idFieldPath: String,
createTable:oolean, bulkInsert: Boolean): Unit
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
import org.apache.spark.sql.SparkSession;
MapRDBJavaSession maprSession = new MapRDBJavaSession(spark);
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles");
maprSession.saveToMapRDB(ds, true);
The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark also provides the following API to insert a Dataset into a HPE Ezmeral Data Fabric Database table:
import com.mapr.db.spark._
ds.insertToMapRDB(tableName, idFieldPath, bulkInsert)
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
maprSession.insertToMapRDB(ds, tableName, idFieldPath, bulkInsert)
NOTE
The insertToMapRDB
API throws an exception if a row with the same ID
already exists.Word Count Example Using HPE Ezmeral Data Fabric Database OJAI Connector
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.maprdbconnector
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
object MaprDBJsonConnectorWordCount {
def main(args: Array[String]): Unit = {
parseArgs(args)
val pathToFileWithData = args(0)
val tableName = args(1)
val tableNameWithResult = args(2)
val spark = SparkSession
.builder()
.appName("OJAI MaprDB connector wordcount example")
.getOrCreate()
import spark.implicits._
val wordSequenceDS = importDataIntoSeq(pathToFileWithData).toDS()
wordSequenceDS.saveToMapRDB(tableName, createTable = true)
val dfWithDataFromMaprDB = spark.loadFromMapRDB(tableName)
.flatMap(line => line.getAs[String](1).split(" "))
.groupBy("value")
.count()
println("Dataset with counted words:")
dfWithDataFromMaprDB.show()
dfWithDataFromMaprDB.withColumn("_id", $"value")
.saveToMapRDB(tableNameWithResult, createTable = true)
println("Dataset with counted words was saved into the MaprDB table.")
spark.stop()
}
private def parseArgs(args: Array[String]): Unit = {
if (args.length != 3) {
printUsage()
System.exit(1)
}
}
private def printUsage(): Unit = {
val usage =
"""OJAI MaprDB connector wordcount example
|Usage:
|1) path to the file with data (words.txt can be used for the test);
|2) name of the MaprDB table where data from file will be saved;
|3) name of the MaprDB table where result will be saved;
|""".stripMargin
println(usage)
}
private def importDataIntoSeq(filePath: String): Seq[Word] = {
scala.io.Source.fromURL(filePath)
.getLines
.map(line => {
val wordWithId = line.split(" ")
Word(wordWithId(0), wordWithId.drop(1).mkString(" "))
}).toSeq
}
private case class Word(_id: String, words: String)
}