Friday, December 14, 2012

Impala and Kerberos

First, Impala is beta software and has some limitations. Stay tuned and test this, you'll see it can be change your BI world dramatically.

What is Impala? 
Impala provides fast, interactive SQL queries directly on your Apache Hadoop data stored in HDFS or HBase. In addition to using the same unified storage platform, Impala also uses the same metadata, SQL syntax (Hive SQL), ODBC driver and user interface (Hue Beeswax) as Apache Hive. This provides a familiar and unified platform for batch-oriented or real-time queries.
You can build Impala by source ( or you can grab them by using yum on a RHEL / CentOS 6x server. Imapla doesn't support RHEL / CentOS prior 6, since the most part of Impala is written in C++.

I choose the rpm-version for this article, but the compiled version will work in the same manner. To grab impala directly per yum setup a new repository:

#> cat /etc/yum.repos.d/impala.repo
gpgkey =
gpgcheck = 1

and install impala and all needed libs per yum:

yum install impala impala-shell cyrus-sasl-devel cyrus-sasl-gssapi gcc-c++ gcc c++ python-setuptools -y && easy_install sasl

You should use the newest JDK from Oracle and you have to install it along your cluster, in this article jdk-6u37-linux-x64-rpm.bin was the actual release. Note, you have to install the JDK after you have installed Impala per yum, as the dependencies install OpenJDK too. To avoid the using of OpenJDK point your system(s) per alternative to the release you want to use:

alternatives --install /usr/bin/javaws javaws /usr/java/latest/jre/bin/javaws 20000
alternatives --install /usr/bin/java java /usr/java/latest/jre/bin/java 20000

To be sure you're running with the JDK you've installed ago you should check it:

java -version
java version "1.6.0_37"
Java(TM) SE Runtime Environment (build 1.6.0_37-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.12-b01, mixed mode)

One of the things which can be go wrong are some missed libs, they are dynamically linked from impalad and not present in the default library stores. Check it with ldd and link the missed libs into /usr/lib64/, in my case I did:

ln -s /usr/java/jdk1.6.0_37/jre/lib/amd64/server/ /usr/lib64/
ln -s /usr/java/jdk1.6.0_37/jre/lib/amd64/ /usr/lib64/
ln -s /usr/lib/impala/lib/ /usr/lib64/

you should check if you find any missed libraries by using ldd (ldd /usr/lib/impala/sbin/impalad) .

Copy your hive, hdfs and hbase config files into the config directory of Impala and create a file within $IMPALA_HOME/conf/:

log4j.appender.FA.layout.ConversionPattern=%p%d{MMdd HH:mm:ss.SSS'000'} %t %c] %m%n
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

The config directory within impalas home should have the following files present, to determine which home-directory the user impala use check it with "echo ~impala".

ll /usr/lib/impala/conf/
total 24
-rw-r--r-- 1 root root 1243 Dec 10 14:59 core-site.xml
-rw-r--r-- 1 root root 4596 Sep 2 09:35 hdfs-site.xml
-rw-r--r-- 1 root root 1157 Dec 10 10:36 hive-site.xml
-rw------- 1 impala impala 594 Dec 11 12:29 impala.keytab
-rw-r--r-- 1 root root 647 Dec 11 12:31

Sync the content of the directory to all other nodes in your cluster.

Kerberos integration
If you use RHEL kerberos KDC packages you have to tweak your principals. From RHEL4 on principals getting a default renew_lifetime by zero. Means, you can get a renewable ticket, but you can't renew this.

To solve this you have to modify the krbtgt principal as well all other principals who should have the availability to renew their tickets.
kadmin.local: modprinc -maxrenewlife 1day krbtgt/ALO.ALT@ALO.ALT
kadmin.local: addprinc -randkey -maxrenewlife 1day +allow_renewable impala/hadoop1.alo.alt@ALO.ALT

Export the keytab (xst -norandkey -k impala.keytab impala/hadoop1.alo.alt@ALO.ALT HTTP/hadoop1.alo.alt@ALO.ALT), place it in $IMPALA_HOME/conf and obtain a renewable ticket with
sudo -u impala kinit -r 1day -k -t /usr/lib/impala/conf/impala.keytab impala/hadoop1.alo.alt@ALO.ALT

I created a poor startscript to check if all is working as expected and start statestore as well as  impalad on your server:

PWD=`echo ~$USER`
export GLOG_minloglevel=0
export GLOG_logbuflevel=-1
export GLOG_log_dir=/var/log/impala
export GLOG_max_log_size=200

mkdir -p /var/log/impala
chown -R impala: /var/log/impala
# obtain a new ticket
sudo -u impala kinit -r 1day -k -t $CONF/$USER.keytab $USER/$HOST@$REALM
#start it up
statestored -state_store_port=24000 -enable_webserver=true -webserver_port=25010 -log_filename=impala-state-store -principal=$USER/$HOST@$REALM -keytab_file=$CONF/impala.keytab &

impalad -state_store_host=hadoop1.alo.alt -nn=hadoop1.alo.alt -nn_port=9000 -hostname=hadoop1.alo.alt -ipaddress= -enable_webserver=true -webserver_port=25000 -principal=$USER/$HOST@$REALM -keytab_file=$CONF/impala.keytab -kerberos_ticket_life=36000 -log_filename=impala &

To control if all is running well you can now point your browser to the configured webservices:
statestore: http://<statestore-server>:25010
impalad: http://<impala-server>:25000

Both services deliver a bunch of monitoring features, as example you can grab metrics from the /metrics endpoint.

Using impala-shell with kerberos
To use impala shell with kerberos you have to get a valid ticket for your user first and have to invoke the shell per impala-shell -k. Note, that on all clients you have to install python sasl (best way per easy_install sasl)

[~]$ impala-shell -k
Using service name 'impala' for kerberos
Welcome to the Impala shell. Press TAB twice to see a list of available commands.

Copyright (c) 2012 Cloudera, Inc. All rights reserved.

(Build version: Impala v0.3 (3cb725b) built on Fri Nov 23 13:51:59 PST 2012)
[Not connected] > connect hadoop1:21000
[hadoop1:21000] > show tables
[hadoop1:21000] >

Thursday, December 13, 2012

Using Hive's HBase handler

Hive supports per HIVE-705 HBase integration for SELECT and write INSERT both and is well described Hive's wiki. Note, as of Hive 0.9x the integration requires HBase 0.92x. In this article I'll show how to use existing HBase tables with Hive.

To use Hive in conjunction with HBase, a storage-handler is needed. Per default, the storage handler comes along with your Hive installation and should be available in Hive's lib directory ($HIVE_HOME/lib/hive-hbase-handler*). The handler requires hadoop-0.20x and later as well as zookeeper 3.3.4 and up.

To get Hive and HBase working together, add HBase's config directory into hive-site.xml:


and sync the configs (hbase-site.xml as well as hive-site.xml) to your clients. Add a table in Hive using the HBase handler:

CREATE TABLE hbase_test
key1 string,
col1 string
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:c1 ")
TBLPROPERTIES("" = "hive_test");

This statement creates the table in HBase as well:

hbase(main):001:0> describe 'hive_test'
{NAME => 'hive_test', FAMILIES => [{NAME => 'cf1', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => ' true
0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', BLOCKSIZE
=> '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}
1 row(s) in 0.1190 seconds

Existing HBase tables can be connected as follows, by using the EXTERNAL TABLE statement. The using of the correct ColumnFamily is the key to success, describe shows them:

hbase(main):003:0> describe 't1'
RESSION => 'NONE', VERSIONS => '1', TTL => '2147483647', MIN_VERSIONS => '0', BLOCKSIZE => '6553
6', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0700 seconds

The new Hive table have to be created with CREATE EXTERNAL TABLE, Hive doesn't support ALTER statements for non-native tables.

key1 string,
col1 string
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:c1 ") TBLPROPERTIES("" = "t1");

hive> describe hbase_test2;
key1 string from deserializer
col1 string from deserializer
Time taken: 0.106 seconds

Thursday, December 6, 2012

Hive "drop table" hangs (Postgres Metastore)

By using postgres as a metastore database it could be happen that "drop table xyz" hangs and Postgres is showing LOCKS with UPDATE. This happen since some tables are missing and can be fixed by using:

create index "IDXS_FK1" on "IDXS" using btree ("SD_ID");
create index "IDXS_FK2" on "IDXS" using btree ("INDEX_TBL_ID");
create index "IDXS_FK3" on "IDXS" using btree ("ORIG_TBL_ID");

"ROLE_ID" bigint NOT NULL,
"ROLE_NAME" varchar(128) DEFAULT NULL,
) ;

"GRANTOR" varchar(128) DEFAULT NULL,
) ;

"GRANTOR" varchar(128) DEFAULT NULL,
"USER_PRIV" varchar(128) DEFAULT NULL,
) ;

"GRANTOR" varchar(128) DEFAULT NULL,
"DB_PRIV" varchar(128) DEFAULT NULL,
) ;

"GRANTOR" varchar(128) DEFAULT NULL,
"PART_PRIV" varchar(128) DEFAULT NULL,
) ;


"GRANTOR" varchar(128) DEFAULT NULL,


Paste this into the pgsql CLI on your metastore database (psql -h <HOSTNAME> -d metastore -U hiveuser -W) and you should be able to drop tables.

Saturday, December 1, 2012

Hive query shows ERROR "too many counters"

A hive job face the odd "Too many counters:" like

Ended Job = job_xxxxxx with exception 'org.apache.hadoop.mapreduce.counters.LimitExceededException(Too many counters: 201 max=200)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MapRedTask
Intercepting System.exit(1)

These happens when operators are used in queries (Hive Operators). Hive creates 4 counters per operator, max upto 1000, plus a few additional counters like file read/write, partitions and tables. Hence the number of counter required is going to be dependent upon the query. 

To avoid such exception, configure "mapreduce.job.counters.max" in mapreduce-site.xml to a value above 1000. Hive will fail when he is hitting the 1k counts, but other MR jobs not. A number around 1120 should be a good choice.

Using "EXPLAIN EXTENDED" and "grep -ri operators | wc -l" print out the used numbers of operators. Use this value to tweak the MR settings carefully. 

Thursday, November 29, 2012

Memory consumption in Flume

Memory required by each Source or Sink

The heap memory used by a single event is dominated by the data in the event body with some incremental usage by any headers added. So in general, a source or a sink will allocate roughly the size of the event body + maybe 100 bytes of headers (this is affected by headers added by the txn agent). To get the total memory used by a single batch, multiply your average (or 90th percentile) event
size (plus some additional buffer) by the maximum batch size. This will give you the memory needed by a single batch.

The memory required for each Sink is the memory needed for a single batch and the memory required for each Source is the memory needed for a single batch, multiplied by the number of clients simultaneously connected to the Source. Keep this in mind, and plan your event delivery according to your expected throughput.

Memory required by each File Channel

Under normal operation, each File Channel uses some heap memory and some direct memory. Give each File channel roughly 30MB of heap memory for basic operational overhead. Each File channel also needs an amount of direct memory roughly equal to 1MB + (capacity of channel * 8) bytes because Flume is storing the updates in a hashmap.

For fast replay without a checkpoint, the file channel can use up to (channel capacity * 32) bytes of heap memory. The amount of memory actually used depends on the number of events present in the log files being replayed. So, if the file channel holds 100 million events, the replay will require about 3.2 GB of heap memory. In order to enable fast checkpoint-less replay, you must set the configuration option use-fast-replay to true, i.e.: = true

If that option is not explicitly enabled, then replay without a checkpoint will be slower, but it will use significantly less memory: on the order of normal operation of the file channel as specified above.
Memory required by the Flume core itself

Add to the total heap size roughly 50MB for the Flume core. Finally, adding a healthy buffer to calculated estimates is recommended. JVM memory usage in production can be monitored & graphed using JMX, to get a better understanding of real-world memory allocation behavior given a particular workload. I wrote a article about JMX monitoring in past.

Example memory settings

As you know, Flume reads out the environment variables over their, which is disabled per default (named as $FLUME/conf/ Simply rename them into and tweak the settings according your requirements you have calculated. Also it is always a good idea to initialize the needed memory on startup, instead to add them later to avoid Juliet pauses when fresh memory will be allocated.

A example for a larger memory tweaking with GC tuning could look like

# sets minimum memory to 2GB, max to 16GB, max direct memory to 256MB
# also uses the parallel new and concurrent garbage collectors to reduce the likelihood of long stop-the-world GC pauses
JAVA_OPTS="-Xms2000m -Xmx16000m -Xss128k -XX:MaxDirectMemorySize=256m
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC"

Posted in Flume Wiki too

Thursday, November 15, 2012

HBase major compaction per cronjob

Sometimes I get asked how a admin can run a major compaction on a particular table at a time when the cluster isn't usually used.

This can be done per cron, or at. HBase shell needs a ruby script, which is very simple:

# cat m_compact.rb
major_compact 't1'

A working shell script for cron, as example:

# cat daily_compact
PWD=`echo ~$USER`
# kerberos enabled 

# get a new ticket
sudo -u $USER kinit -k -t $KEYTAB $USER/$HOST@$REALM
# start compaction
sudo -u $USER hbase shell $PWD/m_compact.rb 2>&1 |tee -a $LOG

All messages will be redirected to /var/log/daily_compact:
11/15/13 06:49:26 WARN conf.Configuration: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
12 row(s) in 0.7800 seconds

Thursday, October 18, 2012

Secure your cluster with kerberos

Many times I get questions about a safe and fast way to secure a cluster without big steps like integrate AD structures, simply to prevent unauthorized access. I created this writeup to let you know the steps you need. I used CentOS 6.3 and CDH 4.0.1, but you can use other distributions as well.

Setup KDC on a Linux Box

Install kerberos5 related packages as well as kadmin, too. First thing you have to do is to replace EXAMPLE.COM, which is delivered per default, with your own realm. I used ALO.ALT here.

Example config:

# hadoop1> cat /etc/krb5.conf 
 default_realm = ALO.ALT
 dns_lookup_realm = false
 dns_lookup_kdc = false
  ALO.ALT = {
  kdc = HADOOP1.ALO.ALT:88
  admin_server = HADOOP1.ALO.ALT:749
  default_domain = HADOOP1.ALO.ALT
 .alo.alt = ALO.ALT
 alo.alt = ALO.ALT

  kdc = FILE:/var/log/krb5kdc.log
  admin_server = FILE:/var/log/kadmin.log
  default = FILE:/var/log/krb5lib.log

Now tweak your DNS or /etc/hosts to reflect the settings, if you use /etc/hosts be sure you've deployed this both files across your nodes (hosts as well as krb5.conf).

# hadoop1> cat /etc/hosts hadoop1.alo.alt             hadoop1 hadoop2.alo.alt    hadoop2   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

Enable ACL's by writing rules into kdc's config space:
# hadoop1> cat /var/kerberos/krb5kdc/kadm5.acl
*/admin@ALO.ALT *

Now create your kerberos random key with "kdb5_util create -s" and start kerberos as well as kadmin. If you face some errors, revisit your config.

Adding Principals

To use kerberos, you have to add some principals, since we're talking about hadoop I setup all needed princs. We use kadmin.local on the server which hosts the KDC.

addprinc -randkey hdfs/hadoop1.alo.alt@ALO.ALT
addprinc -randkey mapred/hadoop1.alo.alt@ALO.ALT
addprinc -randkey yarn/hadoop1.alo.alt@ALO.ALT
addprinc -randkey hbase/hadoop1.alo.alt@ALO.ALT

addprinc -randkey HTTP/hadoop1.alo.alt@ALO.ALT

You have to create a password for your <YOUR_USERNAME>, that's all. Now you should be able to sudo into your user account and kinit yourself (su - <YOUR_USERNAME> && kinit).

You need to export the keytabs for all services you want to secure (in fact, all services you want to start in your cluster). We need keytabs for HDFS, MapReduce, YARN and HBase in this case. We start with HDFS (always from kadmin's CLI)

xst -norandkey -k hdfs.keytab hdfs/hadoop1.alo.alt@ALO.ALT HTTP/hadoop1.alo.alt@ALO.ALT
xst -norandkey -k mapred.keytab mapred/hadoop1.alo.alt@ALO.ALT HTTP/hadoop1.alo.alt@ALO.ALT
xst -norandkey -k yarn.keytab yarn/hadoop1.alo.alt@ALO.ALT HTTP/hadoop1.alo.alt@ALO.ALT

Now you have all keytabs exported into the directory where you are:

# hadoop1> ls
hdfs.keytab mapred.keytab yarn.keytab

Chmod the files into the correct owner (chmod 400 hdfs:hadoop hdfs.keytab, chmod 400 mapred:hadoop mapred.keytab and so on and copy them in the right place into $HADOOP_HOME/conf/). From now on, all steps are pretty easy and well documented, so I post only the necessary config changes.

Enable Kerberos in a Cluster

I writeup the changes only, if you haven't them in your *.xml, wrap the XML notation around (<property><name>NAME<value>VALUE</value></name></property>).


dfs.block.access.token.enable = true
dfs.namenode.keytab.file = <PATH/TO/hdfs.keytab>
dfs.namenode.kerberos.principal = hdfs/_HOST@ALO.ALT
dfs.namenode.kerberos.internal.spnego.principal = HTTP/_HOST@ALO.ALT
dfs.secondary.namenode.keytab.file = <PATH/TO/hdfs.keytab>
dfs.secondary.namenode.kerberos.principal = hdfs/_HOST@ALO.ALT
dfs.secondary.namenode.kerberos.internal.spnego.principal = HTTP/_HOST@ALO.ALT = 700
dfs.datanode.address =
dfs.datanode.http.address =
dfs.datanode.keytab.file = <PATH/TO/hdfs.keytab>
dfs.datanode.kerberos.principal = hdfs/_HOST@ALO.ALT
dfs.webhdfs.enabled = true
dfs.web.authentication.kerberos.principal = HTTP/_HOST@ALO.ALT
dfs.web.authentication.kerberos.keytab = <PATH/TO/hdfs.keytab>

Startup the Namenode, watch the logs for issues and try out if you can connect:
"hadoop dfs -ls /"

It's important that we set a sticky bit to /tmp:
# hadoop1> sudo -u hdfs kinit -k -t hdfs.keytab hdfs/hadoop1.alo.alt@ALO.ALT
# hadoop1> sudo -u hdfs hadoop fs -chmod 1777 /tmp


mapreduce.jobtracker.kerberos.principal = mapred/_HOST@ALO.ALT
mapreduce.jobtracker.keytab.file = mapred/_HOST@ALO.ALT
mapreduce.tasktracker.kerberos.principal = mapred/_HOST@ALO.ALT
mapreduce.tasktracker.keytab.file = mapred/_HOST@ALO.ALT
mapred.task.tracker.task-controller = org.apache.hadoop.mapred.LinuxTaskController = mapred

Create a file called taskcontroller.cfg in $HADOOP_HOME/conf/

# hadoop1> cat /etc/hadoop/conf/taskcontroller.cfg 

Now startup the Tasktracker as well as the Jobtracker, too. Watch the logs for issues, and if all is going well up start a sample MR job (like pi or something else) as a "normal" user for testing purposes. Please note, you've to kinit first. To control that you have a gotten a valid ticket use klist.

Monday, September 17, 2012

Enable JMX Metrics in Flume 1.3x

As you know, Flume supports Ganglia (version 3 and above) to collect report metrics. The details are described in our documentation (1).

Now I'll describe how do you use JMX reporting (to integrate metrics into other monitoring systems or monitor flume directly via Java's builtin JMX console (2)). That's pretty easy - choose a port which is free on your server and not firewall blocked.

First, enable JMX via flume's ($FLUME_HOME/conf/ and uncomment / add / edit the line starting with JAVA-OPTS:

JAVA_OPTS="-Xms100m -Xmx200m"

Of course, restart all running agents. 

Now point jconsole to Flume:

jconsole YOUR_HOSTNAME: 54321  

Wednesday, August 8, 2012

BigData - eine Übersicht

(Dieser Artikel ist auch als Slideshow verfügbar:

Mehr und mehr drängt sich BigData als nebulöser Begriff in die Fachpresse. Klar ist, wer mithalten will im Business und innovativ zukünftige Projekte erfolgreich zum Abschluss führen will, kommt um das Thema nicht herum. Doch warum kommt man nicht darum herum? Was ist der Beweggrund für das Sammeln riesiger Datenmengen?

Der Weg dahin ist recht einfach und wird von vielen Unternehmen bereits seit Jahren betrieben, nur mit ungleich höherem Aufwand an Manpower und finanziellen Investments.

Ein Beispiel:
Es werden Logfiles durch riesige Datenfarmen zusammengeführt; wochenlange Jobs laufen über Terrabyte an den gewonnen und aufbereiteten Daten. Tritt in der Kette ein Fehler auf, beginnt der Lauf im Idealfall an der unterbrochenen Stelle - oder von vorn. Doch bis dahin muss eine lange Prozesskette eingehalten werden, um brauchbare Daten für eben diesen einen Job zu erhalten. Und exakt hier setzt Apache Hadoop an. Genau genommen reden wir bei BigData über Apache Hadoop.

Was ist BigData?
Spätestens, wenn strukturierte Daten in eine Datenbank geladen werden, fragt man sich, was mit dem Rest der Daten passieren soll? Dem Teil, der beim generieren der zu verarbeitenden Daten entsteht, kurz wohin mit dem anfallenden Datenmüll, oder besser - anfallenden unstrukturierten Datensätzen? Daten, um die es bei dem Buzzword BigData eigentlich geht.

Pro akkumuliertem Datensatz fallen Rohdaten an, statistisch im Verhältnis 1:9 (1 Byte strukturierte Daten zu 9 Byte unstrukturierte Daten). Da diese Mengen an Daten nicht nur unterschiedlichen Typs, sondern auch unformatiert und in ihrer Entstehung ebenso unqualifiziert sind, spricht man von BigData. Und genau diese Daten bringen den Vorteil für ein Unternehmen, den es benötigt, um in einem globalisierten Marktumfeld zu bestehen.

Apache Hadoop - das Framework für Daten

Apache Hadoop blickt auf eine etwa 10jährige Geschichte zurück. Ursächlich liegt die Wiege in der Suchengine Nutch, die 2003 die erste 100-Million-Page Demo online stellte. Der Initiator des Projektes, Doug Cutting, nahm sich des in 2005 von Google veröffentlichten Whitepapers “Google File System and Map Reduce” an und stellte Ende 2006 die erste lauffähige Hadoop-Version vor, die sehr schnell zum Apache Top Level-Projekt wurde und auf eine rege Entwicklergemeinde zurückgreifen kann. Einen Hauptteil der Entwicklung wurde und wird von Yahoo! - später Hourtonworks - und Cloudera beigesteuert. Apache Hadoop beruht auf der Apache-Lizenz und wird als OpenSource Software angeboten. Apache Hadoop ist eine Java-basierte Anwendung, die es dem Benutzer erlaubt, einfache Hardware zu Clustern zusammenzuschließen und die daraus resultierende Rechenleistung linear zu nutzen. Das bedeutet, pro angeschlossener Node stehen dem Cluster die gesamte freigegebene Rechenleistung und Platz der Node ohne Verluste zur Verfügung. Nimmt man einen Server mit 16GB RAM, 4 IDE 1TB Platten und Quad Core CPU, ergibt dies bei 10 Servern bereits eine stattliche Menge an Rechenleistung zu einem sehr moderaten Preis. Da Apache Hadoop auf Linux-Servern läuft, sind auch hier die Kosten durch eine standardisierte Verwaltung (Puppet als Beispiel) überschaubar.

Das Apache Hadoop Ecosystem

Die Hauptfunktionalität liegt in der MapReduce-Anwendung eines Clusters. Um diese Power bestmöglich auszunutzen, ist eine Kenntnis der Java-API und des Wesens von verteilten System fast unumgänglich. Aber nur fast.

Mittlerweile hat sich das Ecosystem rund um Apache Hadoop sehr gut etabliert, und fast monatlich werden neue Tools und Programme veröffentlicht. Die wichtigsten Anwendungen stellen wir in einer kurzen Übersicht vor:


Das Hadoop Distributed File System (HDFS) stellt die Grundlage des gesamten Ecosystems dar. Vereinfacht dargestellt werden Daten in Blöcke unterteilt und auf Nodes mit einer frei konfigurierbaren Redundanz gespeichert. Im Normfall wird ein Replicaset mit 3 Kopien benutzt; demzufolge wird mit einfachen Mitteln eine sehr hohe, ITL-konforme Ausfallsicherheit erreicht (es können 66 % des Clusters ausfallen ohne Datenverlust zu erleiden). HDFS achtet hierbei auf entsprechende Latenz und Balancierung des Clusters.


Der MapReduce-Algorithmus erlaubt das Verteilen von Aufgaben in einem verteilten Umfeld. Hierzu wird eine Aufgabe in n Teilaufgaben zerlegt und an die einzelnen Nodes zur Abarbeitung gesandt. Nach Erledigung der Teilaufgaben wird der Datensatz zusammengeführt und ausgegeben. Auch hier ist eine extrem hohe Ausfallsicherheit vorhanden, da jede Node über einen Tasktracker verfügt, der seinen Status ständig mit der Namenode (dem zentralen Punkt eines Clusters) abgleicht.


Hive ist eine SQL Abstraction Language mit einer eigenen, an SQL angelehnten DDL (Data Definition Language), basierend auf Primary Key Lookups. Hive ist für das Auswerten von Daten gedacht, dabei können die Daten in unterschiedlichen Formaten vorliegen (plain Text, compressed, binär). Hive ist entfernt mit einer Datenbank vergleichbar; es wird eine Meta-Information einer Tabelle und ihrer Spalten benötigt, der einzelne Felder der auszuwertenden Daten zugrunde liegen. Diese Felder müssen bestimmten Datentypen zugeordnet werden. Interessant ist Hive vor allem für Business Analysts, Statistiker und SQL Professionals, da eine sehr geringe Einarbeitungszeit benötigt wird. Hive arbeitet batchorientiert und ist je nach SLA-Definition auch für NRT - (near real time) Prozesse einsetzbar.


HBase ist ein Key Value Store zur Realtime-Verarbeitung von Daten. Der große Unterschied besteht vor allem in der Möglichkeit der Datenmanipulation, die bei Hive nicht direkt gegeben ist. HBase hat keine SQL-Syntax, sondern besteht auf definierten Schemas mit Regionen, in denen die Veränderungen abgelegt werden. Dabei sind diese Regionen untereinander beliebig verzweigbar. HBase ist vor allem für volatile Daten mit hohen Update-Raten und Realtime-Abfragen mit kürzester Latenz interessant. Es ist eine etwas höhere Einarbeitungszeit notwendig, vor allem, da man sich hier von bekannten, starren Datenbankschemas vollständig lösen muss.


Sqoop ist ein Connector zwischen Hadoop und RDBMS und wird von einer Reihe namhafter Datenbankhersteller unterstützt. Mittels Sqoop lässt sich mit einfachen Mitteln und ohne großen Aufwand Apache Hadoop als Middleware-Applikation in bestehende BI-Lösungen integrieren. Der Charme an Sqoop ist die Möglichkeit, Select Statements bereits in Datenbankabfragen oder bei der Rückspeicherung zu integrieren. Neben den Connectoren zu den bekannten RMDBS sind auch Connectoren zu Datenbanken wie TerraData, Netezza, Microstrategy, Quest und Tableau verfügbar. Letztes Beispiel der Wichtigkeit von Sqoop ist die Veröffentlichung des Microsoft SQL-Server-Treibers, der von Microsoft auf der PASS 2011 angekündigt und als Open Source freigegeben wurde.


Mit Flume ist es unkompliziert möglich, Daten aus unterschiedlichsten Quellen (Sources) in HDFS oder Files (Sinks) zu transportieren. Es steht eine Vielzahl von Quellen zur Verfügung, und es werden monatlich neue veröffentlicht. Kurz umschrieben ist Flume ein Logcollector, der in seiner neuesten Version (1.2.0) die Möglichkeit der Korrelation bereits im Transportchannel zulässt. Dabei ist es unerheblich, welche Art von Daten transportiert werden, und was die Quelle und das Ziel sind (sofern unterstützt). Die konsequente Nutzung der API ermöglicht es Entwicklern, eigene Sources und Sinks zu schreiben und einzubinden.


Eine der größten Herausforderungen in einem Projekt ist die Serialisierung von Daten in ein binärkompatibles Format. Hier setzt Apache Avro an und bietet eine breite Palette an Schemata für alle erdenklichen Datentypen. Das Besondere hierbei ist die Erweiterung der Datenfiles bei lokaler Speicherung. Hier werden die konvertieren Daten nicht ersetzt, sondern um das Schema erweitert, sodass die Daten innerhalb einer Prozesskette oder später von anderen, nicht binärkompatiblen Programmen weiter verarbeitet werden können. Als eine Erweiterung hat RPC Einzug gehalten; damit ist es ohne große Umwege möglich, Daten zwischen den einzelnen Projekten während der RPC-Verbindung zu konvertieren. Ein mögliches Beispiel für die Anwendung sind Telekommunikationsanbieter, die verschiedenste Systeme zentralisieren müssen.


SML (scalable machine learning) darf in einem Ensemble der Massendatenverarbeitung nicht fehlen, dieses bietet Mahout mit seinen nahezu unbegrenzten Einsatzmöglichkeiten von Produktempfehlungen aufgrund von Interessengebieten und statistischen Algorithmen bis hin zu Fraud Detection, Wahrscheinlichkeitsanalysen oder Trendanalysen in Sozialen Netzwerken. Mahout bietet einen zentralen Algorithmus zum Clustern von Informationen weit über HDFS hinaus an. Durch die durchdachte und streng optimierte MR-Integration werden auch umfangreiche Datensätze innerhalb kürzester Zeit aufbereitet und so für den Endanwender nutzbar gemacht.


Wie in jedem Prozess werden mehr als 90 % aller Aufgaben immer wieder zur gleichen Zeit mit denselben Aufgaben ablaufen. In einem so komplexen System wie Apache Hadoop würde hier sehr viel Zeit - und damit Innovation und letztendlich Geld - bei der Verwaltung der Aufgaben verloren gehen. Dies zu vermeiden ist Aufgabe des Workflow Orchestrators Oozie. Oozie managed Apache Hadoop Workflows und steuert in einem begrenzten Maß die Fehlerbehandlung wie Restart, Reload und Stop eines Prozesses. Oozie kann neben zeitgesteuerten Aktionen auch datengesteuerte Aktionen ausführen, etwa wenn Daten unregelmäßig verfügbar und nicht per zeitgesteuertem Batchprocessing verarbeitet werden können.

Friday, July 27, 2012

Flume 1.2.0 released

The Apache Flume Team released yesterday the next large release with number 1.2.0. Here a overview about the fixes and additions (thanks Mike, I copy your overview):

Apache Flume 1.2.0 is the third release under the auspices of Apache of the so-called "NG" codeline, and our first release as a top-level Apache project! Flume 1.2.0 has been put through many stress and regression tests, is stable, production-ready software, and is backwards-compatible with Flume 1.1.0. Four months of very active development went into this release: a whopping 192 patches were committed since 1.1.0, representing many features, enhancements, and bug fixes. While the full change log can be found in the link below, here are a few new feature highlights:

* New durable file channel 
* New client API 
* New HBase sinks (two different implementations) 
* New Interceptor interface (a plugin processing API) 
* New JMX-based monitoring support

With this release - the first after evolving into a tier 1 Apache project - we've updated the website and firstly have a well written UserGuide (again thanks to Mike and Ralph for their great effort).


What's the next?
Now, we got a increasing interest into a Windows version. I don't know why, but that's happen. I try to port some of the sinks into a Windows Platform - if you're a Windows Developer and you've time to spent into the project, all hands are welcome.

Monday, July 2, 2012

Get Apache Flume 1.3.x running on Windows

Since we found an increasing interest in the flume community to get Apache Flume running on Windows systems again, I spent some time to figure out how we can reach that. Finally, the good news - Apache Flume runs on Windows. You need some tweaks to get them running.

Build system:
maven 3x, git, jdk1.6.x, WinRAR (or similar program)

Apache Flume agent:
jdk1.6.x, WinRAR (or similar program), Ultraedit++ or similar texteditor

Tweak the Windows build box
1. Download and install JDK 1.6x from Oracle
2. Set the environment variables
   => Start - type "env" into the search box, select "Edit system environment variables", click Environment Variables, Select "New" from the "Systems variables" box, type "JAVA_HOME" into "variable name" and the path to your JDK installation into "Variable value" (Example: C:\Program Files (x86)\Java\jdk1.6.0_33)
3. Download maven from Apache
4. Set the environment variables
   => from the field "System variables" select New, Variable name "M2_HOME", value Path to your maven installation (Example: D:\Maven\apache-maven-3.0.4)
   => from the field "User variables for (your username)" select New, Variable name "M2", value "%M2_HOME%\bin"
   => from the field "User variables for (your username)" select New, Variable name "MAVEN_OPTS", value "-XX:MaxPermSize=1024M"
5. Close the CMD if you have running one
6. Download and install msysgit or use a similar program

Start git and clone flume's git repo to the local harddisk. I created a Source directory on my secondary HDD (D:) and cloned the repo into. Change into the previous checkout'd directory and start to build your Apache Flume installation with maven.
D:\Source\flume>mvn clean
D:\Source\flume>mvn package -DskipTests

You'll see a lot of noise, but after some minutes you get the success:
[INFO] Apache Flume ...................................... SUCCESS [39.780s]
[INFO] Flume NG SDK ...................................... SUCCESS [3.510s]
[INFO] Flume NG Configuration ............................ SUCCESS [1.825s]
[INFO] Flume NG Core ..................................... SUCCESS [3.853s]
[INFO] Flume NG Sinks .................................... SUCCESS [0.874s]
[INFO] Flume NG HDFS Sink ................................ SUCCESS [2.652s]
[INFO] Flume NG IRC Sink ................................. SUCCESS [1.310s]
[INFO] Flume NG HBase Sink ............................... SUCCESS [2.527s]
[INFO] Flume NG Channels ................................. SUCCESS [0.796s]
[INFO] Flume NG JDBC channel ............................. SUCCESS [2.168s]
[INFO] Flume NG Node ..................................... SUCCESS [2.481s]
[INFO] Flume NG file-based channel ....................... SUCCESS [2.371s]
[INFO] Flume NG file backed Memory channel ............... SUCCESS [2.075s]
[INFO] Flume legacy Sources .............................. SUCCESS [0.733s]
[INFO] Flume legacy Avro source .......................... SUCCESS [1.778s]
[INFO] Flume legacy Thrift Source ........................ SUCCESS [1.903s]
[INFO] Flume NG Clients .................................. SUCCESS [0.702s]
[INFO] Flume NG Log4j Appender ........................... SUCCESS [1.498s]
[INFO] Flume NG distribution ............................. SUCCESS [14.477s]
[INFO] Flume NG Integration Tests ........................ SUCCESS [1.529s]
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1:29.310s
[INFO] Finished at: Mon Jul 02 09:19:11 CEST 2012
[INFO] Final Memory: 195M/844M
[INFO] ------------------------------------------------------------------------

As usual, the build you'll find in flume-ng-dist\target\:
02.07.2012  09:19        14.161.677 flume-ng-dist-1.3.0-SNAPSHOT-dist.tar.gz
02.07.2012  09:19           467.362 flume-ng-dist-1.3.0-SNAPSHOT-src.tar.gz
Now grab the *dist.tar.gz and extract them on a Windows server of your choice. I used WinRar for.

Running Flume
Create a config file (I used the one I posted on my Linux related build post) and created the file with Ultraedit++. Why? The included Windows editor save files with Windows LF at the end, and that will hurt Flume. 

You've to edit the file too, simple change the logger facility from 

Start Flume:
D:\flume\flume-1.3.0-SNAPSHOT>"c:\Program Files (x86)\Java\jdk1.6.0_33\bin\java.exe" -Xmx20m -Dlog4j.configuration=file:///%CD%\conf\ -cp "d:\flume\flume-1.3.0-SNAPSHOT\lib\*" org.apache.flume.node.Application -f d:\flume\flume-1.3.0-SNAPSHOT\conf\test1.conf -n syslog-agent
Path to java.exe + Java related options + log4j config in Windows Uri style + Flume libs + application + config

Tip: tweak the path to match your installation

You'll see:
.. a lot of DEBUG noise ..
2012-07-02 11:30:54,823 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(] Starting Sink Console
2012-07-02 11:30:54,825 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(] Starting Source Syslog
2012-07-02 11:30:54,828 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$] Polling sink runner starting
2012-07-02 11:31:24,829 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$] Checking file:d:\flume\flume-1.3.0-SNAPSHOT\conf\test1.conf for changes
Now fire up a syslog message from a compatible system:
echo "<13>Jun 20 12:12:12 host foo[345]: a syslog message" > /tmp/foo
nc -v WINDOWS SERVER 5140 < /tmp/foo

Tip: Replace WINDOWS SERVER with the server IP of your Windows box

On the Windows Box you'll see:
2012-07-02 11:36:38,114 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process (] Event: { headers:{timestamp=1340187132000, Severity=5, host=host, Facility=8} body: 66 6F 6F 5B 33 34 35 5D 3A 20 61 20 73 79 73 6C foo[345]: a sysl }

I've only tested a syslog flow. Special flows like Apache HBase won't run, since Apache HBase isn't running on Windows (yet). I think Avro will run too, as well as exec sinks. That open a real wide field for Apache Flume's integration into BI solutions or mass eventlog debugging.
For your tracking, the link to the corresponding Jira (FLUME-1335), I think I will write a better Windows service integration in a short time period. Add you as an watcher, so we can create good Karma for the improvement.

Wednesday, June 13, 2012

Apache Flume 1.2.x and HBase

The newest (and first) HBase sink was committed into trunk one week ago and was my point at the HBase workshop @Berlin Buzzwords. The slides are available in my slideshare channel.

Let me explain how it works and how you get an Apache Flume - HBase flow running. First, you've got to checkout trunk and build the project (you need git and maven installed on your system):

git clone git:// && cd flume && git checkout trunk && mvn package -DskipTests && cd flume-ng-dist/target

Within trunk, the HBase sink is available in the sinks - directory (ls -la flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/)

Please note a few specialities:
The sink controls atm only HBase flush (), transaction and rollback. Apache Flume reads out the $CLASSPATH variable and uses the first available hbase-site.xml. If you use different versions of HBase on your system please keep that in mind. The HBase table, columns and column family have to be created. Thats all.

The using of an HBase sink is pretty simple, an valid configuration could look like:

host1.sources = src1
host1.sinks = sink1 
host1.channels = ch1 
host1.sources.src1.type = seq 
host1.sources.src1.port = 25001
host1.sources.src1.bind = localhost
host1.sources.src1.channels = ch1
host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink = ch1
host1.sinks.sink1.table = test3
host1.sinks.sink1.columnFamily = testing
host1.sinks.sink1.column = foo
host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
host1.sinks.sink1.serializer.payloadColumn = pcol
host1.sinks.sink1.serializer.incrementColumn = icol 

In this example we start a Seq interface on localhost with a listening port, point the sink to the HBase sink jar and define the event serializer. Why? HBase needs the data in a HBase format, to achieve that we need to transform the input into a HBase compilant format. Apache Flume's HBase sink uses synchronous / blocking client, asynchronous support will follow (FLUME-1252). 


Tuesday, May 29, 2012

Using filters in HBase to match certain columns

HBase is a column oriented database which stores the content by column rather than by row. To limit the output of an scan you can use filters, so far so good.

But how it'll work when you want to filter more as one matching column, let's say 2 or more certain columns?
The trick here is to use an SingleColumnValueFilter (SCVF) in conjunction with a boolean arithmetic operation. The idea behind is to include all columns which have "X" and NOT the value DOESNOTEXIST; the filter would look like:

List list = new ArrayList<Filter>(2);
Filter filter1 = new SingleColumnValueFilter(Bytes.toBytes("fam1"),
 Bytes.toBytes("VALUE1"), CompareOp.DOES_NOT_EQUAL, Bytes.toBytes("DOESNOTEXIST"));
Filter filter2 = new SingleColumnValueFilter(Bytes.toBytes("fam2"),
 Bytes.toBytes("VALUE2"), CompareOp.DOES_NOT_EQUAL, Bytes.toBytes("DOESNOTEXIST"));
FilterList filterList = new FilterList(list);
Scan scan = new Scan();

Define a new filter list, add an family (fam1) and define the filter mechanism to match VALUE1 and compare them with NOT_EQUAL => DOESNOTEXIST. Means, the filter match all columns which have VALUE1 and returns only the rows who have NOT included DOESNOTEXIST. Now you can add more and more values to the filter list, start the scan and you should only get data back which match exactly your conditions.

Tuesday, May 15, 2012

Stop accepting new jobs in a hadoop cluster (ACL)

To stop accepting new MR jobs in a hadoop cluster you have to enable ACL's first. If you've done that, you can specify a single character queue ACL (' ' = a space!). Since mapred-queue-acls.xml is polled regularly you can dynamically change the queue in a running system . Useful for ops related work (setting into maintenance, extending / decommission nodes and such things).

Enable ACL's

Edit the config file ($HADOOP/conf/mapred-queue-acls.xml) to fit your needs:




Enable an ACL driven cluster by editing the value of mapred.acls.enabled in conf/mapred-site.xml and setting to true.

Now edit simply the value of mapred.queue.default.acl-submit-job and replace user1,user2,group1,group2 with ' ':

   <value> </value>



This stops all users to submit new jobs, but lets the started jobs running.

Thursday, April 12, 2012

Access Oozie WebUI after securing your hadoop cluster

To access a Kerberized hadoop environment you need a SPNEGO supporting browser (FF, IE, Chrome) and the client, who runs the browser, need network connectivity to the KDC.

IBM has written up a good tutorial about.

Here some tips:

For Firefox, access the low level configuration page by loading the about:config page. Then go to the network.negotiate-auth.trusted-uris preference and add the hostname or the domain of the web server that is HTTP Kerberos SPNEGO protected (if using multiple domains and hostname use comma to separate them).

For Chrome on Windows try: C:\Users\username\AppData\Local\Google\Chrome\Application\chrome.exe --args --auth-server whitelist="*" --auto-ssl-client-auth

For IE:
Simply add the Oozie-URL to Intranet Sites. It appears you not only have to make sure ‘Windows Integrated Authentication’ is enabled, but you also have to add the site to the ‘Local Intranet’ sites in IE.

Monday, March 5, 2012

FlumeNG - the evolution

Flume, the decentralized log collector, makes some great progress. Since the project has reached an Apache incubating tier the development on the next generation (NG) has reached a significant level.

Now, what's the advantage of a new flume? Simply the architecture. FlumeNG doesn't need zookeeper anymore, has no master / client concept nor nodes / collectors. Simply run a bunch of agents, connect each of them together and create your own flow. Flume now can run with 20MB heapsize, uses inMemory Channel for flows, can have multiflows, different sinks to one channel and will support a few more sinks as flume =< 1.0.0. But, flumeNG will not longer support tail and tailDir, here a general exec sink is available, which lets the user the freedom to use everything. 

On the build host we need jre 1.6.x, maven 3.x and git or svn. 

To check out the code we use git and maven in a simple one-line command:
git clone git://; cd flume; git checkout trunk && mvn clean && mvn package -DskipTests

After few seconds the build should be done:

[INFO] Apache Flume ...................................... SUCCESS [7.276s]
[INFO] Flume NG Core ..................................... SUCCESS [3.043s]
[INFO] Flume NG Sinks .................................... SUCCESS [0.275s]
[INFO] Flume NG HDFS Sink ................................ SUCCESS [0.892s]
[INFO] Flume NG IRC Sink ................................. SUCCESS [0.515s]
[INFO] Flume NG Channels ................................. SUCCESS [0.214s]
[INFO] Flume NG JDBC channel ............................. SUCCESS [0.802s]
[INFO] Flume NG Agent .................................... SUCCESS [0.893s]
[INFO] Flume NG file-based channel ....................... SUCCESS [0.516s]
[INFO] Flume NG distribution ............................. SUCCESS [16.602s]
[INFO] Flume legacy Sources .............................. SUCCESS [0.143s]
[INFO] Flume legacy Thrift Source ........................ SUCCESS [0.599s]
[INFO] Flume legacy Avro source .......................... SUCCESS [0.458s]
[INFO] Flume NG Clients .................................. SUCCESS [0.133s]
[INFO] Flume NG Log4j Appender ........................... SUCCESS [0.385s]
[INFO] ------------------------------------------------------------------------

Now we find the build and sources in flume-ng-dist/target. Copy the wanted distribution to the host you want to play with, unpack them and start to use flume. 

What is a flow?
A flow in flumeNG describes the whole transport from a source to a sink. The sink could also be a new source to collect different streams into one sink. The process flume starts is an agent. A setup could be run like the example:

source -             -> source => channel => sink
        \           /       
source - => channel => sink 
        /           \
source -             -> channel => source => channel => sink                  

Before we can start to play with flumeNG, we need to configure it. The config is logical, but for the first time difficult to understand. The matrix is always <identifier>.type.subtype.parameter.config, where <identifier> is the name of the agent we call later to startup. As the picture above shows, we can have all complexity we want. For that we need also a config which reflects our complexity, so we have to define the source, channel, sink for all entry and end points. 
Let me explain an example (syslog-agent.cnf):

syslog-agent.sources = Syslog
syslog-agent.channels = MemoryChannel-1
syslog-agent.sinks = Console

syslog-agent.sources.Syslog.type = syslogTcp
syslog-agent.sources.Syslog.port = 5140

syslog-agent.sources.Syslog.channels = MemoryChannel-1 = MemoryChannel-1

syslog-agent.sinks.Console.type = logger
syslog-agent.channels.MemoryChannel-1.type = memory

In the configuration example above we define a simple syslog flow, start point Syslog, endpoint Console and transport channel MemoryChannel-1. The name of a segment we can define as we wish, that are the main identifiers to setup a valid flow. 

The example flow will listen at the configured port and will send all events to the logger (logger is an internal sink for debugging and sends captured events to stdout).  The same config with HDFS as an sink would looks like:

syslog-agent.sources = Syslog
syslog-agent.channels = MemoryChannel-1
syslog-agent.sinks = HDFS-LAB

syslog-agent.sources.Syslog.type = syslogTcp
syslog-agent.sources.Syslog.port = 5140

syslog-agent.sources.Syslog.channels = MemoryChannel-1 = MemoryChannel-1

syslog-agent.sinks.HDFS-LAB.type = hdfs

syslog-agent.sinks.HDFS-LAB.hdfs.path = hdfs://NN.URI:PORT/flumetest/'%{host}''
syslog-agent.sinks.HDFS-LAB.hdfs.file.Prefix = syslogfiles
syslog-agent.sinks.HDFS-LAB.hdfs.file.rollInterval = 60
syslog-agent.sinks.HDFS-LAB.hdfs.file.Type = SequenceFile
syslog-agent.channels.MemoryChannel-1.type = memory

Flume supports at the moment avro, syslog and exec sources, hdfs and logger sinks. 

Start the flow
Flume-ng starts a single flow per process. That's will be done with:
bin/flume-ng agent -n YOUR_IDENTIFIER -f YOUR_CONFIGFILE 
bin/flume-ng agent -n syslog-agent -f conf/syslog-agent.cnf

Links: flumeNG Wiki