Monday, December 12, 2011

Export HDFS over CIFS (Samba3)


Three weeks ago I played with libhdfs and NFS, but I did not get the results I expected. Then my next idea was, why not to use Samba? Samba3x is stable and most OS can mount an exported share.
The main task was to research the performance and setup of this scenario, because samba has a lot of tuning mechanisms inside. Let's go!

I used a RHEL 5.7 and the delivered RPMs:
 #> rpm -qa|grep samba
 samba-3.0.33-3.29.el5_7.4.x86_64
 samba-common-3.0.33-3.29.el5_7.4.x86_64

Like I described in "NFS exported HDFS" I mounted hdfs over fuse into the directory /123 via /etc/fstab:

 #> cat /etc/fstab
 [..]
 hadoop-fuse-dfs#dfs://NAMENODE:9000 /123/hdfs fuse usetrash,rw 0 0

and checked it:
 #> mount
 [..]
 fuse on /123/hdfs type fuse (rw,nosuid,nodev,allow_other,default_permissions)

 #> ls -la /123
 total 16
 drwxr-xr-x  3 root root   4096 Dec  9 16:36 .
 drwxr-xr-x 27 root root   4096 Dec  9 12:11 ..
 drwxr-xr-x  5 hdfs nobody 4096 Dec  9 02:14 hdfs

The first step afterwards is to configure samba. I figured that config out:

#> cat /etc/samba/smb.conf
[global]
        bind interfaces only = yes
        deadtime = 15
        default case = lower
        disable netbios = yes
        interfaces = eth0
        dns proxy = no
        workgroup = HDFS
        server string = Samba Server Version %v
        socket options = TCP_NODELAY IPTOS_LOWDELAY SO_RCVBUF=65536 SO_SNDBUF=65536
        load printers = no
        max connections = 30
        strict sync = no
        sync always = no
        syslog = 1
        syslog only = yes
        security = user
        smb passwd file = /etc/samba/smbpasswd
        
[hdfs]
        comment = HDFS
        path = /123/hdfs
        public = yes
        writable = yes
        printable = no
        create mask = 0744
        force user = hdfs
        force group = nobody

Created user and password, here I used the hdfs-system-user (id=hdfs, group=nobody)      

smbpasswd -a username

At last I started the server:
 #> service smb restart

Test cases
For testing I used another RHEL5.7 server and mounted the exported share into /test:
 #> mount -t cifs -o username=hdfs,rw //SAMBASERVER/hdfs /test
 Password: HERE_THE_PASSWORD

check:
 #> ls -la /test/
 total 8
 drwxr-xr-x  5 hdfs nobody    0 Dec  9 02:14 .
 drwxr-xr-x 25 root root   4096 Dec  9 15:03 ..
 drwxr-xr-x  3 hdfs nobody    0 Dec  9 02:12 mapred
 drwxr-xr-x  3 hdfs nobody    0 Dec  9 02:13 opt
 drwxr-xr-x  6 hdfs nobody    0 Dec  9 15:56 user

Now the hdfs from my testcluster is exported via samba. So far, so good.

My first test concerned the read performance, here I chose a rsync of a smaller logfile collection:
 #> cd /tmp/rsync-test
 #> rsync -av /test/hdfs/user/flume/weblogs/2011-12-07/ .
 sent 20478888644 bytes  received 92606 bytes  17377158.46 bytes/sec
 total size is 20475835998
 (19GB, 16 MB/s) 

How many files I synced?
 #> find . -type f |wc -l
 4665

Okay, that worked. Then I tested the write speed, here I used a plain file I created with

 #> dd if=/dev/zero of=/tmp/13GB bs=128M count=100

and copied it into the cifs-mount, for testing with "time":
 #> time cp /tmp/13GB /test/hdfs/user/
 real 7m57.864s
 user 0m0.328s
 sys 0m20.602s

= around 27 mb/s

checked for correct rights and groups on hdfs:

 hdfs#> hadoop dfs -ls /user
 Found 1 item
 -rw-r--r--   3 hdfs supergroup 13421772800 2011-12-09 15:56 /user/13GB

To compare with a scp write test I used:
 #> scp /tmp/13GB hdfs@SAMBASERVER:/123/hdfs/user

and got
13GB  100%   13GB  47.8MB/s   04:28

which is much faster. The overhead from samba will cost performance, for sure.

Conclusion
It is possible to export a hdfs filesystem over libhdfs and samba to clients and get acceptable results. That makes some tasks easier, including the use of hdfs as a (limited) cluster storage.

Links:
Samba-Tuning: https://calomel.org/samba_optimize.html

Friday, December 2, 2011

OSX improved shell environment


There is my favorite environment for all reasons on my MacBook and come with an improved zsh and extended .vimrc with highlighting, checking, TextMate, Solarize. Some tools are to install, so here a recipe for it.
Features are direction keys navigation through directories or files, dev friendly colors, command highlighting, improved history, option autocomplete, ssh autocomplete (if the keys are known) and a lot more useful things.

Get Xcode:
 AppStore => Xcode => Install Xcode

From now we use a terminal window.

Install Brew
 /usr/bin/ruby -e "$(curl -fsSL https://raw.github.com/gist/323731)"

Install git and wget:
 brew install git
 brew install wget

Install oh-my-zsh:
 wget --no-check-certificate https://github.com/robbyrussell/oh-my-zsh/raw/master/tools/install.sh -O - | sh

The script want to change your shell from /bin/bash into /bin/zsh, here you have to provide your password.

To change the theme edit ~/.zshrc and edit line 8:
 ZSH_THEME="YOUR_FAV_THEME"

Install and enable zsh-highlighting:
 git clone http://github.com/nicoulaj/zsh-syntax-highlighting.git ~/.oh-my-zsh/plugins/
 edit .zshrc and add in line 27 (space seperated):
  zsh-syntax-highlighting

Solarized and Pathogen
(http://ethanschoonover.com/solarized)
(https://github.com/tpope/vim-pathogen)

mkdir -p ~/.vim/autoload ~/.vim/bundle; \  
curl -so ~/.vim/autoload/pathogen.vim \
    https://raw.github.com/tpope/vim-pathogen/HEAD/autoload/pathogen.vim
 
cd ~/.vim/bundle
 git clone https://github.com/altercation/vim-colors-solarized.git

Extended .vimrc:
cd ~/ && git clone https://github.com/ikaros/vim-configuration.git && rm -rf .vim && mv vim-configuration .vim && rake -T && rake place_vim_config

I remove some bundles for now:
 git rm --cache bundle/cucumber
 git rm --cache bundle/rubytest
 git rm --cache bundle/snipmate

and add snipmate, because the bundle above does not work:
 git submodule add https://github.com/msanders/snipmate.vim.git bundle/snipmate

update all bundles:
 git submodule update --init

To change the Solarized theme edit in your .vimrc the background-variable (set background=light | dark)

Finished. Enjoy your new shell!

Wednesday, November 30, 2011

NFS exported HDFS (CDH3)


For some reasons it could be a good idea to make a hdfs filesystem available across networks as a exported share. Here I describe a working scenario with linux and hadoop with tools both have on board.
I used fuse and libhdfs to mount a hdfs filesystem. Change namenode.local and <PORT> to fit your environment.

Install:
 yum install hadoop-0.20-fuse.x86_64 hadoop-0.20-libhdfs.x86_64

Create a mountpoint:
 mkdir /hdfs-mount

Mount your hdfs (testing):
 hadoop-fuse-dfs dfs://namenode.local:<PORT> /hdfs-mount -d

You will show like that:
 INFO fuse_options.c:162 Adding FUSE arg /hdfs-mount
 INFO fuse_options.c:110 Ignoring option -d
 unique: 1, opcode: INIT (26), nodeid: 0, insize: 56
 INIT: 7.10
 flags=0x0000000b
 max_readahead=0x00020000
 INFO fuse_init.c:101 Mounting namenode.local:<PORT>
 INIT: 7.8
 flags=0x00000001
 max_readahead=0x00020000
 max_write=0x00020000
 unique: 1, error: 0 (Success), outsize: 40

Hit crtl-C after you see "Success".

Make the mount available at boot time:
 echo "hadoop-fuse-dfs#dfs://namenode.local:<PORT> /hdfs-mount fuse usetrash,rw 0 0" >> /etc/fstab

Test:
#> mount -a
#> mount
 [..]
 sunrpc on /var/lib/nfs/rpc_pipefs type rpc_pipefs (rw)
 fuse on /hdfs-mount type fuse (rw,nosuid,nodev,allow_other,default_permissions)

To tune the memory for each JVM process take a look into /etc/default/hadoop-0.20-fuse and adjust the settings there.

Export via NFS (unsecure):
First we have to decide which user we use, I suppose the user hdfs. Use "id hdfs":
 uid=104(hdfs) gid=105(hdfs) groups=105(hdfs),104(hadoop) context=root:staff_r:staff_t:SystemLow-SystemHigh

Create an exports-file:
 cat /etc/exports
 /hdfs-mount/user    (fsid=111,rw,wdelay,anonuid=104,anongid=105,sync,insecure,no_subtree_check,no_root_squash)

Expl.: read-write, fsid=unused ID (man 5 exports), write-delay, hdfs user, sync

To export only the user-directory from HDFS prevents you from unwanted changes in system relevant directories (mapred as example).
Restart your NFS Server (service nfs restart).

Now you can use your hdfs as a "local" filesystem, which makes some tasks easier. Note that the "use user" are mapped to the local user, to using root is a bad idea.
Mount the exported NFS on your machine and create / copy your jobdefinitions or files simply.

PS: works only from kernel 2.6.27 upwards

Saturday, November 19, 2011

All in one HDFS Cluster for your pocket

Update 1 (Nov 21, 2011):
- added 3rd interface as host-only-adapter (hadoop1)
- enabled trusted device eth2

About one year ago, I created a small XEN-environment for my engineering pourposes. When I was traveling for hours it was very helpful to track some issues or test new features. The problem was that I had to carry 2 notebooks with me. That was the reason I switched to VirtualBox [1] which runs on OSX, Linux and Windows as well. I could play with my servers and when I did, they configured to death and I reimported them into a clean setup. I think that will also be a good start for new people who have to find into the hadoop ecosystem to see the power without the harm of configuration in a multi-node environment.
The appliance is created with VirtualBox, because it runs on OSX and Windows very easily. The idea behind it is to check new settings in a small environment rather easily; the appliance is designed for research, not for development and really not for production. The appliance has 4 nodes, one master and 3 slaves. The setup is not perfect, but it matched the environment I created it for. We have no seperate secondary namenode, for example. I set up hdfs, hive with mysql-metastore, hBase in distributed mode with zookeeper and stargate.

Before we can play with our own LAB we have to consider that we need some specials before. Please read the site [2] I created for.

[1] https://www.virtualbox.org/wiki/Downloads
[2] http://mapredit.blogspot.com/p/all-in-one-hadoop-multi-node-appliance.html

Thursday, November 3, 2011

HDFS debugging scenario


The first step to debug issues in a running hadoop - environment to take a look at the stacktraces, easy accessible over jobtracker/stacks and let you show all running stacks in a jstack view. You will see the running processes, as an example I discuss a lab testing scenario, see below.

http://jobtracker:50030/stacks

Process Thread Dump: 
43 active threads
Thread 3203101 (IPC Client (47) connection to NAMENODE/IP:9000 from hdfs):
  State: TIMED_WAITING
  Blocked count: 6
  Waited count: 7
  Stack:
    java.lang.Object.wait(Native Method)
    org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:676)
    org.apache.hadoop.ipc.Client$Connection.run(Client.java:719)

In that case the RPC connection has a state "TIMED_WAIT" in a block and waited count. That means, the namenode could not answer the RPC request fast enough. The problem belongs the setup as I see often in production environments.
For demonstration I use a ESX Cluster with a VM for the namenode. The ESX abstraction layer for networks isn't performant enough and block the requests. It is always a good idea to use physical servers for infrastructure and services.
Another problem I figured out depends on HP Bladecenter switches from Nortel, a newer update set a hidden switch "dos-filter", disable it. The switch will block all traffic which looks like a DOS attack. That's a serious bug and I wondering why such params are delivered and enabled per default.

Now we take a closer look at the namenode:

With "jps" you can list all running java-processes:
jps
24158 SecondaryNameNode
31684 FlumeMaster
7898 JobTracker
18613 NameNode
16631 Jps
31653 FlumeWatchdog

We check the logs with "tail -f /var/log/hadoop-0.20/*.log|grep -i error". If you sure that all things are well you should look at the java-threads on the jobtracker. With "top -Hc" you'll see the threads and the running command:

Mem:   4043792k total,  3916788k used,   127004k free,   352684k buffers
Swap:  5242864k total,  1448628k used,  3794236k free,   653296k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
18448 hdfs      17   0 2345m 102m  12m S 55.1  1.3   0:01.66 /usr/java/jdk1.6.0_23/bin/java -Dproc_jar -Xmx2000m -Dhadoo
18458 hdfs      16   0 2345m 102m  12m S 31.5  1.3   0:00.95 /usr/java/jdk1.6.0_23/bin/java -Dproc_jar -Xmx2000m -Dhadoo
18457 hdfs      15   0 2345m 102m  12m R 30.9  1.3   0:00.93 /usr/java/jdk1.6.0_23/bin/java -Dproc_jar -Xmx2000m -Dhadoo
18356 hdfs      15   0 2510m 544m  11m S  1.3  6.8   0:00.06 jsvc.exec -Dproc_datanode -pidfile /usr/lib/hadoop-0.20/pid
18366 hdfs      18   0 2345m 102m  12m S  0.7  1.3   0:00.02 /usr/java/jdk1.6.0_23/bin/java -Dproc_jar -Xmx2000m -Dhadoo

to check the IO load on a running job you can use vmstat (sysstat-package):

vmstat -n 2 
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu------
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 0  0 1707244 368124 296528 482624    0    0     0   276 1626 1336 14 17 68  1  0
 0  0 1707244 367752 296544 482724    0    0     0   144 1362  716  2  1 97  0  0
 0  0 1707244 366884 296564 482728    0    0     8    66 1268  750  2  3 94  1  0
 1  0 1707244 366216 296588 482736    0    0     8   260 1269  672  2  4 94  1  0

Here we see a snippet with a running hive-job, the system has no problems with IO, but the interrupts and context switching looks a bit high. That depends on setup, it is always a bad idea to run all services on a single server instance.

Ex. Solution:
The namenode is going deeply into swap, so the first case should be to add more RAM (it was a testcluster with 5 nodes and a VM as namenode to reproduce some timing errors).
Remember, each block of HDFS take 4kb RAM usage on the namenode. The best case here will be to split the services in single instances (jobtracker, namenode and secondary namenode should be outsourced).

Another effective way would be to write a debugscript [1] and use setStatus() and incrCounter() methods on Reporter. Let run your debug script in streaming mode and use the cmd-line-options "-mapdebug" and "-reducedebug".

[1] http://hadoop.apache.org/common/docs/current/mapred_tutorial.html#Debugging

Tuesday, October 18, 2011

Syncing hdfs-clusters


Mostly it is a good idea to test new code on a reference cluster with a nearly live dataset. To sync files from a cluster to another use the hadoop builtin tool distcp [1]. With a small script I "rebase" a development cluster with logfiles we collected over the past day.

COPYDATE=`date -d '-1 Day' +"%Y-%m-%d"`
DELDATE=`date -d '-3 Day' +"%Y-%m-%d"`
SNAMENODE=namenode1
TNAMENODE=namenode2
PATH="/user/flume/logs"
LOG="/var/log/jobs/sync.log"


#logging
exec >> $LOG 2>&1


echo -e "\n ------- sync $COPYDATE ------- \n"
/usr/bin/hadoop distcp -i -m 100 hdfs://$SNAMENODE:9000/$PATH/$COPYDATE hdfs://$TNAMENODE:9000/$PATH/$COPYDATE/
sleep 60
echo -e "\n ------- delete $DELDATE ------- \n"
/usr/bin/hadoop dfs -rmr /$PATH/$DELDATE
/usr/bin/hadoop dfs -rmr /$PATH/_distcp_logs*
sleep 60
/usr/bin/hadoop dfs -chmod -R 777 /$PATH/

The script copy logfiles from the past day and the given path to the target's hdfs and delete the datasets if they older than 3 days. I didn't want the logs in that directory (and I didn't need them), so I delete them too. We didn't have the user flume in our development cluster, so I set permissions to 777 for the whole directory.
To debug a failure the script writes all output into the given logfile. If you want to rotate the file add a logrote-definition into /etc/logrotate.d/. To decrease the load and network impact at our live cluster I use only 100 maps. The script runs every day via cron 02:00 pm and took for 1TB around 1 hour. Here a ganglia chart for a 300 GB sync.



[1] http://hadoop.apache.org/common/docs/current/distcp.html

Technocrati Claim: PYBPPWZ4RFST

Saturday, October 15, 2011

Secure your hadoop cluster, Part II

To get absolutely safe you need a bit more time, coffee and Aspirin. You will get headaches, for sure. First the good news, hadoop and the ecosystem run out of the box with an enabled SELinux system in targeting mode. You have to consider a performance loss of 5 - 10%.

To enable SELiux on a box use setenforce 1, to check the system use sestatus.
# sestatus 
SELinux status:                 enabled
SELinuxfs mount:                /selinux
Current mode:                   enforcing
Mode from config file:          enforcing
Policy version:                 21
Policy from config file:        targeted

Fine. Thats all. Now we enable SELinux at boot time:
# cat /etc/selinux/config
SELINUX=enforcing
SELINUXTYPE=targeted
SETLOCALDEFS=0

If you use fuse-hdfs check [1] for a valid rule.

The best way to get a system running is always to use SELINUXTYPE=targeted. But in some environments it is neccessary to protect the systems much more (Healthcare, Bank, Military etc.), here we use strict mode. The difference between targeted and strict is small, but really important - strict protects the whole system and uses the given ruleset, targeted protects only the services in the given policy.

To get the strict mode running you have to install the policies first, I did yum install selinux-policy*. Now you have to set your system into the permissive mode

# cat /etc/selinux/config
SELINUX=permissive
SELINUXTYPE=strict
SETLOCALDEFS=0

Then touch /.autorelabel and reboot. If you get into a kernel panic a boot time cancel boot (hitting Space), and append at the grub boot-line selinux=0 and boot.

Cross your fingers, get a large cup of coffee and watch the boot screen. The permissive mode lets boot the system and only reports all violations, but allows us to access the system. After a successfull reboot check dmesg for errors. Now we get a lot of AVC messages, including the hadoop subsystem. Use the system as you useally use. With audit2allow you have the availibity to create own policies, the simplest way is audit2allow -a (see above).

Activate the strict-mode and set SELinux into enforcing:

# setenforce 1
# sestatus 
SELinux status:                 enabled
SELinuxfs mount:                /selinux
Current mode:                   enforcing
Mode from config file:          permissive
Policy version:                 21
Policy from config file:        strict

Now you have no access to the system, except your home and /tmp. To set the system into the permissive mode you have to get the role for:
# newrole -r sysadm_r
Password:

now you are able to administer the system including setting back into the permissive mode with setenforce 0.

Now its time to get hdfs running. It is quite more easier then you think, because hadoop is developed in java. So we have to enable java first:

# grep java_exec_t /var/log/audit/audit.log | audit2allow -m hdfs > hdfs.te
# cat hdfs.te
module hdfs 1.0;


require {
type sysadm_su_t;
type java_exec_t;
class file execute;
}


#============= sysadm_su_t ==============
allow sysadm_su_t java_exec_t:file execute;

Now we build a custom module:
# grep java_exec_t /var/log/audit/audit.log | audit2allow -M hdfs
******************** IMPORTANT ***********************
To make this policy package active, execute:
semodule -i hdfs.pp


# semodule -i hdfs.pp

Check if the module was loaded correctly:
# semodule -l | grep hdfs
hdfs 1.0

enable the ports:
# semanage port -a -t java_port_t -p tcp 9000 (9000 as an example, check [2] for default ports)

You see, to harden a hadoop cluster take a bit more time, but it is possible. You get a robust and security enhanced system, but have to pay for with minimal performance loss.


[1] http://mail-archives.apache.org/mod_mbox/hadoop-general/201001.mbox/%3C4B436D53.2030406@hep.caltech.edu%3E
[2] http://www.cloudera.com/blog/2009/08/hadoop-default-ports-quick-reference/

Thursday, October 13, 2011

Sqoop and Microsoft SQL Server

From Microsoft's technet:
With SQL Server-Hadoop Connector [1], you import data from:
Tables in SQL Server to delimited text files on HDFS
Tables in SQL Server to SequenceFiles files on HDFS
Tables in SQL Server to tables in Hive*
Queries executed in SQL Server to delimited text files on HDFS
Queries executed in SQL Server to SequenceFiles files on HDFS
Queries executed in SQL Server to tables in Hive*
 
With SQL Server-Hadoop Connector, you can export data from:
Delimited text files on HDFS to SQL Server
SequenceFiles on HDFS to SQL Server
Hive Tables* to tables in SQL Server
But before it works you have to setup the connector. First get the MS JDBC driver [2]:
You have just to download the driver, unpack them and copy the driver (sqljdbc4.jar) file to the $SQOOP_HOME/lib/ directory. Now download the connector (.tar.gz) from [1], unpack them and set the MSSQL_CONNECTOR_HOME into that directory. Let's assume you unpack into /usr/sqoop/connector/mssql, do:
# export MSSQL_CONNECTOR_HOME=/usr/sqoop/connector/mssql 

control the export:
# echo $MSSQL_CONNECTOR_HOME
/usr/sqoop/connector/mssql


and run the install.sh in the unpacked directory.
sh ./install.sh

Tip: create a profile.d file:
# cat /etc/profile.d/mssql.sh
export MSSQL_CONNECTOR_HOME=/usr/sqoop/connector/mssql
and chmod into 755

An example:
Sqoop <=> MS SQL Server and hadoop processing works well. Just setup a larger PoC with split the data in 3 maps:
# sqoop import --connect 'jdbc:sqlserver://<IP>;username=dbuser;password=dbpasswd;database=<DB>' --table <table> --target-dir /path/to/hdfs/dir --split-by <KEY> -m 3

=> export of 1.3 GB data tooks around one minute. After processing just send back:
# sqoop export --connect 'jdbc:sqlserver://<IP>;username=dbuser;password=dbpasswd;database=<DB>' --table=<table> --direct --export-dir /path/from/hdfs/dir

You can do the same operations as you know from oracle or mysql sqoop scripts.

[1] http://www.microsoft.com/download/en/details.aspx?id=27584
[2] http://www.microsoft.com/download/en/details.aspx?displaylang=en&id=21599

Tuesday, October 11, 2011

Centralized logfile management across networks with flume


Facebooks's scribe was the first available service for managing a hughe amount on logfiles. We didn't talk over 2 GB / day or so, I mean more as 1 TB per day. Compressed.
Now, a new apache incubator project is flume [1]. It is a pretty nice piece of software, so I love it. It is reliable, fast, safe and has no proprietary stack inside. And you can create really cool logging tasks.

If you use Clouderas Distribution you get flume easy with a "yum install flume-master" on the master and "yum install flume-node" on a node. Check [2] for more infos about.

Flume has a lot of sources to get logfiles:
- from a text-file
- as a tail (one or more files)
- syslog UDP or TCP
- synthetic sources

Flume's design belongs to a large logfile distribution process. Let's assume, we have a 100 Node Webcluster and incoming traffic around 3 GB/s. The farm produce 700 MB raw weblogs per minute.
Through the processing over flume we can compress the files, sort them into buckets you need and fast deliver into our hdfs. Here a working example:

cat /flume-zk/running.cfg
collector1.local : autoCollectorSource | collectorSink( "hdfs://namenode.local:9000/user/flume/weblogs/%Y-%m-%d/%H00/%M/", "%{host}-" );
collector2.local : autoCollectorSource | collectorSink( "hdfs://namenode.local:9000/user/flume/weblogs/%Y-%m-%d/%H00/%M/", "%{host}-" );
collector3.local : autoCollectorSource | collectorSink( "hdfs://namenode.local:9000/user/flume/weblogs/%Y-%m-%d/%H00/%M/", "%{host}-" );
collector4.local : autoCollectorSource | collectorSink( "hdfs://namenode.local:9000/user/flume/weblogs/%Y-%m-%d/%H00/%M/", "%{host}-" );
agent1.local : syslogTcp( "19800" ) | autoE2EChain;
agent2.local : syslogTcp( "19800" ) | autoE2EChain;

The chain autoE2EChain describe a failover process, if one of the nodes didn't respond they will be moved at the end. You will see the logical mapping at the webinterface (http://flume-master:35871/flumemaster.jsp).
We split the data here into minutes and set as a identifier the host from which we get the logs at the end. Make it easier to debug. The webfarm logs via a loadbalancer to the agents (input syslog, output 19800). 19800 is a free unprivileged port, as example.

Let us check one of the agents:
# cd /tmp/flume/agent/agent1.local/
# ls
done  logged  sending  sent  writing

# ls -lah writing/
total 418M
drwxr-xr-x 2 flume flume 4.0K Sep 28 16:25 .
drwxr-xr-x 7 flume flume 4.0K Sep 28 10:21 ..
-rw-r--r-- 1 flume flume 418M Sep 28 16:25 log.00000019.20111011-162503316+0200.10204828793997118.seq

That the logfile we receive at the moment from our nodes. After writing (you can define the split in flume.conf) the log will be sent to the collectors, so we connect to collector1:

# tail -f -n 10 /var/log/flume/*.log
<del>: Creating org.apache.hadoop.io.compress.BZip2Codec@3c9d9efb compressed HDFS file: hdfs://namenode.local:9000/user/flume/weblogs/2011-09-28/1600/25/agent1.local-log.00000019.20111011-162503316+0200.10204828793997118.seq.bzip2
<del>: Finishing checksum group called 'log.00000019.20111011-162503316+0200.10204828793997118.seq'
<del>: Checksum succeeded 1325c55440e
<del>: moved from partial to complete log.00000019.20111011-162503316+0200.10204828793997118.seq
<del>: Closing hdfs://namenode.local:9000/user/flume/weblogs/2011-09-28/1600/25/agent1.local-log.00000019.20111011-162503316+0200.10204828793997118.seq
<del>: Closing HDFS file: hdfs://namenode.local:9000/user/flume/weblogs/2011-09-28/1600/25/agent1.local-log.log.00000019.20111011-162503316+0200.10204828793997118.seq.bzip2

You see, the collector receive the action from the agent, open a sink into hdfs, write via stream the file and close the sink after the time we configured. Pretty nice! The logging mechanism works perfectly, the files will be splitted and compressed as bzip into 1 minute-pieces into our hdfs. Remember, use always bzip as compress codecs because the codec understand markers (blocksize, reducing etc).

[1] https://cwiki.apache.org/FLUME/
[2] https://ccp.cloudera.com/display/CDHDOC/Flume+Installation

Monday, October 10, 2011

Secure your hadoop cluster, Part I


Use mapred with Active Directory (basic auth)

The most cases I managed in past weeks concerned hadoop security. That means firewalling, SELinux, authentication and user management. It is usually a difficult process, related to the companys security profile and processes.

So I start with the most interesting part - authentication. And, in all cases I worked on the main authentication system was a Windows Active Directory Forest (AD). Since hadoop is shipped with more taskcontroller-classes we can use LinuxTaskController. I use RHEL5 server, but it can be adapted similar to other installations.

To enable the UNIX services in Windows Server > 2003 you have to extend the existing schema with UNIX templates, delivered from Microsoft. After that you have to install the "Identity Management for UNIX", in 2008 located in Server Manager => Roles => AD DS => common tasks => Add Role => Select Role Services. Install the software, restart your server and it should be done. Now create a default bind-account, configure the AD server and create a test-user with group hdfs.

For testing we use these settings:
DN:hadoop.company.local
Binding-Acc: main
Group: hdfs

Now we take a closer look at the RedHat box. Since kerberos5 is fully supported the task is really simple. 3 problems could occur: time, DNS and wrong schema on the AD Server(s).
Setup the ldap authentication with:
# authconfig-tui

Authentication => Use LDAP (use MD5 Password + Use shadow Password + Use Kerberos) => Next.






Server => Base DN (FQDN or IP) + DN (dc=hadoop,dc=company,dc=local) => Next.






Kerberos Settings => REALM in uppercase + KDC and admin server (FQDN or IP of AD Server) + Use DNS to resolve hosts to realms + Use DNS to locate KDCs for realms => OK





AD does not allow anonymous connection, so you have to use the bind-account in /etc/ldap.conf (see above).

add in /etc/nsswitch.conf ldap service after files:
passwd:     files ldap
shadow:     files ldap
group:      files ldap

Now edit the /etc/ldap.conf:

base dc=hadoop,dc=company,dc=local
uri ldap://hadoop.company.local/
binddn main@hadoop.company.local
bindpw <PASSWORD>
scope sub
ssl no
nss_base_passwd dc=hadoop,dc=company,dc=local?sub
nss_base_shadow dc=hadoop,dc=company,dc=local?sub
nss_base_group dc=hadoop,dc=company,dc=local?sub? \
&(objectCategory=group)(gidnumber=*)
nss_map_objectclass posixAccount user
nss_map_objectclass shadowAccount user
nss_map_objectclass posixGroup group
nss_map_attribute gecos cn
nss_map_attribute homeDirectory unixHomeDirectory
nss_map_attribute uniqueMember member
nss_map_objectclass posixGroup Group
tls_cacertdir /etc/openldap/cacerts
pam_password md5
pam_login_attribute sAMAccountName
pam_filter objectclass=User

After you have written your file you should test your config:
# getent passwd
and
# kinit <YOUR USERNAME>@hadoop.company.local
if you get no errors all works as expected.

Add to /etc/pam.d/system-auth:
session      required      pam_mkhomedir.so skel=/etc/skel umask=0022

Now it is time to use mapred with your AD. For that we use the shipped class org.apache.hadoop.mapred.LinuxTaskController, configuration will be done in mapred-site.xml:

<property>
<name>mapred.task.tracker.task-controller</name>
<value>org.apache.hadoop.mapred.LinuxTaskController</value>
</property>
<property>
<name>mapreduce.tasktracker.group</name>
<value>hdfs</value>
</property>

Now jobs will be submitted in the given usercontext via pam. Here you have to keep in mind that the group should be set to the group you setup in your AD.

Known issues mostly depend on your setup. Be sure you have a syncronized time in your network (usually done with ntpd), a working DNS infrastructure and the user and groups are known in AD.

Wednesday, September 28, 2011

hadoop log retention

Some people ask me for a "issue" in mapreduce-jobhistory (/jobhistory.jsp) - the history tooks a while to load the site on high-traffic clusters. For that I'll explain the mechanism:

The history-files will be available for 30 days (hardcoded in pre-h21). That produce a lot of logs and waste also space on the hadoop-jobtracker. So I have some installations which hold 20GB on logs in history, as a dependecy a audit of long running jobs isn't really useable.

Beginning from h21 the cleanup is configurable:

Key: mapreduce.jobtracker.jobhistory.maxage
Default: 7 * 24 * 60 * 60 * 1000L (one week)

to set the store into a 3-day period use:

mapreduce.jobtracker.jobhistory.maxage
3 * 24 * 60 * 60 * 1000L
That means 3 Days, 24 hours, 60 minutes, 60 seconds and a cache size of 1000.

a other way, but more a hack via crond.d:
find /var/log/hadoop-0.20/history/done/ -type f -mtime +1 |xargs rm -f

Friday, September 23, 2011

Analyze your IIS Logs with hive

As you know, it's really easy to collect logs from a apache driven webfarm into a hive-cluster and analyze them. But how it'll work for IIS? 


Okay, lets do a view inside. IIS let us collect logs in W3C format by checking over the administraion console, register "website", "Active log format". Here you can setup the path where the logs will be stored, the fields you'll logging and much more. After a restart you should see the logs in the desired path. A good idea will be a split into hours, so you can run the jobs every hour on a fresh dataset.


A really easy way will be for a small farm to export the path as a windows shared drive, connect your hive server with the samba-utils:
mount -t cifs //Windows-Server/share -o user=name,password=passwd /mountpoint

Copy the file into hdfs:
hadoop dfs -copyFromLocal /mountpoint/filename <hdfs-dir> (we assume iislog)

Now you can proceed with analysis, we use hive here. Lets assume you want to know which IP has the most traffic.


First you have to describe your tables in hive:
hive> create TABLE iislog (sdate STRING, stime STRING, ssitename STRING,scomputername STRING,sip STRING,csmethod STRING,csuristem STRING,csuriquery STRING,sport INT,csusername STRING,cip STRING,csversion STRING,csuseragent STRING,csCookie STRING,csReferer STRING,scstatus INT,scsubstatus INT,scwin32status INT,scbyte INT,csbytes INT,timetaken INT) partitioned by (time STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '32' STORED AS TEXTFILE;

hive> CREATE TABLE iptraffic (sdate STRING, cip STRING, traffic INT,hits INT,appid STRING,scsuseragent STRING) partitioned by (time STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '124' STORED AS TEXTFILE;

hive> describe iptraffic;
OK
sdate string
cip string
traffic int
hits int
appid string
scsuseragent string
time string
Time taken: 0.07 seconds


The first statement create a table, the rows describes our logfiles, splittet in fields. The second one we need to get the traffic per minute.
Remind, we try to partition the data to get the last results much faster. For that test we need the date -2 mins to create the partition with the data we looking for. To do that write a small script like:


#!/bin/bash
TABLE=IPTRAFFICDATEPAR=`date -d '-2 Min' +"%Y%m%d%H%M"`
DATEPATH=`date -d '-2 Min' +"%Y-%m-%d/%H00/%M"`
SDATE=`date -d '-2 Min' +"%Y-%m-%d"`
STIME=`date -d '-2 Min' +"%H:%M"`
hive -e "ALTER TABLE iptraffic ADD IF NOT EXISTS PARTITION (time='$DATEPAR')"
if [ $? -ne 0 ] ; 
 then echo "Couldn't create partition" 
 exit 1
 else echo -e "\n ==> PARTITION (time='$DATEPAR') created" 
fi

hive -e "INSERT OVERWRITE TABLE iptraffic partition (time=$DATEPAR) \ select concat('$SDATE ','$STIME:00'), cip, sum(csbytes)/1024 counter, count(1) hits,ssitename,csuseragent \ from iislog where iislog.time=$DATEPAR and NOT(iislog.cip LIKE '192\.%')\ group by cip,concat('$SDATE ','$STIME:00'), csuseragent, ssitename"

if [ $? -ne 0 ] ; 
 then echo -e "\n ==> a error occured in analysis \n" 
 exit 1
 else echo -e "\n ==> Insert analysis sucessful" 
fi


What will that do?
First we define the date and format them into a standard format we can use for. Then we create the partition and use them in our hive-statement (where clause iptraffic.PARTITION), group with concat by our mainkey (cip = SourceIP) with date and time and copy that into our hive-warehouse-dir.

Let us take a look into the table:
hive> select * from iislog limit 10; 
OK
20110928130000 2011-09-28 10:59:06 W3SVC2 IISTEST xxx.xxx.xxx.xxx GET /images/bluebox.gif -80 - xxx.xxx.xxx.xxx HTTP/1.1 Mozilla/5.0+(compatible;+MSIE+9.0;+Windows+NT+6.1;+Trident/5.0) GUID=<deleted> 6313985 NULL 200 0 0 551 1689 201109281300

But we analyzed the data in a new partition:
hadoop dfs -cat /user/hive/warehouse/iptraffic/time=201110071059/* |less

2011-10-07 10:59:00|xxx.xxx.xxx.xxx|18|2|W3SVC5|Mozilla/5.0+(compatible;+MSIE+9.0;+Windows+NT+6.1;+Win64;+x64;+Trident/5.0;+MALC)
2011-10-07 10:59:00|xxx.xxx.xxx.xxx|1|2|W3SVC7|Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/535.1+(KHTML,+like+Gecko)+Chrome/14.0.835.202+Safari/535.1

you're done.

Thursday, September 15, 2011

Speedup Sqoop


Sqoop [1] (sql to hadoop) lets easy connect RDBMS into a hadoop infrastructure. Newest plugin comes from Microsoft and let us connect MS-SQL Server and hadoop each together. As a cool feature you can create a jar-file from your job, its pretty easy, just here a line:

sqoop export --connect jdbc:<RDBMS>:thin:@<HOSTNAME>:<PORT>:<DB-NAME> --table<TABLENAME> --username<USERNAME> --password<PASSWORD> --export-dir <HDFS DIR WHICH CONTAINS DATA> --direct --fields-terminated-by '<TERMINATOR (Java)>' --package-name <JOBNAME>.<IDENTIFIER> --outdir <WHERE THE JAR SHOULD WRITTEN> --bindir <BIN_DIR>

After you fired up you'll find a jar-package in --outdir, unzip it and you find your java-code and the precompiled class,so you can start to tune them.

Now lets start the job again, but use the precompiled class:

sqoop export --connect jdbc:<RDBMS>:thin:@<HOSTNAME>:<PORT>:<DB-NAME> --table<TABLENAME> --username<USERNAME> --password<PASSWORD> --export-dir <HDFS DIR WHICH CONTAINS DATA> --direct --fields-terminated-by '<TERMINATOR (Java)>' --jar-file <PATH/TO/JAR> 
--class-name <JOBNAME>.<IDENTIFIER>.<CLASSNAME>

The step above let you increase the export of large datasets dramatically. So I speedup a export of 100k records from hdfs into a oracle-DB from 16sec into 8sec. 


Tuesday, August 16, 2011

Using MySQL as a Hive backend database


Hive let us use a SQL like (HiveQL) style to analyse large datasets with ad-hoc queries, and comes as a service on top of hdfs. It is easy to use and most SQL programmers can instant write some queries.
The lack of the installation are the included derby DB, which is running on the node locally. For that Hive is not really multiuser-capable.

To use Hive with more than one user you have to setup a backend database. The database will hold all metainformations regarding your tables, partitions, splits and rows. For that the database should be safe (maybe replication) or a HVA installation. I use 2 MySQL servers in a ESX Cluster environment with enabled binary logs (Active/Standby).

Setup a server and install mysql-server version 5.1 and up. To get absolute safe you can setup a MySQL cluster ;) Let us configure the mysql-database:

# cat /etc/my.cnf
[mysqld_safe]
socket      = /var/lib/mysql/mysql.sock
[mysqld]
user        = mysql
pid-file    = /var/run/mysqld/mysqld.pid
socket      = /var/lib/mysql/mysql.sock
log-error   = /var/log/mysqld.log
datadir     = /opt/hadoop/mysql
default-storage_engine  = InnoDB
skip-bdb                = 1
old_passwords           = 0
skip_name_resolve
connect_timeout     = 30
wait_timeout        = 30
interactive_timeout = 100
key_buffer          = 128M
thread_concurrency  = 4
thread_cache        = 16
thread_stack        = 256K
table_cache         = 512
tmp_table_size      = 64M
max_heap_table_size = 64M
server-id           = 1001
log_bin             = /var/log/mysql/mysqlserver-bin.log
expire_logs_days    = 3
max_binlog_size     = 256M
innodb_file_per_table           = 1
innodb_data_file_path           = ibdata1:10M:autoextend
innodb_buffer_pool_size         = 512M
innodb_log_file_size            = 16M
innodb_flush_log_at_trx_commit  = 1
long_query_time     = 2
log_slow_queries    = /var/log/mysql/mysql-slow.log
query_cache_size    = 64M
query_cache_type    = 1
query_cache_limit   = 4M

You see, we use binary logging, as engine InnoDB (no locking tables) and some RAM related params. Consult the mysql manual for a complete description (would be a larger post).

Restart your mysqlserver (service mysqld restart). Now create the user and table:
mysql> CREATE USER 'USER'@'%' IDENTIFIED BY 'PASSWORD';
mysql> GRANT ALL PRIVILEGES ON *.* TO 'USER'@'%' WITH GRANT OPTION;
mysql> create DATABASE hive-live;

Thats all. I know, very low security. But the server should only hold one database.

Setup hive to use the metastore and add in hive-default.xml:
# vi /etc/hive/conf/hive-default.xml
<!-- add mysql metastore -->
<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://mysqlserver:3306/hive_live_new?createDatabaseIfNotExist=true</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>USER</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>PASSWORD</value>
</property>
<!-- end -->

To get Hive running with the mysql-driver, you have to download them [1] and copy across your cluster:
for i in $(cat /etc/hadoop-0.20/conf/slaves); do scp -r /usr/lib/hive/lib/mysql-connector-java-5.1.11-bin.jar $i:/usr/lib/hive/lib/; done

Now copy the hive-config too:
for i in $(cat /etc/hadoop-0.20/conf/slaves); do scp -r /etc/hive/conf/hive-* $i:/etc/hive/conf.dist/; done

Thats all. If you use hive now the first statement should take a while, because hive will create the schema at the backend. Let us check:

mysql> use hive_live;
mysql> show tables;
+-------------------------+
| Tables_in_hive_live     |
+-------------------------+
| BUCKETING_COLS          |
| COLUMNS                 |
| DATABASE_PARAMS         |
| DBS                     |
| DB_PRIVS                |
| GLOBAL_PRIVS            |
| IDXS                    |
| INDEX_PARAMS            |
| PARTITIONS              |
| PARTITION_KEYS          |
| PARTITION_KEY_VALS      |
| PARTITION_PARAMS        |
| PART_COL_PRIVS          |
| PART_PRIVS              |
| ROLES                   |
| ROLE_MAP                |
| SDS                     |
| SD_PARAMS               |
| SEQUENCE_TABLE          |
| SERDES                  |
| SERDE_PARAMS            |
| SORT_COLS               |
| TABLE_PARAMS            |
| TBLS                    |
| TBL_COL_PRIVS           |
| TBL_PRIVS               |
| TYPES                   |
| TYPE_FIELDS             |
+-------------------------+
28 rows in set (0.01 sec)

If you have to ugrade the schema (hive-update maybe), don't be worry. Hive comes with SQL-statements, take a look at /var/lib/hive/metastore/scripts/upgrade/mysql/. To apply a schema cd into the directory and open mysql-cli:
mysql> source /var/lib/hive/metastore/scripts/upgrade/mysql/hive-schema-0.7.0.mysql.sql <enter>

The script will load all sql-statements they will need to upgrade the database. A good idea is a full DB backup before.


[1] http://dev.mysql.com/downloads/connector/j/