001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.net.URI; 026 027import cascading.scheme.Scheme; 028import cascading.tap.SinkMode; 029import cascading.tap.TapException; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032 033/** 034 * Class Dfs is a {@link cascading.tap.Tap} class that provides access to the Hadoop Distributed File System. 035 * <p/> 036 * Use the {@link URI} constructors to specify a different HDFS cluster than the default. 037 */ 038public class Dfs extends Hfs 039 { 040 /** 041 * Constructor Dfs creates a new Dfs instance. 042 * 043 * @param scheme of type Scheme 044 * @param uri of type URI 045 */ 046 @ConstructorProperties({"scheme", "uri"}) 047 public Dfs( Scheme scheme, URI uri ) 048 { 049 super( scheme, uri.getPath() ); 050 051 init( uri ); 052 } 053 054 /** 055 * Constructor Dfs creates a new Dfs instance. 056 * 057 * @param scheme of type Scheme 058 * @param uri of type URI 059 * @param sinkMode of type SinkMode 060 */ 061 @ConstructorProperties({"scheme", "uri", "sinkMode"}) 062 public Dfs( Scheme scheme, URI uri, SinkMode sinkMode ) 063 { 064 super( scheme, uri.getPath(), sinkMode ); 065 066 init( uri ); 067 } 068 069 /** 070 * Constructor Dfs creates a new Dfs instance. 071 * 072 * @param scheme of type Scheme 073 * @param stringPath of type String 074 */ 075 @ConstructorProperties({"scheme", "stringPath"}) 076 public Dfs( Scheme scheme, String stringPath ) 077 { 078 super( scheme, stringPath ); 079 } 080 081 /** 082 * Constructor Dfs creates a new Dfs instance. 083 * 084 * @param scheme of type Scheme 085 * @param stringPath of type String 086 * @param sinkMode of type SinkMode 087 */ 088 @ConstructorProperties({"scheme", "stringPath", "sinkMode"}) 089 public Dfs( Scheme scheme, String stringPath, SinkMode sinkMode ) 090 { 091 super( scheme, stringPath, sinkMode ); 092 } 093 094 private void init( URI uri ) 095 { 096 if( !uri.getScheme().equalsIgnoreCase( "hdfs" ) ) 097 throw new IllegalArgumentException( "uri must use the hdfs scheme" ); 098 099 setUriScheme( URI.create( uri.getScheme() + "://" + uri.getAuthority() ) ); 100 } 101 102 protected void setStringPath( String stringPath ) 103 { 104 if( stringPath.matches( ".*://.*" ) && !stringPath.startsWith( "hdfs://" ) ) 105 throw new IllegalArgumentException( "uri must use the hdfs scheme" ); 106 107 super.setStringPath( stringPath ); 108 } 109 110 @Override 111 protected FileSystem getDefaultFileSystem( Configuration configuration ) 112 { 113 String name = configuration.get( "fs.default.name", "hdfs://localhost:5001/" ); 114 115 if( name.equals( "local" ) || name.matches( ".*://.*" ) && !name.startsWith( "hdfs://" ) ) 116 name = "hdfs://localhost:5001/"; 117 else if( name.indexOf( '/' ) == -1 ) 118 name = "hdfs://" + name; 119 120 try 121 { 122 return FileSystem.get( URI.create( name ), configuration ); 123 } 124 catch( IOException exception ) 125 { 126 throw new TapException( "unable to get handle to get filesystem for: " + name, exception ); 127 } 128 } 129 }