001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.net.URI;
026    
027    import cascading.scheme.Scheme;
028    import cascading.tap.SinkMode;
029    import cascading.tap.TapException;
030    import cascading.tuple.Fields;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.mapred.JobConf;
033    
034    /**
035     * Class Dfs is a {@link cascading.tap.Tap} class that provides access to the Hadoop Distributed File System.
036     * <p/>
037     * Use the {@link URI} constructors to specify a different HDFS cluster than the default.
038     */
039    public class Dfs extends Hfs
040      {
041    
042      /**
043       * Constructor Dfs creates a new Dfs instance.
044       *
045       * @param fields of type Fields
046       * @param uri    of type URI
047       */
048      @ConstructorProperties({"fields", "uri"})
049      @Deprecated
050      public Dfs( Fields fields, URI uri )
051        {
052        super( fields, uri.getPath() );
053    
054        init( uri );
055        }
056    
057      /**
058       * Constructor Dfs creates a new Dfs instance.
059       *
060       * @param fields  of type Fields
061       * @param uri     of type URI
062       * @param replace of type boolean
063       */
064      @ConstructorProperties({"fields", "uri", "replace"})
065      @Deprecated
066      public Dfs( Fields fields, URI uri, boolean replace )
067        {
068        super( fields, uri.getPath(), replace );
069    
070        init( uri );
071        }
072    
073      /**
074       * Constructor Dfs creates a new Dfs instance.
075       *
076       * @param fields   of type Fields
077       * @param uri      of type URI
078       * @param sinkMode of type SinkMode
079       */
080      @ConstructorProperties({"fields", "uri", "sinkMode"})
081      @Deprecated
082      public Dfs( Fields fields, URI uri, SinkMode sinkMode )
083        {
084        super( fields, uri.getPath(), sinkMode );
085    
086        init( uri );
087        }
088    
089      /**
090       * Constructor Dfs creates a new Dfs instance.
091       *
092       * @param fields     of type Fields
093       * @param stringPath of type String
094       */
095      @ConstructorProperties({"fields", "stringPath"})
096      @Deprecated
097      public Dfs( Fields fields, String stringPath )
098        {
099        super( fields, stringPath );
100        }
101    
102      /**
103       * Constructor Dfs creates a new Dfs instance.
104       *
105       * @param fields     of type Fields
106       * @param stringPath of type String
107       * @param replace    of type boolean
108       */
109      @ConstructorProperties({"fields", "stringPath", "replace"})
110      @Deprecated
111      public Dfs( Fields fields, String stringPath, boolean replace )
112        {
113        super( fields, stringPath, replace );
114        }
115    
116      /**
117       * Constructor Dfs creates a new Dfs instance.
118       *
119       * @param fields     of type Fields
120       * @param stringPath of type String
121       * @param sinkMode   of type SinkMode
122       */
123      @ConstructorProperties({"fields", "stringPath", "sinkMode"})
124      @Deprecated
125      public Dfs( Fields fields, String stringPath, SinkMode sinkMode )
126        {
127        super( fields, stringPath, sinkMode );
128        }
129    
130      @ConstructorProperties({"scheme"})
131      Dfs( Scheme scheme )
132        {
133        super( scheme );
134        }
135    
136      /**
137       * Constructor Dfs creates a new Dfs instance.
138       *
139       * @param scheme of type Scheme
140       * @param uri    of type URI
141       */
142      @ConstructorProperties({"scheme", "uri"})
143      public Dfs( Scheme scheme, URI uri )
144        {
145        super( scheme, uri.getPath() );
146    
147        init( uri );
148        }
149    
150      /**
151       * Constructor Dfs creates a new Dfs instance.
152       *
153       * @param scheme  of type Scheme
154       * @param uri     of type URI
155       * @param replace of type boolean
156       */
157      @ConstructorProperties({"scheme", "uri", "replace"})
158      @Deprecated
159      public Dfs( Scheme scheme, URI uri, boolean replace )
160        {
161        super( scheme, uri.getPath(), replace );
162    
163        init( uri );
164        }
165    
166      /**
167       * Constructor Dfs creates a new Dfs instance.
168       *
169       * @param scheme   of type Scheme
170       * @param uri      of type URI
171       * @param sinkMode of type SinkMode
172       */
173      @ConstructorProperties({"scheme", "uri", "sinkMode"})
174      public Dfs( Scheme scheme, URI uri, SinkMode sinkMode )
175        {
176        super( scheme, uri.getPath(), sinkMode );
177    
178        init( uri );
179        }
180    
181      /**
182       * Constructor Dfs creates a new Dfs instance.
183       *
184       * @param scheme     of type Scheme
185       * @param stringPath of type String
186       */
187      @ConstructorProperties({"scheme", "stringPath"})
188      public Dfs( Scheme scheme, String stringPath )
189        {
190        super( scheme, stringPath );
191        }
192    
193      /**
194       * Constructor Dfs creates a new Dfs instance.
195       *
196       * @param scheme     of type Scheme
197       * @param stringPath of type String
198       * @param replace    of type boolean
199       */
200      @ConstructorProperties({"scheme", "stringPath", "replace"})
201      @Deprecated
202      public Dfs( Scheme scheme, String stringPath, boolean replace )
203        {
204        super( scheme, stringPath, replace );
205        }
206    
207      /**
208       * Constructor Dfs creates a new Dfs instance.
209       *
210       * @param scheme     of type Scheme
211       * @param stringPath of type String
212       * @param sinkMode   of type SinkMode
213       */
214      @ConstructorProperties({"scheme", "stringPath", "sinkMode"})
215      public Dfs( Scheme scheme, String stringPath, SinkMode sinkMode )
216        {
217        super( scheme, stringPath, sinkMode );
218        }
219    
220      private void init( URI uri )
221        {
222        if( !uri.getScheme().equalsIgnoreCase( "hdfs" ) )
223          throw new IllegalArgumentException( "uri must use the hdfs scheme" );
224    
225        setUriScheme( URI.create( uri.getScheme() + "://" + uri.getAuthority() ) );
226        }
227    
228      protected void setStringPath( String stringPath )
229        {
230        if( stringPath.matches( ".*://.*" ) && !stringPath.startsWith( "hdfs://" ) )
231          throw new IllegalArgumentException( "uri must use the hdfs scheme" );
232    
233        super.setStringPath( stringPath );
234        }
235    
236      @Override
237      protected FileSystem getDefaultFileSystem( JobConf jobConf )
238        {
239        String name = jobConf.get( "fs.default.name", "hdfs://localhost:5001/" );
240    
241        if( name.equals( "local" ) || name.matches( ".*://.*" ) && !name.startsWith( "hdfs://" ) )
242          name = "hdfs://localhost:5001/";
243        else if( name.indexOf( '/' ) == -1 )
244          name = "hdfs://" + name;
245    
246        try
247          {
248          return FileSystem.get( URI.create( name ), jobConf );
249          }
250        catch( IOException exception )
251          {
252          throw new TapException( "unable to get handle to get filesystem for: " + name, exception );
253          }
254        }
255      }