HPE Ezmeral Data Fabric Database JSON MapReduce: Sample App
This sample application reads records (JSON documents) from a JSON table and inserts new documents into another JSON table.
After reading records from a JSON table, the application aggregates data within those records, creates new JSON documents that contain the aggregated records, and then inserts the new documents into another JSON table. Each record contains the name of an author and the name of a book that the author has written.
{
"_id" : <string or binary>,
"authorid":"<string>",
"name":"<string>",
"book":{
"id":<int>,
"title":"<string>"
}
}
{
"_id" : <string or binary>,
"authorid":"<string>",
"book":{
[
"title":"<string>",
"title":"<string>",
...
]
}
}
Prerequisites
- Ensure that your user ID has the
-readAce
and-writeAce
privileges on the volumes where you plan to create the source and destination tables. - Create the source JSON table. You can create the source table and populate it with
sample records by running sample_dataset.txt from the
mapr dbshell
utility.$ mapr dbshell < sample_dataset.txt
- Create the destination JSON table. A simple way to create this table is to use the
create
command in the HPE Ezmeral Data Fabric Database Shell (JSON Tables) utility.
Compiling and Running
javac -cp <classpath> <java source file(s)>
java -cp <classpath>:. -Djava.library.path=/opt/mapr/lib <main class> <command line arguments>
To run the application, supply the paths and names of the source and destination tables as arguments:
CombineBookList <source_table> <destination_table>
Code Walkthrough
private static Job createSubmittableJob(Configuration conf, String[] otherArgs)
throws IOException {
srcTable = otherArgs[0];
destTable = otherArgs[1];
Job job = new Job(conf, NAME + "_" + destTable);
job.setJarByClass(CombineBookList.class);
MapRDBMapReduceUtil.configureTableInputFormat(job, srcTable);
job.setMapperClass(CombineBookListMapper.class);
MapRDBMapReduceUtil.setMapOutputKeyValueClass(job);
MapRDBMapReduceUtil.configureTableOutputFormat(job, destTable);
job.setReducerClass(CombineBookListReducer.class);
MapRDBMapReduceUtil.setOutputKeyValueClass(job);
job.setNumReduceTasks(1);
return job;
}
The createSubmittableJob()
method uses methods that are in the MapRDBMapReduceUtil
class to perform the
following tasks:
- Set the input format to the default table input format
- You can call the
configureTableInputFormat()
method, passing in the job and also passing in the path and name of the source table:
The default behavior is to do the following:MapRDBMapReduceUtil.configureTableInputFormat(job, srcTable);
- Set the serialization class for
Document
andValue
objects. These interfaces are part of the OJAI (Open JSON Application Interface) API. - Set the field
INPUT_TABLE
inTableInputFormat
to the path and name of the source table, and pass this value to the configuration for the MapReduce application. - Set the input format class for the job to
TableInputFormat
.
- Set the serialization class for
- Set the type for keys and values that are output from the mapper
- You can call the
setMapOutputKeyValueClass()
method to use the default type for keys and values:MapRDBMapReduceUtil.setMapOutputKeyValueClass(job);
- Set the output format to the default table output format
- You can call the
configureTableOutputFormat()
method, passing in the job and also passing in the path and name of the destination table, which must already exist at runtime:
The default behavior is to do the following:MapRDBMapReduceUtil.configureTableOutputFormat(job, destTable);
- Set the field
OUTPUT_TABLE
inTableOutputFormat
to the path and name of the destination table, and pass this value to the configuration for the MapReduce applications. - Set the output format class for the job to
TableOutputFormat
.
- Set the field
- Set the type of the keys and values that are output from the reducer
- You can call the
setOutputKeyValueClass()
method to use the default type for keys and values:MapRDBMapReduceUtil.setOutputKeyValueClass(job);
The map()
method in the mapper class
CombineBookListMapper
takes the value of the _id
field
in a document as a key and the JSON document with that _id
field value as a
Document
. The mapper does nothing with the Value
object.
For each record
, the mapper writes the value of the
authorid
field and the full JSON document itself to the context.
public static class CombineBookListMapper extends Mapper<Value, Document, Value, Document> {
@Override
public void map(Value key, Document record, Context context) throws IOException, InterruptedException {
context.write(record.getValue("authorid"), record);
}
}
Both the Value
and Document
interfaces are part of the
OJAI (Open JSON Application Interface) API. The javadoc for the OJAI API is here.
The reduce()
method in the reducer class
CombineBookListReducer
takes the map output key, which is the value of
the authorid
field, and the map output value, which is an iterator of
Document
objects that each contain a full record. For each author ID, the
reducer creates a document. For each document in the iterator, the reducer extracts the
value of the book
field and adds that value to the list
books
within a new JSON document.
public static class CombineBookListReducer extends Reducer<Value, Document, Value, Document> {
@Override
public void reduce(Value key, Iterable<Document> values,
Context context) throws IOException, InterruptedException {
Document d = MapRDB.newDocument();
List<Document> books = new ArrayList<Document>();
for (Document b : values) {
books.add((Document)b.getValue("book"));
}
d.setId(key);
d.set("books", books);
context.write(key, d);
}
}
The MapRDB class is part of the HPE Ezmeral Data Fabric Database JSON API, not the HPE Ezmeral Data Fabric Database JSON MapReduce API.