Lingual executes ANSI SQL queries as Cascading applications on Apache Hadoop clusters.

Introduction

Lingual was designed to support the following goals:

Immediate Data Access

Using the Lingual Shell or JDBC Driver with existing Desktop and BI tools, unlock data stored on an Apache Hadoop cluster.

Simplify SQL Migration

Using the Lingual JDBC Driver or native Cascading APIs, simply migrate existing SQL code to Apache Hadoop.

Simplify System and Data Integration

Using the Lingual Data Provider functionality, data can be moved between any two systems over Apache Hadoop using SQL.

To satisfy these goals, Lingual supports as much of the ANSI SQL spec that makes sense, which is more than any other tool today providing "SQL" on Apache Hadoop MapReduce. Lingual is also fully compliant with the JDBC API.

And being built on Cascading, it works seamlessly with other Cascading based workloads via the native Cascading API.

Overview

Lingual provides JDBC Drivers, a SQL command shell, and a catalog manager for publishing files (or any resource) as schemas and tables.

If you are already familiar with Cascading, you can use the native Cascading API to run SQL based Flows in tandem with other custom Flows in your application.

To use Lingual, there is no installation other than the optional command line utilities.

Lingual supports SELECT and INSERT INTO statements, for example

SELECT COUNT( DISTINCT CITY ), SUM( DISTINCT AGE ) FROM SALES.EMPS
INSERT INTO TEST.RESULTS SELECT EMPNO, NAME FROM SALES.EMPS

DDL statements like CREATE TABLE are supported only through the Lingual Catalog tool for defining Schemas and Tables from the command line.

See ANSI SQL Support for the full list of supported operations and functions.

Command Line Tools

Use the command line tools to publish files or resources as tables, or run queries from the console, against a local cluster or remotely on Amazon ElasticMapReduce.

To install the command line client:

The Lingual command line client consists of two utilities:

  • Catalog - for publishing files as tables

  • Shell - for executing SQL statements

See the Getting Started page for an overview of how the utilities work together.

When using Apache Hadoop, refer to the notes on how to setup your Hadoop environment:

Java JDBC

Lingual provides a fully Java 1.6 JDBC compliant Driver.

Java Cascading APIs

Lingual provides a very simple API for executing a SQL statement as a Cascading Flow.

Data Providers

Data Providers are jar files with custom Cascading Taps, Schemes, a factory class, and a provider definition file (provider.properties).

When registered with Lingual using the Catalog tool, the Lingual JDBC Driver and/or Lingual Shell can run queries where data is read or written to systems the provider provides integration to.

A provider can be specified by either referencing the file location, or using a Maven style spec (group:name:revision).

Learn more about how to create a Provider:

Lingual Client Installation

With an existing Hadoop cluster

If you already have an installed and running version of a supported Apache Hadoop distribution, you can simply install the Lingual tools from the command line locally.

To install the Lingual command line tools, call:

> curl http://files.concurrentinc.com/lingual/1.0/lingual-client/install-lingual-client.sh | bash

This scripts downloads and installs the latest lingual shell script into ~/.lingual-client/ and updates any local .bashrc file.

You typically should have the HADOOP_CONF_DIR env variable set pointing to your Hadoop conf directory so Lingual can find your cluster setup. For more details, see:

With a local demo Hadoop cluster

If you want your own local Apache Hadoop 4 node cluster to test out Lingual’s features, clone the Vagrant based cluster hosted on the Cascading GitHub site.

See the README at the link above for details on how to initialize and run the cluster with the Cascading SDK pre-installed.

Updating your Lingual install

To get the latest release, call:

> lingual selfupdate

You can optionally bypass the installation and just download the latest version of the Lingual client by calling:

> wget -i http://files.concurrentinc.com/lingual/1.0/lingual-client/latest.txt

Amazon ElasticMapReduce

The install-lingual-client.sh file is also a valid Amazon EMR bootstrap action.

elastic-mapreduce \
  --create \
  --instance-group master --instance-count 1 --instance-type $MASTER_INSTANCE_TYPE \
  --instance-group core --instance-count $1 --instance-type $SLAVE_INSTANCE_TYPE \
  --bootstrap-action s3://files.concurrentinc.com/lingual/1.0/lingual-client/install-lingual-client.sh \
  --name "Cascading Cluster - $USER" \
  --key-pair $EMR_SSH_KEY_NAME \
  --alive

Alternately, you can install the full Cascading SDK which includes a number of additional tools with the following bootstrap action:

--bootstrap-action s3://files.concurrentinc.com/sdk/2.2/install-cascading-sdk.sh

The assumption here is that you will be shelling into your remote Hadoop cluster to use Lingual or other SDK tools. See Using Hadoop for tips on connecting remotely.

Getting Started

The best way to learn Lingual is to download sample data and run a few queries.

Getting Data and Setting up Lingual

After installing Lingual, in a new working directory download sample data:

> wget http://data.cascading.org/employees.tgz
> tar -xzf employees.tgz

Next download a simple shell script to register the employee data with Lingual against the platform Lingual will be executing against, either local or hadoop. local means data is read from the local filesystem, and the queries are run in local memory. hadoop means data is read from HDFS and MapReduce jobs are run on the cluster to execute the queries.

If local call:

> export LINGUAL_PLATFORM=local

Or if hadoop, call:

> export LINGUAL_PLATFORM=hadoop
> export HADOOP_HOME=/path/to/hadoop
> hadoop fs -copyFromLocal employees employees

Then call:

> wget http://data.cascading.org/create-employees.sh
> chmod a+x create-employees.sh
> ./create-employees.sh

The create-employees.sh script

The create-employees.sh script simply calls lingual catalog to register each file as a table, and the columns and types in each file on the platform set by LINGUAL_PLATFORM.

For example, to register the employees/employees.csv file as the EMPLOYEES.EMPLOYEES table, first the schema EMPLOYEES must be created:

lingual catalog --schema EMPLOYEES --add

The stereotype must be created, named employees (to keep things simple):

lingual catalog --schema EMPLOYEES --stereotype employees --add \
  --columns EMP_NO,BIRTH_DATE,FIRST_NAME,LAST_NAME,GENDER,HIRE_DATE \
  --types int,date,string,string,string,string

Then the EMPLOYEES table must be created:

lingual catalog --schema EMPLOYEES --table EMPLOYEES --stereotype employees \
  --add ${BASEDIR}/employees/employees.csv

Separating stereotype from table allows the columns and type definitions to be shared across tables without having to re-register the redundant data.

Note that .csv file support is built in to Lingual, so there is no need to register or create that data format.

Running queries

The Lingual Shell is simply a command shell that uses the Lingual JDBC Driver to execute SQL statements against the configured platform.

To start the shell, run:

> lingual shell

From within the shell, execute:

> select * from employees.titles;

The Lingual query planner detects that we are effectively only reading the file with this query, so the results begin to display immediately.

Alternately, run:

> select * from employees.titles where title = 'Engineer';

This will result in an actual MapReduce job being submitted, if using the Hadoop platform. You can verify this on the JobTracker web interface.

What actually happened under the hood is that a new Cascading Flow was created by the JDBC Driver and run to select all the employees records with the given title, which were placed, by default, into a file in the ./results/ directory, either on the local disk or in your user directory on HDFS.

A JDBC ResultSet was then created to read the results file where the "max rows" was set to 10,000 (the default). Since Hadoop generally has really large files, this seems like a reasonable limit. See the command line args to change.

The file in the ./results/ directory is a valid data file, but should be deleted if you want to reclaim the space it is taking up.

To verify on Hadoop, run:

> hadoop fs -lsr results

Resulting in something like this:

-rw-r--r--   3 vagrant supergroup    2165628 2013-07-24 21:01 /user/vagrant/results/20130724-210146-65127B6700/part-00000
-rw-r--r--   3 vagrant supergroup    2169890 2013-07-24 21:01 /user/vagrant/results/20130724-210146-65127B6700/part-00001

To see the contents, run:

hadoop fs -cat results/20130724-210146-65127B6700/part-00000

A table must exist in Lingual before an insert into select ... statement can be called so Catalog must be used to create a location to insert the results into.

> lingual catalog --schema working --add
> lingual catalog --schema working --stereotype titles -add --columns title,cnt --types string,int
> lingual catalog --schema working --table unique_titles --stereotype titles -add working/unique-titles.csv

Now execute the query:

> lingual shell
> insert into "working"."unique_titles" select title, count( title ) as cnt from employees.titles group by title;

The results will be located in working/unique-titles.csv.

Please note that the column names in your insert statement have to match the column names of the of the declared stereotype in the catalog. In the example above we call the function count() and add as cnt so that lingual is able to write the data to the correct table. Omitting the as cnt will result in an error.

Using different file formats

Lingual supports a Data Provider mechanism that allows for new formats and protocols to be added on demand.

For example, to add support for a fabricated pipe delimited format or .psv file, the built in providers can be used to create a new file format.

By running:

> lingual catalog --provider

you can see the currently registered providers. text is the built in provider for handling delimited files.

To see the properties associated with the text provider, run:

> lingual catalog --provider text --show

To create a .psv file, execute

> lingual catalog --schema working --format psv --add --provider text --extensions '.psv' --properties "delimiter=|"
> lingual catalog --schema working --table unique_titles --update working/unique-titles.psv --format psv

The results will be located in working/unique-titles.psv and use a | instead of , as a field delimiter.

Adding and using a new Data Provider

Instead of using the built in Data Provider, new ones can be added that provide access to data systems not currently supported by Lingual.

For example, to copy data from a csv file and store it in a memcached server, the cascading-memcached provider can be registered.

To register the memcached provider, run:

> lingual catalog --provider -add cascading:cascading-memcached:0.3.0:provider

This will retrieve the cascading-memcached-0.3.0-provider.jar from Conjars (if not on Conjars, then from Maven Central).

To see what the provider provides, call:

> lingual catalog --provider memcached --show

The memcached provider can store data as text delimited values, or as binary. To store values as comma separated text values, we can use the builtin format called csv. But we need to tell it which columns are keys, and which columns are values.

> lingual catalog --schema working --format csv --update --properties keyFields=title,valueFields=cnt

Note we are "updating" the csv format as seen by the "working" schema even though the provider was added to the default schema.

Compare these three calls:

> lingual catalog --format csv --show
> lingual catalog --format csv --provider memcached --show
> lingual catalog --schema working --format csv --show

The first fails naming two providers that provide support for the csv format. The second shows the default values of csv for the "memcached" provider. The third shows the properties as configured in the "working" schema along with the defaults from the provider.

Schemas are used to customize and/or override default protocol and format properties as seen by the tables in the given schema.

Next we need to create a table that is backed by our memcached server on the given IP and port:

> lingual catalog --schema working --table title_counts --stereotype titles -add localhost:11211 \
  --protocol memcached-text --format csv

Note that we re-used the stereotype "titles" created in the above example.

And when using Hadoop, make sure you use the actual IP address of the memcached server host, not localhost.

Now execute the query, assuming an actual memcached server is running:

> lingual shell
> insert into "working"."title_counts" select title, count( title ) as cnt from employees.titles group by title;

If run on Hadoop, a MapReduce job will be spawned, and the "sink" Tap in the Flow will be the memcached Tap. That is the results are not written to disk, but streamed directly into the memcached server from each reducer task.

To verify values are stored in the memcached server, run:

> telnet localhost 11211
> get Staff

Lingual Catalog

The Lingual Catalog command line tool allows users to curate a catalog of database schemas and tables, where a table is a Tap accessible data-set and a schema is a collection of tables.

The Lingual Catalog is used in tandem with the Lingual Shell and the Lingual JDBC driver. By default the Shell will use the current (in the current directory) catalog of schemas and tables to satisfy the SQL planner.

These concepts are inherited from the database world and are compatible with standard SQL tools.

Detail

A Schema is a collection of Tables. A Schema has a name like employees.

|-- schemas --|
root           <- default schema
    \
     employees <- user schema

A Table consists of a URI, Stereotype, a Format, and a Protocol. A Table also has a name like titles. The full name, if the Table belonged to the Schema employees would be employees.titles.

|-- schemas --|-- tables -- |
root
    \
     employees
              \
               titles  <- user table

Format is the file format or encoding of a Table URI. Tab delimited (TSV) and comma delimited (CSV) are common text formats that can be identified by the file extension on the URI (employees/titles.csv). Format maps to a Cascading Scheme instances internally (like TextDelimited for CSV).

Protocol is how a Table URI is accessed. If on Hadoop, the default is HDFS. If in Local mode, the default is through the local filesystem. Protocols can be identified by the the URI scheme. hdfs:/... for HDFS and file:/... for the local filesystem. Protocol maps to a Cascading Tap type internally (like Hfs for HDFS).

A Stereotype represents the meta-data associated with a Table, the table definition, which includes column names and column types. Stereotypes have a name, may be nested in a Schema, and may be shared between Tables.

Use the command line to create and update new Schema, Table, Stereotype, Format, and Protocols.

CLI Usage

Catalog is invoked from the command line via:

lingual catalog [switches]*

To create a new catalog in the user home directory on HDFS:

lingual catalog --platform hadoop --init

To add new table to an existing schema:

lingual catalog --platform hadoop --schema company --table employees --add ./data/employees

CLI Options Reference

context action description

--uri [uri]

optional path to the catalog meta-data, defaults to current directory relative to the current platform

--platform

lists all known platforms (currently local and hadoop)

--platform [name]

use the named platform (relative uri will be resolved for given platform)

--default*

make the current relevant options the default environment

--init

initializes a new catalog in the current directory if --uri is not given

--ddl [file]

use DDL file to define tables in an existing schema

--schema [name]

--format [name]

--protocol [name]

--repo

list all maven repos

--repo [name]

--add [url]

add maven repo

--remove

remove maven repo

--validate

optional arg when using --add to test the repo is valid without adding it

--show

shows detailed information about a repo. requires the [name] param for repo

--schema

lists all current schemas

--schema [name]

--add [uri]

uri optional, add path as a new schema root

--remove

--rename [new name]

--show

shows detailed information about a schema. requires the [name] param for schema

--table

lists all tables for the current schema

--table [name]

--add [uri]

add path as a new table root, will attempt to resolve stereotype

--update

updates the table with new properties

--stereotype [name]

use existing stereotype for table definition

--format [name]

use format for uri identifier

--protocol [name]

optional, use protocol for uri identifier

--remove

logically removes table, does not delete files

--rename [new name]

logically renames table, does not alter files

--show

shows detailed information about a table. requires the [name] param for table

--stereotype

list all registered stereotype names

--stereotype [name]

--add

--update

update with given values (replaces values)

--provider [name]*

use the given provider (optional)

--columns [names,.]

required

--types [types,.]

required

--remove

--rename [new name]

--show

shows detailed information about a stereotype. requires the [name] param for stereotype

--provider

list all registered providers

--provider [name]

register a new provider

--add [uri|spec]

register a provider located by the uri or maven spec (group:name:revision)

--validate

optional arg when using --add to test the provider’s uri or spec is valid without it

--remove

--rename [new name]

--show

shows detailed information about a provider. requires the [name] param for provider

--protocol

list all registered protocol names

--protocol [name]

--add

register a new protocol

--provider [name]

use the given provider

--update

update with given values (replaces values)

--schemes [uri,.]

uri scheme to identify protocol (jdbc:, hdfs:, etc)

--properties [name=value,.]

update/add properties for the protocol (user=jsmith, etc)**

--remove

--rename [new name]

--show

shows detailed information about a protocol. requires the [name] param for protocol

--format

list all registered format names

--format [name]

--add

register a new format, like CSV, TSV, Avro, or Thrift

--provider [name]

use the given provider

--update

update with given values (replaces values)

--extensions [.ext,.]

file extension used to identify format (.csv, .tsv, etc)

--properties [name=value,.]

update/add properties for the format (hasHeaders=true, etc)**

--remove

--rename [new name]

--show

shows detailed information about a format. requires the [name] param for format

* currently unsupported

** If a key has a list of values, name1=value1,value2, you can only set a single property from that invocation Otherwise name1=value1,name=value2 works.

Catalog Structure

Any directory can be the root namespace for a catalog

path description

.

current directory

./.lingual/

all meta-data (hidden directory)

defaults

default environment values *

catalog

catalog data file in JSON

providers

provider jar files

config

config files dir, "default.properties" file from it is picked by default

./results

local storage for all SELECT query results sets

* currently unsupported

Configuration

See Configuring Apache Hadoop for using with a Apache Hadoop cluster.

Lingual Shell

Lingual Shell is an interactive SQL command shell.

Lingual Shell expects to find a catalog configuration that defines available Schemas and Tables in the current directory for the specified platform.

The catalog information is created and maintained by the Catalog utility. By default, the catalog utility will create a directory named .lingual in the current directory with additional directories and files.

Reusing results

By default, the results of SELECT queries will be stored in the results directory in current working path on the current platform. On screen the max rows number of rows will be displayed.

If the --resultSchema argument is used, the named schema will be used to store results, where each table name will be a unique timestamp. Additionally, the last result will be also references as the table alias LAST.

That is, after running any complex query, select * from results.last will re-print the results of the last query, assuming shell was started with the argument

> lingual shell --resultSchema RESULTS

This remains true even if the RESULTS schema was not previously created with the catalog tool.

Note that no tables are added to the persistent version of catalog meta-data (saved to disk). They are only available at runtime, subsequently if any files are deleted from the result path, they will not be visible as tables in the result schema and queries on subsequent shell runs.

Setting Job Properties

After using the Catalog to initialize a new catalog (in the .lingual directory), a .lingual/config/default.properties file will be created. Edit this file to add new properties to be used by the underlying platform. In the case of Hadoop, the number of reducers to use for all jobs could be set here.

The LINGUAL_CONFIG environment variable may also be used to provide bootstrap properties to Shell.

See the notes on Hadoop for more information on setting properties.

CLI Usage

Catalog is invoked from the command line via:

lingual shell [switches]*

To start the shell for running queries on Apache Hadoop:

lingual shell --platform hadoop

CLI Options Reference

switch description

--platform [name]

use the named platform

--username [name]

name of remote user to use

--password [password]

password of remote user

--schema [name]

name of the default schema (same as set schema _name_)

--schemas [uri,…]

root path for each schema to use, will use base directory as schema name. sub-directories will be treated as tables

--sql [file]

file with SQL commands to execute

--maxRows [num]

the maximum number of rows to print to the console

--resultSchema [name]

schema name to store temporary result sets within, uses --resultPath if no schema exists

--resultPath [dir]

platform path to store temporary result sets in, results is default

--flowPlanPath [dir]

where to write out the Cascading planner DOT file for debugging

--sqlPlanPath [dir ]

where to write out the Optiq planner plan file for debugging

Configuration

See Configuring Apache Hadoop for using with a Apache Hadoop cluster.

Lingual JDBC

Using the JDBC Drivers

Lingual provides two JDBC Driver jars with self contained dependencies in the Conjars Maven repository.

The JDBC connection string is of the form jdbc:lingual:[platform], where [platform] is either local or hadoop.

Additional JDBC properties may be set using the form:

jdbc:lingual:[platform];[property]=[value];[property]=[value];...

Where the property keys are, in part:

  • catalog=[path] - the working directory where your .lingual workspace is kept, default is ./.

  • schema=[name] - set the default schema name to use

  • schemas=[path,path] - URI paths to the set of schema/tables to install in the catalog on startup

  • resultPath=[path] - temporary root path for result sets to be stored, defaults to ./results

  • flowPlanPath=[path] - for debugging, print the corresponding Flow dot file here

  • sqlPlanPath=[path] - for debugging, print the corresponding SQL plan file here

platform is required to help the driver distinguish between either backend when more than one JDBC Driver is in the CLASSPATH.

Any other properties will be passed down to the underlying platform. In the case of Apache Hadoop, specific connection properties can be passed. See the Notes on Hadoop documentation for more details.

Using the JDBC Driver In Your Project

To pull either of these jars, the jdbc Maven classifier must be used if you want the self contained jar file with all its shaded dependencies.

For the local mode platform:

<dependency>
  <groupId>cascading</groupId>
  <artifactId>lingual-local</artifactId>
  <version>x.y.z</version>
  <classifier>jdbc</classifier>
</dependency>

For the hadoop mode platform:

<dependency>
  <groupId>cascading</groupId>
  <artifactId>lingual-hadoop</artifactId>
  <version>x.y.z</version>
  <classifier>jdbc</classifier>
</dependency>

Alternatively, pulling the default artifacts (without the classifier) will also pull any relevant dependencies as would be expected.

Using JDBC in Java

The Java source code used to execute a query via a JDBC connection is much the same as with any other JDBC driver:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class JdbcExample
  {
  public static void main( String[] args ) throws Exception
    {
    new JdbcExample().run();
    }

  public void run() throws ClassNotFoundException, SQLException
    {
    Class.forName( "cascading.lingual.jdbc.Driver" );
    Connection connection = DriverManager.getConnection(
       "jdbc:lingual:local;schemas=src/main/resources/data/example" );
    Statement statement = connection.createStatement();

    ResultSet resultSet =
      statement.executeQuery(
        "select *\n"
          + "from \"example\".\"sales_fact_1997\" as s\n"
          + "join \"example\".\"employee\" as e\n"
          + "on e.\"EMPID\" = s.\"CUST_ID\"" );

    while( resultSet.next() )
      {
      int n = resultSet.getMetaData().getColumnCount();
      StringBuilder builder = new StringBuilder();

      for( int i = 1; i <= n; i++ )
        {
        builder.append(
          ( i > 1 ? "; " : "" )
            + resultSet.getMetaData().getColumnLabel( i )
            + "="
            + resultSet.getObject( i ) );
        }
      System.out.println( builder );
      }

    resultSet.close();
    statement.close();
    connection.close();
    }
  }

Note that in this example the schema for the DDL has been derived directly from the CSV files. In other words, point the JDBC connection at a directory of flat files and query as if they were already loaded into SQL.

The results look like:

CUST_ID=100; PROD_ID=10; EMPID=100; NAME=Bill
CUST_ID=150; PROD_ID=20; EMPID=150; NAME=Sebastian

An equivalent Cascading app would be:

Tap empTap = new FileTap(new TextDelimited(true, ",", "\""), "src/test/data/employee.txt");
Tap salesTap = new FileTap(new TextDelimited(true, ",", "\""), "src/test/data/salesfact.txt");

Tap resultsTap = new FileTap(new TextDelimited(true, ",", "\""), "build/test/output/results.txt", SinkMode.REPLACE);

Pipe empPipe = new Pipe("emp");
Pipe salesPipe = new Pipe("sales");

Pipe join = new CoGroup(empPipe, new Fields("empid"), salesPipe, new Fields("cust_id"));

FlowDef flowDef = flowDef()
  .setName("flow")
  .addSource(empPipe, empTap)
  .addSource(salesPipe, salesTap)
  .addTailSink(join, resultsTap);

Flow flow = new LocalFlowConnector().connect(flowDef);
flow.start();

TupleEntryIterator iterator = resultTap.openForRead();

Alternately you can access this data from R via the JDBC Driver as described on the Accessing Data from R page.

Using JDBC from R

The following example is based on the RJDBC package for R, assuming that the MySQL Sample Employee Database has been used as described on the Java Example page.

# JDBC support in R is provided by the RJDBC package http://www.rforge.net/RJDBC/

# install the RJDBC package; only needed once -- uncomment next line the first time
#install.packages("RJDBC", dep=TRUE)

# load the library
library(RJDBC)

# set up the driver
drv <- JDBC("cascading.lingual.jdbc.Driver", "~/src/concur/lingual/lingual-local/build/libs/lingual-local-1.0.0-wip-dev-jdbc.jar")

# set up a database connection to a local repository
connection <- dbConnect(drv, "jdbc:lingual:local;catalog=~/src/concur/lingual/lingual-examples/tables;schema=EMPLOYEES")

# query the repository
df <- dbGetQuery(connection, "SELECT * FROM EMPLOYEES.EMPLOYEES WHERE FIRST_NAME = 'Gina'")
head(df)

# use R functions to summarize and visualize part of the data
df$hire_age <- as.integer(as.Date(df$HIRE_DATE) - as.Date(df$BIRTH_DATE)) / 365.25
summary(df$hire_age)

# uncomment next line the first time
#install.packages("ggplot2")
library(ggplot2)

m <- ggplot(df, aes(x=hire_age))
m <- m + ggtitle("Age at hire, people named Gina")
m + geom_histogram(binwidth=1, aes(y=..density.., fill=..count..)) + geom_density()

The last part of that R script calculates the age (in years) at time of hire for all people named ‘Gina’, summarized as:

> summary(df$hire_age)
   Min. 1st Qu.  Median    Mean 3rd Qu.    Max.
  20.86   27.89   31.70   31.61   35.01   43.92

and visualized in the plot:

gina_hire_age

Using JDBC with Squirrel SQL with Apache Hadoop

The Lingual Drivers work great with Squirrel SQL as a desktop client.

Here we will describe installing the Lingual Driver for the Apache Hadoop platform. The local mode platform should be straightforward as it has less configuration.

Before starting, make sure:

  • the Lingual Client has been downloaded and installed per theGetting Started section,

  • a local copy of the Apache Hadoop libraries are unpacked, and

  • a copy of the remote Hadoop cluster configuration is copied local (found in the remote HADOOP_CONF_DIR)

After downloading the Squirrel application and installing, simply add a new "driver" from the drivers tab.

The "name" is arbitrary, "Lingual Hadoop" is a good place to start.

The "example url" should be jdbc:lingual:hadoop.

After clicking "extra classpath", navigate and add:

  • All the jar files found under the Apache Hadoop unpack directory, including the lib folder

  • The directory containing all the Hadoop configuration files (HADOOP_CONF_DIR)

  • The Lingual Hadoop JDBC jar file, found under lingual-client/platform/hadoop/lingual-hadoop-x.y.z-jdbc.jar

Finally, if not already filled out, set "class name" to cascading.lingual.jdbc.Driver then click "Ok".

Next click the "Aliases" tab and create a new alias using the driver name just created. The name is arbitrary here as well.

"username" should be the name of the user in which the default directory on HDFS should be accessed. If using the vagrant test cluster, set the username to "vagrant". This will pull all catalog and meta-data from the /user/vagrant director on HDFS. No password is necessary.

Save any any changes and connect to the cluster.

If the Getting Started Guide was followed, you can now query any data that was previously registered by lingual catalog tool.

Lingual in Java with Cascading

The Java source code used to execute a query as a Cascading Flow is much the same as assembling a Flow from a pipe assembly. Cascading 2.2 has introduced the SQLPlanner interface allowing the underlying pipe assembly to be created on the fly after the source and sink Tap instances are provided.

import java.io.IOException;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.lingual.flow.SQLPlanner;
import cascading.lingual.tap.local.SQLTypedTextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.TupleEntryIterator;

public class FlowExample
  {
  public static void main( String[] args ) throws Exception
    {
    new FlowExample().run();
    }

  public void run() throws IOException
    {
    String statement = "select *\n"
      + "from \"example\".\"sales_fact_1997\" as s\n"
      + "join \"example\".\"employee\" as e\n"
      + "on e.\"EMPID\" = s.\"CUST_ID\"";

    Tap empTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ),
        "src/main/resources/data/example/employee.tcsv", SinkMode.KEEP );
    Tap salesTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ),
        "src/main/resources/data/example/sales_fact_1997.tcsv", SinkMode.KEEP );

    Tap resultsTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ),
        "build/test/output/flow/results.tcsv", SinkMode.REPLACE );

    FlowDef flowDef = FlowDef.flowDef()
      .setName( "sql flow" )
      .addSource( "example.employee", empTap )
      .addSource( "example.sales_fact_1997", salesTap )
      .addSink( "results", resultsTap );

    SQLPlanner sqlPlanner = new SQLPlanner()
      .setSql( statement );

    flowDef.addAssemblyPlanner( sqlPlanner );

    Flow flow = new LocalFlowConnector().connect( flowDef );

    flow.complete();

    TupleEntryIterator iterator = resultsTap.openForRead( flow.getFlowProcess() );

    while( iterator.hasNext() )
      System.out.println( iterator.next() );

    iterator.close();
    }
  }

Note that in this example the table definitions expected by the SQL statement has been derived directly from the TCSV files. Where TCSV files are standard CSV (comma separated values) files with a header containing the field name and "type" of the column, like string or int. This feature is provided by the SQLTypedTextDelimited Scheme class.

Data Providers

The Data Provider mechanism allows Lingual to integrate multiple systems, including Hadoop, into a single JDBC based application. This tremendously simplifies building integration and ETL (extract, trans`form, and load) types of applications.

It also allows JDBC Client applications (Web based BI tools, GUI SQL Clients, etc) to execute queries on Hadoop with any data format or back end system. For example, a web based reporting tool can join relational data (from Oracle) with data on HDFS in the Apache Avro format to generate a report without the use of any additional tools to migrate or modify data from the database or read the Avro files.

This is accomplished by adding new protocols and formats dynamically to a Lingual query through a packaged jar file hosted locally or from Maven. This jar is automatically added, on demand, to any Lingual queries executed from the Lingual JDBC Driver.

Where a protocol is a label specifying a Cascading Tap, and a format is a label for a Cascading Scheme, along with any relevant meta-data (text delimiters, username and password, etc) needed by the Tap or Scheme classes.

By default Lingual supports multiple text formats like csv and tsv (comma separated values, and tab separated values, respectively). Files ending with .csv or .tsv are automatically mapped to the proper format. Lingual assumes that the first line of these files is a header line declaring the field name and type. If the first line is data then the Catalog Configuration for the provider for format should be set to have the parameter header=false

Cascading local mode supports the file protocol via the FileTap, and Hadoop mode supports the hdfs protocol via the Hfs Tap. URLs (identifiers) starting with file: or hdfs: are automatically mapped to the proper protocol.

Creating a Data Provider

To add new protocol or format providers, a new Jar library must be created with an optional factory class and a provider.properties file defining how to find and use the factory.

If a factory class is not provided, the extends property must be used to specify an existing provider that these properties will amend. Extending an existing provider is a simple way to add new formats or protocols by creating a new named set of properties.

Note

Any dependent Java class files (or libraries) must be expanded into the final jar file. Typically this is called a fatjar. Hadoop style lib folder inside the jar file is not honored.

The Factory Class

Below are the optional method signatures expected to be seen in a custom factory. Custom factories are not required to implement any interface. Lingual is searching for the method signatures via reflection.

If only creating a Protocol factory, only one of the createTap methods must be implemented. Similarly, if only creating a Format factory, only one of the createScheme methods must be implemented. Obviously if the Factory is providing a Protocol and a Format it must have one createTap and one createScheme method.

The identifier parameter in the createTap signatures represents the native address of the resource, that your provider is talking to. This could be a JDBC URL or a zookeeper quorum for HBase or just the path in the filesystem.

Note the Properties object is a java.util.Properties instance containing the key to value sets either defined in the provider.properties file (described below), or set from the Catalog tool command line. These properties are typically used to give usernames, passwords or similar type of information to your Format and Protocol.

public interface ProviderFactory
  {
  String getDescription(); // optional

  Tap createTap( String protocol, Scheme scheme, String identifier, SinkMode mode, Properties properties );

  Tap createTap( Scheme scheme, String identifier, SinkMode mode, Properties properties );

  Tap createTap( Scheme scheme, String identifier, SinkMode mode );

  Scheme createScheme( String protocol, String format, Fields fields, Properties properties );

  Scheme createScheme( String format, Fields fields, Properties properties );

  Scheme createScheme( Fields fields, Properties properties );

  Scheme createScheme( Fields fields );
  }

Implementations of custom factories to guide you towards your own, can be found in cascading.memcached and cascading-jdbc.

The provider.properties File

The contents of the provider.properties file should follow this template, and be located in the cascading.bind package.

cascading.bind.provider.names=
cascading.bind.provider.[provider_name].platforms=

# one or the other
cascading.bind.provider.[provider_name].factory.classname=[classname]
cascading.bind.provider.[provider_name].extends=[provider_name]

# define protocols differentiated by properties
cascading.bind.provider.[provider_name].protocol.names=
cascading.bind.provider.[provider_name].protocol.[protocol_name].schemes=
cascading.bind.provider.[provider_name].protocol.[protocol_name].[property]=[value]

# define formats differentiated by properties
cascading.bind.provider.[provider_name].format.names=
cascading.bind.provider.[provider_name].format.[format_name].extensions=
cascading.bind.provider.[provider_name].format.[format_name].[property]=[value]

# optional format property specifying the protocols this format is compatible with
# otherwise all defined protocols in this definition will be used
cascading.bind.provider.[provider_name].format.[format_name].protocols=

Notes on using with Apache Hadoop

Using with Lingual Shell

When using with Apache Hadoop, the Lingual Shell expects the following environment variable so that the correct Hadoop version and configuration may be included in the CLASSPATH.

  • HADOOP_HOME - path to local Hadoop installation, or

  • HADOOP_CONF_DIR - defaults to $HADOOP_HOME/conf

  • HADOOP_USER_NAME - the username to use when submitting Hadoop jobs

To pass custom Hadoop properties to the Hadoop platform in Lingual, use:

> export LINGUAL_CONFIG=property=value,property=value

Setting default properties

By default, Lingual creates a base JobConf instance, which in turn is populated (per the Hadoop API) by all configuration information found in the CLASSPATH.

Lingual Shell constructs the CLASSPATH in part with the value of HADOOP_CONF_DIR.

Outside of Lingual Shell, the CLASSPATH is a function of the tools using the Lingual JDBC drivers. For example, with Squirrel, the conf directory must be added explicitly.

After the JobConf is constructed, any values found in the Lingual catalog directory will be applied. By default, these values are located in .lingual/config/default.properties on HDFS.

Which leaves the tricky problem of letting Lingual know how to reach HDFS.

Hadoop relies on a single property to reach HDFS, fs.default.name. And if not set in the default.properties or CLASSPATH, Lingual needs to also know how to reach the Hadoop "job tracker" using the mapred.job.tracker property.

For example, to connect to a vagrant based Hadoop cluster as user vagrant:

> export HADOOP_USER_NAME=vagrant
> export LINGUAL_CONFIG=fs.default.name=hdfs://master.local:9000,mapred.job.tracker=master.local:9001

Any additional use specific properties can be stuffed into the default.properties file.

AWS EMR

If working with a remote Amazon Elastic MapReduce cluster from a local terminal/shell, see the Bash EMR utilities, specifically the emrconf command to fetch remote configuration files locally that can be pointed to with HADOOP_CONF_DIR.

If errors are encountered executing SQL queries remotely, calling:

> export HADOOP_USER_NAME=hadoop

Should alleviate any security issues causing failures on the remote EMR Cluster.

ANSI SQL Support

Lingual Supports the following ANSI SQL features.

Datatypes

The following data types are supported by Lingual.

Note that the "Java Type" column indicates what the database type will get translated to as final output.

For temporal types, internal representation will differ from final output. For example the DATE type is treated as an integer when doing internal serialization for Cascading processing and then converted to java.sql.Date. The usual cautions and best practices about SQL temporal types and timezones apply.

For data types such as DATE that are defined as not having limited precision, Lingual assumes a value of 0 in the default timezone of the JVM. For example, the DATE of "2013-04-02" is considered "2013-04-02 00:00:00" if converting to DATETIME.

Character Types

SQL Type Java Type

BOOLEAN

boolean, Boolean

CHAR

String

VARCHAR

String

CLOB,NCLOB

String (support may be vendor specific)

Numeric Types

SQL Type Java Type

INT

int, Integer

SMALLINT

int, Integer

BIGINT

long, Long

NUMERIC

float, Float

DECIMAL

float, Float

FLOAT

float, Float

REAL

double, Double

DOUBLE

double, Double

Temporal Types

SQL Type Java Type

DATE

java.sql.Date

TIME

java.sql.Time

TIMESTAMP

java.sql.Timestamp

Functions

The following SQL functions are supported by Lingual. Internally the implementation of the function is a combination of Optiq and Cascading features.

The particular internal function should not affect end users or developers but users running with log level set at DEBUG may want to be aware of this to help understand log output.

Operators and Logic Function

Function Notes

+

-

*

/

%

Implemented as MOD

=

>, >=

<, ⇐

<>

!=

Implemented as <>

AND

IN

Implemented as "values ({fn power([number], [exponent])});"

OR

NOT

CASE

Aggregate Functions

Function Notes

AVG

COUNT

COUNT(*)

MIN

MAX

SUM

String Functions

Function Notes

CASE

POSITION

POWER

LOWER

Implemented as "as values ({fn lcase([string])});"

OVERLAY

TRIM

UPPER

Implemented as "as values ({fn ucase([string])});"

Numeric and Mathematical Functions

Function Notes

ABS

MOD

EXP

Implemented as "values ({fn power([number], [exponent])});"

LN

POWER

Conversion and User Functions

Function Notes

CAST

CONVERT

CURRENT_DATE

CURRENT_TIME

CURRENT_TIMESTAMP

CURRENT_USER

SESSION_USER

SYSTEM_USER