Showing posts with label run a map-reduce job in apache hadoop. Show all posts
Showing posts with label run a map-reduce job in apache hadoop. Show all posts

Tuesday, November 25, 2014

Running the WordCount Program

Environment

  1. Configuring a Hadoop Development Environment



Sample Code


Let's import the sample WordCount program, and make sure we can get this to compile correctly.  We'll export the JAR file containing this code to our HDFS cluster.

WordCountMapper

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 package dev.hadoop.sandbox.counter;  

 import java.io.IOException;  
 import java.util.StringTokenizer;  
 import org.apache.hadoop.io.IntWritable;  
 import org.apache.hadoop.io.LongWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.mapreduce.Mapper;  

 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {  
      
      private final IntWritable     one        = new IntWritable(1);  
      private Text                  word       = new Text();  
     
      public WordCountMapper() {  
           System.out.println("Init WordCount Mapper");  
      }  
     
      @Override  
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
           StringTokenizer iter = new StringTokenizer(value.toString());  
           while (iter.hasMoreTokens()) {  
                word.set(iter.nextToken());  
                context.write(word, one);  
           }  
      }  
 }  

WordCountReducer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 package dev.hadoop.sandbox.counter;  

 import java.io.IOException;  
 import java.util.Iterator;  
 import org.apache.hadoop.io.IntWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.mapreduce.Reducer;  

 public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {  
     
      private IntWritable     result     = new IntWritable();  
      
      public WordCountReducer() {  
           System.out.println("Init WordCountReducer");  
      }  
     
      @Override  
      protected void reduce(Text word, Iterable<IntWritable> intOne, Context context) throws IOException, InterruptedException {  
           int sum = 0;  
           Iterator<IntWritable> iter = intOne.iterator();  
           while (iter.hasNext())  
                sum += iter.next().get();  
           result.set(sum);  
           context.write(word, result);  
      }  
 }  

WordCountRunner

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 package dev.hadoop.sandbox.counter;  

 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.io.IntWritable;  
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 import org.apache.hadoop.util.GenericOptionsParser;  

 public class WordCountRunner {  

      public static void main(String... args) throws Throwable {  
           Configuration conf = new Configuration();  
           Job job = new Job(conf, "word count");  
           String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
           job.setCombinerClass(WordCountReducer.class);  
           job.setReducerClass(WordCountReducer.class);  
           job.setMapperClass(WordCountMapper.class);  
           job.setJarByClass(WordCountRunner.class);  
           job.setOutputKeyClass(Text.class);  
           job.setOutputValueClass(IntWritable.class);  
           FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
           FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
           System.exit(job.waitForCompletion(true) ? 0 : 1);  
      }  
 }  


Import the code, and make sure it can compile successfully.

It should look something like this:


We're going to re-compile this using Maven.  Eclipse is useful as an IDE, but it's better to rely on a tool like Maven for managing the project build cycles.


Building the Sample


I type this command on the terminal window:
mvn clean package

This is essentially the same as calling "mvn package" as far as build cycle execution, with the added advantage of invoking the clean plugin. As the name might imply, this plugin attempts to clean the files and directories generated by Maven during its build.

Note that if your POM file contains dependencies on other POM files, you may need to build a far JAR:
<plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <executions>
  <execution>
   <phase>package</phase>
   <goals>
    <goal>shade</goal>
   </goals>
  </execution>
 </executions>
 <configuration>
  <finalName>uber-${artifactId}-${version}</finalName>
 </configuration>
</plugin>
This is the JAR that you'll want to copy to the NameNode and execute.


Running the JAR


I'm going to copy the JAR onto my NameNode:
scp target/*.jar craig@master:~

Then SSH into my NameNode and execute the JAR
hadoop jar ~/sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out

I've colour-coded the input parameters:
/nyt is the input directory
/out is the output directory 

If the output directory exists from a prior run of this program, you'll have to delete it using this command:
hdfs dfs -rm -r /out

Or simply specify a new output directory (eg. /out2)

Operational Output

The (partial) operational output from a successful run of the WordCounter looks like this:
 craigtrim@CVB:/usr/lib/apache/hadoop/2.5.2/bin$ hadoop jar sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out  
 Input Directory = /nyt  
 Output Directory = /out  
 2014-11-25 11:14:05,828 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - session.id is deprecated. Instead, use dfs.metrics.session-id  
 2014-11-25 11:14:05,832 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=  
 2014-11-25 11:14:06,188 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.  
 2014-11-25 11:14:06,389 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 180  
 2014-11-25 11:14:06,451 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:180  
 2014-11-25 11:14:06,542 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_local1043625416_0001  
 2014-11-25 11:14:06,569 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.  
 2014-11-25 11:14:06,572 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.  
 2014-11-25 11:14:06,662 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/home/craigtrim/HADOOP_DATA_DIR/local/localRunner/craigtrim/job_local1043625416_0001/job_local1043625416_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.  
snip ...
 2014-11-25 11:14:15,549 INFO [pool-6-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local1043625416_0001_r_000000_0 is done. And is in the process of committing  
 2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 180 / 180 copied.  
 2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local1043625416_0001_r_000000_0 is allowed to commit now  
 2014-11-25 11:14:15,588 INFO [pool-6-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local1043625416_0001_r_000000_0' to hdfs://master:9000/out/_temporary/0/task_local1043625416_0001_r_000000  
 2014-11-25 11:14:15,589 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce  
 2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local1043625416_0001_r_000000_0' done.  
 2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local1043625416_0001_r_000000_0  
 2014-11-25 11:14:15,590 INFO [Thread-5] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete.  
 2014-11-25 11:14:15,843 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100%  
 2014-11-25 11:14:15,844 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local1043625416_0001 completed successfully  
 2014-11-25 11:14:15,932 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 38  
      File System Counters  
           FILE: Number of bytes read=71500803  
           FILE: Number of bytes written=129250399  
           FILE: Number of read operations=0  
           FILE: Number of large read operations=0  
           FILE: Number of write operations=0  
           HDFS: Number of bytes read=67981524  
           HDFS: Number of bytes written=199881  
           HDFS: Number of read operations=33667  
           HDFS: Number of large read operations=0  
           HDFS: Number of write operations=183  
      Map-Reduce Framework  
           Map input records=4372  
           Map output records=83529  
           Map output bytes=837472  
           Map output materialized bytes=614722  
           Input split bytes=17820  
           Combine input records=83529  
           Combine output records=46932  
           Reduce input groups=19209  
           Reduce shuffle bytes=614722  
           Reduce input records=46932  
           Reduce output records=19209  
           Spilled Records=93864  
           Shuffled Maps =180  
           Failed Shuffles=0  
           Merged Map outputs=180  
           GC time elapsed (ms)=1299  
           CPU time spent (ms)=0  
           Physical memory (bytes) snapshot=0  
           Virtual memory (bytes) snapshot=0  
           Total committed heap usage (bytes)=134581059584  
      Shuffle Errors  
           BAD_ID=0  
           CONNECTION=0  
           IO_ERROR=0  
           WRONG_LENGTH=0  
           WRONG_MAP=0  
           WRONG_REDUCE=0  
      File Input Format Counters   
           Bytes Read=503395  
      File Output Format Counters   
           Bytes Written=199881  

Pay particular attention to the text at the end of the operational output.