001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop.io; 022 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.net.HttpURLConnection; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.URL; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.PathFilter; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Class HttpFileSystem provides a basic read-only {@link FileSystem} for accessing remote HTTP and HTTPS data. 041 * <p/> 042 * To use this FileSystem, just use regular http:// or https:// URLs. 043 */ 044public class HttpFileSystem extends StreamedFileSystem 045 { 046 /** Field LOG */ 047 private static final Logger LOG = LoggerFactory.getLogger( HttpFileSystem.class ); 048 049 /** Field HTTP_SCHEME */ 050 public static final String HTTP_SCHEME = "http"; 051 /** Field HTTPS_SCHEME */ 052 public static final String HTTPS_SCHEME = "https"; 053 054 static 055 { 056 HttpURLConnection.setFollowRedirects( true ); 057 } 058 059 /** Field scheme */ 060 private String scheme; 061 /** Field authority */ 062 private String authority; 063 064 @Override 065 public void initialize( URI uri, Configuration configuration ) throws IOException 066 { 067 setConf( configuration ); 068 069 scheme = uri.getScheme(); 070 authority = uri.getAuthority(); 071 } 072 073 @Override 074 public URI getUri() 075 { 076 try 077 { 078 return new URI( scheme, authority, null, null, null ); 079 } 080 catch( URISyntaxException exception ) 081 { 082 throw new RuntimeException( "failed parsing uri", exception ); 083 } 084 } 085 086 @Override 087 public FileStatus[] globStatus( Path path, PathFilter pathFilter ) throws IOException 088 { 089 FileStatus fileStatus = getFileStatus( path ); 090 091 if( fileStatus == null ) 092 return null; 093 094 return new FileStatus[]{fileStatus}; 095 } 096 097 @Override 098 public FSDataInputStream open( Path path, int i ) throws IOException 099 { 100 URL url = makeUrl( path ); 101 102 HttpURLConnection connection = (HttpURLConnection) url.openConnection(); 103 connection.setRequestMethod( "GET" ); 104 connection.connect(); 105 106 debugConnection( connection ); 107 108 return new FSDataInputStream( new FSDigestInputStream( connection.getInputStream(), getMD5SumFor( getConf(), path ) ) ); 109 } 110 111 @Override 112 public boolean exists( Path path ) throws IOException 113 { 114 URL url = makeUrl( path ); 115 116 HttpURLConnection connection = (HttpURLConnection) url.openConnection(); 117 connection.setRequestMethod( "HEAD" ); 118 connection.connect(); 119 120 debugConnection( connection ); 121 122 return connection.getResponseCode() == 200; 123 } 124 125 @Override 126 public FileStatus getFileStatus( Path path ) throws IOException 127 { 128 URL url = makeUrl( path ); 129 130 HttpURLConnection connection = (HttpURLConnection) url.openConnection(); 131 connection.setRequestMethod( "HEAD" ); 132 connection.connect(); 133 134 debugConnection( connection ); 135 136 if( connection.getResponseCode() != 200 ) 137 throw new FileNotFoundException( "could not find file: " + path ); 138 139 long length = connection.getHeaderFieldInt( "Content-Length", 0 ); 140 141 length = length < 0 ? 0 : length; // queries may return -1 142 143 long modified = connection.getHeaderFieldDate( "Last-Modified", System.currentTimeMillis() ); 144 145 return new FileStatus( length, false, 1, getDefaultBlockSize(), modified, path ); 146 } 147 148 private void debugConnection( HttpURLConnection connection ) throws IOException 149 { 150 if( LOG.isDebugEnabled() ) 151 { 152 LOG.debug( "connection.getURL() = {}", connection.getURL() ); 153 LOG.debug( "connection.getRequestMethod() = {}", connection.getRequestMethod() ); 154 LOG.debug( "connection.getResponseCode() = {}", connection.getResponseCode() ); 155 LOG.debug( "connection.getResponseMessage() = {}", connection.getResponseMessage() ); 156 LOG.debug( "connection.getContentLength() = {}", connection.getContentLength() ); 157 } 158 } 159 160 private URL makeUrl( Path path ) throws IOException 161 { 162 if( path.toString().startsWith( scheme ) ) 163 return URI.create( path.toString() ).toURL(); 164 165 try 166 { 167 return new URI( scheme, authority, path.toString(), null, null ).toURL(); 168 } 169 catch( URISyntaxException exception ) 170 { 171 throw new IOException( exception.getMessage() ); 172 } 173 } 174 }