SparkSQL and DataFrames
The HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark leverages DataSource API (SPARK-3247) introduced in Spark-1.2.0. The connector bridges the gap between simple HBase KV store and complex relational SQL queries and enables users to perform complex data analytical work on top of HPE Ezmeral Data Fabric Database binary tables using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to interact with any other data sources, such as Hive, Orc, Parquet, JSON, and others. The HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark applies critical techniques such as partition pruning, column pruning, predicate pushdown and data locality.
To use the HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark, you need to define the Catalog for the schema mapping between HPE Ezmeral Data Fabric Database binary tables and Spark tables, prepare the data and populate the HPE Ezmeral Data Fabric Database binary table, then load the HBase DataFrame. After that, users can do integrated query and access records in a HPE Ezmeral Data Fabric Database binary table with SQL query. The following examples illustrate the basic procedure.
Define Catalog Example
The catalog
defines a mapping between HPE Ezmeral Data Fabric Database binary tables and Spark
tables. There are two critical parts of this catalog. One is the rowkey definition. The
other is the mapping between the table column in Spark and the column family and column
qualifier in HPE Ezmeral Data Fabric Database binary table. The following example defines a schema for a HPE Ezmeral Data Fabric Database
binary table with name as my_table
, row key as key
and a
number of columns (col1 - col8
). Note that the rowkey also has to be
defined in details as a column (col0
), which has a specific cf
(rowkey
).
def catalog = s"""{
|"table":{"namespace":"default", "name":"/path_to/my_table"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
Save the DataFrame Example
Data prepared by the user is a local Scala collection that has 256 HBaseRecord objects. The
sc.parallelize(data)
function distributes data to form an RDD.
toDF
returns a DataFrame. writefunction
returns a
DataFrameWriter used to write the DataFrame to external storage systems (e.g. HPE Ezmeral Data Fabric Database here).
Given a DataFrame with a specified schema catalog, the save
function
creates a HPE Ezmeral Data Fabric Database binary table with five (5) regions and saves the DataFrame inside.
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord
{
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
sc.parallelize(data).toDF.write.options(Map(
HBaseTableCatalog.tableCatalog -> catalog,
HBaseTableCatalog.newTable -> "5")
).format("org.apache.hadoop.hbase.spark")
.save()
Load the DataFrame Example
In the withCatalog
function, sqlContext
is a variable of
SQLContext
, which is the entry point for working with structured data
(rows and columns) in Spark. read
returns a DataFrameReader that can be
used to read data in a DataFrame. The option
function adds input options
for the underlying data source to the DataFrameReader. The format
function
specifies the input data source format for the DataFrameReader. The load()
function loads input as a DataFrame. The data frame df
returned by the
withCatalog
function can be used to access the HPE Ezmeral Data Fabric Database binary table, as
shown in the Language Integrated Query and SQL Query examples.
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val df = withCatalog(catalog)
Language Integrated Query Example
join
, sort
,
select
, filter
, orderBy
, and so on. In
the following example, df.filter
filters rows using the given SQL
expression. select
selects a set of columns: col0
,
col1
and
col4
.val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005" ||
$"col0" <= "row005")
.select("col0", "col1", "col4")
s.show
SQL Query Example
registerTempTable
registers df
DataFrame as a temporary
table using the table name table1
. The lifetime of this temporary table is
tied to the SQLContext that was used to create df
.
sqlContext.sqlfunction
allows the user to execute SQL
queries.df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show
Query with Different Timestamps
HBaseSparkConf
, you can set four parameters related to timestamp:TIMESTAMP
MIN_TIMESTAMP
MAX_TIMESTAMP
MAX_VERSIONS
MIN_TIMESTAMP
and MAX_TIMESTAMP
, you can query
records with different timestamps or time ranges. In the meantime, use a concrete value
instead of tsSpecified
and oldMs
in the following
examples. The first example shows how to load df
DataFrame with different
timestamps. tsSpecified
is specified by the user.
HBaseTableCatalog
defines the HBase and Relation relation schema.
writeCatalog
defines the catalog for the schema
mapping.val df = sqlContext.read
.options(Map(
HBaseTableCatalog.tableCatalog -> writeCatalog,
HBaseSparkConf.TIMESTAMP -> tsSpecified.toString)
).format("org.apache.hadoop.hbase.spark")
.load()
df
DataFrame with different time
ranges. oldMs
is specified by the
user.val df = sqlContext.read
.options(Map(
HBaseTableCatalog.tableCatalog -> writeCatalog,
HBaseSparkConf.MIN_TIMESTAMP -> "0",
HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString)
).format("org.apache.hadoop.hbase.spark")
.load()
After loading df DataFrame, users can query data.
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show