package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;

import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;

public class HCatalogMRTest {

    public static class Map extends
        Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable> {

            int keyv;

            @Override
                protected void map(
                        WritableComparable key,
                        HCatRecord value,
                        org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
                        IntWritable, IntWritable>.Context context)
                throws IOException, InterruptedException {
                keyv = (Integer) value.get(2);
                context.write(new IntWritable(keyv), new IntWritable(1));
                        }
        }

    public static class Reduce extends Reducer<IntWritable, IntWritable,
           WritableComparable, HCatRecord> {

               @Override
                   protected void reduce(
                           IntWritable key,
                           java.lang.Iterable<IntWritable> values,
                           org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord>.Context context)
                   throws IOException, InterruptedException {
                   int sum = 0;
                   Iterator<IntWritable> iter = values.iterator();
                   while (iter.hasNext()) {
                       sum++;
                       iter.next();
                   }
                   HCatRecord record = new DefaultHCatRecord(2);
                   record.set(0, key.get());
                   record.set(1, sum);

                   context.write(null, record);
                           }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();

        // Get the input and output table names as arguments
        String inputTableName = args[0];
        String outputTableName = args[1];
        // Assume the default database
        String dbName = null;

        Job job = new Job(conf, "HCatalogMRTest");
        HCatInputFormat.setInput(job.getConfiguration(), dbName, inputTableName);
        HCatSchema si = HCatInputFormat.getTableSchema(job.getConfiguration());
        System.err.println("INFO: output schema explicitly set for writing:"+ si);
        // initialize HCatOutputFormat

        job.setInputFormatClass(HCatInputFormat.class);
        job.setJarByClass(HCatalogMRTest.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(WritableComparable.class);
        job.setOutputValueClass(DefaultHCatRecord.class);
        HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
        HCatSchema s = HCatOutputFormat.getTableSchema(job.getConfiguration());
        System.err.println("INFO: output schema explicitly set for writing:" + s);
        HCatOutputFormat.setSchema(job, s);
        job.setOutputFormatClass(HCatOutputFormat.class);
        job.waitForCompletion(true);
    }
}
