001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.util;
022    
023    import java.io.IOException;
024    import java.util.Set;
025    
026    import cascading.CascadingException;
027    import cascading.flow.FlowProcess;
028    import cascading.flow.planner.Scope;
029    import cascading.scheme.Scheme;
030    import cascading.scheme.SinkCall;
031    import cascading.scheme.SourceCall;
032    import cascading.scheme.hadoop.SequenceFile;
033    import cascading.tap.Tap;
034    import cascading.tap.hadoop.Hfs;
035    import cascading.tuple.Fields;
036    import cascading.tuple.Tuple;
037    import org.apache.hadoop.fs.Path;
038    import org.apache.hadoop.mapred.JobConf;
039    import org.apache.hadoop.mapred.OutputCollector;
040    import org.apache.hadoop.mapred.RecordReader;
041    import org.apache.hadoop.mapred.lib.NullOutputFormat;
042    
043    /** Class TempHfs creates a temporary {@link cascading.tap.Tap} instance for use internally. */
044    public class TempHfs extends Hfs
045      {
046      /** Field name */
047      final String name;
048      /** Field schemeClass */
049      private Class<? extends Scheme> schemeClass;
050      /** Field temporaryPath */
051    
052      /** Class NullScheme is a noop scheme used as a placeholder */
053      private static class NullScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object, Object>
054        {
055        @Override
056        public void sourceConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
057          {
058          // do nothing
059          }
060    
061        @Override
062        public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
063          {
064          conf.setOutputKeyClass( Tuple.class );
065          conf.setOutputValueClass( Tuple.class );
066          conf.setOutputFormat( NullOutputFormat.class );
067          }
068    
069        @Override
070        public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object, RecordReader> sourceCall ) throws IOException
071          {
072          return false;
073          }
074    
075        @Override
076        public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object, OutputCollector> sinkCall ) throws IOException
077          {
078          }
079        }
080    
081      /**
082       * Constructor TempHfs creates a new TempHfs instance.
083       *
084       * @param name   of type String
085       * @param isNull of type boolean
086       */
087      public TempHfs( JobConf conf, String name, boolean isNull )
088        {
089        super( isNull ? new NullScheme() : new SequenceFile()
090        {
091        } );
092        this.name = name;
093        this.stringPath = initTemporaryPath( conf, true );
094        }
095    
096      /**
097       * Constructor TempDfs creates a new TempDfs instance.
098       *
099       * @param name of type String
100       */
101      public TempHfs( JobConf conf, String name, Class<? extends Scheme> schemeClass )
102        {
103        this( conf, name, schemeClass, true );
104        }
105    
106      public TempHfs( JobConf conf, String name, Class<? extends Scheme> schemeClass, boolean unique )
107        {
108        this.name = name;
109    
110        if( schemeClass == null )
111          this.schemeClass = SequenceFile.class;
112        else
113          this.schemeClass = schemeClass;
114    
115        this.stringPath = initTemporaryPath( conf, unique );
116        }
117    
118      public Class<? extends Scheme> getSchemeClass()
119        {
120        return schemeClass;
121        }
122    
123      private String initTemporaryPath( JobConf conf, boolean unique )
124        {
125        String child = unique ? makeTemporaryPathDirString( name ) : name;
126    
127        return new Path( getTempPath( conf ), child ).toString();
128        }
129    
130      @Override
131      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
132        {
133        Fields fields = incomingScopes.iterator().next().getIncomingTapFields();
134    
135        setSchemeUsing( fields );
136    
137        return new Scope( fields );
138        }
139    
140      private void setSchemeUsing( Fields fields )
141        {
142        try
143          {
144          setScheme( schemeClass.getConstructor( Fields.class ).newInstance( fields ) );
145          }
146        catch( Exception exception )
147          {
148          throw new CascadingException( "unable to create specified scheme: " + schemeClass.getName(), exception );
149          }
150        }
151    
152      @Override
153      public boolean isTemporary()
154        {
155        return true;
156        }
157    
158      @Override
159      public String toString()
160        {
161        return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[" + name + "]";
162        }
163    
164      @Override
165      public boolean equals( Object object )
166        {
167        // we are only overriding since the path is lazily initialized.
168    
169        if( this == object )
170          return true;
171        if( object == null || getClass() != object.getClass() )
172          return false;
173        if( !super.equals( object ) )
174          return false;
175    
176        TempHfs tempHfs = (TempHfs) object;
177    
178        if( name != null ? !name.equals( tempHfs.name ) : tempHfs.name != null )
179          return false;
180    
181        return true;
182        }
183    
184      @Override
185      public int hashCode()
186        {
187        // don't use super hashCode() as path changes during runtime
188        return 31 * ( System.identityHashCode( this ) + name != null ? name.hashCode() : 0 );
189        }
190      }