Environment
- Configuring a Hadoop Development Environment
Sample Code
Let's import the sample WordCount program, and make sure we can get this to compile correctly. We'll export the JAR file containing this code to our HDFS cluster.
WordCountMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | package dev.hadoop.sandbox.counter;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
public WordCountMapper() {
System.out.println("Init WordCount Mapper");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer iter = new StringTokenizer(value.toString());
while (iter.hasMoreTokens()) {
word.set(iter.nextToken());
context.write(word, one);
}
}
}
|
WordCountReducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | package dev.hadoop.sandbox.counter;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public WordCountReducer() {
System.out.println("Init WordCountReducer");
}
@Override
protected void reduce(Text word, Iterable<IntWritable> intOne, Context context) throws IOException, InterruptedException {
int sum = 0;
Iterator<IntWritable> iter = intOne.iterator();
while (iter.hasNext())
sum += iter.next().get();
result.set(sum);
context.write(word, result);
}
}
|
WordCountRunner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | package dev.hadoop.sandbox.counter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountRunner {
public static void main(String... args) throws Throwable {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
job.setMapperClass(WordCountMapper.class);
job.setJarByClass(WordCountRunner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
|
Import the code, and make sure it can compile successfully.
It should look something like this:
We're going to re-compile this using Maven. Eclipse is useful as an IDE, but it's better to rely on a tool like Maven for managing the project build cycles.
Building the Sample
I type this command on the terminal window:
This is essentially the same as calling "mvn package" as far as build cycle execution, with the added advantage of invoking the
clean plugin. As the name might imply, this plugin attempts to clean the files and directories generated by Maven during its build.
Note that if your POM file contains dependencies on other POM files, you may need to build a far JAR:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<finalName>uber-${artifactId}-${version}</finalName>
</configuration>
</plugin>
This is the JAR that you'll want to copy to the NameNode and execute.
Running the JAR
I'm going to copy the JAR onto my NameNode:
scp target/*.jar craig@master:~
Then SSH into my NameNode and execute the JAR
hadoop jar ~/sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out
I've colour-coded the input parameters:
/nyt is the input directory
/out is the output directory
If the output directory exists from a prior run of this program, you'll have to delete it using this command:
Or simply specify a new output directory (eg. /out2)
Operational Output
The (partial) operational output from a successful run of the WordCounter looks like this:
craigtrim@CVB:/usr/lib/apache/hadoop/2.5.2/bin$ hadoop jar sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out
Input Directory = /nyt
Output Directory = /out
2014-11-25 11:14:05,828 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2014-11-25 11:14:05,832 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2014-11-25 11:14:06,188 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2014-11-25 11:14:06,389 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 180
2014-11-25 11:14:06,451 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:180
2014-11-25 11:14:06,542 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_local1043625416_0001
2014-11-25 11:14:06,569 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2014-11-25 11:14:06,572 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2014-11-25 11:14:06,662 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/home/craigtrim/HADOOP_DATA_DIR/local/localRunner/craigtrim/job_local1043625416_0001/job_local1043625416_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
snip ...
2014-11-25 11:14:15,549 INFO [pool-6-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local1043625416_0001_r_000000_0 is done. And is in the process of committing
2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 180 / 180 copied.
2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local1043625416_0001_r_000000_0 is allowed to commit now
2014-11-25 11:14:15,588 INFO [pool-6-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local1043625416_0001_r_000000_0' to hdfs://master:9000/out/_temporary/0/task_local1043625416_0001_r_000000
2014-11-25 11:14:15,589 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce
2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local1043625416_0001_r_000000_0' done.
2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local1043625416_0001_r_000000_0
2014-11-25 11:14:15,590 INFO [Thread-5] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete.
2014-11-25 11:14:15,843 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100%
2014-11-25 11:14:15,844 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local1043625416_0001 completed successfully
2014-11-25 11:14:15,932 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 38
File System Counters
FILE: Number of bytes read=71500803
FILE: Number of bytes written=129250399
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=67981524
HDFS: Number of bytes written=199881
HDFS: Number of read operations=33667
HDFS: Number of large read operations=0
HDFS: Number of write operations=183
Map-Reduce Framework
Map input records=4372
Map output records=83529
Map output bytes=837472
Map output materialized bytes=614722
Input split bytes=17820
Combine input records=83529
Combine output records=46932
Reduce input groups=19209
Reduce shuffle bytes=614722
Reduce input records=46932
Reduce output records=19209
Spilled Records=93864
Shuffled Maps =180
Failed Shuffles=0
Merged Map outputs=180
GC time elapsed (ms)=1299
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=134581059584
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=503395
File Output Format Counters
Bytes Written=199881
Pay particular attention to the text at the end of the operational output.