SparkSQL and DataFrames

The HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark leverages DataSource API (SPARK-3247) introduced in Spark-1.2.0. The connector bridges the gap between simple HBase KV store and complex relational SQL queries and enables users to perform complex data analytical work on top of HPE Ezmeral Data Fabric Database binary tables using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to interact with any other data sources, such as Hive, Orc, Parquet, JSON, and others. The HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark applies critical techniques such as partition pruning, column pruning, predicate pushdown and data locality.

To use the HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark, you need to define the Catalog for the schema mapping between HPE Ezmeral Data Fabric Database binary tables and Spark tables, prepare the data and populate the HPE Ezmeral Data Fabric Database binary table, then load the HBase DataFrame. After that, users can do integrated query and access records in a HPE Ezmeral Data Fabric Database binary table with SQL query. The following examples illustrate the basic procedure.

Define Catalog Example

The catalog defines a mapping between HPE Ezmeral Data Fabric Database binary tables and Spark tables. There are two critical parts of this catalog. One is the rowkey definition. The other is the mapping between the table column in Spark and the column family and column qualifier in HPE Ezmeral Data Fabric Database binary table. The following example defines a schema for a HPE Ezmeral Data Fabric Database binary table with name as my_table, row key as key and a number of columns (col1 - col8). Note that the rowkey also has to be defined in details as a column (col0), which has a specific cf (rowkey).

def catalog = s"""{
       |"table":{"namespace":"default", "name":"/path_to/my_table"},
       |"rowkey":"key",
       |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
         |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
         |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
         |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
       |}
     |}""".stripMargin

Save the DataFrame Example

Data prepared by the user is a local Scala collection that has 256 HBaseRecord objects. The sc.parallelize(data) function distributes data to form an RDD. toDF returns a DataFrame. writefunction returns a DataFrameWriter used to write the DataFrame to external storage systems (e.g. HPE Ezmeral Data Fabric Database here). Given a DataFrame with a specified schema catalog, the save function creates a HPE Ezmeral Data Fabric Database binary table with five (5) regions and saves the DataFrame inside.

case class HBaseRecord(
   col0: String,
   col1: Boolean,
   col2: Double,
   col3: Float,
   col4: Int,       
   col5: Long,
   col6: Short,
   col7: String,
   col8: Byte)

object HBaseRecord
{                                                                                                             
   def apply(i: Int, t: String): HBaseRecord = {
      val s = s"""row${"%03d".format(i)}"""       
      HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,  
      i,
      i.toLong,
      i.toShort,  
      s"String$i: $t",      
      i.toByte)
  }
}

val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}

sc.parallelize(data).toDF.write.options(Map(
	HBaseTableCatalog.tableCatalog -> catalog,
	HBaseTableCatalog.newTable -> "5")
).format("org.apache.hadoop.hbase.spark")
 .save()

Load the DataFrame Example

In the withCatalog function, sqlContext is a variable of SQLContext, which is the entry point for working with structured data (rows and columns) in Spark. read returns a DataFrameReader that can be used to read data in a DataFrame. The option function adds input options for the underlying data source to the DataFrameReader. The format function specifies the input data source format for the DataFrameReader. The load() function loads input as a DataFrame. The data frame df returned by the withCatalog function can be used to access the HPE Ezmeral Data Fabric Database binary table, as shown in the Language Integrated Query and SQL Query examples.

def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.hadoop.hbase.spark")
  .load()
}
val df = withCatalog(catalog)

Language Integrated Query Example

DataFrame can do various operations, such as join, sort, select, filter, orderBy, and so on. In the following example, df.filter filters rows using the given SQL expression. select selects a set of columns: col0, col1 and col4.
val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
  $"col0" === "row005" ||
  $"col0" <= "row005")
  .select("col0", "col1", "col4")
s.show

SQL Query Example

registerTempTable registers df DataFrame as a temporary table using the table name table1. The lifetime of this temporary table is tied to the SQLContext that was used to create df. sqlContext.sqlfunction allows the user to execute SQL queries.
df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show

Query with Different Timestamps

In HBaseSparkConf, you can set four parameters related to timestamp:
  • TIMESTAMP
  • MIN_TIMESTAMP
  • MAX_TIMESTAMP
  • MAX_VERSIONS
With MIN_TIMESTAMP and MAX_TIMESTAMP, you can query records with different timestamps or time ranges. In the meantime, use a concrete value instead of tsSpecified and oldMs in the following examples. The first example shows how to load df DataFrame with different timestamps. tsSpecified is specified by the user. HBaseTableCatalog defines the HBase and Relation relation schema. writeCatalog defines the catalog for the schema mapping.
val df = sqlContext.read
      .options(Map(
		HBaseTableCatalog.tableCatalog -> writeCatalog,
		HBaseSparkConf.TIMESTAMP -> tsSpecified.toString)
	).format("org.apache.hadoop.hbase.spark")
      .load()
The following example shows how to load df DataFrame with different time ranges. oldMs is specified by the user.
val df = sqlContext.read
      .options(Map(
		HBaseTableCatalog.tableCatalog -> writeCatalog,
		HBaseSparkConf.MIN_TIMESTAMP -> "0",
		HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString)
	).format("org.apache.hadoop.hbase.spark")
      .load()
After loading df DataFrame, users can query data.
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show