Bulk Loading Data into HBase with Spark
There are two options for bulk loading data into HBase with Spark:
- Basic bulk load functionality
 - The basic bulk load functionality works for cases where your rows have millions of columns and cases where your columns are not consolidated.
 - Thin-record bulk load option
 - The thin-record bulk load option with Spark is designed for tables that have fewer then 10,000 columns per row. The advantage of this option is higher throughput and less overall load on the Spark shuffle operation.
 
Both implementations work more or less like the MapReduce bulk load process. A partitioner partitions the RowKeys based on region splits, and the RowKeys are sent to the reducers in order, so that HFiles can be written directly from the reduce phase.
In Spark terms, the bulk load is implemented around a
        SparkrepartitionAndSortWithinPartitions followed by a Spark
        foreachPartition. Here is an example of using the basic bulk load
      functionality:
Bulk Loading Example
:paste mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes._
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.{HBaseAdmin, HConnectionManager}
 val tableName = "table1"
    val stagingFolder = "/home/mapr"
    val columnFamily1 = "cf1"
    @transient val conf = HBaseConfiguration.create()
    val hbaseContext = new HBaseContext(sc, conf)
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
    conf.setInt("hbase.zookeeper.property.clientPort", 5181)
    val rdd = sc.parallelize(Array(
      (toBytes("1"), (toBytes(columnFamily1), toBytes("a"), toBytes("foo1"))),
      (toBytes("3"), (toBytes(columnFamily1), toBytes("b"), toBytes("foo2.b")))
    ))
    rdd.hbaseBulkLoad(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1
        val family: Array[Byte] = t._2._1
        val qualifier = t._2._2
        val value: Array[Byte] = t._2._3
        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
        Seq((keyFamilyQualifier, value)).iterator
      },
      stagingFolder)
    val connection = HConnectionManager.createConnection(conf)
    val table = connection.getTable(TableName.valueOf(tableName))
    val load = new LoadIncrementalHFiles(conf)
    load.doBulkLoad(
      new Path(stagingFolder),
      connection.getAdmin,
      table,
      connection.getRegionLocator(TableName.valueOf(tableName)))
    Required Parameters for Bulk Loading with Spark
hbaseBulkLoad function takes three required parameters:- The name of the table you intend to bulk load to.
 - A function that converts a record in the RDD to a tuple key-value pair, with the tuple
            key being a 
KeyFamilyQualiferobject and the value being the cell value. TheKeyFamilyQualiferobject holds the RowKey, Column Family, and Column Qualifier. The shuffle partitions on the RowKey but sorts by all three values. - The temporary path for the HFile to be written out to. Following the Spark bulk load
            command, use the HBase
LoadIncrementalHFilesobject to load the newly created HFiles into HBase. 
Additional Parameters for Bulk Loading with Spark
hbaseBulkLoad:- Max file size of the HFiles
 - A flag to exclude HFiles from compactions
 - Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding
 
:paste mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.spark.sql.SparkSession
  val tableName = "table2"
    val stagingFolder = "/home/mapr"
    val columnFamily1 = "cf1"
    val sc = spark.sparkContext
    @transient val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
    conf.setInt("hbase.zookeeper.property.clientPort", 5181)
    val hbaseContext = new HBaseContext(sc, conf)
    val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"),
        (Bytes.toBytes(columnFamily1),
          Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      (Bytes.toBytes("3"),
        (Bytes.toBytes(columnFamily1),
          Bytes.toBytes("b"),
          Bytes.toBytes("foo2.b")))))
    val familyHBaseWriterOptions =
      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
    val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
    familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
    rdd.hbaseBulkLoad(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1
        val family:Array[Byte] = t._2._1
        val qualifier = t._2._2
        val value = t._2._3
        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
        Seq((keyFamilyQualifier, value)).iterator
      },
      stagingFolder,
      familyHBaseWriterOptions,
      compactionExclude = false,
      HConstants.DEFAULT_MAX_FILE_SIZE)
    val connection = HConnectionManager.createConnection(conf)
    val table = connection.getTable(TableName.valueOf(tableName))
    val load = new LoadIncrementalHFiles(conf)
    load.doBulkLoad(new Path(stagingFolder),
      connection.getAdmin, table, connection.getRegionLocator(TableName.valueOf(tableName)))
    Thin-Record Bulk Load Example
:paste
        mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.spark.{HBaseContext, _}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
val tableName = "table3"
    val stagingFolder = "/home/mapr"
    val columnFamily1 = "cf1"
    @transient val conf = HBaseConfiguration.create()
    val hbaseContext = new HBaseContext(sc, conf)
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
    conf.setInt("hbase.zookeeper.property.clientPort", 5181)
    val rdd = sc.parallelize(Array(
      ("1",  List(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      ("3", List(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))))
    rdd.hbaseBulkLoadThinRows(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1
        val familyQualifiersValues = new FamiliesQualifiersValues
        val q = t._2
        val family:Array[Byte] = q.head
        val qualifier = q(1)
        val value:Array[Byte] = q(2)
        println(s"family: $family")
        println(s"qualifier: $qualifier")
        println(s"value: $value")
        familyQualifiersValues +=(family, qualifier, value)
        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)}, stagingFolder, new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude = false, 20)
    val connection = HConnectionManager.createConnection(conf)
    val table = connection.getTable(TableName.valueOf(tableName))
    val load = new LoadIncrementalHFiles(conf)
    load.doBulkLoad(
      new Path(stagingFolder),
      connection.getAdmin,
      table,
      connection.getRegionLocator(TableName.valueOf(tableName)))The big difference in using bulk load for thin rows is that the function returns a tuple
        with the first value being the RowKey and the second value being an object of
          FamiliesQualifiersValues. FamiliesQualifiersValues
        contains all the values for this row for all column families.