001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop;
022
023import java.io.IOException;
024import java.net.URI;
025import java.util.ArrayList;
026import java.util.List;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.hadoop.util.HadoopUtil;
030import cascading.tap.DecoratorTap;
031import cascading.tap.MultiSourceTap;
032import cascading.tap.Tap;
033import cascading.tap.TapException;
034import cascading.tuple.TupleEntryIterator;
035import cascading.util.Util;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.mapred.OutputCollector;
041import org.apache.hadoop.mapred.RecordReader;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 *
047 */
048public abstract class BaseDistCacheTap extends DecoratorTap<Void, Configuration, RecordReader, OutputCollector>
049  {
050  /** logger. */
051  private static final Logger LOG = LoggerFactory.getLogger( BaseDistCacheTap.class );
052
053  public BaseDistCacheTap( Tap<Configuration, RecordReader, OutputCollector> original )
054    {
055    super( original );
056    }
057
058  @Override
059  public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
060    {
061    if( HadoopUtil.isLocal( conf ) ||
062      Tap.id( this ).equals( conf.get( "cascading.node.source" ) ) ||
063      Tap.id( this ).equals( conf.get( "cascading.step.source" ) ) )
064      {
065      LOG.info( "can't use distributed cache. reading '{}' from hdfs", super.getIdentifier() );
066      super.sourceConfInit( process, conf );
067      return;
068      }
069    try
070      {
071      registerHfs( process, conf, getHfs() );
072      }
073    catch( IOException exception )
074      {
075      throw new TapException( exception );
076      }
077    }
078
079  @Override
080  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
081    {
082    // always read via Hadoop FileSystem if in standalone/local mode, or if an RecordReader is provided
083    if( HadoopUtil.isLocal( flowProcess.getConfig() ) || input != null )
084      {
085      LOG.info( "delegating to parent" );
086      return super.openForRead( flowProcess, input );
087      }
088
089    Path[] cachedFiles = getLocalCacheFiles( flowProcess );
090
091    if( cachedFiles == null || cachedFiles.length == 0 )
092      return super.openForRead( flowProcess, null );
093
094    List<Path> paths = new ArrayList<>();
095    List<Tap> taps = new ArrayList<>();
096
097    if( isSimpleGlob() )
098      {
099      FileSystem fs = FileSystem.get( flowProcess.getConfig() );
100      FileStatus[] statuses = fs.globStatus( getHfs().getPath() );
101
102      for( FileStatus status : statuses )
103        paths.add( status.getPath() );
104      }
105    else
106      {
107      paths.add( getHfs().getPath() );
108      }
109
110    for( Path pathToFind : paths )
111      {
112      for( Path path : cachedFiles )
113        {
114        if( path.toString().endsWith( pathToFind.getName() ) )
115          {
116          LOG.info( "found {} in distributed cache", path );
117          taps.add( new Lfs( getScheme(), path.toString() ) );
118          }
119        }
120      }
121
122    if( paths.isEmpty() ) // not in cache, read from HDFS
123      {
124      LOG.info( "could not find files in local resource path. delegating to parent: {}", super.getIdentifier() );
125      return super.openForRead( flowProcess, input );
126      }
127
128    return new MultiSourceTap( taps.toArray( new Tap[ taps.size() ] ) ).openForRead( flowProcess, input );
129    }
130
131  private void registerHfs( FlowProcess<? extends Configuration> process, Configuration conf, Hfs hfs ) throws IOException
132    {
133    if( isSimpleGlob() )
134      {
135      FileSystem fs = FileSystem.get( conf );
136      FileStatus[] statuses = fs.globStatus( getHfs().getPath() );
137
138      if( statuses == null || statuses.length == 0 )
139        throw new TapException( String.format( "glob expression %s does not match any files on the filesystem", getHfs().getPath() ) );
140
141      for( FileStatus fileStatus : statuses )
142        registerURI( conf, fileStatus.getPath() );
143      }
144    else
145      {
146      registerURI( conf, hfs.getPath() );
147      }
148
149    hfs.sourceConfInitComplete( process, conf );
150    }
151
152  private void registerURI( Configuration conf, Path path )
153    {
154    URI uri = path.toUri();
155    LOG.info( "adding {} to local resource configuration ", uri );
156    addLocalCacheFiles( conf, uri );
157    }
158
159  private Hfs getHfs()
160    {
161    return (Hfs) getOriginal();
162    }
163
164  private boolean isSimpleGlob()
165    {
166    if( Util.isEmpty( getHfs().getIdentifier() ) )
167      return false;
168
169    return getHfs().getIdentifier().contains( "*" );
170    }
171
172  protected abstract Path[] getLocalCacheFiles( FlowProcess<? extends Configuration> flowProcess ) throws IOException;
173
174  protected abstract void addLocalCacheFiles( Configuration conf, URI uri );
175  }