The HCatInputFormat and HCatOutputFormat interfaces are used to read data from HDFS and after processing, write the resultant data into HDFS using MapReduce job. Let us elaborate the Input and Output format interfaces.
The HCatInputFormat is used with MapReduce jobs to read data from HCatalog-managed tables. HCatInputFormat exposes a Hadoop 0.20 MapReduce API for reading data as if it had been published to a table.
Sr.No. | Method Name & Description |
---|---|
1 | public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException Set inputs to use for the job. It queries the metastore with the given input specification and serializes matching partitions into the job configuration for MapReduce tasks. |
2 | public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException Set inputs to use for the job. It queries the metastore with the given input specification and serializes matching partitions into the job configuration for MapReduce tasks. |
3 | public HCatInputFormat setFilter(String filter)throws IOException Set a filter on the input table. |
4 | public HCatInputFormat setProperties(Properties properties) throws IOException Set properties for the input format. |
The HCatInputFormat API includes the following methods −
To use HCatInputFormat to read data, first instantiate an InputJobInfo with the necessary information from the table being read and then call setInput with the InputJobInfo.
You can use the setOutputSchema method to include a projection schema, to specify the output fields. If a schema is not specified, all the columns in the table will be returned. You can use the getTableSchema method to determine the table schema for a specified input table.
HCatOutputFormat is used with MapReduce jobs to write data to HCatalog-managed tables. HCatOutputFormat exposes a Hadoop 0.20 MapReduce API for writing data to a table. When a MapReduce job uses HCatOutputFormat to write output, the default OutputFormat configured for the table is used and the new partition is published to the table after the job completes.
Sr.No. | Method Name & Description |
---|---|
1 | public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException Set the information about the output to write for the job. It queries the metadata server to find the StorageHandler to use for the table. It throws an error if the partition is already published. |
2 | public static void setSchema (Configuration conf, HCatSchema schema) throws IOException Set the schema for the data being written out to the partition. The table schema is used by default for the partition if this is not called. |
3 | public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException Get the record writer for the job. It uses the StorageHandler's default OutputFormat to get the record writer. |
4 | public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException Get the output committer for this output format. It ensures that the output is committed correctly. |
The HCatOutputFormat API includes the following methods −
The first call on the HCatOutputFormat must be setOutput; any other call will throw an exception saying the output format is not initialized.
The schema for the data being written out is specified by the setSchema method. You must call this method, providing the schema of data you are writing. If your data has the same schema as the table schema, you can use HCatOutputFormat.getTableSchema() to get the table schema and then pass that along to setSchema().
The following MapReduce program reads data from one table which it assumes to have an integer in the second column ("column 1"), and counts how many instances of each distinct value it finds. That is, it does the equivalent of "select col1, count(*) from $table group by col1;".
For example, if the values in the second column are {1, 1, 1, 3, 3, 5}, then the program will produce the following output of values and counts −
1, 3 3, 2 5, 1
Let us now take a look at the program code −
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.HCatalog.common.HCatConstants; import org.apache.HCatalog.data.DefaultHCatRecord; import org.apache.HCatalog.data.HCatRecord; import org.apache.HCatalog.data.schema.HCatSchema; import org.apache.HCatalog.mapreduce.HCatInputFormat; import org.apache.HCatalog.mapreduce.HCatOutputFormat; import org.apache.HCatalog.mapreduce.InputJobInfo; import org.apache.HCatalog.mapreduce.OutputJobInfo; public class GroupByAge extends Configured implements Tool { public static class Map extends Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable> { int age; @Override protected void map( WritableComparable key, HCatRecord value, org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable>.Context context )throws IOException, InterruptedException { age = (Integer) value.get(1); context.write(new IntWritable(age), new IntWritable(1)); } } public static class Reduce extends Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord> { @Override protected void reduce( IntWritable key, java.lang.Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord>.Context context )throws IOException ,InterruptedException { int sum = 0; Iterator<IntWritable> iter = values.iterator(); while (iter.hasNext()) { sum++; iter.next(); } HCatRecord record = new DefaultHCatRecord(2); record.set(0, key.get()); record.set(1, sum); context.write(null, record); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); args = new GenericOptionsParser(conf, args).getRemainingArgs(); String serverUri = args[0]; String inputTableName = args[1]; String outputTableName = args[2]; String dbName = null; String principalID = System .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL); if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "GroupByAge"); HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null)); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); job.setJarByClass(GroupByAge.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(WritableComparable.class); job.setOutputValueClass(DefaultHCatRecord.class); HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null)); HCatSchema s = HCatOutputFormat.getTableSchema(job); System.err.println("INFO: output schema explicitly set for writing:" + s); HCatOutputFormat.setSchema(job, s); job.setOutputFormatClass(HCatOutputFormat.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new GroupByAge(), args); System.exit(exitCode); } }
Before compiling the above program, you have to download some jars and add those to the classpath for this application. You need to download all the Hive jars and HCatalog jars (HCatalog-core-0.5.0.jar, hive-metastore-0.10.0.jar, libthrift-0.7.0.jar, hive-exec-0.10.0.jar, libfb303-0.7.0.jar, jdo2-api-2.3-ec.jar, slf4j-api-1.6.1.jar).
Use the following commands to copy those jar files from local to HDFS and add those to the classpath.
bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar, hdfs:///tmp/hive-metastore-0.10.0.jar, hdfs:///tmp/libthrift-0.7.0.jar, hdfs:///tmp/hive-exec-0.10.0.jar, hdfs:///tmp/libfb303-0.7.0.jar, hdfs:///tmp/jdo2-api-2.3-ec.jar, hdfs:///tmp/slf4j-api-1.6.1.jar
Use the following command to compile and execute the given program.
$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive
Now, check your output directory (hdfs: user/tmp/hive) for the output (part_0000, part_0001).