Lingual executes ANSI SQL queries as Cascading applications on Apache Hadoop clusters.
Introduction
Lingual was designed to support the following goals:
- Immediate Data Access
-
Using the Lingual Shell or JDBC Driver with existing Desktop and BI tools, unlock data stored on an Apache Hadoop cluster.
- Simplify SQL Migration
-
Using the Lingual JDBC Driver or native Cascading APIs, simply migrate existing SQL code to Apache Hadoop.
- Simplify System and Data Integration
-
Using the Lingual Data Provider functionality, data can be moved between any two systems over Apache Hadoop using SQL.
To satisfy these goals, Lingual supports as much of the ANSI SQL spec that makes sense, which is more than any other tool today providing "SQL" on Apache Hadoop MapReduce. Lingual is also fully compliant with the JDBC API.
And being built on Cascading, it works seamlessly with other Cascading based workloads via the native Cascading API.
Overview
Lingual provides JDBC Drivers, a SQL command shell, and a catalog manager for publishing files (or any resource) as schemas and tables.
If you are already familiar with Cascading, you can use the native Cascading API to run SQL based Flows in tandem with other custom Flows in your application.
To use Lingual, there is no installation other than the optional command line utilities.
Lingual supports SELECT
and INSERT INTO
statements, for example
SELECT COUNT( DISTINCT CITY ), SUM( DISTINCT AGE ) FROM SALES.EMPS
INSERT INTO TEST.RESULTS SELECT EMPNO, NAME FROM SALES.EMPS
DDL statements like CREATE TABLE
are supported only through the Lingual Catalog tool for defining
Schemas and Tables from the command line.
See ANSI SQL Support for the full list of supported operations and functions.
Command Line Tools
Use the command line tools to publish files or resources as tables, or run queries from the console, against a local cluster or remotely on Amazon ElasticMapReduce.
To install the command line client:
The Lingual command line client consists of two utilities:
See the Getting Started page for an overview of how the utilities work together.
When using Apache Hadoop, refer to the notes on how to setup your Hadoop environment:
Java JDBC
Lingual provides a fully Java 1.6 JDBC compliant Driver.
Java Cascading APIs
Lingual provides a very simple API for executing a SQL statement as a Cascading Flow.
Data Providers
Data Providers are jar
files with custom Cascading Taps, Schemes, a factory class, and a provider definition file
(provider.properties
).
When registered with Lingual using the Catalog tool, the Lingual JDBC Driver and/or Lingual Shell can run queries where
data is read or written to systems the provider
provides integration to.
A provider can be specified by either referencing the file location, or using a Maven style spec (group:name:revision
).
Learn more about how to create a Provider:
Lingual Client Installation
With an existing Hadoop cluster
If you already have an installed and running version of a supported Apache Hadoop distribution, you can simply install the Lingual tools from the command line locally.
To install the Lingual command line tools, call:
> curl http://files.concurrentinc.com/lingual/1.1/lingual-client/install-lingual-client.sh | bash
This scripts downloads and installs the latest lingual
shell script into ~/.lingual-client/
and updates any
local .bashrc
file.
You typically should have the HADOOP_CONF_DIR
env variable set pointing to your Hadoop conf
directory so Lingual
can find your cluster setup. For more details, see:
With a local demo Hadoop cluster
If you want your own local Apache Hadoop 4 node cluster to test out Lingual’s features, clone the Vagrant based cluster hosted on the Cascading GitHub site.
See the README at the link above for details on how to initialize and run the cluster with the Cascading SDK pre-installed.
Updating your Lingual install
To get the latest release, call:
> lingual selfupdate
You can optionally bypass the installation and just download the latest version of the Lingual client by calling:
> wget -i http://files.concurrentinc.com/lingual/1.1/lingual-client/latest.txt
Amazon ElasticMapReduce
The install-lingual-client.sh
file is also a valid Amazon EMR bootstrap action.
elastic-mapreduce \ --create \ --instance-group master --instance-count 1 --instance-type $MASTER_INSTANCE_TYPE \ --instance-group core --instance-count $1 --instance-type $SLAVE_INSTANCE_TYPE \ --bootstrap-action s3://files.concurrentinc.com/lingual/1.1/lingual-client/install-lingual-client.sh \ --name "Cascading Cluster - $USER" \ --key-pair $EMR_SSH_KEY_NAME \ --alive
Alternately, you can install the full Cascading SDK which includes a number of additional tools with the following bootstrap action:
--bootstrap-action s3://files.concurrentinc.com/sdk/2.5/install-cascading-sdk.sh
The assumption here is that you will be shelling into your remote Hadoop cluster to use Lingual or other SDK tools. See Using Hadoop for tips on connecting remotely.
Getting Started
The best way to learn Lingual is to download sample data and run a few queries.
Getting Data and Setting up Lingual
After installing Lingual, in a new working directory download sample data:
> wget http://data.cascading.org/employees.tgz > tar -xzf employees.tgz
Next download a simple shell script to register the employee data with Lingual against the platform
Lingual
will be executing against, either local
, hadoop
or hadoop2-mr1
. local
means data is read from the local filesystem, and the
queries are run in local memory. hadoop
means data is read from HDFS and MapReduce jobs are run on the cluster to
execute the queries. hadoop2-mr1
means HDFS and MapReduce as well, but for Hadoop distributions based on YARN, like
Apache Hadoop 2.x.
If local
call:
> export LINGUAL_PLATFORM=local
Or if hadoop
, call:
> export LINGUAL_PLATFORM=hadoop > export HADOOP_HOME=/path/to/hadoop > hadoop fs -copyFromLocal employees employees
Or if hadoop2-mr1
, call:
> export LINGUAL_PLATFORM=hadoop2-mr1 > export HADOOP_HOME=/path/to/hadoop > hadoop fs -copyFromLocal employees employees
Then call:
> wget http://data.cascading.org/create-employees.sh > chmod a+x create-employees.sh > ./create-employees.sh
The create-employees.sh script
The create-employees.sh
script simply calls lingual catalog
to register each file as a table, and the columns and
types in each file on the platform set by LINGUAL_PLATFORM
.
For example, to register the employees/employees.csv
file as the EMPLOYEES.EMPLOYEES
table,
first the schema EMPLOYEES
must be created:
lingual catalog --schema EMPLOYEES --add
The stereotype must be created, named employees
(to keep things simple):
lingual catalog --schema EMPLOYEES --stereotype employees --add \ --columns EMP_NO,BIRTH_DATE,FIRST_NAME,LAST_NAME,GENDER,HIRE_DATE \ --types int,date,string,string,string,string
Then the EMPLOYEES
table must be created:
lingual catalog --schema EMPLOYEES --table EMPLOYEES --stereotype employees \ --add ${BASEDIR}/employees/employees.csv
Separating stereotype from table allows the columns and type definitions to be shared across tables without having to re-register the redundant data.
Note that .csv
file support is built in to Lingual, so there is no need to register or create that data format.
Running queries
The Lingual Shell is simply a command shell that uses the Lingual JDBC Driver to execute SQL statements against the configured platform.
To start the shell, run:
> lingual shell
From within the shell, execute:
> select * from employees.titles;
The Lingual query planner detects that we are effectively only reading the file with this query, so the results begin to display immediately.
Alternately, run:
> select * from employees.titles where title = 'Engineer';
This will result in an actual MapReduce job being submitted, if using the Hadoop platform. You can verify this on the JobTracker web interface.
What actually happened under the hood is that a new Cascading Flow was created by the JDBC Driver and run to select
all the employees
records with the given title
, which were placed, by default, into a file in the ./results/
directory, either on the local disk or in your user directory on HDFS.
A JDBC ResultSet was then created to read the results file where the "max rows" was set to 10,000 (the default). Since Hadoop generally has really large files, this seems like a reasonable limit. See the command line args to change.
The file in the ./results/
directory is a valid data file, but should be deleted if you want to reclaim the
space it is taking up.
To verify on Hadoop, run:
> hadoop fs -lsr results
Resulting in something like this:
-rw-r--r-- 3 vagrant supergroup 2165628 2013-07-24 21:01 /user/vagrant/results/20130724-210146-65127B6700/part-00000 -rw-r--r-- 3 vagrant supergroup 2169890 2013-07-24 21:01 /user/vagrant/results/20130724-210146-65127B6700/part-00001
To see the contents, run:
hadoop fs -cat results/20130724-210146-65127B6700/part-00000
A table must exist in Lingual before an insert into select ...
statement can be called so Catalog must
be used to create a location to insert the results into.
> lingual catalog --schema working --add > lingual catalog --schema working --stereotype titles -add --columns title,cnt --types string,int > lingual catalog --schema working --table unique_titles --stereotype titles -add working/unique-titles.csv
Now execute the query:
> lingual shell > insert into "working"."unique_titles" select title, count( title ) as cnt from employees.titles group by title;
The results will be located in working/unique-titles.csv
.
Please note that the column names in your insert statement have to match the column names of the of the declared
stereotype in the catalog. In the example above we call the function count()
and add as cnt
so that lingual is able
to write the data to the correct table. Omitting the as cnt
will result in an error.
Using different file formats
Lingual supports a Data Provider mechanism that allows for new formats and protocols to be added on demand.
For example, to add support for a fabricated pipe delimited format or .psv
file, the built in providers can be used
to create a new file format.
By running:
> lingual catalog --provider
you can see the currently registered providers. text
is the built in provider for handling delimited files.
To see the properties associated with the text
provider, run:
> lingual catalog --provider text --show
To create a .psv
file, execute
> lingual catalog --schema working --format psv --add --provider text --extensions '.psv' --properties "delimiter=|" > lingual catalog --schema working --table unique_titles --update working/unique-titles.psv --format psv
The results will be located in working/unique-titles.psv
and use a |
instead of ,
as a field delimiter.
Adding and using a new Data Provider
Instead of using the built in Data Provider, new ones can be added that provide access to data systems not currently supported by Lingual.
For example, to copy data from a csv
file and store it in a memcached server, the
cascading-memcached
provider can be registered.
To register the memcached provider, run:
> lingual catalog --provider -add cascading:cascading-memcached:0.3.0:provider
This will retrieve the cascading-memcached-0.3.0-provider.jar
from Conjars (if not on Conjars, then from Maven Central).
To see what the provider provides, call:
> lingual catalog --provider memcached --show
The memcached provider can store data as text delimited values, or as binary. To store values as comma separated
text values, we can use the builtin format called csv
. But we need to tell it which columns are keys, and which
columns are values.
> lingual catalog --schema working --format csv --update --properties keyFields=title,valueFields=cnt
Note we are "updating" the csv
format as seen by the "working" schema even though the provider was added to the
default schema.
Compare these three calls:
> lingual catalog --format csv --show > lingual catalog --format csv --provider memcached --show > lingual catalog --schema working --format csv --show
The first fails naming two providers that provide support for the csv
format.
The second shows the default values of csv
for the "memcached" provider.
The third shows the properties as configured in the "working" schema along with the defaults from the provider.
Schemas are used to customize and/or override default protocol and format properties as seen by the tables in the given schema.
Next we need to create a table that is backed by our memcached server on the given IP and port:
> lingual catalog --schema working --table title_counts --stereotype titles -add localhost:11211 \ --protocol memcached-text --format csv
Note that we re-used the stereotype "titles" created in the above example.
And when using Hadoop, make sure you use the actual IP address of the memcached server host, not localhost
.
Now execute the query, assuming an actual memcached server is running:
> lingual shell > insert into "working"."title_counts" select title, count( title ) as cnt from employees.titles group by title;
If run on Hadoop, a MapReduce job will be spawned, and the "sink" Tap in the Flow will be the memcached Tap. That is the results are not written to disk, but streamed directly into the memcached server from each reducer task.
To verify values are stored in the memcached server, run:
> telnet localhost 11211 > get Staff
Lingual Catalog
The Lingual Catalog command line tool allows users to curate a catalog of database schemas and tables, where a table is a Tap accessible data-set and a schema is a collection of tables.
The Lingual Catalog is used in tandem with the Lingual Shell and the Lingual JDBC driver. By default the Shell will use the current (in the current directory) catalog of schemas and tables to satisfy the SQL planner.
These concepts are inherited from the database world and are compatible with standard SQL tools.
Detail
A Schema is a collection of Tables. A Schema has a name like employees.
|-- schemas --| root <- default schema \ employees <- user schema
A Table consists of a URI, Stereotype, a Format, and a Protocol. A Table also has a name like titles. The full name, if the Table belonged to the Schema employees would be employees.titles.
|-- schemas --|-- tables -- | root \ employees \ titles <- user table
Format is the file format or encoding of a Table URI. Tab delimited (TSV) and comma delimited (CSV) are common text
formats that can be identified by the file extension on the URI (employees/titles.csv
). Format maps to a Cascading
Scheme instances internally (like TextDelimited
for CSV).
Protocol is how a Table URI is accessed. If on Hadoop, the default is HDFS. If in Local mode, the default is through
the local filesystem. Protocols can be identified by the the URI scheme. hdfs:/...
for HDFS and file:/...
for the
local filesystem. Protocol maps to a Cascading Tap type internally (like Hfs
for HDFS).
A Stereotype represents the meta-data associated with a Table, the table definition, which includes column names and column types. Stereotypes have a name, may be nested in a Schema, and may be shared between Tables.
Use the command line to create and update new Schema, Table, Stereotype, Format, and Protocols.
CLI Usage
Catalog is invoked from the command line via:
lingual catalog [switches]*
To create a new catalog in the user home directory on HDFS:
lingual catalog --platform hadoop --init
To add new table to an existing schema:
lingual catalog --platform hadoop --schema company --table employees --add ./data/employees
CLI Options Reference
context | action | description |
---|---|---|
|
optional path to the catalog meta-data, defaults to current directory relative to the current platform |
|
|
lists all known platforms (currently local, hadoop and hadoop2-mr1) |
|
|
use the named platform (relative uri will be resolved for given platform) |
|
|
make the current relevant options the default environment |
|
|
initializes a new catalog in the current directory if --uri is not given |
|
|
use DDL file to define tables in an existing schema |
|
|
||
|
||
|
||
|
list all maven repos |
|
|
||
|
add maven repo |
|
|
remove maven repo |
|
|
optional arg when using --add to test the repo is valid without adding it |
|
|
shows detailed information about a repo. requires the [name] param for repo |
|
|
lists all current schemas |
|
|
||
|
uri optional, add path as a new schema root |
|
|
||
|
||
|
shows detailed information about a schema. requires the [name] param for schema |
|
|
lists all tables for the current schema |
|
|
||
|
add path as a new table root, will attempt to resolve stereotype |
|
|
updates the table with new properties |
|
|
use existing stereotype for table definition |
|
|
use format for uri identifier |
|
|
optional, use protocol for uri identifier |
|
|
logically removes table, does not delete files |
|
|
logically renames table, does not alter files |
|
|
shows detailed information about a table. requires the [name] param for table |
|
|
list all registered stereotype names |
|
|
||
|
||
|
update with given values (replaces values) |
|
|
use the given provider (optional) |
|
|
required |
|
|
required |
|
|
||
|
||
|
shows detailed information about a stereotype. requires the [name] param for stereotype |
|
|
list all registered providers |
|
|
register a new provider |
|
|
register a provider located by the uri or maven spec (group:name:revision) |
|
|
optional arg when using --add to test the provider’s uri or spec is valid without it |
|
|
||
|
||
|
shows detailed information about a provider. requires the [name] param for provider |
|
|
list all registered protocol names |
|
|
||
|
register a new protocol |
|
|
use the given provider |
|
|
update with given values (replaces values) |
|
|
uri scheme to identify protocol (jdbc:, hdfs:, etc) |
|
|
update/add properties for the protocol (user=jsmith, etc)** |
|
|
||
|
||
|
shows detailed information about a protocol. requires the [name] param for protocol |
|
|
list all registered format names |
|
|
||
|
register a new format, like CSV, TSV, Avro, or Thrift |
|
|
use the given provider |
|
|
update with given values (replaces values) |
|
|
file extension used to identify format (.csv, .tsv, etc) |
|
|
update/add properties for the format (hasHeaders=true, etc)** |
|
|
||
|
||
|
shows detailed information about a format. requires the [name] param for format |
* currently unsupported
** If a key has a list of values, name1=value1,value2
, you can only set a single property from that invocation
Otherwise name1=value1,name=value2
works.
Catalog Structure
Any directory can be the root namespace for a catalog
path | description |
---|---|
|
current directory |
|
all meta-data (hidden directory) |
|
default environment values * |
|
catalog data file in JSON |
|
provider jar files |
|
config files dir, "default.properties" file from it is picked by default |
|
local storage for all SELECT query results sets |
* currently unsupported
Lingual Shell
Lingual Shell is an interactive SQL command shell.
Lingual Shell expects to find a catalog configuration that defines available Schemas and Tables in the current directory for the specified platform.
The catalog information is created and maintained by the Catalog utility. By default, the catalog utility
will create a directory named .lingual
in the current directory with additional directories and files.
Reusing results
By default, the results of SELECT
queries will be stored in the results
directory in current working path
on the current platform. On screen the max rows
number of rows will be displayed.
If the --resultSchema
argument is used, the named schema will be used to store results, where each table name
will be a unique timestamp. Additionally, the last result will be also references as the table alias LAST
.
That is, after running any complex query, select * from results.last
will re-print the results of the last query,
assuming shell was started with the argument
> lingual shell --resultSchema RESULTS
This remains true even if the RESULTS
schema was not previously created with the catalog
tool.
Note that no tables are added to the persistent version of catalog meta-data (saved to disk). They are only available at runtime, subsequently if any files are deleted from the result path, they will not be visible as tables in the result schema and queries on subsequent shell runs.
Setting Job Properties
After using the Catalog to initialize a new catalog (in the .lingual
directory),
a .lingual/config/default.properties
file will be created. Edit this file to add new properties to be used by the
underlying platform. In the case of Hadoop, the number of reducers to use for all jobs could be set here.
The LINGUAL_CONFIG
environment variable may also be used to provide bootstrap properties to Shell.
See the notes on Hadoop for more information on setting properties.
CLI Usage
Catalog is invoked from the command line via:
lingual shell [switches]*
To start the shell for running queries on Apache Hadoop:
lingual shell --platform hadoop
CLI Options Reference
switch | description |
---|---|
|
use the named platform |
|
name of remote user to use |
|
password of remote user |
|
name of the default schema (same as |
|
root path for each schema to use, will use base directory as schema name. sub-directories will be treated as tables |
|
file with SQL commands to execute |
|
the maximum number of rows to print to the console |
|
schema name to store temporary result sets within, uses |
|
platform path to store temporary result sets in, |
|
where to write out the Cascading planner DOT file for debugging |
|
where to write out the Optiq planner plan file for debugging |
Lingual JDBC
Using the JDBC Drivers
Lingual provides two JDBC Driver jars with self contained dependencies in the Conjars Maven repository.
The JDBC connection string is of the form jdbc:lingual:[platform]
, where [platform]
is either local
or hadoop
.
Additional JDBC properties may be set using the form:
jdbc:lingual:[platform];[property]=[value];[property]=[value];...
Where the property keys are, in part:
-
catalog=[path]
- the working directory where your .lingual workspace is kept, default is./
. -
schema=[name]
- set the default schema name to use -
schemas=[path,path]
- URI paths to the set of schema/tables to install in the catalog on startup -
resultPath=[path]
- temporary root path for result sets to be stored, defaults to./results
-
flowPlanPath=[path]
- for debugging, print the corresponding Flow dot file here -
sqlPlanPath=[path]
- for debugging, print the corresponding SQL plan file here
platform
is required to help the driver distinguish between either backend when more than one JDBC Driver is in
the CLASSPATH.
Any other properties will be passed down to the underlying platform. In the case of Apache Hadoop, specific connection properties can be passed. See the Notes on Hadoop documentation for more details.
Using the JDBC Driver In Your Project
To pull either of these jars, the jdbc
Maven classifier must be used if you want the self contained jar
file with
all its shaded dependencies.
For the local
mode platform:
<dependency> <groupId>cascading</groupId> <artifactId>lingual-local</artifactId> <version>x.y.z</version> <classifier>jdbc</classifier> </dependency>
For the hadoop
mode platform:
<dependency> <groupId>cascading</groupId> <artifactId>lingual-hadoop</artifactId> <version>x.y.z</version> <classifier>jdbc</classifier> </dependency>
Alternatively, pulling the default artifacts (without the classifier) will also pull any relevant dependencies as would be expected.
Using JDBC in Java
The Java source code used to execute a query via a JDBC connection is much the same as with any other JDBC driver:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class JdbcExample
{
public static void main( String[] args ) throws Exception
{
new JdbcExample().run();
}
public void run() throws ClassNotFoundException, SQLException
{
Class.forName( "cascading.lingual.jdbc.Driver" );
Connection connection = DriverManager.getConnection(
"jdbc:lingual:local;schemas=src/main/resources/data/example" );
Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(
"select *\n"
+ "from \"example\".\"sales_fact_1997\" as s\n"
+ "join \"example\".\"employee\" as e\n"
+ "on e.\"EMPID\" = s.\"CUST_ID\"" );
while( resultSet.next() )
{
int n = resultSet.getMetaData().getColumnCount();
StringBuilder builder = new StringBuilder();
for( int i = 1; i <= n; i++ )
{
builder.append(
( i > 1 ? "; " : "" )
+ resultSet.getMetaData().getColumnLabel( i )
+ "="
+ resultSet.getObject( i ) );
}
System.out.println( builder );
}
resultSet.close();
statement.close();
connection.close();
}
}
Note that in this example the schema for the DDL has been derived directly from the CSV files. In other words, point the JDBC connection at a directory of flat files and query as if they were already loaded into SQL.
The results look like:
CUST_ID=100; PROD_ID=10; EMPID=100; NAME=Bill CUST_ID=150; PROD_ID=20; EMPID=150; NAME=Sebastian
An equivalent Cascading app would be:
Tap empTap = new FileTap(new TextDelimited(true, ",", "\""), "src/test/data/employee.txt");
Tap salesTap = new FileTap(new TextDelimited(true, ",", "\""), "src/test/data/salesfact.txt");
Tap resultsTap = new FileTap(new TextDelimited(true, ",", "\""), "build/test/output/results.txt", SinkMode.REPLACE);
Pipe empPipe = new Pipe("emp");
Pipe salesPipe = new Pipe("sales");
Pipe join = new CoGroup(empPipe, new Fields("empid"), salesPipe, new Fields("cust_id"));
FlowDef flowDef = flowDef()
.setName("flow")
.addSource(empPipe, empTap)
.addSource(salesPipe, salesTap)
.addTailSink(join, resultsTap);
Flow flow = new LocalFlowConnector().connect(flowDef);
flow.start();
TupleEntryIterator iterator = resultTap.openForRead();
Alternately you can access this data from R via the JDBC Driver as described on the Accessing Data from R page.
Using JDBC from R
The following example is based on the RJDBC package for R, assuming that the MySQL Sample Employee Database has been used as described on the Java Example page.
# JDBC support in R is provided by the RJDBC package http://www.rforge.net/RJDBC/
# install the RJDBC package; only needed once -- uncomment next line the first time
#install.packages("RJDBC", dep=TRUE)
# load the library
library(RJDBC)
# set up the driver
drv <- JDBC("cascading.lingual.jdbc.Driver", "~/src/concur/lingual/lingual-local/build/libs/lingual-local-1.0.0-wip-dev-jdbc.jar")
# set up a database connection to a local repository
connection <- dbConnect(drv, "jdbc:lingual:local;catalog=~/src/concur/lingual/lingual-examples/tables;schema=EMPLOYEES")
# query the repository
df <- dbGetQuery(connection, "SELECT * FROM EMPLOYEES.EMPLOYEES WHERE FIRST_NAME = 'Gina'")
head(df)
# use R functions to summarize and visualize part of the data
df$hire_age <- as.integer(as.Date(df$HIRE_DATE) - as.Date(df$BIRTH_DATE)) / 365.25
summary(df$hire_age)
# uncomment next line the first time
#install.packages("ggplot2")
library(ggplot2)
m <- ggplot(df, aes(x=hire_age))
m <- m + ggtitle("Age at hire, people named Gina")
m + geom_histogram(binwidth=1, aes(y=..density.., fill=..count..)) + geom_density()
The last part of that R script calculates the age (in years) at time of hire for all people named ‘Gina’, summarized as:
> summary(df$hire_age)
Min. 1st Qu. Median Mean 3rd Qu. Max.
20.86 27.89 31.70 31.61 35.01 43.92
and visualized in the plot:
Using JDBC with Squirrel SQL with Apache Hadoop
The Lingual Drivers work great with Squirrel SQL as a desktop client.
Here we will describe installing the Lingual Driver for the Apache Hadoop platform. The local mode platform should be straightforward as it has less configuration.
Before starting, make sure:
-
the Lingual Client has been downloaded and installed per theGetting Started section,
-
a local copy of the Apache Hadoop libraries are unpacked, and
-
a copy of the remote Hadoop cluster configuration is copied local (found in the remote
HADOOP_CONF_DIR
)
After downloading the Squirrel application and installing, simply add a new "driver" from the drivers tab.
The "name" is arbitrary, "Lingual Hadoop" is a good place to start.
The "example url" should be jdbc:lingual:hadoop
.
After clicking "extra classpath", navigate and add:
-
All the
jar
files found under the Apache Hadoop unpack directory, including thelib
folder -
The directory containing all the Hadoop configuration files (
HADOOP_CONF_DIR
) -
The Lingual Hadoop JDBC jar file, found under
lingual-client/platform/hadoop/lingual-hadoop-x.y.z-jdbc.jar
Finally, if not already filled out, set "class name" to cascading.lingual.jdbc.Driver
then click "Ok".
Next click the "Aliases" tab and create a new alias using the driver name just created. The name is arbitrary here as well.
"username" should be the name of the user in which the default directory on HDFS should be accessed. If using
the vagrant test cluster, set the username to "vagrant". This will pull all catalog and meta-data from the
/user/vagrant
director on HDFS. No password is necessary.
Save any any changes and connect to the cluster.
If the Getting Started Guide was followed, you can now query any data that was previously
registered by lingual catalog
tool.
Lingual in Java with Cascading
The Java source code used to execute a query as a Cascading Flow
is much the same as assembling a Flow
from
a pipe assembly. Cascading 2.2 has introduced the SQLPlanner
interface allowing the underlying pipe assembly
to be created on the fly after the source and sink Tap
instances are provided.
import java.io.IOException;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.lingual.flow.SQLPlanner;
import cascading.lingual.tap.local.SQLTypedTextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.TupleEntryIterator;
public class FlowExample
{
public static void main( String[] args ) throws Exception
{
new FlowExample().run();
}
public void run() throws IOException
{
String statement = "select *\n"
+ "from \"example\".\"sales_fact_1997\" as s\n"
+ "join \"example\".\"employee\" as e\n"
+ "on e.\"EMPID\" = s.\"CUST_ID\"";
Tap empTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ),
"src/main/resources/data/example/employee.tcsv", SinkMode.KEEP );
Tap salesTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ),
"src/main/resources/data/example/sales_fact_1997.tcsv", SinkMode.KEEP );
Tap resultsTap = new FileTap( new SQLTypedTextDelimited( ",", "\"" ),
"build/test/output/flow/results.tcsv", SinkMode.REPLACE );
FlowDef flowDef = FlowDef.flowDef()
.setName( "sql flow" )
.addSource( "example.employee", empTap )
.addSource( "example.sales_fact_1997", salesTap )
.addSink( "results", resultsTap );
SQLPlanner sqlPlanner = new SQLPlanner()
.setSql( statement );
flowDef.addAssemblyPlanner( sqlPlanner );
Flow flow = new LocalFlowConnector().connect( flowDef );
flow.complete();
TupleEntryIterator iterator = resultsTap.openForRead( flow.getFlowProcess() );
while( iterator.hasNext() )
System.out.println( iterator.next() );
iterator.close();
}
}
Note that in this example the table definitions expected by the SQL statement has been derived directly from the
TCSV files. Where TCSV files are standard CSV (comma separated values) files with a header containing the field name
and "type" of the column, like string
or int
. This feature is provided by the SQLTypedTextDelimited
Scheme
class.
Data Providers
The Data Provider mechanism allows Lingual to integrate multiple systems, including Hadoop, into a single JDBC based application. This tremendously simplifies building integration and ETL (extract, trans`form, and load) types of applications.
It also allows JDBC Client applications (Web based BI tools, GUI SQL Clients, etc) to execute queries on Hadoop with any data format or back end system. For example, a web based reporting tool can join relational data (from Oracle) with data on HDFS in the Apache Avro format to generate a report without the use of any additional tools to migrate or modify data from the database or read the Avro files.
This is accomplished by adding new protocols and formats dynamically to a Lingual query through
a packaged jar
file hosted locally or from Maven. This jar is automatically added, on demand, to any Lingual
queries executed from the Lingual JDBC Driver.
Where a protocol is a label specifying a Cascading Tap, and a format is a label for a Cascading Scheme, along with any relevant meta-data (text delimiters, username and password, etc) needed by the Tap or Scheme classes.
By default Lingual supports multiple text formats like csv and tsv (comma separated values, and tab separated values, respectively). Files ending with .csv or .tsv are automatically mapped to the proper format. Lingual assumes that the first line of these files is a header line declaring the field name and type. If the first line is data then the Catalog Configuration for the provider for format should be set to have the parameter header=false
Cascading local mode supports the file protocol via the FileTap
, and Hadoop mode supports the hdfs protocol
via the Hfs
Tap. URLs (identifiers) starting with file: or hdfs: are automatically mapped to the proper protocol.
Creating a Data Provider
To add new protocol or format providers, a new Jar library must be created with an optional factory class and a
provider.properties
file defining how to find and use the factory.
If a factory class is not provided, the extends
property must be used to specify an existing provider that these
properties will amend. Extending an existing provider is a simple way to add new formats or protocols by creating
a new named set of properties.
Note
|
Any dependent Java class files (or libraries) must be expanded into the final jar file. Typically this is called a
|
The Factory Class
Below are the optional method signatures expected to be seen in a custom factory. Custom factories are not required to implement any interface. Lingual is searching for the method signatures via reflection.
If only creating a Protocol factory, only one of the createTap
methods must be implemented. Similarly, if only
creating a Format factory, only one of the createScheme
methods must be implemented. Obviously if the Factory
is providing a Protocol and a Format it must have one createTap
and one createScheme
method.
The identifier
parameter in the createTap
signatures represents the native address of the resource, that your
provider is talking to. This could be a JDBC
URL or a zookeeper
quorum for
HBase
or just the path in the filesystem.
Note the Properties object is a java.util.Properties
instance containing the key to value sets either defined in the
provider.properties
file (described below), or set from the Catalog tool command line. These properties are typically
used to give usernames, passwords or similar type of information to your Format and Protocol.
public interface ProviderFactory
{
String getDescription(); // optional
Tap createTap( String protocol, Scheme scheme, String identifier, SinkMode mode, Properties properties );
Tap createTap( Scheme scheme, String identifier, SinkMode mode, Properties properties );
Tap createTap( Scheme scheme, String identifier, SinkMode mode );
Scheme createScheme( String protocol, String format, Fields fields, Properties properties );
Scheme createScheme( String format, Fields fields, Properties properties );
Scheme createScheme( Fields fields, Properties properties );
Scheme createScheme( Fields fields );
}
Implementations of custom factories to guide you towards your own, can be found in cascading.memcached and cascading-jdbc.
The provider.properties File
The contents of the provider.properties
file should follow this template, and be located in the cascading.bind
package.
cascading.bind.provider.names= cascading.bind.provider.[provider_name].platforms= # one or the other cascading.bind.provider.[provider_name].factory.classname=[classname] cascading.bind.provider.[provider_name].extends=[provider_name] # define protocols differentiated by properties cascading.bind.provider.[provider_name].protocol.names= cascading.bind.provider.[provider_name].protocol.[protocol_name].schemes= cascading.bind.provider.[provider_name].protocol.[protocol_name].[property]=[value] # define formats differentiated by properties cascading.bind.provider.[provider_name].format.names= cascading.bind.provider.[provider_name].format.[format_name].extensions= cascading.bind.provider.[provider_name].format.[format_name].[property]=[value] # optional format property specifying the protocols this format is compatible with # otherwise all defined protocols in this definition will be used cascading.bind.provider.[provider_name].format.[format_name].protocols=
Notes on using with Apache Hadoop
Using with Lingual Shell
When using with Apache Hadoop, the Lingual Shell expects the following environment variable so that the correct Hadoop version and configuration may be included in the CLASSPATH.
-
HADOOP_HOME
- path to local Hadoop installation, or -
HADOOP_CONF_DIR
- defaults to$HADOOP_HOME/conf
-
HADOOP_USER_NAME
- the username to use when submitting Hadoop jobs
To pass custom Hadoop properties to the Hadoop platform in Lingual, use:
> export LINGUAL_CONFIG=property=value,property=value
When using a YARN based Hadoop distribution like Apache Hadoop 2.x you, can also set the following environment variable.
-
HADOOP_YARN_HOME
- path to local YARN-based Hadoop installation
Setting default properties
By default, Lingual creates a base JobConf
instance, which in turn is populated (per the Hadoop API) by
all configuration information found in the CLASSPATH
.
Lingual Shell constructs the CLASSPATH
in part with the value of HADOOP_CONF_DIR
.
Outside of Lingual Shell, the CLASSPATH
is a function of the tools using the Lingual JDBC drivers. For example, with
Squirrel, the conf
directory must be added explicitly.
After the JobConf
is constructed, any values found in the Lingual catalog directory will be applied. By default,
these values are located in .lingual/config/default.properties
on HDFS.
Which leaves the tricky problem of letting Lingual know how to reach HDFS.
Hadoop relies on a single property to reach HDFS, fs.default.name
. And if not set in the default.properties
or
CLASSPATH
, Lingual needs to also know how to reach the Hadoop "job tracker" using the mapred.job.tracker
property.
For example, to connect to a vagrant based Hadoop cluster as user vagrant
:
> export HADOOP_USER_NAME=vagrant > export LINGUAL_CONFIG=fs.default.name=hdfs://master.local:9000,mapred.job.tracker=master.local:9001
Any additional use specific properties can be stuffed into the default.properties
file.
AWS EMR
If working with a remote Amazon Elastic MapReduce cluster from a local terminal/shell, see the
Bash EMR utilities, specifically the emrconf
command to fetch remote
configuration files locally that can be pointed to with HADOOP_CONF_DIR
.
If errors are encountered executing SQL queries remotely, calling:
> export HADOOP_USER_NAME=hadoop
Should alleviate any security issues causing failures on the remote EMR Cluster.
ANSI SQL Support
Lingual Supports the following ANSI SQL features.
Datatypes
The following data types are supported by Lingual.
Note that the "Java Type" column indicates what the database type will get translated to as final output.
For temporal types, internal representation will differ from final output. For example the DATE type is treated as an integer when doing internal serialization for Cascading processing and then converted to java.sql.Date. The usual cautions and best practices about SQL temporal types and timezones apply.
For data types such as DATE that are defined as not having limited precision, Lingual assumes a value of 0 in the default timezone of the JVM. For example, the DATE of "2013-04-02" is considered "2013-04-02 00:00:00" if converting to DATETIME.
Character Types
SQL Type | Java Type |
---|---|
BOOLEAN |
boolean, Boolean |
CHAR |
String |
VARCHAR |
String |
CLOB,NCLOB |
String (support may be vendor specific) |
Numeric Types
SQL Type | Java Type |
---|---|
INT |
int, Integer |
SMALLINT |
int, Integer |
BIGINT |
long, Long |
NUMERIC |
float, Float |
DECIMAL |
float, Float |
FLOAT |
float, Float |
REAL |
double, Double |
DOUBLE |
double, Double |
Temporal Types
SQL Type | Java Type |
---|---|
DATE |
java.sql.Date |
TIME |
java.sql.Time |
TIMESTAMP |
java.sql.Timestamp |
Functions
The following SQL functions are supported by Lingual. Internally the implementation of the function is a combination of Optiq and Cascading features.
The particular internal function should not affect end users or developers but users running with log level set at DEBUG may want to be aware of this to help understand log output.
Operators and Logic Function
Function | Notes |
---|---|
+ |
|
- |
|
* |
|
/ |
|
% |
Implemented as MOD |
= |
|
>, >= |
|
<, ⇐ |
|
<> |
|
!= |
Implemented as <> |
AND |
|
IN |
Implemented as "values ({fn power([number], [exponent])});" |
OR |
|
NOT |
|
CASE |
Aggregate Functions
Function | Notes |
---|---|
AVG |
|
COUNT |
|
COUNT(*) |
|
MIN |
|
MAX |
|
SUM |
String Functions
Function | Notes |
---|---|
CASE |
|
POSITION |
|
POWER |
|
LOWER |
Implemented as "as values ({fn lcase([string])});" |
OVERLAY |
|
TRIM |
|
UPPER |
Implemented as "as values ({fn ucase([string])});" |
Numeric and Mathematical Functions
Function | Notes |
---|---|
ABS |
|
MOD |
|
EXP |
Implemented as "values ({fn power([number], [exponent])});" |
LN |
|
POWER |
Conversion and User Functions
Function | Notes |
---|---|
CAST |
|
CONVERT |
|
CURRENT_DATE |
|
CURRENT_TIME |
|
CURRENT_TIMESTAMP |
|
CURRENT_USER |
|
SESSION_USER |
|
SYSTEM_USER |