Tuesday, October 18, 2011

Syncing hdfs-clusters


Mostly it is a good idea to test new code on a reference cluster with a nearly live dataset. To sync files from a cluster to another use the hadoop builtin tool distcp [1]. With a small script I "rebase" a development cluster with logfiles we collected over the past day.

COPYDATE=`date -d '-1 Day' +"%Y-%m-%d"`
DELDATE=`date -d '-3 Day' +"%Y-%m-%d"`
SNAMENODE=namenode1
TNAMENODE=namenode2
PATH="/user/flume/logs"
LOG="/var/log/jobs/sync.log"


#logging
exec >> $LOG 2>&1


echo -e "\n ------- sync $COPYDATE ------- \n"
/usr/bin/hadoop distcp -i -m 100 hdfs://$SNAMENODE:9000/$PATH/$COPYDATE hdfs://$TNAMENODE:9000/$PATH/$COPYDATE/
sleep 60
echo -e "\n ------- delete $DELDATE ------- \n"
/usr/bin/hadoop dfs -rmr /$PATH/$DELDATE
/usr/bin/hadoop dfs -rmr /$PATH/_distcp_logs*
sleep 60
/usr/bin/hadoop dfs -chmod -R 777 /$PATH/

The script copy logfiles from the past day and the given path to the target's hdfs and delete the datasets if they older than 3 days. I didn't want the logs in that directory (and I didn't need them), so I delete them too. We didn't have the user flume in our development cluster, so I set permissions to 777 for the whole directory.
To debug a failure the script writes all output into the given logfile. If you want to rotate the file add a logrote-definition into /etc/logrotate.d/. To decrease the load and network impact at our live cluster I use only 100 maps. The script runs every day via cron 02:00 pm and took for 1TB around 1 hour. Here a ganglia chart for a 300 GB sync.



[1] http://hadoop.apache.org/common/docs/current/distcp.html

Technocrati Claim: PYBPPWZ4RFST

Saturday, October 15, 2011

Secure your hadoop cluster, Part II

To get absolutely safe you need a bit more time, coffee and Aspirin. You will get headaches, for sure. First the good news, hadoop and the ecosystem run out of the box with an enabled SELinux system in targeting mode. You have to consider a performance loss of 5 - 10%.

To enable SELiux on a box use setenforce 1, to check the system use sestatus.
# sestatus 
SELinux status:                 enabled
SELinuxfs mount:                /selinux
Current mode:                   enforcing
Mode from config file:          enforcing
Policy version:                 21
Policy from config file:        targeted

Fine. Thats all. Now we enable SELinux at boot time:
# cat /etc/selinux/config
SELINUX=enforcing
SELINUXTYPE=targeted
SETLOCALDEFS=0

If you use fuse-hdfs check [1] for a valid rule.

The best way to get a system running is always to use SELINUXTYPE=targeted. But in some environments it is neccessary to protect the systems much more (Healthcare, Bank, Military etc.), here we use strict mode. The difference between targeted and strict is small, but really important - strict protects the whole system and uses the given ruleset, targeted protects only the services in the given policy.

To get the strict mode running you have to install the policies first, I did yum install selinux-policy*. Now you have to set your system into the permissive mode

# cat /etc/selinux/config
SELINUX=permissive
SELINUXTYPE=strict
SETLOCALDEFS=0

Then touch /.autorelabel and reboot. If you get into a kernel panic a boot time cancel boot (hitting Space), and append at the grub boot-line selinux=0 and boot.

Cross your fingers, get a large cup of coffee and watch the boot screen. The permissive mode lets boot the system and only reports all violations, but allows us to access the system. After a successfull reboot check dmesg for errors. Now we get a lot of AVC messages, including the hadoop subsystem. Use the system as you useally use. With audit2allow you have the availibity to create own policies, the simplest way is audit2allow -a (see above).

Activate the strict-mode and set SELinux into enforcing:

# setenforce 1
# sestatus 
SELinux status:                 enabled
SELinuxfs mount:                /selinux
Current mode:                   enforcing
Mode from config file:          permissive
Policy version:                 21
Policy from config file:        strict

Now you have no access to the system, except your home and /tmp. To set the system into the permissive mode you have to get the role for:
# newrole -r sysadm_r
Password:

now you are able to administer the system including setting back into the permissive mode with setenforce 0.

Now its time to get hdfs running. It is quite more easier then you think, because hadoop is developed in java. So we have to enable java first:

# grep java_exec_t /var/log/audit/audit.log | audit2allow -m hdfs > hdfs.te
# cat hdfs.te
module hdfs 1.0;


require {
type sysadm_su_t;
type java_exec_t;
class file execute;
}


#============= sysadm_su_t ==============
allow sysadm_su_t java_exec_t:file execute;

Now we build a custom module:
# grep java_exec_t /var/log/audit/audit.log | audit2allow -M hdfs
******************** IMPORTANT ***********************
To make this policy package active, execute:
semodule -i hdfs.pp


# semodule -i hdfs.pp

Check if the module was loaded correctly:
# semodule -l | grep hdfs
hdfs 1.0

enable the ports:
# semanage port -a -t java_port_t -p tcp 9000 (9000 as an example, check [2] for default ports)

You see, to harden a hadoop cluster take a bit more time, but it is possible. You get a robust and security enhanced system, but have to pay for with minimal performance loss.


[1] http://mail-archives.apache.org/mod_mbox/hadoop-general/201001.mbox/%3C4B436D53.2030406@hep.caltech.edu%3E
[2] http://www.cloudera.com/blog/2009/08/hadoop-default-ports-quick-reference/

Thursday, October 13, 2011

Sqoop and Microsoft SQL Server

From Microsoft's technet:
With SQL Server-Hadoop Connector [1], you import data from:
Tables in SQL Server to delimited text files on HDFS
Tables in SQL Server to SequenceFiles files on HDFS
Tables in SQL Server to tables in Hive*
Queries executed in SQL Server to delimited text files on HDFS
Queries executed in SQL Server to SequenceFiles files on HDFS
Queries executed in SQL Server to tables in Hive*
 
With SQL Server-Hadoop Connector, you can export data from:
Delimited text files on HDFS to SQL Server
SequenceFiles on HDFS to SQL Server
Hive Tables* to tables in SQL Server
But before it works you have to setup the connector. First get the MS JDBC driver [2]:
You have just to download the driver, unpack them and copy the driver (sqljdbc4.jar) file to the $SQOOP_HOME/lib/ directory. Now download the connector (.tar.gz) from [1], unpack them and set the MSSQL_CONNECTOR_HOME into that directory. Let's assume you unpack into /usr/sqoop/connector/mssql, do:
# export MSSQL_CONNECTOR_HOME=/usr/sqoop/connector/mssql 

control the export:
# echo $MSSQL_CONNECTOR_HOME
/usr/sqoop/connector/mssql


and run the install.sh in the unpacked directory.
sh ./install.sh

Tip: create a profile.d file:
# cat /etc/profile.d/mssql.sh
export MSSQL_CONNECTOR_HOME=/usr/sqoop/connector/mssql
and chmod into 755

An example:
Sqoop <=> MS SQL Server and hadoop processing works well. Just setup a larger PoC with split the data in 3 maps:
# sqoop import --connect 'jdbc:sqlserver://<IP>;username=dbuser;password=dbpasswd;database=<DB>' --table <table> --target-dir /path/to/hdfs/dir --split-by <KEY> -m 3

=> export of 1.3 GB data tooks around one minute. After processing just send back:
# sqoop export --connect 'jdbc:sqlserver://<IP>;username=dbuser;password=dbpasswd;database=<DB>' --table=<table> --direct --export-dir /path/from/hdfs/dir

You can do the same operations as you know from oracle or mysql sqoop scripts.

[1] http://www.microsoft.com/download/en/details.aspx?id=27584
[2] http://www.microsoft.com/download/en/details.aspx?displaylang=en&id=21599

Tuesday, October 11, 2011

Centralized logfile management across networks with flume


Facebooks's scribe was the first available service for managing a hughe amount on logfiles. We didn't talk over 2 GB / day or so, I mean more as 1 TB per day. Compressed.
Now, a new apache incubator project is flume [1]. It is a pretty nice piece of software, so I love it. It is reliable, fast, safe and has no proprietary stack inside. And you can create really cool logging tasks.

If you use Clouderas Distribution you get flume easy with a "yum install flume-master" on the master and "yum install flume-node" on a node. Check [2] for more infos about.

Flume has a lot of sources to get logfiles:
- from a text-file
- as a tail (one or more files)
- syslog UDP or TCP
- synthetic sources

Flume's design belongs to a large logfile distribution process. Let's assume, we have a 100 Node Webcluster and incoming traffic around 3 GB/s. The farm produce 700 MB raw weblogs per minute.
Through the processing over flume we can compress the files, sort them into buckets you need and fast deliver into our hdfs. Here a working example:

cat /flume-zk/running.cfg
collector1.local : autoCollectorSource | collectorSink( "hdfs://namenode.local:9000/user/flume/weblogs/%Y-%m-%d/%H00/%M/", "%{host}-" );
collector2.local : autoCollectorSource | collectorSink( "hdfs://namenode.local:9000/user/flume/weblogs/%Y-%m-%d/%H00/%M/", "%{host}-" );
collector3.local : autoCollectorSource | collectorSink( "hdfs://namenode.local:9000/user/flume/weblogs/%Y-%m-%d/%H00/%M/", "%{host}-" );
collector4.local : autoCollectorSource | collectorSink( "hdfs://namenode.local:9000/user/flume/weblogs/%Y-%m-%d/%H00/%M/", "%{host}-" );
agent1.local : syslogTcp( "19800" ) | autoE2EChain;
agent2.local : syslogTcp( "19800" ) | autoE2EChain;

The chain autoE2EChain describe a failover process, if one of the nodes didn't respond they will be moved at the end. You will see the logical mapping at the webinterface (http://flume-master:35871/flumemaster.jsp).
We split the data here into minutes and set as a identifier the host from which we get the logs at the end. Make it easier to debug. The webfarm logs via a loadbalancer to the agents (input syslog, output 19800). 19800 is a free unprivileged port, as example.

Let us check one of the agents:
# cd /tmp/flume/agent/agent1.local/
# ls
done  logged  sending  sent  writing

# ls -lah writing/
total 418M
drwxr-xr-x 2 flume flume 4.0K Sep 28 16:25 .
drwxr-xr-x 7 flume flume 4.0K Sep 28 10:21 ..
-rw-r--r-- 1 flume flume 418M Sep 28 16:25 log.00000019.20111011-162503316+0200.10204828793997118.seq

That the logfile we receive at the moment from our nodes. After writing (you can define the split in flume.conf) the log will be sent to the collectors, so we connect to collector1:

# tail -f -n 10 /var/log/flume/*.log
<del>: Creating org.apache.hadoop.io.compress.BZip2Codec@3c9d9efb compressed HDFS file: hdfs://namenode.local:9000/user/flume/weblogs/2011-09-28/1600/25/agent1.local-log.00000019.20111011-162503316+0200.10204828793997118.seq.bzip2
<del>: Finishing checksum group called 'log.00000019.20111011-162503316+0200.10204828793997118.seq'
<del>: Checksum succeeded 1325c55440e
<del>: moved from partial to complete log.00000019.20111011-162503316+0200.10204828793997118.seq
<del>: Closing hdfs://namenode.local:9000/user/flume/weblogs/2011-09-28/1600/25/agent1.local-log.00000019.20111011-162503316+0200.10204828793997118.seq
<del>: Closing HDFS file: hdfs://namenode.local:9000/user/flume/weblogs/2011-09-28/1600/25/agent1.local-log.log.00000019.20111011-162503316+0200.10204828793997118.seq.bzip2

You see, the collector receive the action from the agent, open a sink into hdfs, write via stream the file and close the sink after the time we configured. Pretty nice! The logging mechanism works perfectly, the files will be splitted and compressed as bzip into 1 minute-pieces into our hdfs. Remember, use always bzip as compress codecs because the codec understand markers (blocksize, reducing etc).

[1] https://cwiki.apache.org/FLUME/
[2] https://ccp.cloudera.com/display/CDHDOC/Flume+Installation

Monday, October 10, 2011

Secure your hadoop cluster, Part I


Use mapred with Active Directory (basic auth)

The most cases I managed in past weeks concerned hadoop security. That means firewalling, SELinux, authentication and user management. It is usually a difficult process, related to the companys security profile and processes.

So I start with the most interesting part - authentication. And, in all cases I worked on the main authentication system was a Windows Active Directory Forest (AD). Since hadoop is shipped with more taskcontroller-classes we can use LinuxTaskController. I use RHEL5 server, but it can be adapted similar to other installations.

To enable the UNIX services in Windows Server > 2003 you have to extend the existing schema with UNIX templates, delivered from Microsoft. After that you have to install the "Identity Management for UNIX", in 2008 located in Server Manager => Roles => AD DS => common tasks => Add Role => Select Role Services. Install the software, restart your server and it should be done. Now create a default bind-account, configure the AD server and create a test-user with group hdfs.

For testing we use these settings:
DN:hadoop.company.local
Binding-Acc: main
Group: hdfs

Now we take a closer look at the RedHat box. Since kerberos5 is fully supported the task is really simple. 3 problems could occur: time, DNS and wrong schema on the AD Server(s).
Setup the ldap authentication with:
# authconfig-tui

Authentication => Use LDAP (use MD5 Password + Use shadow Password + Use Kerberos) => Next.






Server => Base DN (FQDN or IP) + DN (dc=hadoop,dc=company,dc=local) => Next.






Kerberos Settings => REALM in uppercase + KDC and admin server (FQDN or IP of AD Server) + Use DNS to resolve hosts to realms + Use DNS to locate KDCs for realms => OK





AD does not allow anonymous connection, so you have to use the bind-account in /etc/ldap.conf (see above).

add in /etc/nsswitch.conf ldap service after files:
passwd:     files ldap
shadow:     files ldap
group:      files ldap

Now edit the /etc/ldap.conf:

base dc=hadoop,dc=company,dc=local
uri ldap://hadoop.company.local/
binddn main@hadoop.company.local
bindpw <PASSWORD>
scope sub
ssl no
nss_base_passwd dc=hadoop,dc=company,dc=local?sub
nss_base_shadow dc=hadoop,dc=company,dc=local?sub
nss_base_group dc=hadoop,dc=company,dc=local?sub? \
&(objectCategory=group)(gidnumber=*)
nss_map_objectclass posixAccount user
nss_map_objectclass shadowAccount user
nss_map_objectclass posixGroup group
nss_map_attribute gecos cn
nss_map_attribute homeDirectory unixHomeDirectory
nss_map_attribute uniqueMember member
nss_map_objectclass posixGroup Group
tls_cacertdir /etc/openldap/cacerts
pam_password md5
pam_login_attribute sAMAccountName
pam_filter objectclass=User

After you have written your file you should test your config:
# getent passwd
and
# kinit <YOUR USERNAME>@hadoop.company.local
if you get no errors all works as expected.

Add to /etc/pam.d/system-auth:
session      required      pam_mkhomedir.so skel=/etc/skel umask=0022

Now it is time to use mapred with your AD. For that we use the shipped class org.apache.hadoop.mapred.LinuxTaskController, configuration will be done in mapred-site.xml:

<property>
<name>mapred.task.tracker.task-controller</name>
<value>org.apache.hadoop.mapred.LinuxTaskController</value>
</property>
<property>
<name>mapreduce.tasktracker.group</name>
<value>hdfs</value>
</property>

Now jobs will be submitted in the given usercontext via pam. Here you have to keep in mind that the group should be set to the group you setup in your AD.

Known issues mostly depend on your setup. Be sure you have a syncronized time in your network (usually done with ntpd), a working DNS infrastructure and the user and groups are known in AD.