001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.Iterator;
028    import java.util.Map;
029    import java.util.Set;
030    
031    import cascading.CascadingException;
032    import cascading.flow.FlowException;
033    import cascading.flow.FlowProcess;
034    import cascading.flow.hadoop.planner.HadoopFlowStepJob;
035    import cascading.flow.hadoop.util.HadoopUtil;
036    import cascading.flow.planner.BaseFlowStep;
037    import cascading.flow.planner.FlowStepJob;
038    import cascading.flow.planner.PlatformInfo;
039    import cascading.flow.planner.Scope;
040    import cascading.property.ConfigDef;
041    import cascading.tap.Tap;
042    import cascading.tap.hadoop.io.MultiInputFormat;
043    import cascading.tap.hadoop.util.Hadoop18TapUtil;
044    import cascading.tap.hadoop.util.TempHfs;
045    import cascading.tuple.Fields;
046    import cascading.tuple.Tuple;
047    import cascading.tuple.hadoop.TupleSerialization;
048    import cascading.tuple.hadoop.util.CoGroupingComparator;
049    import cascading.tuple.hadoop.util.CoGroupingPartitioner;
050    import cascading.tuple.hadoop.util.GroupingComparator;
051    import cascading.tuple.hadoop.util.GroupingPartitioner;
052    import cascading.tuple.hadoop.util.GroupingSortingComparator;
053    import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
054    import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
055    import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
056    import cascading.tuple.hadoop.util.ReverseTupleComparator;
057    import cascading.tuple.hadoop.util.TupleComparator;
058    import cascading.tuple.io.IndexTuple;
059    import cascading.tuple.io.TuplePair;
060    import cascading.util.Util;
061    import cascading.util.Version;
062    import org.apache.hadoop.filecache.DistributedCache;
063    import org.apache.hadoop.fs.Path;
064    import org.apache.hadoop.mapred.FileOutputFormat;
065    import org.apache.hadoop.mapred.JobConf;
066    import org.apache.hadoop.mapred.OutputFormat;
067    
068    import static cascading.flow.hadoop.util.HadoopUtil.serializeBase64;
069    import static cascading.flow.hadoop.util.HadoopUtil.writeStateToDistCache;
070    
071    /**
072     *
073     */
074    public class HadoopFlowStep extends BaseFlowStep<JobConf>
075      {
076      /** Field mapperTraps */
077      private final Map<String, Tap> mapperTraps = new HashMap<String, Tap>();
078      /** Field reducerTraps */
079      private final Map<String, Tap> reducerTraps = new HashMap<String, Tap>();
080    
081      public HadoopFlowStep( String name, int stepNum )
082        {
083        super( name, stepNum );
084        }
085    
086      public JobConf getInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
087        {
088        JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig );
089    
090        // disable warning
091        conf.setBoolean( "mapred.used.genericoptionsparser", true );
092    
093        conf.setJobName( getStepDisplayName( conf.getInt( "cascading.step.display.id.truncate", Util.ID_LENGTH ) ) );
094    
095        conf.setOutputKeyClass( Tuple.class );
096        conf.setOutputValueClass( Tuple.class );
097    
098        conf.setMapRunnerClass( FlowMapper.class );
099        conf.setReducerClass( FlowReducer.class );
100    
101        // set for use by the shuffling phase
102        TupleSerialization.setSerializations( conf );
103    
104        initFromSources( flowProcess, conf );
105    
106        initFromSink( flowProcess, conf );
107    
108        initFromTraps( flowProcess, conf );
109    
110        initFromProcessConfigDef( conf );
111    
112        if( getSink().getScheme().getNumSinkParts() != 0 )
113          {
114          // if no reducer, set num map tasks to control parts
115          if( getGroup() != null )
116            conf.setNumReduceTasks( getSink().getScheme().getNumSinkParts() );
117          else
118            conf.setNumMapTasks( getSink().getScheme().getNumSinkParts() );
119          }
120    
121        conf.setOutputKeyComparatorClass( TupleComparator.class );
122    
123        if( getGroup() == null )
124          {
125          conf.setNumReduceTasks( 0 ); // disable reducers
126          }
127        else
128          {
129          // must set map output defaults when performing a reduce
130          conf.setMapOutputKeyClass( Tuple.class );
131          conf.setMapOutputValueClass( Tuple.class );
132          conf.setPartitionerClass( GroupingPartitioner.class );
133    
134          // handles the case the groupby sort should be reversed
135          if( getGroup().isSortReversed() )
136            conf.setOutputKeyComparatorClass( ReverseTupleComparator.class );
137    
138          addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors() );
139    
140          if( getGroup().isGroupBy() )
141            addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors() );
142    
143          if( !getGroup().isGroupBy() )
144            {
145            conf.setPartitionerClass( CoGroupingPartitioner.class );
146            conf.setMapOutputKeyClass( IndexTuple.class ); // allows groups to be sorted by index
147            conf.setMapOutputValueClass( IndexTuple.class );
148            conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index
149            conf.setOutputValueGroupingComparator( CoGroupingComparator.class );
150            }
151    
152          if( getGroup().isSorted() )
153            {
154            conf.setPartitionerClass( GroupingSortingPartitioner.class );
155            conf.setMapOutputKeyClass( TuplePair.class );
156    
157            if( getGroup().isSortReversed() )
158              conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class );
159            else
160              conf.setOutputKeyComparatorClass( GroupingSortingComparator.class );
161    
162            // no need to supply a reverse comparator, only equality is checked
163            conf.setOutputValueGroupingComparator( GroupingComparator.class );
164            }
165          }
166    
167        // perform last so init above will pass to tasks
168        String versionString = Version.getRelease();
169    
170        if( versionString != null )
171          conf.set( "cascading.version", versionString );
172    
173        conf.set( CASCADING_FLOW_STEP_ID, getID() );
174        conf.set( "cascading.flow.step.num", Integer.toString( getStepNum() ) );
175    
176        String stepState = pack( this, conf );
177    
178        // hadoop 20.2 doesn't like dist cache when using local mode
179        int maxSize = Short.MAX_VALUE;
180        if( isHadoopLocalMode( conf ) || stepState.length() < maxSize ) // seems safe
181          conf.set( "cascading.flow.step", stepState );
182        else
183          conf.set( "cascading.flow.step.path", writeStateToDistCache( conf, getID(), stepState ) );
184    
185        return conf;
186        }
187    
188      public boolean isHadoopLocalMode( JobConf conf )
189        {
190        return HadoopUtil.isLocal( conf );
191        }
192    
193      private String pack( Object object, JobConf conf )
194        {
195        try
196          {
197          return serializeBase64( object, conf, true );
198          }
199        catch( IOException exception )
200          {
201          throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception );
202          }
203        }
204    
205      protected FlowStepJob<JobConf> createFlowStepJob( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
206        {
207        try
208          {
209          JobConf initializedConfig = getInitializedConfig( flowProcess, parentConfig );
210    
211          setConf( initializedConfig );
212    
213          return new HadoopFlowStepJob( createClientState( flowProcess ), this, initializedConfig );
214          }
215        catch( NoClassDefFoundError error )
216          {
217          PlatformInfo platformInfo = HadoopUtil.getPlatformInfo();
218          String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1";
219    
220          logError( String.format( message, platformInfo.toString() ), error );
221    
222          throw error;
223          }
224        }
225    
226      /**
227       * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown.
228       *
229       * @param config of type JobConf
230       */
231      public void clean( JobConf config )
232        {
233        String stepStatePath = config.get( "cascading.flow.step.path" );
234    
235        if( stepStatePath != null )
236          {
237          try
238            {
239            HadoopUtil.removeStateFromDistCache( config, stepStatePath );
240            }
241          catch( IOException exception )
242            {
243            logWarn( "unable to remove step state file: " + stepStatePath, exception );
244            }
245          }
246    
247        if( tempSink != null )
248          {
249          try
250            {
251            tempSink.deleteResource( config );
252            }
253          catch( Exception exception )
254            {
255            // sink all exceptions, don't fail app
256            logWarn( "unable to remove temporary file: " + tempSink, exception );
257            }
258          }
259    
260        if( getSink().isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
261          {
262          try
263            {
264            getSink().deleteResource( config );
265            }
266          catch( Exception exception )
267            {
268            // sink all exceptions, don't fail app
269            logWarn( "unable to remove temporary file: " + getSink(), exception );
270            }
271          }
272        else
273          {
274          cleanTapMetaData( config, getSink() );
275          }
276    
277        for( Tap tap : getMapperTraps().values() )
278          cleanTapMetaData( config, tap );
279    
280        for( Tap tap : getReducerTraps().values() )
281          cleanTapMetaData( config, tap );
282    
283        }
284    
285      private void cleanTapMetaData( JobConf jobConf, Tap tap )
286        {
287        try
288          {
289          Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap );
290          }
291        catch( IOException exception )
292          {
293          // ignore exception
294          }
295        }
296    
297      private void addComparators( JobConf conf, String property, Map<String, Fields> map )
298        {
299        Iterator<Fields> fieldsIterator = map.values().iterator();
300    
301        if( !fieldsIterator.hasNext() )
302          return;
303    
304        Fields fields = fieldsIterator.next();
305    
306        if( fields.hasComparators() )
307          {
308          conf.set( property, pack( fields, conf ) );
309          return;
310          }
311    
312        // use resolved fields if there are no comparators.
313        Set<Scope> previousScopes = getPreviousScopes( getGroup() );
314    
315        fields = previousScopes.iterator().next().getOutValuesFields();
316    
317        if( fields.size() != 0 ) // allows fields.UNKNOWN to be used
318          conf.setInt( property + ".size", fields.size() );
319        }
320    
321      private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
322        {
323        if( !traps.isEmpty() )
324          {
325          JobConf trapConf = HadoopUtil.copyJobConf( conf );
326    
327          for( Tap tap : traps.values() )
328            tap.sinkConfInit( flowProcess, trapConf );
329          }
330        }
331    
332      protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf )
333        {
334        // handles case where same tap is used on multiple branches
335        // we do not want to init the same tap multiple times
336        Set<Tap> uniqueSources = getUniqueStreamedSources();
337    
338        JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ];
339        int i = 0;
340    
341        for( Tap tap : uniqueSources )
342          {
343          if( tap.getIdentifier() == null )
344            throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
345    
346          streamedJobs[ i ] = flowProcess.copyConfig( conf );
347    
348          streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
349    
350          tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
351    
352          i++;
353          }
354    
355        Set<Tap> accumulatedSources = getAllAccumulatedSources();
356    
357        for( Tap tap : accumulatedSources )
358          {
359          JobConf accumulatedJob = flowProcess.copyConfig( conf );
360    
361          tap.sourceConfInit( flowProcess, accumulatedJob );
362    
363          Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob );
364          conf.set( "cascading.step.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
365    
366          try
367            {
368            if( DistributedCache.getCacheFiles( accumulatedJob ) != null )
369              DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf );
370            }
371          catch( IOException exception )
372            {
373            throw new CascadingException( exception );
374            }
375          }
376    
377        MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last
378        }
379    
380      public Tap getTapForID( Set<Tap> taps, String id )
381        {
382        for( Tap tap : taps )
383          {
384          if( Tap.id( tap ).equals( id ) )
385            return tap;
386          }
387    
388        return null;
389        }
390    
391      private void initFromProcessConfigDef( final JobConf conf )
392        {
393        initConfFromProcessConfigDef( getSetterFor( conf ) );
394        }
395    
396      private ConfigDef.Setter getSetterFor( final JobConf conf )
397        {
398        return new ConfigDef.Setter()
399        {
400        @Override
401        public String set( String key, String value )
402          {
403          String oldValue = get( key );
404    
405          conf.set( key, value );
406    
407          return oldValue;
408          }
409    
410        @Override
411        public String update( String key, String value )
412          {
413          String oldValue = get( key );
414    
415          if( oldValue == null )
416            conf.set( key, value );
417          else if( !oldValue.contains( value ) )
418            conf.set( key, oldValue + "," + value );
419    
420          return oldValue;
421          }
422    
423        @Override
424        public String get( String key )
425          {
426          String value = conf.get( key );
427    
428          if( value == null || value.isEmpty() )
429            return null;
430    
431          return value;
432          }
433        };
434        }
435    
436      /**
437       * sources are specific to step, remove all known accumulated sources, if any
438       *
439       * @return
440       */
441      private Set<Tap> getUniqueStreamedSources()
442        {
443        HashSet<Tap> set = new HashSet<Tap>( sources.keySet() );
444    
445        set.removeAll( getAllAccumulatedSources() );
446    
447        return set;
448        }
449    
450      protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
451        {
452        // init sink first so tempSink can take precedence
453        if( getSink() != null )
454          getSink().sinkConfInit( flowProcess, conf );
455    
456        Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
457        boolean isFileOutputFormat = false;
458    
459        if( outputFormat != null )
460          isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
461    
462        Path outputPath = FileOutputFormat.getOutputPath( conf );
463    
464        // if no output path is set, we need to substitute an alternative if the OutputFormat is file based
465        // PartitionTap won't set the output, but will set an OutputFormat
466        // MultiSinkTap won't set the output or set the OutputFormat
467        // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
468        if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
469          tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
470    
471        // tempSink exists because sink is writeDirect
472        if( tempSink != null )
473          tempSink.sinkConfInit( flowProcess, conf );
474        }
475    
476      protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf )
477        {
478        initFromTraps( flowProcess, conf, getMapperTraps() );
479        initFromTraps( flowProcess, conf, getReducerTraps() );
480        }
481    
482      @Override
483      public Set<Tap> getTraps()
484        {
485        Set<Tap> set = new HashSet<Tap>();
486    
487        set.addAll( mapperTraps.values() );
488        set.addAll( reducerTraps.values() );
489    
490        return Collections.unmodifiableSet( set );
491        }
492    
493      @Override
494      public Tap getTrap( String name )
495        {
496        Tap trap = getMapperTrap( name );
497    
498        if( trap == null )
499          trap = getReducerTrap( name );
500    
501        return trap;
502        }
503    
504      public Map<String, Tap> getMapperTraps()
505        {
506        return mapperTraps;
507        }
508    
509      public Map<String, Tap> getReducerTraps()
510        {
511        return reducerTraps;
512        }
513    
514      public Tap getMapperTrap( String name )
515        {
516        return getMapperTraps().get( name );
517        }
518    
519      public Tap getReducerTrap( String name )
520        {
521        return getReducerTraps().get( name );
522        }
523      }