Wednesday, November 26, 2014

Adding a new DataNode to the Cluster

This article is part of the Hadoop Masterpage.

If you're initializing a cluster for the first time, it's likely that you'll want to start small with just one or two slaves (DataNodes).

Over time, as you operate your cluster, gaining more experience and confidence, you'll likely want to add more slaves.

I started with a simple NameNode and dual-DataNode cluster configuration.  This article details the steps I took to add an extra node to my cluster.


  1. The first step I took was to clone an existing VM.
  2. Update the /etc/hosts file
    1. Add the new slave and IP address
    2. Copy this file to ea
  3. Start the DataNode

Clone an existing DataNode

If you're cloning a DataNode that's already been used within a cluster, you'll want to clean out the $HADOOP_DATA_DIR. An easy way to do this is simply delete the existing directory, and re-create it (reference "Configuring your Base" for more information on this step):

 sudo rm -rf $HADOOP_DATA_DIR
 mkdir -p $HADOOP_DATA_DIR/data  
 mkdir -p $HADOOP_DATA_DIR/name  
 mkdir -p $HADOOP_DATA_DIR/local  
 sudo chmod 755 $HADOOP_DATA_DIR  

If you plan to clone more than one node, I recommend cloning the additional nodes from this step onward.

Once my data directory is cleared, I use VirtualBox to create my clone:
 VBoxManage clonevm "%~1" --name "%~2" --register --basefolder %vm%  

Substitute the node names for the parameters above. Reference "VirtualBox for Virtualization" for automated cloning recipes.

Updating the Hosts File

On your NameNode, update the hosts file:
 sudo gedit /etc/hosts  

and add the node name and IP address for each newly created slave node.

My /etc/hosts file now looks like this:     localhost     CVB     master     slave1     slave2     slave3     slave4     slave5     dev  
 # The following lines are desirable for IPv6 capable hosts  
 ::1   ip6-localhost ip6-loopback  
 fe00::0 ip6-localnet  
 ff00::0 ip6-mcastprefix  
 ff02::1 ip6-allnodes  
 ff02::2 ip6-allrouters  

The new lines in my file are in blue bold.  The information in your file will not be identical.

Copying the Host File

Do I need to copy the hosts file on my NameNode to each DataNode in the cluster?

Yes, you should do this.  There are cases where DataNodes will talk to each other.  DataNode do this when they are replicating data.  Also, when adding a new DataNode to an existing cluster, data re-balancing (addressed below) will occur.  This requires DataNodes to address one another.

It is important that each DataNode be able to address each other in a consistent fashion.  I recommend maintaining (and editing) the master copy of the hosts file on the NameNode.

On my NameNode, I've created a script in my home directory that will copy the hosts file to each DataNode in my cluster:

 cd ~  

Copy this script with the appropriate modifications:
 for i in {1..5}  
   scp /etc/hosts craigtrim@slave$i:/etc/hosts  

Don't forget to make your shell script executable:
 chmod +x  

Operational Output

When I execute this script, it simply reports back to me that all the files were copied successfully:
 craigtrim@CVB:~$ ./  
 hosts                     100% 355   0.4KB/s  00:00    
 hosts                     100% 355   0.4KB/s  00:00    
 hosts                     100% 355   0.4KB/s  00:00    
 hosts                     100% 355   0.4KB/s  00:00    
 hosts                     100% 355   0.4KB/s  00:00    

It doesn't hurt to logon to at least one of the slaves and verify that the file was copied correctly:
 craigtrim@CVB:~$ ssh slave3  
 Welcome to Ubuntu 14.04.1 LTS (GNU/Linux 3.13.0-32-generic x86_64)  
  * Documentation:  
 229 packages can be updated.  
 87 updates are security updates.  
 Last login: Wed Nov 26 14:26:43 2014 from master  
 craigtrim@CVB:~$ cat /etc/hosts     localhost     CVB     master     slave1     slave2     slave3     slave4     slave5     dev  
 # The following lines are desirable for IPv6 capable hosts  
 ::1   ip6-localhost ip6-loopback  
 fe00::0 ip6-localnet  
 ff00::0 ip6-mcastprefix  
 ff02::1 ip6-allnodes  
 ff02::2 ip6-allrouters  

Once you gain confidence that the script is doing what it's supposed to, you can likely skip this verification step in the future.

Updating the Slaves File

This section is almost identical to the one above. Just as we maintain a master /etc/hosts on the designated NameNode, we'll maintain our master slaves file there.

Let's go ahead and edit it, and add the two new slaves (DataNodes):
 gedit slaves  

My /etc/hosts file now looks like this:

The new lines in my file are in blue bold. That's about as simple as it gets. Use of the hosts file prevents us from having to enter actual IP addresses in this file.

In like manner, we can also write a script to copy our slaves file to each DataNode in the cluster.  I call this script "", make it executable in the same manner as the last script we wrote, and enter this:
 for i in {1..5}  
   scp $HADOOP_CONF_DIR/slaves craigtrim@slave$i:$HADOOP_CONF_DIR  

Operational Output

The script output contains nothing surprising:
 craigtrim@CVB:~$ ./  
 slaves                    100%  35   0.0KB/s  00:00    
 slaves                    100%  35   0.0KB/s  00:00    
 slaves                    100%  35   0.0KB/s  00:00    
 slaves                    100%  35   0.0KB/s  00:00    
 slaves                    100%  35   0.0KB/s  00:00    

Starting Up

If the entire cluster has been stopped, you should just start the cluster using the start-dfs and the start-yarn shell scripts, as described in this article "Flipping the Switch".

If your cluster is already operational, and you want to "hot deploy" a new DataNode, then execute this command the DataNode: --config $HADOOP_CONF_DIR --script hdfs start datanode  

this works then notice once I start up the datanode (it heartbeats to the namenode - diagram this)
 2014-11-26 13:08:56,141 INFO [main] datanode.DataNode ( - STARTUP_MSG:   
 STARTUP_MSG: Starting DataNode  
 STARTUP_MSG:  host = CVB/  
 STARTUP_MSG:  args = []  
 STARTUP_MSG:  version = 2.5.2  
 STARTUP_MSG:  classpath = /usr/lib/apache/hadoop/2.5.2/conf:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/commons-net-3.1.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/mockito-all-1.8.5.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/jersey-core-1.9.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/commons-cli-1.2.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/commons-lang-2.6.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/commons-math3-3.1.1.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/stax-api-1.0-2.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/jettison-1.1.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/hamcrest-core-1.3.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/junit-4.11.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/activation-1.1.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/usr/lib/apache/hadoop/2.5.2/share/hadoop/common/lib/snappy-java-*.jar:/usr/lib/apache/hadoop/2.5.2/contrib/capacity-scheduler/*.jar:/usr/lib/apache/hadoop/2.5.2/contrib/capacity-scheduler/*.jar  
 STARTUP_MSG:  build = -r cc72e9b000545b86b75a61f4835eb86d57bfafc0; compiled by 'jenkins' on 2014-11-14T23:45Z  
 STARTUP_MSG:  java = 1.8.0_25  
 2014-11-26 13:08:56,157 INFO [main] datanode.DataNode ( - registered UNIX signal handlers for [TERM, HUP, INT]  
 2014-11-26 13:08:56,699 WARN [main] impl.MetricsConfig ( - Cannot locate configuration: tried,  
 2014-11-26 13:08:56,794 INFO [main] impl.MetricsSystemImpl ( - Scheduled snapshot period at 10 second(s).  
  2014-11-26 13:08:56,794 INFO [main] impl.MetricsSystemImpl ( - DataNode metrics system started  
 2014-11-26 13:08:56,799 INFO [main] datanode.DataNode (<init>(291)) - Configured hostname is CVB  
 2014-11-26 13:08:56,800 INFO [main] datanode.DataNode ( - Starting DataNode with maxLockedMemory = 0  
 2014-11-26 13:08:56,825 INFO [main] datanode.DataNode ( - Opened streaming server at /  
 2014-11-26 13:08:56,829 INFO [main] datanode.DataNode (<init>(75)) - Balancing bandwith is 1048576 bytes/s  
 2014-11-26 13:08:56,829 INFO [main] datanode.DataNode (<init>(76)) - Number threads for balancing is 5  
 2014-11-26 13:08:57,069 INFO [main] mortbay.log ( - Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog  
 2014-11-26 13:08:57,073 INFO [main] http.HttpRequestLog ( - Http request log for http.requests.datanode is not defined  
 2014-11-26 13:08:57,084 INFO [main] http.HttpServer2 ( - Added global filter 'safety' (class=org.apache.hadoop.http.HttpServer2$QuotingInputFilter)  
 2014-11-26 13:08:57,086 INFO [main] http.HttpServer2 ( - Added filter static_user_filter (class=org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter) to context datanode  
 2014-11-26 13:08:57,087 INFO [main] http.HttpServer2 ( - Added filter static_user_filter (class=org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter) to context static  
 2014-11-26 13:08:57,087 INFO [main] http.HttpServer2 ( - Added filter static_user_filter (class=org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter) to context logs  
 2014-11-26 13:08:57,103 INFO [main] http.HttpServer2 ( - addJerseyResourcePackage: packageName=org.apache.hadoop.hdfs.server.datanode.web.resources;org.apache.hadoop.hdfs.web.resources, pathSpec=/webhdfs/v1/*  
 2014-11-26 13:08:57,106 INFO [main] http.HttpServer2 ( - Jetty bound to port 50075  
 2014-11-26 13:08:57,106 INFO [main] mortbay.log ( - jetty-6.1.26  
 2014-11-26 13:08:57,394 INFO [main] mortbay.log ( - Started HttpServer2$SelectChannelConnectorWithSafeStartup@  
 2014-11-26 13:08:57,559 INFO [main] datanode.DataNode ( - dnUserName = craigtrim  
 2014-11-26 13:08:57,559 INFO [main] datanode.DataNode ( - supergroup = supergroup  
 2014-11-26 13:08:57,608 INFO [main] ipc.CallQueueManager (<init>(53)) - Using callQueue class java.util.concurrent.LinkedBlockingQueue  
 2014-11-26 13:08:57,626 INFO [Socket Reader #1 for port 50020] ipc.Server ( - Starting Socket Reader #1 for port 50020  
 2014-11-26 13:08:57,660 INFO [main] datanode.DataNode ( - Opened IPC server at /  
 2014-11-26 13:08:57,673 INFO [main] datanode.DataNode ( - Refresh request received for nameservices: null  
 2014-11-26 13:08:57,698 INFO [main] datanode.DataNode ( - Starting BPOfferServices for nameservices: <default>  
 2014-11-26 13:08:57,708 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Block pool <registering> (Datanode Uuid unassigned) service to master/ starting to offer service  
 2014-11-26 13:08:57,713 INFO [IPC Server Responder] ipc.Server ( - IPC Server Responder: starting  
 2014-11-26 13:08:57,713 INFO [IPC Server listener on 50020] ipc.Server ( - IPC Server listener on 50020: starting  
 2014-11-26 13:08:57,947 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Data-node version: -55 and name-node layout version: -57  
 2014-11-26 13:08:57,962 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Lock on /home/craigtrim/HADOOP_DATA_DIR/data/in_use.lock acquired by nodename 5450@CVB  
 2014-11-26 13:08:57,964 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Storage directory /home/craigtrim/HADOOP_DATA_DIR/data is not formatted  
 2014-11-26 13:08:57,964 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Formatting ...  
 2014-11-26 13:08:58,000 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Analyzing storage directories for bpid BP-1847084755-  
 2014-11-26 13:08:58,000 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Locking is disabled  
 2014-11-26 13:08:58,001 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Storage directory /home/craigtrim/HADOOP_DATA_DIR/data/current/BP-1847084755- is not formatted.  
 2014-11-26 13:08:58,001 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Formatting ...  
 2014-11-26 13:08:58,001 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Formatting block pool BP-1847084755- directory /home/craigtrim/HADOOP_DATA_DIR/data/current/BP-1847084755-  
 2014-11-26 13:08:58,004 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] common.Storage ( - Restored 0 block files from trash.  
 2014-11-26 13:08:58,006 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Setting up storage: nsid=1137176454;bpid=BP-1847084755-;lv=-55;nsInfo=lv=-57;cid=CID-822421ef-e4d6-49d7-9b25-e0b81ae32b7d;nsid=1137176454;c=0;bpid=BP-1847084755-;dnuuid=null  
 2014-11-26 13:08:58,009 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Generated and persisted new Datanode UUID 68bd5310-85cb-49bf-9bfa-a34be7531d58  
 2014-11-26 13:08:58,026 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] impl.FsDatasetImpl (<init>(214)) - Added volume - /home/craigtrim/HADOOP_DATA_DIR/data/current, StorageType: DISK  
 2014-11-26 13:08:58,036 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] impl.FsDatasetImpl ( - Registered FSDatasetState MBean  
 2014-11-26 13:08:58,041 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DirectoryScanner ( - Periodic Directory Tree Verification scan starting at 1417045060041 with interval 21600000  
 2014-11-26 13:08:58,042 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] impl.FsDatasetImpl ( - Adding block pool BP-1847084755-  
 2014-11-26 13:08:58,044 INFO [Thread-30] impl.FsDatasetImpl ( - Scanning block pool BP-1847084755- on volume /home/craigtrim/HADOOP_DATA_DIR/data/current...  
 2014-11-26 13:08:58,061 INFO [Thread-30] impl.FsDatasetImpl ( - Time taken to scan block pool BP-1847084755- on /home/craigtrim/HADOOP_DATA_DIR/data/current: 17ms  
 2014-11-26 13:08:58,062 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] impl.FsDatasetImpl ( - Total time to scan all replicas for block pool BP-1847084755- 19ms  
 2014-11-26 13:08:58,062 INFO [Thread-32] impl.FsDatasetImpl ( - Adding replicas to map for block pool BP-1847084755- on volume /home/craigtrim/HADOOP_DATA_DIR/data/current...  
 2014-11-26 13:08:58,063 INFO [Thread-32] impl.FsDatasetImpl ( - Time to add replicas to map for block pool BP-1847084755- on volume /home/craigtrim/HADOOP_DATA_DIR/data/current: 0ms  
 2014-11-26 13:08:58,063 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] impl.FsDatasetImpl ( - Total time to add all replicas to map: 2ms  
 2014-11-26 13:08:58,067 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Block pool BP-1847084755- (Datanode Uuid null) service to master/ beginning handshake with NN  
 2014-11-26 13:08:58,084 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Block pool Block pool BP-1847084755- (Datanode Uuid null) service to master/ successfully registered with NN  
 2014-11-26 13:08:58,085 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - For namenode master/ using DELETEREPORT_INTERVAL of 300000 msec BLOCKREPORT_INTERVAL of 21600000msec CACHEREPORT_INTERVAL of 10000msec Initial delay: 0msec; heartBeatInterval=3000  
 2014-11-26 13:08:58,127 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Namenode Block pool BP-1847084755- (Datanode Uuid 68bd5310-85cb-49bf-9bfa-a34be7531d58) service to master/ trying to claim ACTIVE state with txid=1193283  
 2014-11-26 13:08:58,127 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Acknowledging ACTIVE Namenode Block pool BP-1847084755- (Datanode Uuid 68bd5310-85cb-49bf-9bfa-a34be7531d58) service to master/  
 2014-11-26 13:08:58,153 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Sent 1 blockreports 0 blocks total. Took 1 msec to generate and 24 msecs for RPC and NN processing. Got back commands org.apache.hadoop.hdfs.server.protocol.FinalizeCommand@7c7470f4  
 2014-11-26 13:08:58,154 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Got finalize command for block pool BP-1847084755-  
 2014-11-26 13:08:58,160 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] util.GSet ( - Computing capacity for map BlockMap  
 2014-11-26 13:08:58,160 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] util.GSet ( - VM type    = 64-bit  
 2014-11-26 13:08:58,161 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] util.GSet ( - 0.5% max memory 889 MB = 4.4 MB  
 2014-11-26 13:08:58,161 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] util.GSet ( - capacity   = 2^19 = 524288 entries  
 2014-11-26 13:08:58,162 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.BlockPoolSliceScanner (<init>(186)) - Periodic Block Verification Scanner initialized with interval 504 hours for block pool BP-1847084755-  
 2014-11-26 13:08:58,166 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataBlockScanner ( - Added bpid=BP-1847084755- to blockPoolScannerMap, new size=1  
 2014-11-26 13:09:01,671 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:01,671 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:01,672 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:01,675 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:01,702 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 923  
 2014-11-26 13:09:01,702 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 1367  
 2014-11-26 13:09:01,703 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 1902  
 2014-11-26 13:09:01,702 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 1980  
 2014-11-26 13:09:03,055 INFO [Thread-26] datanode.BlockPoolSliceScanner ( - Verification succeeded for BP-1847084755-  
 2014-11-26 13:09:03,057 INFO [Thread-26] datanode.BlockPoolSliceScanner ( - Verification succeeded for BP-1847084755-  
 2014-11-26 13:09:03,059 INFO [Thread-26] datanode.BlockPoolSliceScanner ( - Verification succeeded for BP-1847084755-  
 2014-11-26 13:09:03,060 INFO [Thread-26] datanode.BlockPoolSliceScanner ( - Verification succeeded for BP-1847084755-  
 2014-11-26 13:09:04,479 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:04,479 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:04,482 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 776  
 2014-11-26 13:09:04,484 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 381  
 2014-11-26 13:09:04,617 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:04,619 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:04,621 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 555  
 2014-11-26 13:09:04,624 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 7514  
 2014-11-26 13:09:07,480 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:07,486 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:07,488 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 6221  
 2014-11-26 13:09:07,496 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 7424  
 2014-11-26 13:09:07,621 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:07,621 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Receiving BP-1847084755- src: / dest: /  
 2014-11-26 13:09:07,632 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 4788  
 2014-11-26 13:09:07,633 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( - Received BP-1847084755- src: / dest: / of size 9118  
 ... etc ...  
Notice how the auto -rebalancing occurs It looks like this
2014-11-26 13:09:07,488 INFO [DataXceiver for client at / [Receiving block BP-1847084755-]] datanode.DataNode ( -
Received BP-1847084755-
src: /
dest: /
of size 6221

LIkewise, the summary in the web interface should now show the extra node:

For more information on how DataNodes join the cluster, read up on the heartbeat mechanism in the Hadoop Architecture. Brad Hedlund has one of the best posts on this topic I've come across.


Node is expected to serve this storage

If you create a clone VM, and don't clean out the $HADOOP_DATA_DIR, you're likely to get this error.
 2014-11-26 12:56:16,660 WARN [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Block pool BP-1847084755- (Datanode Uuid 9b460762-eba0-45f2-b0b4-f00e11572ed6) service to master/ is shutting down  
 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.UnregisteredNodeException): Data node DatanodeRegistration(, datanodeUuid=9b460762-eba0-45f2-b0b4-f00e11572ed6, infoPort=50075, ipcPort=50020, storageInfo=lv=-55;cid=CID-822421ef-e4d6-49d7-9b25-e0b81ae32b7d;nsid=1137176454;c=0) is attempting to report storage ID 9b460762-eba0-45f2-b0b4-f00e11572ed6. Node is expected to serve this storage.  
      at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanode(  
      at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.processReport(  
      at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.blockReport(  
      at org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB.blockReport(  
      at org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos$DatanodeProtocolService$2.callBlockingMethod(  
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$  
      at org.apache.hadoop.ipc.RPC$  
      at org.apache.hadoop.ipc.Server$Handler$  
      at org.apache.hadoop.ipc.Server$Handler$  
      at Method)  
      at org.apache.hadoop.ipc.Server$  
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(  
      at com.sun.proxy.$Proxy12.blockReport(Unknown Source)  
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
      at sun.reflect.NativeMethodAccessorImpl.invoke(  
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(  
      at java.lang.reflect.Method.invoke(  
      at com.sun.proxy.$Proxy12.blockReport(Unknown Source)  
      at org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB.blockReport(  
      at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.blockReport(  
      at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(  
      at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.(  
 2014-11-26 12:56:16,664 WARN [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Ending block pool service for: Block pool BP-1847084755- (Datanode Uuid 9b460762-eba0-45f2-b0b4-f00e11572ed6) service to master/  
 2014-11-26 12:56:16,766 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataNode ( - Removed Block pool BP-1847084755- (Datanode Uuid 9b460762-eba0-45f2-b0b4-f00e11572ed6)  
 2014-11-26 12:56:16,766 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] datanode.DataBlockScanner ( - Removed bpid=BP-1847084755- from blockPoolScannerMap  
 2014-11-26 12:56:16,766 INFO [DataNode: [[[DISK]file:/home/craigtrim/HADOOP_DATA_DIR/data/]] heartbeating to master/] impl.FsDatasetImpl ( - Removing block pool BP-1847084755-  
 2014-11-26 12:56:18,768 WARN [main] datanode.DataNode ( - Exiting Datanode  
 2014-11-26 12:56:18,770 INFO [main] util.ExitUtil ( - Exiting with status 0  
 2014-11-26 12:56:18,772 INFO [Thread-1] datanode.DataNode ( - SHUTDOWN_MSG:   
 SHUTDOWN_MSG: Shutting down DataNode at CVB/  

The solution is to clear out the directory as mentioned above, and restart the DataNode.


  1. Inter DataNode Communication
      1. The NameNode Orchestrates the replication of data blocks from one datanode to another. The replication data transfer happens directly between datanodes and the data never passes through the namenode.
      1. "the main communications between datanodes is data transfers, replicating blocks from one datanode to one or more other nodes"
      1. Data nodes can talk to each other to rebalance data, to move copies around, and to keep the replication of data high
  2. The Heartbeat Mechanism
      1. Data Nodes send heartbeats to the Name Node every 3 seconds via a TCP handshake, using the same port number defined for the Name Node daemon, usually TCP 9000.

Tuesday, November 25, 2014

Running the WordCount Program


  1. Configuring a Hadoop Development Environment

Sample Code

Let's import the sample WordCount program, and make sure we can get this to compile correctly.  We'll export the JAR file containing this code to our HDFS cluster.


 package dev.hadoop.sandbox.counter;  

 import java.util.StringTokenizer;  
 import org.apache.hadoop.mapreduce.Mapper;  

 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {  
      private final IntWritable     one        = new IntWritable(1);  
      private Text                  word       = new Text();  
      public WordCountMapper() {  
           System.out.println("Init WordCount Mapper");  
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
           StringTokenizer iter = new StringTokenizer(value.toString());  
           while (iter.hasMoreTokens()) {  
                context.write(word, one);  


 package dev.hadoop.sandbox.counter;  

 import java.util.Iterator;  
 import org.apache.hadoop.mapreduce.Reducer;  

 public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {  
      private IntWritable     result     = new IntWritable();  
      public WordCountReducer() {  
           System.out.println("Init WordCountReducer");  
      protected void reduce(Text word, Iterable<IntWritable> intOne, Context context) throws IOException, InterruptedException {  
           int sum = 0;  
           Iterator<IntWritable> iter = intOne.iterator();  
           while (iter.hasNext())  
                sum +=;  
           context.write(word, result);  


 package dev.hadoop.sandbox.counter;  

 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.mapreduce.Job;  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 import org.apache.hadoop.util.GenericOptionsParser;  

 public class WordCountRunner {  

      public static void main(String... args) throws Throwable {  
           Configuration conf = new Configuration();  
           Job job = new Job(conf, "word count");  
           String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
           FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
           FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
           System.exit(job.waitForCompletion(true) ? 0 : 1);  

Import the code, and make sure it can compile successfully.

It should look something like this:

We're going to re-compile this using Maven.  Eclipse is useful as an IDE, but it's better to rely on a tool like Maven for managing the project build cycles.

Building the Sample

I type this command on the terminal window:
mvn clean package

This is essentially the same as calling "mvn package" as far as build cycle execution, with the added advantage of invoking the clean plugin. As the name might imply, this plugin attempts to clean the files and directories generated by Maven during its build.

Note that if your POM file contains dependencies on other POM files, you may need to build a far JAR:
This is the JAR that you'll want to copy to the NameNode and execute.

Running the JAR

I'm going to copy the JAR onto my NameNode:
scp target/*.jar craig@master:~

Then SSH into my NameNode and execute the JAR
hadoop jar ~/sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out

I've colour-coded the input parameters:
/nyt is the input directory
/out is the output directory 

If the output directory exists from a prior run of this program, you'll have to delete it using this command:
hdfs dfs -rm -r /out

Or simply specify a new output directory (eg. /out2)

Operational Output

The (partial) operational output from a successful run of the WordCounter looks like this:
 craigtrim@CVB:/usr/lib/apache/hadoop/2.5.2/bin$ hadoop jar sandbox-1.0-SNAPSHOT.jar dev.hadoop.sandbox.counter.WordCountRunner /nyt /out  
 Input Directory = /nyt  
 Output Directory = /out  
 2014-11-25 11:14:05,828 INFO [main] Configuration.deprecation ( - is deprecated. Instead, use dfs.metrics.session-id  
 2014-11-25 11:14:05,832 INFO [main] jvm.JvmMetrics ( - Initializing JVM Metrics with processName=JobTracker, sessionId=  
 2014-11-25 11:14:06,188 WARN [main] mapreduce.JobSubmitter ( - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.  
 2014-11-25 11:14:06,389 INFO [main] input.FileInputFormat ( - Total input paths to process : 180  
 2014-11-25 11:14:06,451 INFO [main] mapreduce.JobSubmitter ( - number of splits:180  
 2014-11-25 11:14:06,542 INFO [main] mapreduce.JobSubmitter ( - Submitting tokens for job: job_local1043625416_0001  
 2014-11-25 11:14:06,569 WARN [main] conf.Configuration ( - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.  
 2014-11-25 11:14:06,572 WARN [main] conf.Configuration ( - file:/tmp/hadoop-craigtrim/mapred/staging/craigtrim1043625416/.staging/job_local1043625416_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.  
 2014-11-25 11:14:06,662 WARN [main] conf.Configuration ( - file:/home/craigtrim/HADOOP_DATA_DIR/local/localRunner/craigtrim/job_local1043625416_0001/job_local1043625416_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.  
snip ...
 2014-11-25 11:14:15,549 INFO [pool-6-thread-1] mapred.Task ( - Task:attempt_local1043625416_0001_r_000000_0 is done. And is in the process of committing  
 2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.LocalJobRunner ( - 180 / 180 copied.  
 2014-11-25 11:14:15,552 INFO [pool-6-thread-1] mapred.Task ( - Task attempt_local1043625416_0001_r_000000_0 is allowed to commit now  
 2014-11-25 11:14:15,588 INFO [pool-6-thread-1] output.FileOutputCommitter ( - Saved output of task 'attempt_local1043625416_0001_r_000000_0' to hdfs://master:9000/out/_temporary/0/task_local1043625416_0001_r_000000  
 2014-11-25 11:14:15,589 INFO [pool-6-thread-1] mapred.LocalJobRunner ( - reduce > reduce  
 2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.Task ( - Task 'attempt_local1043625416_0001_r_000000_0' done.  
 2014-11-25 11:14:15,590 INFO [pool-6-thread-1] mapred.LocalJobRunner ( - Finishing task: attempt_local1043625416_0001_r_000000_0  
 2014-11-25 11:14:15,590 INFO [Thread-5] mapred.LocalJobRunner ( - reduce task executor complete.  
 2014-11-25 11:14:15,843 INFO [main] mapreduce.Job ( - map 100% reduce 100%  
 2014-11-25 11:14:15,844 INFO [main] mapreduce.Job ( - Job job_local1043625416_0001 completed successfully  
 2014-11-25 11:14:15,932 INFO [main] mapreduce.Job ( - Counters: 38  
      File System Counters  
           FILE: Number of bytes read=71500803  
           FILE: Number of bytes written=129250399  
           FILE: Number of read operations=0  
           FILE: Number of large read operations=0  
           FILE: Number of write operations=0  
           HDFS: Number of bytes read=67981524  
           HDFS: Number of bytes written=199881  
           HDFS: Number of read operations=33667  
           HDFS: Number of large read operations=0  
           HDFS: Number of write operations=183  
      Map-Reduce Framework  
           Map input records=4372  
           Map output records=83529  
           Map output bytes=837472  
           Map output materialized bytes=614722  
           Input split bytes=17820  
           Combine input records=83529  
           Combine output records=46932  
           Reduce input groups=19209  
           Reduce shuffle bytes=614722  
           Reduce input records=46932  
           Reduce output records=19209  
           Spilled Records=93864  
           Shuffled Maps =180  
           Failed Shuffles=0  
           Merged Map outputs=180  
           GC time elapsed (ms)=1299  
           CPU time spent (ms)=0  
           Physical memory (bytes) snapshot=0  
           Virtual memory (bytes) snapshot=0  
           Total committed heap usage (bytes)=134581059584  
      Shuffle Errors  
      File Input Format Counters   
           Bytes Read=503395  
      File Output Format Counters   
           Bytes Written=199881  

Pay particular attention to the text at the end of the operational output.

Monday, November 24, 2014

Working with the Hadoop Distributed File System

The HDFS File System

HDFS is not fully POSIX-compliant.  The requirements for a POSIX file-system differ from the target goals for a Hadoop application.

HDFS is a distributed filesystem that stores large files across multiple machines.  Just like a Unix filesystem, HDFS allows users to manipulate the filesystem using shell commands.  Most HDFS commands have a one-to-one correspondence with Unix commands.


This section assumes that:
  1. you are already logged onto a Linux NameNode
    1. and transferring files from the filesystem of that NameNode onto the HDFS filesystem.
    2. For instructions on how to copy files onto the NameNode itself (perhaps from a Windows machine), please read this article.
  2. Hadoop has been started.

Copying Files into HDFS

In this example, I have some news article data on my home directory.

I'm going to copy this data into my HDFS filesystem:
hdfs dfs -mkdir /nyt
hdfs dfs -put ~/nyt /nyt
hdfs dfs -ls /nyt

Removing Files from HDFS

Using this command, I can delete data from the directory I created in the prior command:
craigtrim@CVB:/usr/lib/apache/hadoop/2.5.2/bin$ hdfs dfs -rm -r /nyt
2014-11-24 14:21:03,517 INFO [main] fs.TrashPolicyDefault ( - Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.  
Deleted /nyt  

The Web Interface

It is possible to browse the HDFS filesystem using the NameNode Web Interface.

The URL for the NameNode Web Interface can be found at:

Click on the Utilities > Browse the File System tab sequence in the menu header, and visually browse the filesystem in read only mode:

Installing PuTTY on Windows

What is PuTTY?

PuTTY is (among other things) a free of SSH for Windows. I use PuTTy primarily as a method of communicating between my Windows workstation and Linux installations on VMs in VirtualBox.

This article is part of the Hadoop Masterpage.

The purpose of SSH is to create a secure channel across an insecure network. SSH is an asymmetric encryption protocol. This means there are two keys. The public key will encrypt data, and can be disseminated to the world. The private key is used to decrypt data and must be kept hidden at all times.

Where do I download PuTTY?

I start at the official team site here:

And for this article, I’ve selected version 0.63.

PSCP is the putty version of scp which is a cp (copy) over ssh command.


Uploading Files

The use I will be describing in this article is to communicate from a Windows host device to a Linux Virtual Machine (VM), in this case, Ubuntu 14.04.

Find the IP address on your Linux installation:

 craigtrim@CVB:/usr/lib/apache/hadoop/2.5.2/bin$ ifconfig  
 eth0   Link encap:Ethernet HWaddr 08:00:27:6f:e0:5d   
      inet addr: Bcast: Mask:  
      RX packets:2899 errors:0 dropped:0 overruns:0 frame:0  
      TX packets:1696 errors:0 dropped:0 overruns:0 carrier:0  
      collisions:0 txqueuelen:1000   
      RX bytes:654302 (654.3 KB) TX bytes:600048 (600.0 KB)  
 lo    Link encap:Local Loopback   
      inet addr: Mask:  
      UP LOOPBACK RUNNING MTU:65536 Metric:1  
      RX packets:815 errors:0 dropped:0 overruns:0 frame:0  
      TX packets:815 errors:0 dropped:0 overruns:0 carrier:0  
      collisions:0 txqueuelen:0   
      RX bytes:100433 (100.4 KB) TX bytes:100433 (100.4 KB)  

In your windows command line, find the directory that the "pscp.exe" executable exists in.

The data I want to transfer exists in a temp directory on the c:/ drive of my Windows box, and I want to transfer this to a data sub-directory in my home directory in the Linux box.

I use this command to transfer all the files:
 $ pscp c:\temp\* craigtrim@192.168.x.y:/home/craigtrim/data  

The operational output is shown on the command line:
 0106363.txt        | 2 kB |  3.0 kB/s | ETA: 00:00:00 | 100%  
 0106364.txt        | 3 kB |  3.2 kB/s | ETA: 00:00:00 | 100%  
 0106365.txt        | 6 kB |  6.8 kB/s | ETA: 00:00:00 | 100%  
 0106366.txt        | 3 kB |  3.9 kB/s | ETA: 00:00:00 | 100%  
 0106367.txt        | 9 kB |  9.6 kB/s | ETA: 00:00:00 | 100%  
 0106368.txt        | 3 kB |  3.9 kB/s | ETA: 00:00:00 | 100%  

Downloading Files

Let's say I want to download the log files from my HDFS cluster on the NameNode.

This is similar to the command above, with the exception of the remote system being placed first, and my local system being placed last:
 $ pscp craigtrim@192.168.x.y:/usr/lib/apache/hadoop/2.5.2/logs/* c:/temp  
Note that the local directory structure will need to exist first.

It is also possible to pass the username and password into the command:
 $ pscp -l craigtrim -pw password 192.168.x.y:/usr/lib/apache/hadoop/2.5.2/logs/* c:\temp  
 hadoop-craigtrim-secondar | 39 kB | 39.1 kB/s | ETA: 00:00:00 | 100%  
 hadoop-craigtrim-namenode | 122 kB | 122.8 kB/s | ETA: 00:00:00 | 100%  
 yarn-craigtrim-resourcema | 36 kB | 36.1 kB/s | ETA: 00:00:00 | 100%  


I have also found it helpful to create a batch file in Windows with the command above.

The path to the batch file can be placed in the PATH variable in System > Environment Variables.  The command can also be parameterized.  Once in the PATH, this command can be executed from any context in a given Windows Command session.

Alternatives to PuTTY

I prefer PuTTY, since the uploading / downloading can be easily automated from the command line. However, WinSCP provides a visual method for the same purpose.

