001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.io;
022
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.net.HttpURLConnection;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.URL;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PathFilter;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * Class HttpFileSystem provides a basic read-only {@link FileSystem} for accessing remote HTTP and HTTPS data.
041 * <p/>
042 * To use this FileSystem, just use regular http:// or https:// URLs.
043 */
044public class HttpFileSystem extends StreamedFileSystem
045  {
046  /** Field LOG */
047  private static final Logger LOG = LoggerFactory.getLogger( HttpFileSystem.class );
048
049  /** Field HTTP_SCHEME */
050  public static final String HTTP_SCHEME = "http";
051  /** Field HTTPS_SCHEME */
052  public static final String HTTPS_SCHEME = "https";
053
054  static
055    {
056    HttpURLConnection.setFollowRedirects( true );
057    }
058
059  /** Field scheme */
060  private String scheme;
061  /** Field authority */
062  private String authority;
063
064  @Override
065  public void initialize( URI uri, Configuration configuration ) throws IOException
066    {
067    setConf( configuration );
068
069    scheme = uri.getScheme();
070    authority = uri.getAuthority();
071    }
072
073  @Override
074  public URI getUri()
075    {
076    try
077      {
078      return new URI( scheme, authority, null, null, null );
079      }
080    catch( URISyntaxException exception )
081      {
082      throw new RuntimeException( "failed parsing uri", exception );
083      }
084    }
085
086  @Override
087  public FileStatus[] globStatus( Path path, PathFilter pathFilter ) throws IOException
088    {
089    FileStatus fileStatus = getFileStatus( path );
090
091    if( fileStatus == null )
092      return null;
093
094    return new FileStatus[]{fileStatus};
095    }
096
097  @Override
098  public FSDataInputStream open( Path path, int i ) throws IOException
099    {
100    URL url = makeUrl( path );
101
102    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
103    connection.setRequestMethod( "GET" );
104    connection.connect();
105
106    debugConnection( connection );
107
108    return new FSDataInputStream( new FSDigestInputStream( connection.getInputStream(), getMD5SumFor( getConf(), path ) ) );
109    }
110
111  @Override
112  public boolean exists( Path path ) throws IOException
113    {
114    URL url = makeUrl( path );
115
116    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
117    connection.setRequestMethod( "HEAD" );
118    connection.connect();
119
120    debugConnection( connection );
121
122    return connection.getResponseCode() == 200;
123    }
124
125  @Override
126  public FileStatus getFileStatus( Path path ) throws IOException
127    {
128    URL url = makeUrl( path );
129
130    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
131    connection.setRequestMethod( "HEAD" );
132    connection.connect();
133
134    debugConnection( connection );
135
136    if( connection.getResponseCode() != 200 )
137      throw new FileNotFoundException( "could not find file: " + path );
138
139    long length = connection.getHeaderFieldInt( "Content-Length", 0 );
140
141    length = length < 0 ? 0 : length; // queries may return -1
142
143    long modified = connection.getHeaderFieldDate( "Last-Modified", System.currentTimeMillis() );
144
145    return new FileStatus( length, false, 1, getDefaultBlockSize(), modified, path );
146    }
147
148  private void debugConnection( HttpURLConnection connection ) throws IOException
149    {
150    if( LOG.isDebugEnabled() )
151      {
152      LOG.debug( "connection.getURL() = {}", connection.getURL() );
153      LOG.debug( "connection.getRequestMethod() = {}", connection.getRequestMethod() );
154      LOG.debug( "connection.getResponseCode() = {}", connection.getResponseCode() );
155      LOG.debug( "connection.getResponseMessage() = {}", connection.getResponseMessage() );
156      LOG.debug( "connection.getContentLength() = {}", connection.getContentLength() );
157      }
158    }
159
160  private URL makeUrl( Path path ) throws IOException
161    {
162    if( path.toString().startsWith( scheme ) )
163      return URI.create( path.toString() ).toURL();
164
165    try
166      {
167      return new URI( scheme, authority, path.toString(), null, null ).toURL();
168      }
169    catch( URISyntaxException exception )
170      {
171      throw new IOException( exception.getMessage() );
172      }
173    }
174  }