001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.net.URI;
026
027import cascading.scheme.Scheme;
028import cascading.tap.SinkMode;
029import cascading.tap.TapException;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032
033/**
034 * Class Dfs is a {@link cascading.tap.Tap} class that provides access to the Hadoop Distributed File System.
035 * <p/>
036 * Use the {@link URI} constructors to specify a different HDFS cluster than the default.
037 */
038public class Dfs extends Hfs
039  {
040  /**
041   * Constructor Dfs creates a new Dfs instance.
042   *
043   * @param scheme of type Scheme
044   * @param uri    of type URI
045   */
046  @ConstructorProperties({"scheme", "uri"})
047  public Dfs( Scheme scheme, URI uri )
048    {
049    super( scheme, uri.getPath() );
050
051    init( uri );
052    }
053
054  /**
055   * Constructor Dfs creates a new Dfs instance.
056   *
057   * @param scheme   of type Scheme
058   * @param uri      of type URI
059   * @param sinkMode of type SinkMode
060   */
061  @ConstructorProperties({"scheme", "uri", "sinkMode"})
062  public Dfs( Scheme scheme, URI uri, SinkMode sinkMode )
063    {
064    super( scheme, uri.getPath(), sinkMode );
065
066    init( uri );
067    }
068
069  /**
070   * Constructor Dfs creates a new Dfs instance.
071   *
072   * @param scheme     of type Scheme
073   * @param stringPath of type String
074   */
075  @ConstructorProperties({"scheme", "stringPath"})
076  public Dfs( Scheme scheme, String stringPath )
077    {
078    super( scheme, stringPath );
079    }
080
081  /**
082   * Constructor Dfs creates a new Dfs instance.
083   *
084   * @param scheme     of type Scheme
085   * @param stringPath of type String
086   * @param sinkMode   of type SinkMode
087   */
088  @ConstructorProperties({"scheme", "stringPath", "sinkMode"})
089  public Dfs( Scheme scheme, String stringPath, SinkMode sinkMode )
090    {
091    super( scheme, stringPath, sinkMode );
092    }
093
094  private void init( URI uri )
095    {
096    if( !uri.getScheme().equalsIgnoreCase( "hdfs" ) )
097      throw new IllegalArgumentException( "uri must use the hdfs scheme" );
098
099    setUriScheme( URI.create( uri.getScheme() + "://" + uri.getAuthority() ) );
100    }
101
102  protected void setStringPath( String stringPath )
103    {
104    if( stringPath.matches( ".*://.*" ) && !stringPath.startsWith( "hdfs://" ) )
105      throw new IllegalArgumentException( "uri must use the hdfs scheme" );
106
107    super.setStringPath( stringPath );
108    }
109
110  @Override
111  protected FileSystem getDefaultFileSystem( Configuration configuration )
112    {
113    String name = configuration.get( "fs.default.name", "hdfs://localhost:5001/" );
114
115    if( name.equals( "local" ) || name.matches( ".*://.*" ) && !name.startsWith( "hdfs://" ) )
116      name = "hdfs://localhost:5001/";
117    else if( name.indexOf( '/' ) == -1 )
118      name = "hdfs://" + name;
119
120    try
121      {
122      return FileSystem.get( URI.create( name ), configuration );
123      }
124    catch( IOException exception )
125      {
126      throw new TapException( "unable to get handle to get filesystem for: " + name, exception );
127      }
128    }
129  }