Showing posts with label mapreduce. Show all posts
Showing posts with label mapreduce. Show all posts

Tuesday, March 24, 2015

Hadoop Architecture

The Map/Reduce Paradigm


How many times does this pattern occur in the data?


Mapping




Introductioun:
  1. Each Map task outputs data in the form of Key/Value pair.
    1. mapreduce.tasktracker.map.tasks.maximum: 8
      1. The maximum number of map tasks that will be run simultaneously by a task tracker
    2. mapreduce.map.memory.mb: 128
      1. The amount of memory to request from the scheduler for each map task.
  2. The output is stored in a Ring Buffer rather than being written directly to the disk.
  3. When the Ring Buffer reaches 80% capacity, the content is "spilled" to disk.
    1. This process will create multiple files on the datanode (shuffle spill files).
    2. mapreduce.map.sort.spill.percent: 0.80
      1. The soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background. Note that collection will not block if this threshold is exceeded while a spill is already in progress, so spills may be larger than this threshold when it is set to less than .5
  4. Hadoop will merge all the spill files on a given datanode into a single file
    1. This single file is both sorted and partitioned based on number of reducers.
    2. mapreduce.task.io.sort.mb: 512
      1. The total amount of buffer memory to use while sorting files, in megabytes. By default, gives each merge stream 1MB, which should minimize seeks.
    3. mapreduce.task.io.sort.factor: 64
      1. The number of streams to merge at once while sorting files. This determines the number of open file handles.
    4. mapreduce.reduce.shuffle.input.buffer.percent: 0.70
      1. The percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle.
    5. mapreduce.reduce.input.buffer.percent: 0.70
      1. The percentage of memory- relative to the maximum heap size- to retain map outputs during the reduce. When the shuffle is concluded, any remaining map outputs in memory must consume less than this threshold before the reduce can begin.
    6. mapreduce.reduce.shuffle.parallelcopies: 128
      1. The default number of parallel transfers run by reduce during the copy(shuffle) phase.
    7. mapreduce.reduce.memory.mb: 1024
      1. The amount of memory to request from the scheduler for each reduce task.
    8. mapreduce.reduce.shuffle.merge.percent: 0.66
      1. The usage threshold at which an in-memory merge will be initiated, expressed as a percentage of the total memory allocated to storing in-memory map outputs, as defined by mapreduce.reduce.shuffle.input.buffer.percent.



Ring Buffer


The Ring Buffer (aka Circular Buffer) is a key concept in the MapReduce ecosystem.

We have two major challenges in any map/reduce program:

  1. We are dealing with a massive amount of data
    1. If this isn't true, we don't need to use map/reduce
  2. The result of the map tasks can not be constantly written to disk
    1. This would be too slow
  3. Nor can it be stored entirely within memory
    1. Most systems would not have a sufficient amount of memory

We have to use a combination of disks/memory efficiently.

The circular buffer is fast. Writing to memory is much faster than doing an I/O to disk. Flushing the data is only performed when needed.

Continuous logging can fill up space on the systems, causing other programs to also run out of space and fail. In such cases, either logs have to be manually removed or a log rotation policy has to be implemented.



References

  1. Hadoop Internals
    1. One of the best all-in-one overviews of Hadoop Architecture I have read.
    2. The documentation appears to be to date with YARN and other ecosystem improvements.
  2. Advantages of a Ring Buffer
    1. Map Tasks write to ring (aka Circular) buffers while executing
    2. This article is unrelated to Hadoop, but a knowlege of how this buffer works will aid in understanding mapred-site.xml configuration parameters
      1. Property: mapreduce.map.sort.spill.percent
      2. DescriptionThe soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background. Note that collection will not block if this threshold is exceeded while a spill is already in progress, so spills may be larger than this threshold when it is set to less than .5
      3. Default Value: 0.80
  3. [Quora] Apache Spark vs Hadoop
    1. A good discussion of both the map-side and reduce-side differences.  
    2. Helpful for an understanding of Hadoop's design independent of Spark.

Tuesday, November 25, 2014

Running the WordCount Program

Environment

  1. Configuring a Hadoop Development Environment



Sample Code


Let's import the sample WordCount program, and make sure we can get this to compile correctly.  We'll export the JAR file containing this code to our HDFS cluster.

WordCountMapper

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 package dev.hadoop.sandbox.counter;  

 import java.io.IOException;  
 import java.util.StringTokenizer;  
 import org.apache.hadoop.io.IntWritable;  
 import org.apache.hadoop.io.LongWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.mapreduce.Mapper;  

 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {  
      
      private final IntWritable     one        = new IntWritable(1);  
      private Text                  word       = new Text();  
     
      public WordCountMapper() {  
           System.out.println("Init WordCount Mapper");  
      }  
     
      @Override  
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
           StringTokenizer iter = new StringTokenizer(value.toString());  
           while (iter.hasMoreTokens()) {  
                word.set(iter.nextToken());  
                context.write(word, one);  
           }  
      }  
 }  

WordCountReducer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 package dev.hadoop.sandbox.counter;  

 import java.io.IOException;  
 import java.util.Iterator;  
 import org.apache.hadoop.io.IntWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.mapreduce.Reducer;  

 public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {  
     
      private IntWritable     result     = new IntWritable();  
      
      public WordCountReducer() {  
           System.out.println("Init WordCountReducer");  
      }  
     
      @Override  
      protected void reduce(Text word, Iterable<IntWritable> intOne, Context context) throws IOException, InterruptedException {  
           int sum = 0;  
           Iterator<IntWritable> iter = intOne.iterator();  
           while (iter.hasNext())  
                sum += iter.next().get();  
           result.set(sum);  
           context.write(word, result);  
      }  
 }  

WordCountRunner

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 package dev.hadoop.sandbox.counter;  

 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.io.IntWritable;  
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 import org.apache.hadoop.util.GenericOptionsParser;  

 public class WordCountRunner {  

      public static void main(String... args) throws Throwable {  
           Configuration conf = new Configuration();  
           Job job = new Job(conf, "word count");  
           String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
           job.setCombinerClass(WordCountReducer.class);  
           job.setReducerClass(WordCountReducer.class);  
           job.setMapperClass(WordCountMapper.class);  
           job.setJarByClass(WordCountRunner.class);  
           job.setOutputKeyClass(Text.class);  
           job.setOutputValueClass(IntWritable.class);  
           FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
           FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
           System.exit(job.waitForCompletion(true) ? 0 : 1);  
      }  
 }  


Import the code, and make sure it can compile successfully.

It should look something like this:


We're going to re-compile this using Maven.  Eclipse is useful as an IDE, but it's better to rely on a tool like Maven for managing the project build cycles.


Building the Sample


I type this command on the terminal window:
mvn clean package

This is essentially the same as calling "mvn package" as far as build cycle execution, with the added advantage of invoking the clean plugin. As the name might imply, this plugin attempts to clean the files and directories generated by Maven during its build.

Note that if your POM file contains dependencies on other POM files, you may need to build a far JAR:
<plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <executions>
  <execution>
   <phase>package</phase>
   <goals>
    <goal>shade</goal>
   </goals>
  </execution>
 </executions>
 <configuration>
  <finalName>uber-${artifactId}-${version}</finalName>
 </configuration>
</plugin>
This is the JAR that you'll want to copy to the NameNode and execute.


Running the JAR


I'm going to copy the JAR onto my NameNode:
scp target/*.jar craig@master:~

Then SSH into my NameNode and execute the JAR
hadoop jar ~/sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out

I've colour-coded the input parameters:
/nyt is the input directory
/out is the output directory 

If the output directory exists from a prior run of this program, you'll have to delete it using this command:
hdfs dfs -rm -r /out

Or simply specify a new output directory (eg. /out2)

Operational Output

The (partial) operational output from a successful run of the WordCounter looks like this:
 craigtrim@CVB:/usr/lib/apache/hadoop/2.5.2/bin$ hadoop jar sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out  
 Input Directory = /nyt  
 Output Directory = /out  
 2014-11-25 11:14:05,828 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - session.id is deprecated. Instead, use dfs.metrics.session-id  
 2014-11-25 11:14:05,832 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=  
 2014-11-25 11:14:06,188 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.  
 2014-11-25 11:14:06,389 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 180  
 2014-11-25 11:14:06,451 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:180  
 2014-11-25 11:14:06,542 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_local1043625416_0001  
 2014-11-25 11:14:06,569 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.  
 2014-11-25 11:14:06,572 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.  
 2014-11-25 11:14:06,662 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/home/craigtrim/HADOOP_DATA_DIR/local/localRunner/craigtrim/job_local1043625416_0001/job_local1043625416_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.  
snip ...
 2014-11-25 11:14:15,549 INFO [pool-6-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local1043625416_0001_r_000000_0 is done. And is in the process of committing  
 2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 180 / 180 copied.  
 2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local1043625416_0001_r_000000_0 is allowed to commit now  
 2014-11-25 11:14:15,588 INFO [pool-6-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local1043625416_0001_r_000000_0' to hdfs://master:9000/out/_temporary/0/task_local1043625416_0001_r_000000  
 2014-11-25 11:14:15,589 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce  
 2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local1043625416_0001_r_000000_0' done.  
 2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local1043625416_0001_r_000000_0  
 2014-11-25 11:14:15,590 INFO [Thread-5] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete.  
 2014-11-25 11:14:15,843 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100%  
 2014-11-25 11:14:15,844 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local1043625416_0001 completed successfully  
 2014-11-25 11:14:15,932 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 38  
      File System Counters  
           FILE: Number of bytes read=71500803  
           FILE: Number of bytes written=129250399  
           FILE: Number of read operations=0  
           FILE: Number of large read operations=0  
           FILE: Number of write operations=0  
           HDFS: Number of bytes read=67981524  
           HDFS: Number of bytes written=199881  
           HDFS: Number of read operations=33667  
           HDFS: Number of large read operations=0  
           HDFS: Number of write operations=183  
      Map-Reduce Framework  
           Map input records=4372  
           Map output records=83529  
           Map output bytes=837472  
           Map output materialized bytes=614722  
           Input split bytes=17820  
           Combine input records=83529  
           Combine output records=46932  
           Reduce input groups=19209  
           Reduce shuffle bytes=614722  
           Reduce input records=46932  
           Reduce output records=19209  
           Spilled Records=93864  
           Shuffled Maps =180  
           Failed Shuffles=0  
           Merged Map outputs=180  
           GC time elapsed (ms)=1299  
           CPU time spent (ms)=0  
           Physical memory (bytes) snapshot=0  
           Virtual memory (bytes) snapshot=0  
           Total committed heap usage (bytes)=134581059584  
      Shuffle Errors  
           BAD_ID=0  
           CONNECTION=0  
           IO_ERROR=0  
           WRONG_LENGTH=0  
           WRONG_MAP=0  
           WRONG_REDUCE=0  
      File Input Format Counters   
           Bytes Read=503395  
      File Output Format Counters   
           Bytes Written=199881  

Pay particular attention to the text at the end of the operational output.