Apr 24, 2015

Fault Tolerant Twitter firehose Ingestion on YARN

YARN, aka NextGen MapReduce, is awesome for building fault-tolerant distributed applications. But writing plain YARN application is far than trivial and might even be a show-stopper to lots of engineers.
The good news is that a framework to simplify interaction with YARN emerged and met the Apache foundation: Apache Twill. While still in the incubation phase, the project looks really promising and allow to write (easier to test) Runnable application and run them on YARN.
As part of the DAPLAB Hacky Thursday, we jumped head first into Twill, RxJava and Twitter4j, all bundled together to build a fault tolerant Twitter firehose ingestion application storing the tweets into HDFS.

We used Twill version 0.5.0-incubatingRead more on Twill here, here and here.

Twitter4j has been wrapped as an RxJava Observable object, and is attached to and HDFS sink, partitioning the data by year/month/day/hour/minute. This will be useful to create hive tables later on, with proper partitions.

Check it out

The sources of the project are available on github: https://github.com/daplab/yarn-starter
git clone https://github.com/daplab/yarn-starter.git

Configure it

The Twitter keys and secrets are currently hardcoded in TwitterObservable.java (yeah, it's in theTODO list :)). Please set them there before building.

Build it

mvn clean install

Run it

And Run it in the DAPLAB infrastucture like this:
./src/main/scripts/start-twitter-ingestion-app.sh daplab-wn-22.fri.lan:2181
By default data is stored under /tmp/twitter/firehose, monitor the ingestion process:
hdfs dfs -ls -R /tmp/twitter/firehose
-rw-r--r--   3 yarn hdfs    7469136 2015-04-24 09:59 /tmp/twitter/firehose/2015/04/24/07/58.json
-rw-r--r--   3 yarn hdfs    6958213 2015-04-24 10:00 /tmp/twitter/firehose/2015/04/24/07/59.json
drwxrwxrwx   - yarn hdfs          0 2015-04-24 10:01 /tmp/twitter/firehose/2015/04/24/08
-rw-r--r--   3 yarn hdfs    9444337 2015-04-24 10:01 /tmp/twitter/firehose/2015/04/24/08/00.json
That's it, now you can kill the application and see how it will be restarted by YARN!

Apr 23, 2015

Data Ingestion - Homogeneous meteorological data

Homogeneous monthly values of temperature and precipitation for 14 stations from 1864 until today. Yearly values are averaged for whole Switzerland Since 1864.

Data set


The file is a .txt and contains a four rows headers. 
MeteoSchweiz / MeteoSuisse / MeteoSvizzera / MeteoSwiss


Data are separated by a "|" and do not contains blankspace.

# Name Example Description
1 stn BAS
Station's names
BAS: Basel / Binningen
BER: Bern / Zollikofen
CHM: Chaumont
CHD: Château-d'Oex
GSB: Col du Grand St-Bernard
DAV: Davos
ENG: Engelberg
GVE: Genève-Cointrin
LUG: Lugano
PAY: Payerne
SIA: Segl-Maria
SIO: Sion
SAE: Säntis
SMA: Zürich / Fluntern
2 time 201503 Year and months of the measure. Format: yyyyMM
3 rhs150m0 36.9 Sum of precipitation in mm at 1.5 meter
4 ths200m0 4.5 Mean temperature in degree Celsius at 2 meters

Data update

On OpenData, the files are created every day, however the data set changes monthly. 

Data Access


Some of stations do not have data like Payerne or others stations did not exist in 1864. In consequence, data must be filtered before used for statistics.


Creating a database, a table and loading data

Note: In order to have this tutorial to work for everybody, it will create a database prefixed by your username (${env:USER] inside hive)
1. Downloading the data and remove the header
$ wget http://data.geo.admin.ch.s3.amazonaws.com/ch.meteoschweiz.homogenereihen/VQAA60.txt
$ tail -n +5 VQAA60.txt > VQAA60.txt.new && mv VQAA60.txt.new VQAA60.txt
2. Copy the locally unzipped data into your home folder in HDFS (the tailing "." points you to /user/$(whoami)). See HDFSHelloWorld if you're not familiar with HDFS.
$ hdfs dfs -copyFromLocal VQAA60.txt
3. Create a database in Hive
$ hive

create database ${env:USER}_meteo;
4. Create a temp table to load the file into
$ hive

use ${env:USER}_meteo;
create table temp_meteo (col_value STRING);
LOAD DATA INPATH '/user/${env:USER}/VQAA60.txt' OVERWRITE INTO TABLE temp_meteo; 
5. Create the final table and insert the data into. The date are in format YYYYMM, they will be cast in string because of the request language. It is easier to extract the year from a string than an int
$ hive

use ${env:USER}_meteo;
create table meteo (station STRING, date STRING, precipitation FLOAT, temperature FLOAT);
insert overwrite table meteo  
    regexp_extract(col_value, '^(?:([^\|]*)\.){1}', 1) station,  
    regexp_extract(col_value, '^(?:([^\|]*)\.){2}', 1) date,  
    regexp_extract(col_value, '^(?:([^\|]*)\.){3}', 1) precipitation,
    regexp_extract(col_value, '^(?:([^\|]*)\.){4}', 1) temperature  
  from temp_meteo;

6. Run your first query
$ hive --database ${USER}_meteo
SELECT station, avg(precipitation) as mean_precipitation, avg(temperature) as mean_temperature FROM meteo GROUP BY station;
7. Woot!
8. Run a more complex query. Summerize the precipitation by station and year
$ hive --database ${USER}_meteo
SELECT station, dateYear, sum(precipitation) as sumPre 
FROM (SELECT substring(date,1,4) as dateYear, precipitation, station FROM meteo) as T1 
GROUP BY dateYear, station 
ORDER BY sumPre desc