SparkSQL and DataFrames
The HPE Data Fabric Database Binary Connector for Apache Spark leverages DataSource API (SPARK-3247) introduced in Spark-1.2.0. The connector bridges the gap between simple HBase KV store and complex relational SQL queries and enables users to perform complex data analytical work on top of HPE Data Fabric Database binary tables using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to interact with any other data sources, such as Hive, Orc, Parquet, JSON, and others. The HPE Data Fabric Database Binary Connector for Apache Spark applies critical techniques such as partition pruning, column pruning, predicate pushdown and data locality.
To use the HPE Data Fabric Database Binary Connector for Apache Spark, you need to define the Catalog for the schema mapping between HPE Data Fabric Database binary tables and Spark tables, prepare the data and populate the HPE Data Fabric Database binary table, then load the HBase DataFrame. After that, users can do integrated query and access records in a HPE Data Fabric Database binary table with SQL query. The following examples illustrate the basic procedure.
Define Catalog Example
The catalog defines a mapping between HPE Data Fabric Database binary tables and Spark
tables. There are two critical parts of this catalog. One is the rowkey definition. The
other is the mapping between the table column in Spark and the column family and column
qualifier in HPE Data Fabric Database binary table. The following example defines a schema for a HPE Data Fabric Database
binary table with name as my_table, row key as key and a
number of columns (col1 - col8). Note that the rowkey also has to be
defined in details as a column (col0), which has a specific cf
(rowkey).
def catalog = s"""{
|"table":{"namespace":"default", "name":"/path_to/my_table"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
Save the DataFrame Example
Data prepared by the user is a local Scala collection that has 256 HBaseRecord objects. The
sc.parallelize(data) function distributes data to form an RDD.
toDF returns a DataFrame. writefunction returns a
DataFrameWriter used to write the DataFrame to external storage systems (e.g. HPE Data Fabric Database here).
Given a DataFrame with a specified schema catalog, the save function
creates a HPE Data Fabric Database binary table with five (5) regions and saves the DataFrame inside.
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord
{
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
sc.parallelize(data).toDF.write.options(Map(
HBaseTableCatalog.tableCatalog -> catalog,
HBaseTableCatalog.newTable -> "5")
).format("org.apache.hadoop.hbase.spark")
.save()
Load the DataFrame Example
In the withCatalog function, sqlContext is a variable of
SQLContext, which is the entry point for working with structured data
(rows and columns) in Spark. read returns a DataFrameReader that can be
used to read data in a DataFrame. The option function adds input options
for the underlying data source to the DataFrameReader. The format function
specifies the input data source format for the DataFrameReader. The load()
function loads input as a DataFrame. The data frame df returned by the
withCatalog function can be used to access the HPE Data Fabric Database binary table, as
shown in the Language Integrated Query and SQL Query examples.
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val df = withCatalog(catalog)
Language Integrated Query Example
join, sort,
select, filter, orderBy, and so on. In
the following example, df.filter filters rows using the given SQL
expression. select selects a set of columns: col0,
col1 and
col4.val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005" ||
$"col0" <= "row005")
.select("col0", "col1", "col4")
s.show
SQL Query Example
registerTempTable registers df DataFrame as a temporary
table using the table name table1. The lifetime of this temporary table is
tied to the SQLContext that was used to create df.
sqlContext.sqlfunction allows the user to execute SQL
queries.df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show
Query with Different Timestamps
HBaseSparkConf, you can set four parameters related to timestamp:TIMESTAMPMIN_TIMESTAMPMAX_TIMESTAMPMAX_VERSIONS
MIN_TIMESTAMP and MAX_TIMESTAMP, you can query
records with different timestamps or time ranges. In the meantime, use a concrete value
instead of tsSpecified and oldMs in the following
examples. The first example shows how to load df DataFrame with different
timestamps. tsSpecified is specified by the user.
HBaseTableCatalog defines the HBase and Relation relation schema.
writeCatalog defines the catalog for the schema
mapping.val df = sqlContext.read
.options(Map(
HBaseTableCatalog.tableCatalog -> writeCatalog,
HBaseSparkConf.TIMESTAMP -> tsSpecified.toString)
).format("org.apache.hadoop.hbase.spark")
.load()
df DataFrame with different time
ranges. oldMs is specified by the
user.val df = sqlContext.read
.options(Map(
HBaseTableCatalog.tableCatalog -> writeCatalog,
HBaseSparkConf.MIN_TIMESTAMP -> "0",
HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString)
).format("org.apache.hadoop.hbase.spark")
.load()
After loading df DataFrame, users can query data.
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show