Saving an Apache Spark Dataset to a HPE Ezmeral Data Fabric Database JSON Table

Starting in the EEP 4.1.0 release, the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides the following API to save a Dataset to a HPE Ezmeral Data Fabric Database table:

For saving a Dataset, apply the following method on a Spark object:
def saveToMapRDB(tableName: String, idFieldPath : String = "_id", 
            createTable: Boolean = false, bulkInsert:Boolean = false): Unit
            
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
            
val ds = spark.loadFromMapRDB("/tmp/user_profiles")
ds.saveToMapRDB(tableName, createTable = true)
For saving a Dataset, apply the following method on a MapRDBJavaSession object:
def saveToMapRDB[T](ds: Dataset[T], tableName: String, idFieldPath: String, 
            createTable:oolean, bulkInsert: Boolean): Unit
            
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
import org.apache.spark.sql.SparkSession;
            
MapRDBJavaSession maprSession = new MapRDBJavaSession(spark);
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles");
maprSession.saveToMapRDB(ds, true);

The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark also provides the following API to insert a Dataset into a HPE Ezmeral Data Fabric Database table:

import com.mapr.db.spark._
          
ds.insertToMapRDB(tableName, idFieldPath, bulkInsert)
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession; 
          
maprSession.insertToMapRDB(ds, tableName, idFieldPath, bulkInsert)
NOTE
The insertToMapRDB API throws an exception if a row with the same ID already exists.

Word Count Example Using HPE Ezmeral Data Fabric Database OJAI Connector


/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.maprdbconnector

import org.apache.spark.sql.SparkSession

import com.mapr.db.spark.sql._

object MaprDBJsonConnectorWordCount {

  def main(args: Array[String]): Unit = {

    parseArgs(args)

    val pathToFileWithData = args(0)
    val tableName = args(1)
    val tableNameWithResult = args(2)

    val spark = SparkSession
      .builder()
      .appName("OJAI MaprDB connector wordcount example")
      .getOrCreate()

    import spark.implicits._
    val wordSequenceDS = importDataIntoSeq(pathToFileWithData).toDS()

    wordSequenceDS.saveToMapRDB(tableName, createTable = true)

    val dfWithDataFromMaprDB = spark.loadFromMapRDB(tableName)
      .flatMap(line => line.getAs[String](1).split(" "))
      .groupBy("value")
      .count()

    println("Dataset with counted words:")
    dfWithDataFromMaprDB.show()

    dfWithDataFromMaprDB.withColumn("_id", $"value")
      .saveToMapRDB(tableNameWithResult, createTable = true)
    println("Dataset with counted words was saved into the MaprDB table.")

    spark.stop()
  }

  private def parseArgs(args: Array[String]): Unit = {
    if (args.length != 3) {
      printUsage()
      System.exit(1)
    }
  }

  private def printUsage(): Unit = {
    val usage =
      """OJAI MaprDB connector wordcount example
        |Usage:
        |1) path to the file with data (words.txt can be used for the test);
        |2) name of the MaprDB table where data from file will be saved;
        |3) name of the MaprDB table where result will be saved;
        |""".stripMargin

    println(usage)
  }

  private def importDataIntoSeq(filePath: String): Seq[Word] = {
    scala.io.Source.fromURL(filePath)
      .getLines
      .map(line => {
        val wordWithId = line.split(" ")
        Word(wordWithId(0), wordWithId.drop(1).mkString(" "))
      }).toSeq
  }

  private case class Word(_id: String, words: String)

}