Using the Custom Partitioner with the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark
In any distributed computing system, partitioning data is crucial to achieve the best performance. Apache Spark provides a mechanism to register a custom partitioner for partitioning the pipeline. The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark includes a custom partitioner you can use to optimally partition data in an RDD.
The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark's custom partitioner takes the following classes as keys:
String
ByteBuffer
(as serializableByteBuffer
)
You can register this custom partitioner with either the partitionBy
function or the repartitionAndSortWithinPartitions
function.
The connector supports two versions of the custom partitioner. One version takes a HPE Ezmeral Data Fabric Database
JSON table as an input. The partition information of the table is used to partition the data,
so the saveToMapRDB
call can use a bulkInsert
to store the
data. The bulkInsert
option requires that you have the data already sorted on
the _id
key.
The other version of the custom partitioner takes an array of splits as an input.
Specifying tablename for the Partitioner
If you already have a table that has been created and partitioned based on a set of keys,
you can can specify that the RDD be partitioned in the same way (using the same set of
keys). In the following example, /srctable
is provided as a reference
partitioner for /dsttable
:
sc.loadFromMapRDB("/srctable")
.keyBy(doc => doc._id[String])
.repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String]("/dsttable"))
.saveToMapRDB("/dsttable", createTable = false, bulkInsert = true)
Specifying a String Seq as an Array of Splits
In the following example, the first line creates an array of splits as
id1
, id2
... id9
. The rest of the example
splits the RDD based on the array of splits:
val dstSplits: Array[String] = (1 to 9 by 3).map("id" + _).toArray
val partitionRDD = sc.loadFromMapRDB("/srctable")
.keyBy(doc => doc._id[String])
.repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String](dstSplits))
.saveToMapRDB("/dsttable", createTable = true, bulkInsert = true)
Specifying a ByteBuffer Seq as an Array of Splits
Suppose you have an array of byte buffers to use as the array of splits for the partitioner. You must convert the byte buffers to serializable byte buffers first:
// Converting bytebuffer to serializable bytebuffer
val dstSplits = arrayOfByteBuffer.map(x => MapRDBSpark.serializableBinaryValue(x))
sc.loadFromMapRDB("/srctable")
//KeyBy serializable bytebuffer
.keyBy(doc => doc.getBinarySerializable(binaryField))
.repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner(dstSplits))
.saveToMapRDB("/dsttable", createTable = true, bulkInsert = true)
Specifying tablename for the Partitioner with ByteBuffer as Id Fields
Suppose you have a table with keys that are binary or ByteBuffer
, and you
have an RDD with some rows and some values. You can repartition the RDD based on the
partitions of the table. The following example reads the document from
/srctable
, but you could provide any table. In the second line, the
example specifies a keyBy
call on an ID that is binary serializable. In the
last line, /dsttable
is the RDD that has a key of serializable
ByteBuffers
:
sc.loadFromMapRDB("/srctable")
.keyBy(doc => doc.getIdBinarySerializable())
.repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[ByteBuffer]("/dsttable"))
PairedRDD
on which the
partitioning is specified. If the IDs are serializable bytebuffers, specify
ByteBuffer
. Otherwise, specify String
.After the data is partitioned with the custom partitioner, all the downstream transformations should be non-partition-changing transformations. Here is the code for passing on partitioner for an RDD:
user_profiles.repartitionAndSortWithinPartitions
(MapRDBSpark.newPartitioner[String](<table-name>))
Or you can use the partitionBy
function on the RDD:
user_profiles.partitionBy(MapRDBSpark.newPartitioner[String](<table-name>))
The key of the data for this partitioner should be of the same type as that of the key of the table name. This partitioner yields a single partition if the table supplied to it is not pre-split. The number of partitions is calculated based on the table’s existing tablet information.
For a table created with the bulkInsert
option set to
true
, one of the following applies:
- If the table is pre-split, then the resulting partitions can be > 1.
- If the table is no-split, then the resulting partitions will be 1 if no partition information is available from the RDD lineage.