Native Avro Support
The HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark supports different data formats such as Avro, JSON, and others. The following use case shows how Spark supports Avro. You can persist the Avro record into HPE Ezmeral Data Fabric Database binary tables directly. Internally, the Avro schema is converted to a native Spark Catalyst data type automatically. Note that both key-value parts in a HPE Ezmeral Data Fabric Database binary table can be defined in Avro format.
- Define the
catalog
for schema mapping.catalog
is a schema for a HPE Ezmeral Data Fabric Database binary table namedAvrotable
, a row key askey
, and one columncol1
. The rowkey also has to be defined in details as a column (col0
), which has a specificcf
(rowkey
).def catalog = s"""{ |"table":{"namespace":"default", "name":"/path_to/avro_table"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1", "col":"col1", "type":"binary"} |} |}""".stripMargin
- Prepare the data.
schemaString
is defined first. Then it is parsed to getavroSchema
.avroSchema
is used to generateAvroHBaseRecord
.data prepared by users
is a local Scala collection that has 256 AvroHBaseRecord objects.object AvroHBaseRecord { val schemaString = s"""{"namespace": "example.avro", | "type": "record", "name": "User", | "fields": [ | {"name": "name", "type": "string"}, | {"name": "favorite_number", "type": ["int", "null"]}, | {"name": "favorite_color", "type": ["string", "null"]}, | {"name": "favorite_array", "type": {"type": "array", "items": "string"}}, | {"name": "favorite_map", "type": {"type": "map", "values": "int"}} | ] }""".stripMargin val avroSchema: Schema = { val p = new Schema.Parser p.parse(schemaString) } def apply(i: Int): AvroHBaseRecord = { val user = new GenericData.Record(avroSchema); user.put("name", s"name${"%03d".format(i)}") user.put("favorite_number", i) user.put("favorite_color", s"color${"%03d".format(i)}") val favoriteArray = new GenericData.Array[String]( 2, avroSchema.getField("favorite_array").schema()) favoriteArray.add(s"number${i}") favoriteArray.add(s"number${i+1}") user.put("favorite_array", favoriteArray) import collection.JavaConverters._ val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava user.put("favorite_map", favoriteMap) val avroByte = AvroSedes.serialize(user, avroSchema) AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte) } } val data = (0 to 255).map { i => AvroHBaseRecord(i) }
- Save the DataFrame. Given a data frame with the specified schema catalog, the following
example creates a HPE Ezmeral Data Fabric Database binary table with five (5) regions and saves the data frame
inside.
sc.parallelize(data).toDF.write.options( Map( HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5") ).format("org.apache.spark.sql.execution.datasources.hbase") .save()
- Load the DataFrame. In the
withCatalog
function,read
returns a DataFrameReader that can be used to read data in as a DataFrame. Theoption
function adds input options for the underlying data source to the DataFrameReader. There are two options: one is to setavroSchema
asAvroHBaseRecord.schemaString
. The other option is to setHBaseTableCatalog.tableCatalog
asavroCatalog
. Theload()
function loads input in as a DataFrame. The date frame df returned by thewithCatalog
function can be used to access the HPE Ezmeral Data Fabric Database binary table.def avroCatalog = s"""{ |"table":{"namespace":"default", "name":"avrotable"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"} |} |}""".stripMargin def withCatalog(cat: String): DataFrame = { sqlContext .read .options(Map( "avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog) ).format("org.apache.spark.sql.execution.datasources.hbase") .load() } val df = withCatalog(catalog)
- Query data using SQL. After loading
df DataFrame
, you can query data.registerTempTable
registersdf DataFrame
as a temporary table using the table nameavrotable
. ThesqlContext.sql
function allows you to execute SQL queries.df.registerTempTable("avrotable") val c = sqlContext.sql("select count(1) from avrotable"