Data processing with Apache HBase via Cascading Lingual

A tutorial for accessing Apache HBase from Cascading Lingual using SQL.

Introduction

HBase is a database system gaining more and more popularity among people adopting Hadoop. HBase is not a relational database and does not offer a SQL interface out of the box. When adopting HBase, many organizations are faced with the challenge that their current workflows are SQL based. Lingual is a way to bridge this gap and this tutorial shows how to approach such a situation.

The tutorial assumes that you already understand the HBase architecture and data model and furthermore that you have read the Lingual user guide.

Batch Processing

Please note that lingual is a batch oriented framework and does not offer answer times acceptable for live traffic. Typical use cases are long running analytical processes like the ones shown in this tutorial and not CRUD operations from a live application.

Nuage Cloud files

In this tutorial we are looking at the file hosting service nuage'' competing in the object storage market. Their service called blobster'' offers the typical semantics of a cloud object hosting service:

  • REST based service

  • objects are in buckets in different regions

  • customers pay per API call

Customers are identified by their API-key within the system, which is a UUID.

The service is a distributed system operating in multiple regions around the world (EU,US,ASIA).

Nuage has a centralized billing department which is billing the customers based on their usage once a month. In order to do that, they have to aggregate the usage of all customers and store the aggregates somewhere. This is where Hadoop, Lingual, and HBase come in.

All operations (API calls) are logged and collected on the Hadoop cluster of Nuage and once a month the data is aggregated and stored in HBase for further processing. In our tutorial we are going to solve this part of the process with lingual.

Production logs

As explained above, the production systems are logging all interactions with the buckets. A typical log file coming from a production system in Europe might look like this:

2013-10-28 23:47:09\t240750e7-73c3-4d95-96d0-9026cd0dd868\tEU\tfoobucket\tGET\tsomeKey
2013-10-28 23:47:17\t240750e7-73c3-4d95-96d0-9026cd0dd868\tEU\tfoobucket\tGET\tsomeKey
2013-10-28 23:47:18\t240750e7-73c3-4d95-96d0-9026cd0dd868\tEU\tfoobucket\tGET\tsomeKey
2013-10-28 23:47:18\t240750e7-73c3-4d95-96d0-9026cd0dd868\tEU\tfoobucket\tGET\tsomeOtherKey
2013-10-28 23:47:20\t393982a0-3cda-4910-8597-a072d29132f1\tEU\trandom\tPOST\tdoge
2013-10-28 23:51:26\tb04bdbd9-66e1-4604-b214-e77283aae25f\tEU\t\tHEAD\t42.gif

The first column is a timestamp (UTC), followed by the UUID of the customer owning the bucket. The next column identifies the region of the bucket, followed by the bucket name, on which the operation was done. The last two columns are the HTTP method and the key on which an action was performed.

Environment Setup

In this tutorial we are going to use the Vagrant based Apache Hadoop cluster coming from the Cascading project. This cluster makes it easy to start up Hadoop and HBase in an isolated environment in which you can do the experiments. The README explains the setup in detail, however, these are the steps you can use to bring up a fully working Hadoop and HBase cluster.

    > git clone https://github.com/Cascading/vagrant-cascading-hadoop-cluster.git
    > cd vagrant-cascading-hadoop-cluster
    > vagrant box add cascading-hadoop-base http://files.vagrantup.com/precise64.box
    > vagrant up
    > vagrant ssh master
    (master) sudo hadoop namenode -format -force
    (master) sudo start-all.sh
    # give it some time here to start all services up
    (master) sudo start-hbase.sh

Afterwards you have a full Hadoop and HBase Cluster to work with. You can use the different web-interfaces to interact with it:

[NOTE] You don’t have to use the vagrant based setup for following this tutorial, it works perfectly fine with any other Hadoop and HBase setup as long as it is compatible with Cascading.

For the rest of the tutorial we are going to work from the master of the Hadoop/HBase cluster, if you are using something else, change the paths accordingly:

> vagrant ssh master
> cd /vagrant

Registering the log files in Lingual

For this tutorial we prepared some randomly generated log files from the three regions in which the cloud service is operating (US, EU, ASIA). You can get the data from here:

> wget http://data.cascading.org/nuage.tgz

Unpack the test data and copy it onto HDFS:

> tar xf nuage.tgz
> hadoop fs -copyFromLocal nuage nuage

We are using the catalog tool to register the log files as a lingual table.

The first step is to set the platform to hadoop and telling lingual where the Hadoop namenode, the jobtracker, and the Hbase quorum are. You can also put these values into your hadoop config files. If you are not using the vagrant cluster, you must adjust them accordingly.

> export LINGUAL_PLATFORM=hadoop
> export LINGUAL_CONFIG=fs.default.name=hdfs://master.local:9000,mapred.job.tracker=master.local:9001,hbase.zookeeper.quorum=hadoop1.local

Initialize the catalog:

> lingual catalog --init

Next we create a schema called LOGS, which will contain the log files.

> lingual catalog --schema LOGS --add

In this schema we create a stereotype that describes the logfile structure. The first column is a timestamp and all others are strings:

> lingual catalog --schema LOGS --stereotype LOGFILE -add --columns TIMESTAMP,CUSTOMER_ID,REGION,BUCKET,METHOD,KEY --types timestamp,string,string,string,string,string

Next we register a table called LOGS in the schema called LOGS using the LOGFILE stereotype. Note that we are registering a directory containing multiple files as one table in lingual:

> lingual catalog  --schema LOGS --table LOGS --stereotype LOGFILE -add /user/vagrant/nuage/logs/ --format tsv

We can verify that everything works as expected by taking a look a the buckets for a customer in the system:

> lingual shell
0: jdbc:lingual:hadoop> select distinct bucket,region from logs.logs where customer_id = 'b7dd113c-99ba-4fe8-b634-0fa26d781bad' order by region;
+----------------+---------+
|     BUCKET     | REGION  |
+----------------+---------+
| Arieses        | ASIA    |
| correspond     | ASIA    |
| relaxes        | ASIA    |
| sulfide        | EU      |
| bachelors      | US      |
| dialectic      | US      |
| diked          | US      |
| distinctively  | US      |
| inalienable    | US      |
| initiatives    | US      |
+----------------+---------+
10 rows selected (40.859 seconds)

This customer has used 10 different buckets in the 3 regions. 3 of those buckets are in ASIA, one in EU and 6 in the US region.

We now have the ability to do SQL on our log files. This is the first step on our way to roll up the statistics for a customer billing purposes.

HBase and Lingual

Lingual can talk to HBase by using the cascading.hbase provider. The provider is an adapter that abstracts the access and datamodel of HBase away, so that it can be used from lingual (and from cascading).

HBase datamodel and lingual

The HBase datamodel is different from relational models, yet we want to be able to talk to HBase via SQL in a meaningful way. This brings up the question how to map an HBase row to row in SQL. The provider does this by limiting the mapping of an HBase table to SQL to one Column Family. With this limitation, you can map the rowkey in HBase to one column in SQL and the Qualifiers within the Column Family to rows as well.

The first thing we have to do, is installing the HBase provider in lingual:

> lingual catalog --provider --add cascading:cascading-hbase:2.2.0:provider

This will download the provider from conjars and install it in your lingual catalog.

Next we need a new schema in lingual, which we will call BILLING.

> lingual catalog --schema BILLING --add

Now we can define the stereotype for blobster billing data:

> lingual catalog --schema BILLING --stereotype blobster -add --columns ROWKEY,MONTH,GET_COUNT,PUT_COUNT,HEAD_COUNT,POST_COUNT --types string,string,Integer,Integer,Integer,Integer

We define 6 columns: The first is going to be used as the rowkey in HBase and translates to the UUID of the customer. The second column is the billing month and the last 4 columns contain the aggregated co::ts per API calls of the customer.

Afterwards we add the hbase format to lingual and while we are adding it, we declare the column family, which we are going to write the data to. We use B here, since we are writing statistics for the ``blobster'' service.

> lingual catalog --schema BILLING --format hbase --add  --properties=family=B --provider hbase

Now we add the hbase protocol to the schema:

> lingual catalog --schema BILLING --protocol hbase --add --provider hbase

Finally we can register a new lingual table in our schema which reads and writes to HBase transparently. We call the lingual table BLOBSTER and it is backed by an HBase table called central_billing''. The table uses the protocol hbase'' and the format hbase'', which are both provided by the hbase'' provider, which we installed above:

> lingual catalog --schema BILLING --table BLOBSTER --stereotype blobster -add "central_billing" --protocol hbase --format hbase --provider hbase

Writing into HBase

We now have two tables in lingual, one is a collection of tsv files on HDFS, the other is a table in HBase. Creating and writing the statistics for October 2013 into HBase can be done with one standard SQL statement:

> lingual shell
0: jdbc:lingual:hadoop> INSERT INTO "BILLING"."BLOBSTER"
                          select CUSTOMER_ID as ROWKEY,
                            '2013-10' "MONTH",
                            SUM( CASE WHEN  "METHOD" = 'HEAD' THEN 1 ELSE 0 END) AS HEAD_COUNT,
                            SUM( CASE WHEN  "METHOD" = 'GET' THEN 1 ELSE 0 END) AS GET_COUNT,
                            SUM(CASE WHEN  "METHOD" = 'POST' THEN 1 ELSE 0 END) AS POST_COUNT,
                            SUM(CASE WHEN  "METHOD" = 'PUT' THEN 1 ELSE 0 END) AS PUT_COUNT
                          FROM LOGS.LOGS
                          WHERE
                            "TIMESTAMP" BETWEEN TIMESTAMP '2013-10-01 00:00:00' AND TIMESTAMP '2013-10-31 23:59:59'
                          GROUP BY CUSTOMER_ID ;
                        +-----------+
                        | ROWCOUNT  |
                        +-----------+
                        | 1000      |
                        +-----------+
                        1 row selected (48.566 seconds)

Lingual will spawn Cascading flows on the hadoop cluster which filter and aggregate the data. The results will be written into the central_billing'' hbase table. The ROWCOUNT'' reported by the query is the number of rows written into Hbase. The log files contain data from 1000 customers of the object store service, hence the 1000.

With the provider mechanism it is not only possible to write into an HBase table, but also read from it. We can inspect the freshly written data:

> lingual shell
0: jdbc:lingual:hadoop> select * from  "BILLING"."BLOBSTER" where ROWKEY = 'b7dd113c-99ba-4fe8-b634-0fa26d781bad';
+---------------------------------------+----------+------------+------------+-------------+-------------+
|                ROWKEY                 |  MONTH   | GET_COUNT  | PUT_COUNT  | HEAD_COUNT  | POST_COUNT  |
+---------------------------------------+----------+------------+------------+-------------+-------------+
| b7dd113c-99ba-4fe8-b634-0fa26d781bad  | 2013-10  | 158        | 171        | 150         | 147         |
+---------------------------------------+----------+------------+------------+-------------+-------------+

You can of course also access the same data from the HBase shell:

> hbase shell
hbase(main):001:0> get 'central_billing', 'b7dd113c-99ba-4fe8-b634-0fa26d781bad'
COLUMN                                           CELL
 B:GET_COUNT                                     timestamp=1383756401644, value=158
 B:HEAD_COUNT                                    timestamp=1383756401644, value=150
 B:MONTH                                         timestamp=1383756401644, value=2013-10
 B:POST_COUNT                                    timestamp=1383756401644, value=147
 B:PUT_COUNT                                     timestamp=1383756401644, value=171

Wrapping up

We have seen an example of data processing that could be coming from a real world service. We saw how easy it is to use Hadoop and HBase without writing any Java code. Anyone capable of writing SQL queries can do what we just saw. You can also imagine how the aggregated stats are further used to calculate the actual billing statements. You could introduce another table holding the costs of each operation and write those results into yet another table. This is left as an exercise to the reader.