Apr 24, 2015

Fault Tolerant Twitter firehose Ingestion on YARN

YARN, aka NextGen MapReduce, is awesome for building fault-tolerant distributed applications. But writing plain YARN application is far than trivial and might even be a show-stopper to lots of engineers.
The good news is that a framework to simplify interaction with YARN emerged and met the Apache foundation: Apache Twill. While still in the incubation phase, the project looks really promising and allow to write (easier to test) Runnable application and run them on YARN.
As part of the DAPLAB Hacky Thursday, we jumped head first into Twill, RxJava and Twitter4j, all bundled together to build a fault tolerant Twitter firehose ingestion application storing the tweets into HDFS.

We used Twill version 0.5.0-incubatingRead more on Twill here, here and here.

Twitter4j has been wrapped as an RxJava Observable object, and is attached to and HDFS sink, partitioning the data by year/month/day/hour/minute. This will be useful to create hive tables later on, with proper partitions.

Check it out

The sources of the project are available on github: https://github.com/daplab/yarn-starter
git clone https://github.com/daplab/yarn-starter.git

Configure it

The Twitter keys and secrets are currently hardcoded in TwitterObservable.java (yeah, it's in theTODO list :)). Please set them there before building.

Build it

mvn clean install

Run it

And Run it in the DAPLAB infrastucture like this:
./src/main/scripts/start-twitter-ingestion-app.sh daplab-wn-22.fri.lan:2181
By default data is stored under /tmp/twitter/firehose, monitor the ingestion process:
hdfs dfs -ls -R /tmp/twitter/firehose
...
-rw-r--r--   3 yarn hdfs    7469136 2015-04-24 09:59 /tmp/twitter/firehose/2015/04/24/07/58.json
-rw-r--r--   3 yarn hdfs    6958213 2015-04-24 10:00 /tmp/twitter/firehose/2015/04/24/07/59.json
drwxrwxrwx   - yarn hdfs          0 2015-04-24 10:01 /tmp/twitter/firehose/2015/04/24/08
-rw-r--r--   3 yarn hdfs    9444337 2015-04-24 10:01 /tmp/twitter/firehose/2015/04/24/08/00.json
...
That's it, now you can kill the application and see how it will be restarted by YARN!

Apr 23, 2015

Data Ingestion - Homogeneous meteorological data

Homogeneous monthly values of temperature and precipitation for 14 stations from 1864 until today. Yearly values are averaged for whole Switzerland Since 1864.

Data set

Explanation


The file is a .txt and contains a four rows headers. 
MeteoSchweiz / MeteoSuisse / MeteoSvizzera / MeteoSwiss

stn|time|rhs150m0|ths200m0
||[mm]|[°C]

Data are separated by a "|" and do not contains blankspace.


# Name Example Description
1 stn BAS
Station's names
BAS: Basel / Binningen
BER: Bern / Zollikofen
CHM: Chaumont
CHD: Château-d'Oex
GSB: Col du Grand St-Bernard
DAV: Davos
ENG: Engelberg
GVE: Genève-Cointrin
LUG: Lugano
PAY: Payerne
SIA: Segl-Maria
SIO: Sion
SAE: Säntis
SMA: Zürich / Fluntern
2 time 201503 Year and months of the measure. Format: yyyyMM
3 rhs150m0 36.9 Sum of precipitation in mm at 1.5 meter
4 ths200m0 4.5 Mean temperature in degree Celsius at 2 meters

Data update

On OpenData, the files are created every day, however the data set changes monthly. 

Data Access


Warnings

Some of stations do not have data like Payerne or others stations did not exist in 1864. In consequence, data must be filtered before used for statistics.

Hive

Creating a database, a table and loading data

Note: In order to have this tutorial to work for everybody, it will create a database prefixed by your username (${env:USER] inside hive)
1. Downloading the data and remove the header
$ wget http://data.geo.admin.ch.s3.amazonaws.com/ch.meteoschweiz.homogenereihen/VQAA60.txt
$ tail -n +5 VQAA60.txt > VQAA60.txt.new && mv VQAA60.txt.new VQAA60.txt
2. Copy the locally unzipped data into your home folder in HDFS (the tailing "." points you to /user/$(whoami)). See HDFSHelloWorld if you're not familiar with HDFS.
$ hdfs dfs -copyFromLocal VQAA60.txt
3. Create a database in Hive
$ hive

create database ${env:USER}_meteo;
4. Create a temp table to load the file into
$ hive

use ${env:USER}_meteo;
create table temp_meteo (col_value STRING);
LOAD DATA INPATH '/user/${env:USER}/VQAA60.txt' OVERWRITE INTO TABLE temp_meteo; 
5. Create the final table and insert the data into. The date are in format YYYYMM, they will be cast in string because of the request language. It is easier to extract the year from a string than an int
$ hive

use ${env:USER}_meteo;
create table meteo (station STRING, date STRING, precipitation FLOAT, temperature FLOAT);
insert overwrite table meteo  
  SELECT  
    regexp_extract(col_value, '^(?:([^\|]*)\.){1}', 1) station,  
    regexp_extract(col_value, '^(?:([^\|]*)\.){2}', 1) date,  
    regexp_extract(col_value, '^(?:([^\|]*)\.){3}', 1) precipitation,
    regexp_extract(col_value, '^(?:([^\|]*)\.){4}', 1) temperature  
  from temp_meteo;

6. Run your first query
$ hive --database ${USER}_meteo
SELECT station, avg(precipitation) as mean_precipitation, avg(temperature) as mean_temperature FROM meteo GROUP BY station;
7. Woot!
8. Run a more complex query. Summerize the precipitation by station and year
$ hive --database ${USER}_meteo
SELECT station, dateYear, sum(precipitation) as sumPre 
FROM (SELECT substring(date,1,4) as dateYear, precipitation, station FROM meteo) as T1 
GROUP BY dateYear, station 
ORDER BY sumPre desc


Mar 31, 2015

HDFSHelloWorld


This page aims at creating a "copy-paste"-like tutorial to familiarize with HDFS commands . It mainly focuses on user commands (uploading and downloading data into HDFS).

Requirements

  • SSH (for Windows, use PuTTY and see how to create a key with PuTTY)
  • An account in the DAPLAB, and send your ssh public key to Benoit.
  • A browser -- well, if you can access this page, you should have met this requirement :)

Resources


While the source of truth for HDFS commands is the code source, the documentation page describing the hdfs dfs commands is really useful:

Basic Manipulations

Listing a folder

Your home folder

$ hdfs dfs -ls
Found 28 items
...
-rw-r--r--   3 bperroud daplab_user    6398990 2015-03-13 11:01 data.csv
...
^^^^^^^^^^   ^ ^^^^^^^^ ^^^^^^^^^^^    ^^^^^^^ ^^^^^^^^^^ ^^^^^ ^^^^^^^^
         1   2        3           4          5          6     7        8
Columns, as numbered below, represent:
  1. Permissions, in unix-style syntax
  2. Replication factor (RF in short), default being 3 for a file. Directories have a RF of 0.
  3. Owner
  4. Group owning the file
  5. Size of the file, in bytes. Note that to compute the physical space used, this number should be multiplied by the RF.
  6. Modification date. As HDFS is mostly a write-once-read-many filesystem, this date often means creation date
  7. Modification time. Same as date.
  8. Filename, within the listed folder

Listing the /tmp folder

$ hdfs dfs -ls /tmp

Uploading a file

In /tmp

$ hdfs dfs -copyFromLocal localfile.txt /tmp/
The first arguments after -copyFromLocal point to local files or folders, while the last argument is a file (if only one file listed as source) or directory in HDFS.
Note: hdfs dfs -put is doing about the same thing, but -copyFromLocal is more explicit when you're uploading a local file and thus preferred.

Downloading a file

From /tmp

$ hdfs dfs -copyToLocal /tmp/remotefile.txt .

The first arguments after -copyToLocal point to files or folder in HDFS, while the last argument is a local file (if only one file listed as source) or directory.
hdfs dfs -get is doing about the same thing, but -copyToLocal is more explicit when you're downloading a file and thus preferred.

Creating a folder

In your home folder

$ hdfs dfs -mkdir dummy-folder

In /tmp

$ hdfs dfs -mkdir /tmp/dummy-folder
Note that relative paths points to your home folder, /user/bperroud for instance.