SparkSQL and DataFrames
The HPE Data Fabric Database Binary Connector for Apache Spark leverages DataSource API (SPARK-3247) introduced in Spark-1.2.0. The connector bridges the gap between simple HBase KV store and complex relational SQL queries and enables users to perform complex data analytical work on top of HPE Data Fabric Database binary tables using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to interact with any other data sources, such as Hive, Orc, Parquet, JSON, and others. The HPE Data Fabric Database Binary Connector for Apache Spark applies critical techniques such as partition pruning, column pruning, predicate pushdown and data locality.
To use the HPE Data Fabric Database Binary Connector for Apache Spark, you need to define the Catalog for the schema mapping between HPE Data Fabric Database binary tables and Spark tables, prepare the data and populate the HPE Data Fabric Database binary table, then load the HBase DataFrame. After that, users can do integrated query and access records in a HPE Data Fabric Database binary table with SQL query. The following examples illustrate the basic procedure.
Define Catalog Example
The catalog defines a mapping between HPE Data Fabric Database binary tables and Spark
        tables. There are two critical parts of this catalog. One is the rowkey definition. The
        other is the mapping between the table column in Spark and the column family and column
        qualifier in HPE Data Fabric Database binary table. The following example defines a schema for a HPE Data Fabric Database
        binary table with name as my_table, row key as key and a
        number of columns (col1 - col8). Note that the rowkey also has to be
        defined in details as a column (col0), which has a specific cf
          (rowkey).
def catalog = s"""{
       |"table":{"namespace":"default", "name":"/path_to/my_table"},
       |"rowkey":"key",
       |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
         |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
         |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
         |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
       |}
     |}""".stripMargin
    Save the DataFrame Example
Data prepared by the user is a local Scala collection that has 256 HBaseRecord objects. The
          sc.parallelize(data) function distributes data to form an RDD.
          toDF returns a DataFrame. writefunction returns a
        DataFrameWriter used to write the DataFrame to external storage systems (e.g. HPE Data Fabric Database here).
        Given a DataFrame with a specified schema catalog, the save function
        creates a HPE Data Fabric Database binary table with five (5) regions and saves the DataFrame inside.
case class HBaseRecord(
   col0: String,
   col1: Boolean,
   col2: Double,
   col3: Float,
   col4: Int,       
   col5: Long,
   col6: Short,
   col7: String,
   col8: Byte)
object HBaseRecord
{                                                                                                             
   def apply(i: Int, t: String): HBaseRecord = {
      val s = s"""row${"%03d".format(i)}"""       
      HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,  
      i,
      i.toLong,
      i.toShort,  
      s"String$i: $t",      
      i.toByte)
  }
}
val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}
sc.parallelize(data).toDF.write.options(Map(
	HBaseTableCatalog.tableCatalog -> catalog,
	HBaseTableCatalog.newTable -> "5")
).format("org.apache.hadoop.hbase.spark")
 .save()
    Load the DataFrame Example
In the withCatalog function, sqlContext is a variable of
          SQLContext, which is the entry point for working with structured data
        (rows and columns) in Spark. read returns a DataFrameReader that can be
        used to read data in a DataFrame. The option function adds input options
        for the underlying data source to the DataFrameReader. The format function
        specifies the input data source format for the DataFrameReader. The load()
        function loads input as a DataFrame. The data frame df returned by the
          withCatalog function can be used to access the HPE Data Fabric Database binary table, as
        shown in the Language Integrated Query and SQL Query examples. 
def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.hadoop.hbase.spark")
  .load()
}
val df = withCatalog(catalog)
    Language Integrated Query Example
join, sort,
          select, filter, orderBy, and so on. In
        the following example, df.filter filters rows using the given SQL
        expression. select selects a set of columns: col0,
          col1 and
        col4.val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
  $"col0" === "row005" ||
  $"col0" <= "row005")
  .select("col0", "col1", "col4")
s.show
SQL Query Example
registerTempTable registers df DataFrame as a temporary
        table using the table name table1. The lifetime of this temporary table is
        tied to the SQLContext that was used to create df.
          sqlContext.sqlfunction allows the user to execute SQL
        queries.df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show
Query with Different Timestamps
HBaseSparkConf, you can set four parameters related to timestamp:TIMESTAMPMIN_TIMESTAMPMAX_TIMESTAMPMAX_VERSIONS
MIN_TIMESTAMP and MAX_TIMESTAMP, you can query
        records with different timestamps or time ranges. In the meantime, use a concrete value
        instead of tsSpecified and oldMs in the following
        examples. The first example shows how to load df DataFrame with different
        timestamps. tsSpecified is specified by the user.
          HBaseTableCatalog defines the HBase and Relation relation schema.
          writeCatalog defines the catalog for the schema
        mapping.val df = sqlContext.read
      .options(Map(
		HBaseTableCatalog.tableCatalog -> writeCatalog,
		HBaseSparkConf.TIMESTAMP -> tsSpecified.toString)
	).format("org.apache.hadoop.hbase.spark")
      .load()
df DataFrame with different time
        ranges. oldMs is specified by the
        user.val df = sqlContext.read
      .options(Map(
		HBaseTableCatalog.tableCatalog -> writeCatalog,
		HBaseSparkConf.MIN_TIMESTAMP -> "0",
		HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString)
	).format("org.apache.hadoop.hbase.spark")
      .load()
After loading df DataFrame, users can query data.
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show