Friday, December 27, 2013

How HDFS protects your data

Often we get questions how HDFS protects data and what the mechanisms are to prevent data corruption. Eric Sammer explain this en detail in Hadoop Operations.

Additional to the points below, you can also have a second cluster to sync the files, simply to prevent human being failures, like deleting a subset of data. If you have enough space in your cluster, enabling the trash per core-site.xml and setting to a higher value then a day helps too.

<description>Number of minutes after which the checkpoint
gets deleted. If zero, the trash feature is disabled. 1440 means 1 day

<description>Number of minutes between trash checkpoints.
Should be smaller or equal to fs.trash.interval.
Every time the checkpointer runs it creates a new checkpoint
out of current and removes checkpoints created more than
fs.trash.interval minutes ago.

HDFS is designed to protect data in different ways to minimize the risk of data loss with a valuable write speed. This enables in some circumstances HDFS as a NAS replacement for large files with the possibility to quickly access the stored data. The illustration below simplify the data flow:

HDFS has per default the following mechanisms implemented:
  1. Data written to files in HDFS is split up into chunks (usually 128MB in size). Each chunk (called a block) is replicated three times, by default, to three different machines. Multiple copies of a block are never written to the same machine. Replication level is configurable per file. 
  2. HDFS actively monitors the number of available replicas of each block, compared to the intended replication level. If, for some reason, a disk or node in the cluster should become unavailable, the filesystem will repair the missing block(s) by creating new replicas from the remaining copies of the data.
  3. HDFS can be (and normally is) configured to place block replicas across multiple racks of machines to protect against catastrophic failure of an entire rack or its constituent network infrastructure. This is called RackAwareness and should reflect your topology.
  4. Each block has an associated checksum computed on write, which is verified on all subsequent reads. Additionally, to protect against "bit rot" of files (and their blocks) that are not regularly read, the filesystem automatically verifies all checksums of all blocks on a regular basis. Should any checksum not verify correctly, HDFS will automatically detect this, discard the bad block, and create a new replica of the block.
  5. Filesystem metadata - information about object ownership, permissions, replication level, path, and so on - is served by a highly available pair of machines (i.e. namenodes) in CDH4. Updates to metadata are maintained in a traditional write-ahead transaction log that guarantees durability of changes to metadata information. The transaction log can be written to multiple physical disks and, in a highly available configuration, is written to multiple machines.
  6. HDFS block replicas are written in a synchronous, in-line replication pipeline. That is, when a client application receives a successful response from the cluster that a write was successful, it is true that at least a configurable minimum number of replicas are also complete. This eliminates the potential failure case of asynchronous replication where a client could complete a write to a node, receive a successful response, only for that one node to fail before it's able to replicate to another node.
  7. HDFS is fully instrumented with metric collection and reporting so monitoring systems (such as Cloudera Manager) can generate alerts when faults are detected. Metrics related to data integrity include unresponsive nodes in the cluster, failed disks, missing blocks, corrupt blocks, under-replicated blocks, and so on. Cloudera Manager has extensive HDFS-specific monitoring configured out of the box.
  8. HDFS supports directory-level filesystem quotas to protect against accidental denial of service attacks that could otherwise cause critical applications to fail to write data to the cluster.
  9. All higher level data storage and processing systems in CDH (MapReduce, HBase, Hive, Pig, Impala) use HDFS as their underlying storage substrate and, as a result, have the same data protection guarantees described above.

Wednesday, November 13, 2013

Export more as 10k records with Sqoop

If you want to export more as 10k records out of a RDBMS via sqoop, you have some settings and properties you can tweak.

Parameter --num-mapper
Number of simultaneous connections that will be opened against database. Sqoop will use that many processes to export data (each process will export slice of the data). Here you have to take care about the max open connections to your RDBMS, since this can overwhelm the RDBMS easily.

Parameter --batch
Enabling batch mode on the JDBC driver. Here you use the JDBC batching mode, which queues the queries and deliver batched results

Property sqoop.export.records.per.statement
Number of rows that will be created for single insert statement, e.g. INSERT INTO xxx VALUES (), (), (), ...
Here you have to know which VALUES you want to catch, but this

Property export.statements.per.transaction
Number of insert statements per single transaction. e.g BEGIN; INSERT, INSERT, .... COMMIT

You can specify the properties (even both at the same time) in the HADOOP_ARGS section of the command line, for example: 
sqoop export -Dsqoop.export.records.per.statement=X --connect ...

Monday, September 2, 2013

Connect to HiveServer2 with a kerberized JDBC client (Squirrel)

Squirrel work with kerberos, however, if you don't want kerberos then you don't need the JAVA_OPTS changes at the end. My colleague, Chris Conner, has created a maven project that pulls down all of the dependencies for a JDBC program:

Note for kerberos environment, you need to kinit before using Squirrel. The above program handles kinit for you. If you are not using Kerberos and you want to use the above program, then comment out the following lines:


Then make sure to change the jdbc URI to not have the principal. Also, it's worth mentioning that if you use kerberos, I did have some issues with differing java versions. So try matching your client's java version with the HS2 server.

Work with Squirrel

First create a new Driver:
  1. Click on Drivers on the side.
  2. Click the + button.
  3. Enter a Name.
  4. Enter the URL like the example: jdbc:hive2://<host>:<port>/<db>;principal=<princ>
  5. Enter the Driver name: org.apache.hive.jdbc.HiveDriver
Click on the Extra Class Path button and click Add and make sure to add the following Classes:


Note, the classes can be changed every release, so please find out the one you have installed.
Click OK to save.

Now you need to edit the Squirrel start script. On OSX, as example, it is "/Applications/", Linux like OS' should have this in /etc/squirrel - or elsewhere.

Now add the following line anywhere in the script above the actual JAVA_CMD line. Make sure to enter the correct Kerberos stuff:
export JAVA_OPTS=""

Now edit the last line of that script, it is normally something like:
$JAVACMD -Xmx256m -cp "$CP" $MACOSX_SQUIRREL_PROPS -splash:"$SQUIRREL_SQL_HOME/icons/splash.jpg" net.sourceforge.squirrel_sql.client.Main --log-config-file "$UNIX_STYLE_HOME"/ --squirrel-home "$UNIX_STYLE_HOME" $NATIVE_LAF_PROP $SCRIPT_ARGS

Change it to:

$JAVACMD -Xmx256m $JAVA_OPTS -cp "$CP" $MACOSX_SQUIRREL_PROPS -splash:"$SQUIRREL_SQL_HOME/icons/splash.jpg" net.sourceforge.squirrel_sql.client.Main --log-config-file "$UNIX_STYLE_HOME"/ --squirrel-home "$UNIX_STYLE_HOME" $NATIVE_LAF_PROP $SCRIPT_ARGS

Notice I added the JAVA_OPTS.

Now you can add a new host and it should work correctly with kerberos. 

Tuesday, July 23, 2013

Enable Replication in HBase

HBase does have support for multi-site replication for disaster recovery, it is not a HA solution, the application and solution architecture will need to implement HA. This means that data from one cluster is automatically replicated to a backup cluster, this can within the same data center or across data centers. There are 3 ways to configure this, master-slave, master-master, and cyclic replication. Master slave is the simplest solution for DR as data is written to the master and replicated to the configured slave(s). Master-Master means that the two clusters cross replicate edits, however have means to prevent replication going into an infinite loop by tracking mutations using the HBase cluster ID. Cyclic replication is supported which means you can have multiple clusters replicating to each other these can be in combinations of master-master, master-slave.
Replication relies on the WAL, the WAL edits are replayed from a source region server to a target region server.

A few important points:
1) Version alignment, in a replicated environment the versions of HBase and Hadoop/HDFS must be aligned, this means you must replicate to the same version of HBase/HDFS (> 0.92)
2) Don't use HBase for Zookeeper deployment, deploy separate Zookeeper Quorums for each cluster that are user managed.
3) You need full network connectivity between all nodes in all clusters, that is Node 1 on cluster A must be able to reach Node 2 on Cluster B and so on, this applies to the Zookeeper clusters as well.
4) All tables and their corresponding column families must exist on every cluster within the replication configuration, these must be named identically.
5) Don't enable HLog compression, this will cause replication to fail.

6) Do not use start_replication nor stop_replication, this will cause data loss on the replicated side

Getting Started

Step 1:
To enable replication all clusters in the replication configuration must add the following to their configuration:
property= hbase.replication
value = true

Step 2:

Launch the hbase shell and set the replication:
hbase(main):001:0> alter 'your_table', {NAME => 'family_name', REPLICATION_SCOPE => '1'}

Next add a peer:
hbase(main):002:0> add_peer '1', "zk1,zk4:2181:/hbase-backup

You can list the currently enabled peers by 
hbase(main):003:0> list_peers 

There are some other considerations too, in your slave peers, it would be worth increasing the TTL on the tables, this means that accidental or malicious deletes can be recovered, you can also increase the min versions property so that more history is retained on the slaves to cover more scenarios. As mentioned this is not a cross site HA solution, there is no automated cut over, this means there is work at the application and operational level to facilitate this. To disable the replication, simply use:
hbase(main):004:0> disable_peer("1")

Note, disabling enabling a table at the source cluster will not affect the current replication, it will start from the 0 offset and replicate any entry which is scoped for replication if present in it. To move into a fresh state, you have to roll the logs on the source cluster. This means, after you have removed the peer, you have to force a manual file roll per hbase shell:
hbase(main):009:0> hlog_roll 'localhost,60020,1365483729051'

It takes servername as an argument. You can get the regionservers name from the znode (/hbase/rs).
When you now re-enable the peer, the replication starts with a fresh state.


Monday, May 27, 2013

Get all extended Hive tables with location in HDFS

for file in $(hive -e "show table extended like \`*\`" | grep location: | awk 'BEGIN { FS = ":" };{printf("hdfs:%s:%s\n",$3,$4)}'); do hdfs dfs -du -h $file; done;

Time taken: 2.494 seconds
12.6m  hdfs://hadoop1:8020/hive/tpcds/customer/customer.dat
5.2m  hdfs://hadoop1:8020/hive/tpcds/customer_address/customer_address.dat
76.9m  hdfs://hadoop1:8020/hive/tpcds/customer_demographics/customer_demographics.dat
9.8m  hdfs://hadoop1:8020/hive/tpcds/date_dim/date_dim.dat
148.1k  hdfs://hadoop1:8020/hive/tpcds/household_demographics/household_demographics.dat
4.8m  hdfs://hadoop1:8020/hive/tpcds/item/item.dat
36.4k  hdfs://hadoop1:8020/hive/tpcds/promotion/promotion.dat
3.1k  hdfs://hadoop1:8020/hive/tpcds/store/store.dat
370.5m  hdfs://hadoop1:8020/hive/tpcds/store_sales/store_sales.dat
4.9m  hdfs://hadoop1:8020/hive/tpcds/time_dim/time_dim.dat
0      hdfs://hadoop1:8020/user/alexander/transactions/_SUCCESS
95.0k  hdfs://hadoop1:8020/user/alexander/transactions/_logs
3.1m   hdfs://hadoop1:8020/user/alexander/transactions/part-m-00000
3.1m   hdfs://hadoop1:8020/user/alexander/transactions/part-m-00001
3.1m   hdfs://hadoop1:8020/user/alexander/transactions/part-m-00002
3.1m   hdfs://hadoop1:8020/user/alexander/transactions/part-m-00003
1.9m  hdfs://hadoop1:8020/user/hive/warehouse/zipcode_incomes_plain/DEC_00_SF3_P077_with_ann_noheader.csv

Saturday, May 18, 2013

Query HBase tables with Impala

As described in other blog posts, Impala uses Hive Metastore Service to query the underlaying data. In this post I use the Hive-HBase handler to connect Hive and HBase and query the data later with Impala. In the past I've written a tutorial ( how to connect HBase and Hive, please follow the instructions there.

This approach offers Data Scientists a wide field of work with data stored in HDFS and / or HBase. You will get the possibility to run queries against your stored data independently which technology and database do you use, simply by querying the different data sources in a fast and easy way.

I use the official available census data gathered in 2000 by the US government. The goal is to push this data as CSV into HBase and query this table per Impala. I've made a demonstration script which is available in my git repository.

Demonstration scenario

The dataset looks pretty simple:

cat DEC_00_SF3_P077_with_ann_noheader.csv


Create the HBase table:

create 'zipcode_hive', 'id', 'zip', 'desc', 'income'

and create an external table in Hive which looks as follows:

CREATE EXTERNAL TABLE ZIPCODE_HBASE (key STRING,zip STRING,desc1 STRING,desc2 STRING,income STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,zip:zip,desc:desc1,desc:desc2,income:income") TBLPROPERTIES("" = "zipcode_hive");

Here we map the Hive tables per HBaseStorageHandler to the HBase scheme we've used in the step above.

After these steps are successfully finished, we need to copy the CSV data into HBase. I chose Pig for this task but you can use a translate table in Hive, too.

Here's my Pig script:

cat PopulateData.pig

copyFromLocal DEC_00_SF3_P077_with_ann_noheader.csv ziptest.csv
A = LOAD 'ziptest.csv' USING PigStorage(',') as (id:chararray, zip:chararray, desc1:chararray, desc2:chararray, income:chararray); STORE A INTO 'hbase://zipcode_hive' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('zip:zip,desc:desc1,desc:desc2,income:income');

The job takes a few seconds and the data is available per HBase:

scan 'zipcode_hive', LIMIT => 2

ROW                                    COLUMN+CELL                                                                                                    
 8600000US00601                        column=desc:desc1, timestamp=1368880594523, value=006015-DigitZCTA                                             
 8600000US00601                        column=desc:desc2, timestamp=1368880594523, value=0063-DigitZCTA                                               
 8600000US00601                        column=income:income, timestamp=1368880594523, value=11102                                                     
 8600000US00601                        column=zip:zip, timestamp=1368880594523, value=00601                                                           
 8600000US00602                        column=desc:desc1, timestamp=1368880594523, value=006025-DigitZCTA                                             
 8600000US00602                        column=desc:desc2, timestamp=1368880594523, value=0063-DigitZCTA                                               
 8600000US00602                        column=income:income, timestamp=1368880594523, value=12869                                                     
 8600000US00602                        column=zip:zip, timestamp=1368880594523, value=00602 

Now we do the same with Impala:

select * from zipcode_hbase limit 4

Using service name 'impala' for kerberos
Connected to hadoop1:21000
Server version: impalad version 1.0 RELEASE (build d1bf0d1dac339af3692ffa17a5e3fdae0aed751f)
Query: select *
from ZIPCODE_HBASE limit 4
Query finished, fetching results ...
| key            | desc1            | desc2          | income | zip   |
| 8600000US00601 | 006015-DigitZCTA | 0063-DigitZCTA | 11102  | 00601 |
| 8600000US00602 | 006025-DigitZCTA | 0063-DigitZCTA | 12869  | 00602 |
| 8600000US00603 | 006035-DigitZCTA | 0063-DigitZCTA | 12423  | 00603 |
| 8600000US00604 | 006045-DigitZCTA | 0063-DigitZCTA | 33548  | 00604 |
Returned 4 row(s) in 0.42s

Another query to get the incomes between 1,000 and 5,000 US$, sorted by income:

select * from zipcode_hbase where income between '1000' and '5000' order by income DESC limit 20;

| key            | desc1            | desc2          | income | zip   |
| 8600000US64138 | 641385-DigitZCTA | 6413-DigitZCTA | 49995  | 64138 |
| 8600000US12477 | 124775-DigitZCTA | 1243-DigitZCTA | 49993  | 12477 |
| 8600000US33025 | 330255-DigitZCTA | 3303-DigitZCTA | 49991  | 33025 |
| 8600000US44119 | 441195-DigitZCTA | 4413-DigitZCTA | 49988  | 44119 |
| 8600000US34997 | 349975-DigitZCTA | 3493-DigitZCTA | 49982  | 34997 |
| 8600000US70665 | 706655-DigitZCTA | 7063-DigitZCTA | 49981  | 70665 |
| 8600000US28625 | 286255-DigitZCTA | 2863-DigitZCTA | 49981  | 28625 |
| 8600000US76134 | 761345-DigitZCTA | 7613-DigitZCTA | 49979  | 76134 |
| 8600000US44618 | 446185-DigitZCTA | 4463-DigitZCTA | 49978  | 44618 |
| 8600000US65714 | 657145-DigitZCTA | 6573-DigitZCTA | 49978  | 65714 |
| 8600000US77338 | 773385-DigitZCTA | 7733-DigitZCTA | 49976  | 77338 |
| 8600000US14622 | 146225-DigitZCTA | 1463-DigitZCTA | 49972  | 14622 |
| 8600000US84339 | 843395-DigitZCTA | 8433-DigitZCTA | 49972  | 84339 |
| 8600000US85020 | 850205-DigitZCTA | 8503-DigitZCTA | 49967  | 85020 |
| 8600000US64061 | 640615-DigitZCTA | 6403-DigitZCTA | 49964  | 64061 |
| 8600000US97361 | 973615-DigitZCTA | 9733-DigitZCTA | 49961  | 97361 |
| 8600000US30008 | 300085-DigitZCTA | 3003-DigitZCTA | 49960  | 30008 |
| 8600000US48634 | 486345-DigitZCTA | 4863-DigitZCTA | 49958  | 48634 |
| 8600000US47923 | 479235-DigitZCTA | 4793-DigitZCTA | 49946  | 47923 |
| 8600000US46958 | 469585-DigitZCTA | 4693-DigitZCTA | 49946  | 46958 |
Returned 20 row(s) in 1.08s

Thursday, March 28, 2013

HBase: MSLAB and CMS vs. ParallelGC

Tuning Java opts for HBase, for example, are necessary steps to get the best performance and stability in large installations. The optimal recommendation looks like:
HBASE_OPTS="-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled"

But you can also achieve great success with:
HBASE_OPTS="-server -XX:+UseParallelGC XX:+UseParallelOldGC -XX:ParallelGCThreads=8"

What are the differences between ParallelGC and CMS? 

CMS uses more CPU, but runs concurrently. If a thread is failing, CMS falls back to a non-parallel mode and stops the VM for the entire time it's collecting. But this risk can be minimized by using MSLAB in your HBase configuration.
ParallelGC have a better throughput and longer pause times, and stop the VM on every collection. Means for HBase, you'll have a pause (around 1 sec per GB), which can lead on high loaded clusters to outages in a non acceptable time range.

MSLAB (MemStore-Local Allocation Buffers)

The most GC pauses are caused by old-gen fragmentation, and CMS can't defragment without pause the VM (Juliet pause). MSLAB now moves the memstore allocations into the configured chunks into the old generation. When you start or upgrade into HBase 0.92x, MSLAB is enabled per default ( 

hbase.hregion.memstore.mslab.chunksize=2MB (2MB per default)
hbase.hregion.memstore.mslab.max.allocation=256KB (256KB per default)

More about MSLAB you can find in Todd's presentation:

Wednesday, February 20, 2013

Flume 1.3.1 Windows binary release online

Andy Blozhou, a Chinese Flume enthusiast provide precompiled Windows binaries of Flume-1.3.1, including a startup bat and Avro bat.
You can grap this build on their website :

======== snip ========

This is the flume-ng 1.3.1 windows version for download.

simple usage:
unzip the
run bin/flume.bat for agent. 
run bin/flume-avroclient.bat for avro-client. 
Need modify for your own env. 
(To compile flume-ng on windows, please reference or my chinese version the windows version of flume 1.3.1 file from
2.unzip the to a directory.
3.install jdk 1.6 from oracle,and set JAVA_HOME of the env.
download from
4.test agent:
4.1 modify settings of conf/console.conf,conf/hdfs.conf for agent test.
4.2 test source syslog, sink: console out agent
4.2.1 check flume.bat,modify the variables to your env.
4.2.2 click flume.bat
4.2.3 on another computer run command:
echo "<13>test msg" >/tmp/msg
nc -v your_flume_sysloghost port < /tmp/msg
4.2.4 check your syslog host flume output
4.2.5 samples see
4.3 test avro-client
4.3.1 run a avro source flume agent on a node.
4.3.2 modify flume-avroclient.bat and head.txt
4.3.3 run flume-avroclient.bat

tested on windows7 32bit version


Wednesday, February 6, 2013

LZO Compression with Oozie

It happens, when one of the compression codec is switched to LZO, that Oozie can't start any MR job successfully. Usually this is done per core-site.xml:


Oozie reports a ClassNotFound error (java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found). To get the jobs running copy or link hadoop-lzo.jar into /var/lib/oozie/ and restart Oozie's server. 

The second, most common issue most people forget is to set the shared lib directory:

[root@hadoop2 ~]# sudo -u hdfs hadoop fs -mkdir  /user/oozie
[root@hadoop2 ~]# sudo -u hdfs hadoop fs -chown oozie:oozie /user/oozie
[root@hadoop2 ~]# mkdir /tmp/share && cd /tmp/share && tar xvfz /usr/lib/oozie/oozie-sharelib.tar.gz
[root@hadoop2 ~]# sudo -u oozie hadoop fs -put share /user/oozie/share

From CDH 4.1 on a jar package is delivered, called uber JAR. It contains only dependencies to other jar files in a lib/ folder inside of it. After enabling this property, the user can use this in their mapreduce jobs and notify Oozie about this special jar file. You can enable this package per oozie-site.xml


When this property is set, users can use the oozie.mapreduce.uber.jar configuration property in their MapReduce workflows to notify Oozie that the specified JAR file is an uber JAR.