Data processing with Apache HBase via Cascading Lingual
A tutorial for accessing Apache HBase from Cascading Lingual using SQL.
Introduction
HBase is a database system gaining more and more popularity among people adopting Hadoop. HBase is not a relational database and does not offer a SQL interface out of the box. When adopting HBase, many organizations are faced with the challenge that their current workflows are SQL based. Lingual is a way to bridge this gap and this tutorial shows how to approach such a situation.
The tutorial assumes that you already understand the HBase architecture and data model and furthermore that you have read the Lingual user guide.
Batch Processing
Please note that lingual is a batch oriented framework and does not offer answer times acceptable for live traffic. Typical use cases are long running analytical processes like the ones shown in this tutorial and not CRUD operations from a live application.
Nuage Cloud files
In this tutorial we are looking at the file hosting service nuage'' competing
in the object storage market. Their service called
blobster'' offers the
typical semantics of a cloud object hosting service:
-
REST based service
-
objects are in buckets in different regions
-
customers pay per API call
Customers are identified by their API-key within the system, which is a UUID.
The service is a distributed system operating in multiple regions around the world (EU,US,ASIA).
Nuage has a centralized billing department which is billing the customers based on their usage once a month. In order to do that, they have to aggregate the usage of all customers and store the aggregates somewhere. This is where Hadoop, Lingual, and HBase come in.
All operations (API calls) are logged and collected on the Hadoop cluster of Nuage and once a month the data is aggregated and stored in HBase for further processing. In our tutorial we are going to solve this part of the process with lingual.
Production logs
As explained above, the production systems are logging all interactions with the buckets. A typical log file coming from a production system in Europe might look like this:
2013-10-28 23:47:09\t240750e7-73c3-4d95-96d0-9026cd0dd868\tEU\tfoobucket\tGET\tsomeKey
2013-10-28 23:47:17\t240750e7-73c3-4d95-96d0-9026cd0dd868\tEU\tfoobucket\tGET\tsomeKey
2013-10-28 23:47:18\t240750e7-73c3-4d95-96d0-9026cd0dd868\tEU\tfoobucket\tGET\tsomeKey
2013-10-28 23:47:18\t240750e7-73c3-4d95-96d0-9026cd0dd868\tEU\tfoobucket\tGET\tsomeOtherKey
2013-10-28 23:47:20\t393982a0-3cda-4910-8597-a072d29132f1\tEU\trandom\tPOST\tdoge
2013-10-28 23:51:26\tb04bdbd9-66e1-4604-b214-e77283aae25f\tEU\t\tHEAD\t42.gif
The first column is a timestamp (UTC), followed by the UUID of the customer owning
the bucket. The next column identifies the region of the bucket, followed by the
bucket name, on which the operation was done. The last two columns are the
HTTP
method and the key on which an action was performed.
Environment Setup
In this tutorial we are going to use the Vagrant based Apache Hadoop cluster coming from the Cascading project. This cluster makes it easy to start up Hadoop and HBase in an isolated environment in which you can do the experiments. The README explains the setup in detail, however, these are the steps you can use to bring up a fully working Hadoop and HBase cluster.
> git clone https://github.com/Cascading/vagrant-cascading-hadoop-cluster.git
> cd vagrant-cascading-hadoop-cluster
> vagrant box add cascading-hadoop-base http://files.vagrantup.com/precise64.box
> vagrant up
> vagrant ssh master
(master) sudo hadoop namenode -format -force
(master) sudo start-all.sh
# give it some time here to start all services up
(master) sudo start-hbase.sh
Afterwards you have a full Hadoop and HBase Cluster to work with. You can use the different web-interfaces to interact with it:
-
JobTracker: http://master.local:50030/jobtracker.jsp
-
HBase Master: http://master.local:60010/master-status
[NOTE] You don’t have to use the vagrant based setup for following this tutorial, it works perfectly fine with any other Hadoop and HBase setup as long as it is compatible with Cascading.
For the rest of the tutorial we are going to work from the master of the Hadoop/HBase cluster, if you are using something else, change the paths accordingly:
> vagrant ssh master > cd /vagrant
Registering the log files in Lingual
For this tutorial we prepared some randomly generated log files from the three regions in which the cloud service is operating (US, EU, ASIA). You can get the data from here:
> wget http://data.cascading.org/nuage.tgz
Unpack the test data and copy it onto HDFS:
> tar xf nuage.tgz > hadoop fs -copyFromLocal nuage nuage
We are using the catalog tool to register the log files as a lingual table.
The first step is to set the platform to hadoop
and telling lingual where the
Hadoop namenode, the jobtracker, and the Hbase quorum are. You can also put these
values into your hadoop config files. If you are not using the vagrant cluster,
you must adjust them accordingly.
> export LINGUAL_PLATFORM=hadoop > export LINGUAL_CONFIG=fs.default.name=hdfs://master.local:9000,mapred.job.tracker=master.local:9001,hbase.zookeeper.quorum=hadoop1.local
Initialize the catalog:
> lingual catalog --init
Next we create a schema called LOGS
, which will contain the log files.
> lingual catalog --schema LOGS --add
In this schema we create a stereotype that describes the logfile structure. The first column is a timestamp and all others are strings:
> lingual catalog --schema LOGS --stereotype LOGFILE -add --columns TIMESTAMP,CUSTOMER_ID,REGION,BUCKET,METHOD,KEY --types timestamp,string,string,string,string,string
Next we register a table called LOGS
in the schema called LOGS
using the
LOGFILE
stereotype. Note that we are registering a directory containing
multiple files as one table in lingual:
> lingual catalog --schema LOGS --table LOGS --stereotype LOGFILE -add /user/vagrant/nuage/logs/ --format tsv
We can verify that everything works as expected by taking a look a the buckets for a customer in the system:
> lingual shell 0: jdbc:lingual:hadoop> select distinct bucket,region from logs.logs where customer_id = 'b7dd113c-99ba-4fe8-b634-0fa26d781bad' order by region; +----------------+---------+ | BUCKET | REGION | +----------------+---------+ | Arieses | ASIA | | correspond | ASIA | | relaxes | ASIA | | sulfide | EU | | bachelors | US | | dialectic | US | | diked | US | | distinctively | US | | inalienable | US | | initiatives | US | +----------------+---------+ 10 rows selected (40.859 seconds)
This customer has used 10 different buckets in the 3 regions. 3 of those buckets are in ASIA, one in EU and 6 in the US region.
We now have the ability to do SQL on our log files. This is the first step on our way to roll up the statistics for a customer billing purposes.
HBase and Lingual
Lingual can talk to HBase by using the cascading.hbase provider. The provider is an adapter that abstracts the access and datamodel of HBase away, so that it can be used from lingual (and from cascading).
The first thing we have to do, is installing the HBase provider in lingual:
> lingual catalog --provider --add cascading:cascading-hbase:2.2.0:provider
This will download the provider from conjars and install it in your lingual catalog.
Next we need a new schema in lingual, which we will call BILLING
.
> lingual catalog --schema BILLING --add
Now we can define the stereotype for blobster billing data:
> lingual catalog --schema BILLING --stereotype blobster -add --columns ROWKEY,MONTH,GET_COUNT,PUT_COUNT,HEAD_COUNT,POST_COUNT --types string,string,Integer,Integer,Integer,Integer
We define 6 columns: The first is going to be used as the rowkey in HBase and translates to the UUID of the customer. The second column is the billing month and the last 4 columns contain the aggregated co::ts per API calls of the customer.
Afterwards we add the hbase
format to lingual and while we are adding it, we
declare the column family, which we are going to write the data to. We use B
here, since we are writing statistics for the ``blobster'' service.
> lingual catalog --schema BILLING --format hbase --add --properties=family=B --provider hbase
Now we add the hbase
protocol to the schema:
> lingual catalog --schema BILLING --protocol hbase --add --provider hbase
Finally we can register a new lingual table in our schema which reads and
writes to HBase transparently. We call the lingual table BLOBSTER
and it is
backed by an HBase table called central_billing''. The table uses the protocol
hbase'' and the format hbase'', which are both provided by the
hbase''
provider, which we installed above:
> lingual catalog --schema BILLING --table BLOBSTER --stereotype blobster -add "central_billing" --protocol hbase --format hbase --provider hbase
Writing into HBase
We now have two tables in lingual, one is a collection of tsv files on HDFS, the other is a table in HBase. Creating and writing the statistics for October 2013 into HBase can be done with one standard SQL statement:
> lingual shell 0: jdbc:lingual:hadoop> INSERT INTO "BILLING"."BLOBSTER" select CUSTOMER_ID as ROWKEY, '2013-10' "MONTH", SUM( CASE WHEN "METHOD" = 'HEAD' THEN 1 ELSE 0 END) AS HEAD_COUNT, SUM( CASE WHEN "METHOD" = 'GET' THEN 1 ELSE 0 END) AS GET_COUNT, SUM(CASE WHEN "METHOD" = 'POST' THEN 1 ELSE 0 END) AS POST_COUNT, SUM(CASE WHEN "METHOD" = 'PUT' THEN 1 ELSE 0 END) AS PUT_COUNT FROM LOGS.LOGS WHERE "TIMESTAMP" BETWEEN TIMESTAMP '2013-10-01 00:00:00' AND TIMESTAMP '2013-10-31 23:59:59' GROUP BY CUSTOMER_ID ; +-----------+ | ROWCOUNT | +-----------+ | 1000 | +-----------+ 1 row selected (48.566 seconds)
Lingual will spawn Cascading flows on the hadoop cluster which filter and
aggregate the data. The results will be written into the central_billing''
hbase table. The
ROWCOUNT'' reported by the query is the number of rows
written into Hbase. The log files contain data from 1000 customers of the object
store service, hence the 1000.
With the provider mechanism it is not only possible to write into an HBase table, but also read from it. We can inspect the freshly written data:
> lingual shell 0: jdbc:lingual:hadoop> select * from "BILLING"."BLOBSTER" where ROWKEY = 'b7dd113c-99ba-4fe8-b634-0fa26d781bad';
+---------------------------------------+----------+------------+------------+-------------+-------------+ | ROWKEY | MONTH | GET_COUNT | PUT_COUNT | HEAD_COUNT | POST_COUNT | +---------------------------------------+----------+------------+------------+-------------+-------------+ | b7dd113c-99ba-4fe8-b634-0fa26d781bad | 2013-10 | 158 | 171 | 150 | 147 | +---------------------------------------+----------+------------+------------+-------------+-------------+
You can of course also access the same data from the HBase shell:
> hbase shell hbase(main):001:0> get 'central_billing', 'b7dd113c-99ba-4fe8-b634-0fa26d781bad' COLUMN CELL B:GET_COUNT timestamp=1383756401644, value=158 B:HEAD_COUNT timestamp=1383756401644, value=150 B:MONTH timestamp=1383756401644, value=2013-10 B:POST_COUNT timestamp=1383756401644, value=147 B:PUT_COUNT timestamp=1383756401644, value=171
Wrapping up
We have seen an example of data processing that could be coming from a real world service. We saw how easy it is to use Hadoop and HBase without writing any Java code. Anyone capable of writing SQL queries can do what we just saw. You can also imagine how the aggregated stats are further used to calculate the actual billing statements. You could introduce another table holding the costs of each operation and write those results into yet another table. This is left as an exercise to the reader.