Writing a Structured Spark Stream to HPE Ezmeral Data Fabric Database JSON Table

The example in this section writes a structured stream in Spark to HPE Ezmeral Data Fabric Database JSON table.

Example

To write a structured Spark stream to HPE Ezmeral Data Fabric Database JSON table, use MapRDBSourceConfig.Format for Java and Scala and com.mapr.db.spark.streaming for Python to format the tablePath, idFieldPath, createTable, bulkMode, and sampleSize parameters.

import com.mapr.db.spark.streaming.MapRDBSourceConfig
import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
              
def dataStreamWriter(spark: SparkSession, df: DataFrame): DataStreamWriter[Row] = {
import spark.implicits._
              
df.select($"value" as "_id")
  .writeStream
  .format(MapRDBSourceConfig.Format)
  .option(MapRDBSourceConfig.TablePathOption, "/table/path")
  .option(MapRDBSourceConfig.IdFieldPathOption, "value")
  .option(MapRDBSourceConfig.CreateTableOption, true)
  .option(MapRDBSourceConfig.BulkModeOption, true)
  .option(MapRDBSourceConfig.SampleSizeOption, 1000)
  .outputMode("append")
}
import com.mapr.db.spark.streaming.MapRDBSourceConfig;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQueryException;
              
DataStreamWriter<Row> dataStreamWriter(Dataset<Row> df) {
   return df.selectExpr("CAST(value AS STRING) as _id")
            .writeStream()
            .format(MapRDBSourceConfig.Format())
            .option(MapRDBSourceConfig.TablePathOption(), "/table/path")
            .option(MapRDBSourceConfig.IdFieldPathOption(), "value")
            .option(MapRDBSourceConfig.CreateTableOption(), true)
            .option(MapRDBSourceConfig.BulkModeOption(), true)
            .option(MapRDBSourceConfig.SampleSizeOption(), 1000)
            .outputMode("append");
 }
from pyspark.sql import *
              
def data_stream_writer_func(df, checkpoint_dir, table_path):
  return df.selectExpr("CAST(value AS STRING) as _id") \
           .writeStream \
           .format("com.mapr.db.spark.streaming") \
           .option("checkpointLocation", checkpoint_dir) \
           .option("tablePath", table_path) \
           .option("idFieldPath", "value") \
           .option("createTable", True) \
           .option("bulkMode", True) \
           .option("sampleSize", 1000)