Writing a Structured Spark Stream to HPE Ezmeral Data Fabric Database JSON Table
The example in this section writes a structured stream in Spark to HPE Ezmeral Data Fabric Database JSON table.
Example
To write a structured Spark stream to HPE Ezmeral Data Fabric Database JSON table,
use MapRDBSourceConfig.Format
for Java and Scala and
com.mapr.db.spark.streaming
for Python to format the
tablePath
, idFieldPath
, createTable
,
bulkMode
, and sampleSize
parameters.
import com.mapr.db.spark.streaming.MapRDBSourceConfig
import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
def dataStreamWriter(spark: SparkSession, df: DataFrame): DataStreamWriter[Row] = {
import spark.implicits._
df.select($"value" as "_id")
.writeStream
.format(MapRDBSourceConfig.Format)
.option(MapRDBSourceConfig.TablePathOption, "/table/path")
.option(MapRDBSourceConfig.IdFieldPathOption, "value")
.option(MapRDBSourceConfig.CreateTableOption, true)
.option(MapRDBSourceConfig.BulkModeOption, true)
.option(MapRDBSourceConfig.SampleSizeOption, 1000)
.outputMode("append")
}
import com.mapr.db.spark.streaming.MapRDBSourceConfig;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQueryException;
DataStreamWriter<Row> dataStreamWriter(Dataset<Row> df) {
return df.selectExpr("CAST(value AS STRING) as _id")
.writeStream()
.format(MapRDBSourceConfig.Format())
.option(MapRDBSourceConfig.TablePathOption(), "/table/path")
.option(MapRDBSourceConfig.IdFieldPathOption(), "value")
.option(MapRDBSourceConfig.CreateTableOption(), true)
.option(MapRDBSourceConfig.BulkModeOption(), true)
.option(MapRDBSourceConfig.SampleSizeOption(), 1000)
.outputMode("append");
}
from pyspark.sql import *
def data_stream_writer_func(df, checkpoint_dir, table_path):
return df.selectExpr("CAST(value AS STRING) as _id") \
.writeStream \
.format("com.mapr.db.spark.streaming") \
.option("checkpointLocation", checkpoint_dir) \
.option("tablePath", table_path) \
.option("idFieldPath", "value") \
.option("createTable", True) \
.option("bulkMode", True) \
.option("sampleSize", 1000)