Bulk Loading Data into HBase with Spark
There are two options for bulk loading data into HBase with Spark:
- Basic bulk load functionality
- The basic bulk load functionality works for cases where your rows have millions of columns and cases where your columns are not consolidated.
- Thin-record bulk load option
- The thin-record bulk load option with Spark is designed for tables that have fewer then 10,000 columns per row. The advantage of this option is higher throughput and less overall load on the Spark shuffle operation.
Both implementations work more or less like the MapReduce bulk load process. A partitioner partitions the RowKeys based on region splits, and the RowKeys are sent to the reducers in order, so that HFiles can be written directly from the reduce phase.
In Spark terms, the bulk load is implemented around a
SparkrepartitionAndSortWithinPartitions
followed by a Spark
foreachPartition
. Here is an example of using the basic bulk load
functionality:
Bulk Loading Example
:paste
mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes._
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.{HBaseAdmin, HConnectionManager}
val tableName = "table1"
val stagingFolder = "/home/mapr"
val columnFamily1 = "cf1"
@transient val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
conf.setInt("hbase.zookeeper.property.clientPort", 5181)
val rdd = sc.parallelize(Array(
(toBytes("1"), (toBytes(columnFamily1), toBytes("a"), toBytes("foo1"))),
(toBytes("3"), (toBytes(columnFamily1), toBytes("b"), toBytes("foo2.b")))
))
rdd.hbaseBulkLoad(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family: Array[Byte] = t._2._1
val qualifier = t._2._2
val value: Array[Byte] = t._2._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder)
val connection = HConnectionManager.createConnection(conf)
val table = connection.getTable(TableName.valueOf(tableName))
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(
new Path(stagingFolder),
connection.getAdmin,
table,
connection.getRegionLocator(TableName.valueOf(tableName)))
Required Parameters for Bulk Loading with Spark
hbaseBulkLoad
function takes three required parameters:- The name of the table you intend to bulk load to.
- A function that converts a record in the RDD to a tuple key-value pair, with the tuple
key being a
KeyFamilyQualifer
object and the value being the cell value. TheKeyFamilyQualifer
object holds the RowKey, Column Family, and Column Qualifier. The shuffle partitions on the RowKey but sorts by all three values. - The temporary path for the HFile to be written out to. Following the Spark bulk load
command, use the HBase
LoadIncrementalHFiles
object to load the newly created HFiles into HBase.
Additional Parameters for Bulk Loading with Spark
hbaseBulkLoad
:- Max file size of the HFiles
- A flag to exclude HFiles from compactions
- Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding
:paste
mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.spark.sql.SparkSession
val tableName = "table2"
val stagingFolder = "/home/mapr"
val columnFamily1 = "cf1"
val sc = spark.sparkContext
@transient val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
conf.setInt("hbase.zookeeper.property.clientPort", 5181)
val hbaseContext = new HBaseContext(sc, conf)
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1),
Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1),
Bytes.toBytes("b"),
Bytes.toBytes("foo2.b")))))
val familyHBaseWriterOptions =
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
rdd.hbaseBulkLoad(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2._1
val qualifier = t._2._2
val value = t._2._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder,
familyHBaseWriterOptions,
compactionExclude = false,
HConstants.DEFAULT_MAX_FILE_SIZE)
val connection = HConnectionManager.createConnection(conf)
val table = connection.getTable(TableName.valueOf(tableName))
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(new Path(stagingFolder),
connection.getAdmin, table, connection.getRegionLocator(TableName.valueOf(tableName)))
Thin-Record Bulk Load Example
:paste
mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.spark.{HBaseContext, _}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
val tableName = "table3"
val stagingFolder = "/home/mapr"
val columnFamily1 = "cf1"
@transient val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
conf.setInt("hbase.zookeeper.property.clientPort", 5181)
val rdd = sc.parallelize(Array(
("1", List(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3", List(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))))
rdd.hbaseBulkLoadThinRows(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val familyQualifiersValues = new FamiliesQualifiersValues
val q = t._2
val family:Array[Byte] = q.head
val qualifier = q(1)
val value:Array[Byte] = q(2)
println(s"family: $family")
println(s"qualifier: $qualifier")
println(s"value: $value")
familyQualifiersValues +=(family, qualifier, value)
(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)}, stagingFolder, new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude = false, 20)
val connection = HConnectionManager.createConnection(conf)
val table = connection.getTable(TableName.valueOf(tableName))
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(
new Path(stagingFolder),
connection.getAdmin,
table,
connection.getRegionLocator(TableName.valueOf(tableName)))
The big difference in using bulk load for thin rows is that the function returns a tuple
with the first value being the RowKey and the second value being an object of
FamiliesQualifiersValues
. FamiliesQualifiersValues
contains all the values for this row for all column families.