Environment
Sample Code
Let's import the sample WordCount program, and make sure we can get this to compile correctly. We'll export the JAR file containing this code to our HDFS cluster.
WordCountMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | package dev.hadoop.sandbox.counter; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); public WordCountMapper() { System.out.println("Init WordCount Mapper"); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer iter = new StringTokenizer(value.toString()); while (iter.hasMoreTokens()) { word.set(iter.nextToken()); context.write(word, one); } } } |
WordCountReducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | package dev.hadoop.sandbox.counter; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public WordCountReducer() { System.out.println("Init WordCountReducer"); } @Override protected void reduce(Text word, Iterable<IntWritable> intOne, Context context) throws IOException, InterruptedException { int sum = 0; Iterator<IntWritable> iter = intOne.iterator(); while (iter.hasNext()) sum += iter.next().get(); result.set(sum); context.write(word, result); } } |
WordCountRunner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | package dev.hadoop.sandbox.counter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCountRunner { public static void main(String... args) throws Throwable { Configuration conf = new Configuration(); Job job = new Job(conf, "word count"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setMapperClass(WordCountMapper.class); job.setJarByClass(WordCountRunner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
Import the code, and make sure it can compile successfully.
It should look something like this:
We're going to re-compile this using Maven. Eclipse is useful as an IDE, but it's better to rely on a tool like Maven for managing the project build cycles.
Building the Sample
I type this command on the terminal window:
mvn clean package
This is essentially the same as calling "mvn package" as far as build cycle execution, with the added advantage of invoking the clean plugin. As the name might imply, this plugin attempts to clean the files and directories generated by Maven during its build.
Note that if your POM file contains dependencies on other POM files, you may need to build a far JAR:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <finalName>uber-${artifactId}-${version}</finalName> </configuration> </plugin>
Running the JAR
I'm going to copy the JAR onto my NameNode:
scp target/*.jar craig@master:~
Then SSH into my NameNode and execute the JAR
hadoop jar ~/sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out
I've colour-coded the input parameters:
/nyt is the input directory
/out is the output directory
If the output directory exists from a prior run of this program, you'll have to delete it using this command:
hdfs dfs -rm -r /out
Or simply specify a new output directory (eg. /out2)
Operational Output
The (partial) operational output from a successful run of the WordCounter looks like this:craigtrim@CVB:/usr/lib/apache/hadoop/2.5.2/bin$ hadoop jar sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out Input Directory = /nyt Output Directory = /out 2014-11-25 11:14:05,828 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - session.id is deprecated. Instead, use dfs.metrics.session-id 2014-11-25 11:14:05,832 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId= 2014-11-25 11:14:06,188 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 2014-11-25 11:14:06,389 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 180 2014-11-25 11:14:06,451 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:180 2014-11-25 11:14:06,542 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_local1043625416_0001 2014-11-25 11:14:06,569 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 2014-11-25 11:14:06,572 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 2014-11-25 11:14:06,662 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/home/craigtrim/HADOOP_DATA_DIR/local/localRunner/craigtrim/job_local1043625416_0001/job_local1043625416_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. snip ... 2014-11-25 11:14:15,549 INFO [pool-6-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local1043625416_0001_r_000000_0 is done. And is in the process of committing 2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 180 / 180 copied. 2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local1043625416_0001_r_000000_0 is allowed to commit now 2014-11-25 11:14:15,588 INFO [pool-6-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local1043625416_0001_r_000000_0' to hdfs://master:9000/out/_temporary/0/task_local1043625416_0001_r_000000 2014-11-25 11:14:15,589 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce 2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local1043625416_0001_r_000000_0' done. 2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local1043625416_0001_r_000000_0 2014-11-25 11:14:15,590 INFO [Thread-5] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete. 2014-11-25 11:14:15,843 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100% 2014-11-25 11:14:15,844 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local1043625416_0001 completed successfully 2014-11-25 11:14:15,932 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 38 File System Counters FILE: Number of bytes read=71500803 FILE: Number of bytes written=129250399 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=67981524 HDFS: Number of bytes written=199881 HDFS: Number of read operations=33667 HDFS: Number of large read operations=0 HDFS: Number of write operations=183 Map-Reduce Framework Map input records=4372 Map output records=83529 Map output bytes=837472 Map output materialized bytes=614722 Input split bytes=17820 Combine input records=83529 Combine output records=46932 Reduce input groups=19209 Reduce shuffle bytes=614722 Reduce input records=46932 Reduce output records=19209 Spilled Records=93864 Shuffled Maps =180 Failed Shuffles=0 Merged Map outputs=180 GC time elapsed (ms)=1299 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=134581059584 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=503395 File Output Format Counters Bytes Written=199881
Pay particular attention to the text at the end of the operational output.