001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.Iterator;
028    import java.util.Map;
029    import java.util.Set;
030    
031    import cascading.CascadingException;
032    import cascading.flow.FlowException;
033    import cascading.flow.FlowProcess;
034    import cascading.flow.hadoop.planner.HadoopFlowStepJob;
035    import cascading.flow.hadoop.util.HadoopUtil;
036    import cascading.flow.planner.BaseFlowStep;
037    import cascading.flow.planner.FlowStepJob;
038    import cascading.flow.planner.PlatformInfo;
039    import cascading.flow.planner.Scope;
040    import cascading.property.ConfigDef;
041    import cascading.tap.Tap;
042    import cascading.tap.hadoop.io.MultiInputFormat;
043    import cascading.tap.hadoop.util.Hadoop18TapUtil;
044    import cascading.tap.hadoop.util.TempHfs;
045    import cascading.tuple.Fields;
046    import cascading.tuple.Tuple;
047    import cascading.tuple.hadoop.TupleSerialization;
048    import cascading.tuple.hadoop.util.CoGroupingComparator;
049    import cascading.tuple.hadoop.util.CoGroupingPartitioner;
050    import cascading.tuple.hadoop.util.GroupingComparator;
051    import cascading.tuple.hadoop.util.GroupingPartitioner;
052    import cascading.tuple.hadoop.util.GroupingSortingComparator;
053    import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
054    import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
055    import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
056    import cascading.tuple.hadoop.util.ReverseTupleComparator;
057    import cascading.tuple.hadoop.util.TupleComparator;
058    import cascading.tuple.io.IndexTuple;
059    import cascading.tuple.io.TuplePair;
060    import cascading.util.Util;
061    import cascading.util.Version;
062    import org.apache.hadoop.filecache.DistributedCache;
063    import org.apache.hadoop.fs.Path;
064    import org.apache.hadoop.mapred.FileOutputFormat;
065    import org.apache.hadoop.mapred.JobConf;
066    
067    import static cascading.flow.hadoop.util.HadoopUtil.serializeBase64;
068    import static cascading.flow.hadoop.util.HadoopUtil.writeStateToDistCache;
069    
070    /**
071     *
072     */
073    public class HadoopFlowStep extends BaseFlowStep<JobConf>
074      {
075      /** Field mapperTraps */
076      private final Map<String, Tap> mapperTraps = new HashMap<String, Tap>();
077      /** Field reducerTraps */
078      private final Map<String, Tap> reducerTraps = new HashMap<String, Tap>();
079    
080      public HadoopFlowStep( String name, int stepNum )
081        {
082        super( name, stepNum );
083        }
084    
085      public JobConf getInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
086        {
087        JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig );
088    
089        // disable warning
090        conf.setBoolean( "mapred.used.genericoptionsparser", true );
091    
092        conf.setJobName( getStepDisplayName( conf.getInt( "cascading.step.display.id.truncate", Util.ID_LENGTH ) ) );
093    
094        conf.setOutputKeyClass( Tuple.class );
095        conf.setOutputValueClass( Tuple.class );
096    
097        conf.setMapRunnerClass( FlowMapper.class );
098        conf.setReducerClass( FlowReducer.class );
099    
100        // set for use by the shuffling phase
101        TupleSerialization.setSerializations( conf );
102    
103        initFromSources( flowProcess, conf );
104    
105        initFromSink( flowProcess, conf );
106    
107        initFromTraps( flowProcess, conf );
108    
109        initFromProcessConfigDef( conf );
110    
111        if( getSink().getScheme().getNumSinkParts() != 0 )
112          {
113          // if no reducer, set num map tasks to control parts
114          if( getGroup() != null )
115            conf.setNumReduceTasks( getSink().getScheme().getNumSinkParts() );
116          else
117            conf.setNumMapTasks( getSink().getScheme().getNumSinkParts() );
118          }
119    
120        conf.setOutputKeyComparatorClass( TupleComparator.class );
121    
122        if( getGroup() == null )
123          {
124          conf.setNumReduceTasks( 0 ); // disable reducers
125          }
126        else
127          {
128          // must set map output defaults when performing a reduce
129          conf.setMapOutputKeyClass( Tuple.class );
130          conf.setMapOutputValueClass( Tuple.class );
131          conf.setPartitionerClass( GroupingPartitioner.class );
132    
133          // handles the case the groupby sort should be reversed
134          if( getGroup().isSortReversed() )
135            conf.setOutputKeyComparatorClass( ReverseTupleComparator.class );
136    
137          addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors() );
138    
139          if( getGroup().isGroupBy() )
140            addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors() );
141    
142          if( !getGroup().isGroupBy() )
143            {
144            conf.setPartitionerClass( CoGroupingPartitioner.class );
145            conf.setMapOutputKeyClass( IndexTuple.class ); // allows groups to be sorted by index
146            conf.setMapOutputValueClass( IndexTuple.class );
147            conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index
148            conf.setOutputValueGroupingComparator( CoGroupingComparator.class );
149            }
150    
151          if( getGroup().isSorted() )
152            {
153            conf.setPartitionerClass( GroupingSortingPartitioner.class );
154            conf.setMapOutputKeyClass( TuplePair.class );
155    
156            if( getGroup().isSortReversed() )
157              conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class );
158            else
159              conf.setOutputKeyComparatorClass( GroupingSortingComparator.class );
160    
161            // no need to supply a reverse comparator, only equality is checked
162            conf.setOutputValueGroupingComparator( GroupingComparator.class );
163            }
164          }
165    
166        // perform last so init above will pass to tasks
167        String versionString = Version.getRelease();
168    
169        if( versionString != null )
170          conf.set( "cascading.version", versionString );
171    
172        conf.set( CASCADING_FLOW_STEP_ID, getID() );
173        conf.set( "cascading.flow.step.num", Integer.toString( getStepNum() ) );
174    
175        String stepState = pack( this, conf );
176    
177        // hadoop 20.2 doesn't like dist cache when using local mode
178        int maxSize = Short.MAX_VALUE;
179        if( isHadoopLocalMode( conf ) || stepState.length() < maxSize ) // seems safe
180          conf.set( "cascading.flow.step", stepState );
181        else
182          conf.set( "cascading.flow.step.path", writeStateToDistCache( conf, getID(), stepState ) );
183    
184        return conf;
185        }
186    
187      public boolean isHadoopLocalMode( JobConf conf )
188        {
189        return HadoopUtil.isLocal( conf );
190        }
191    
192      private String pack( Object object, JobConf conf )
193        {
194        try
195          {
196          return serializeBase64( object, conf, true );
197          }
198        catch( IOException exception )
199          {
200          throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception );
201          }
202        }
203    
204      protected FlowStepJob<JobConf> createFlowStepJob( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
205        {
206        try
207          {
208          JobConf initializedConfig = getInitializedConfig( flowProcess, parentConfig );
209    
210          setConf( initializedConfig );
211    
212          return new HadoopFlowStepJob( createClientState( flowProcess ), this, initializedConfig );
213          }
214        catch( NoClassDefFoundError error )
215          {
216          PlatformInfo platformInfo = HadoopUtil.getPlatformInfo();
217          String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1";
218    
219          logError( String.format( message, platformInfo.toString() ), error );
220    
221          throw error;
222          }
223        }
224    
225      /**
226       * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown.
227       *
228       * @param config of type JobConf
229       */
230      public void clean( JobConf config )
231        {
232        String stepStatePath = config.get( "cascading.flow.step.path" );
233    
234        if( stepStatePath != null )
235          {
236          try
237            {
238            HadoopUtil.removeStateFromDistCache( config, stepStatePath );
239            }
240          catch( IOException exception )
241            {
242            logWarn( "unable to remove step state file: " + stepStatePath, exception );
243            }
244          }
245    
246        if( tempSink != null )
247          {
248          try
249            {
250            tempSink.deleteResource( config );
251            }
252          catch( Exception exception )
253            {
254            // sink all exceptions, don't fail app
255            logWarn( "unable to remove temporary file: " + tempSink, exception );
256            }
257          }
258    
259        if( getSink().isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
260          {
261          try
262            {
263            getSink().deleteResource( config );
264            }
265          catch( Exception exception )
266            {
267            // sink all exceptions, don't fail app
268            logWarn( "unable to remove temporary file: " + getSink(), exception );
269            }
270          }
271        else
272          {
273          cleanTapMetaData( config, getSink() );
274          }
275    
276        for( Tap tap : getMapperTraps().values() )
277          cleanTapMetaData( config, tap );
278    
279        for( Tap tap : getReducerTraps().values() )
280          cleanTapMetaData( config, tap );
281    
282        }
283    
284      private void cleanTapMetaData( JobConf jobConf, Tap tap )
285        {
286        try
287          {
288          Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap );
289          }
290        catch( IOException exception )
291          {
292          // ignore exception
293          }
294        }
295    
296      private void addComparators( JobConf conf, String property, Map<String, Fields> map )
297        {
298        Iterator<Fields> fieldsIterator = map.values().iterator();
299    
300        if( !fieldsIterator.hasNext() )
301          return;
302    
303        Fields fields = fieldsIterator.next();
304    
305        if( fields.hasComparators() )
306          {
307          conf.set( property, pack( fields, conf ) );
308          return;
309          }
310    
311        // use resolved fields if there are no comparators.
312        Set<Scope> previousScopes = getPreviousScopes( getGroup() );
313    
314        fields = previousScopes.iterator().next().getOutValuesFields();
315    
316        if( fields.size() != 0 ) // allows fields.UNKNOWN to be used
317          conf.setInt( property + ".size", fields.size() );
318        }
319    
320      private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
321        {
322        if( !traps.isEmpty() )
323          {
324          JobConf trapConf = HadoopUtil.copyJobConf( conf );
325    
326          for( Tap tap : traps.values() )
327            tap.sinkConfInit( flowProcess, trapConf );
328          }
329        }
330    
331      protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf )
332        {
333        // handles case where same tap is used on multiple branches
334        // we do not want to init the same tap multiple times
335        Set<Tap> uniqueSources = getUniqueStreamedSources();
336    
337        JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ];
338        int i = 0;
339    
340        for( Tap tap : uniqueSources )
341          {
342          if( tap.getIdentifier() == null )
343            throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
344    
345          streamedJobs[ i ] = flowProcess.copyConfig( conf );
346    
347          streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
348    
349          tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
350    
351          i++;
352          }
353    
354        Set<Tap> accumulatedSources = getAllAccumulatedSources();
355    
356        for( Tap tap : accumulatedSources )
357          {
358          JobConf accumulatedJob = flowProcess.copyConfig( conf );
359    
360          tap.sourceConfInit( flowProcess, accumulatedJob );
361    
362          Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob );
363          conf.set( "cascading.step.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
364    
365          try
366            {
367            if( DistributedCache.getCacheFiles( accumulatedJob ) != null )
368              DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf );
369            }
370          catch( IOException exception )
371            {
372            throw new CascadingException( exception );
373            }
374          }
375    
376        MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last
377        }
378    
379      public Tap getTapForID( Set<Tap> taps, String id )
380        {
381        for( Tap tap : taps )
382          {
383          if( Tap.id( tap ).equals( id ) )
384            return tap;
385          }
386    
387        return null;
388        }
389    
390      private void initFromProcessConfigDef( final JobConf conf )
391        {
392        initConfFromProcessConfigDef( getSetterFor( conf ) );
393        }
394    
395      private ConfigDef.Setter getSetterFor( final JobConf conf )
396        {
397        return new ConfigDef.Setter()
398        {
399        @Override
400        public String set( String key, String value )
401          {
402          String oldValue = get( key );
403    
404          conf.set( key, value );
405    
406          return oldValue;
407          }
408    
409        @Override
410        public String update( String key, String value )
411          {
412          String oldValue = get( key );
413    
414          if( oldValue == null )
415            conf.set( key, value );
416          else if( !oldValue.contains( value ) )
417            conf.set( key, oldValue + "," + value );
418    
419          return oldValue;
420          }
421    
422        @Override
423        public String get( String key )
424          {
425          String value = conf.get( key );
426    
427          if( value == null || value.isEmpty() )
428            return null;
429    
430          return value;
431          }
432        };
433        }
434    
435      /**
436       * sources are specific to step, remove all known accumulated sources, if any
437       *
438       * @return
439       */
440      private Set<Tap> getUniqueStreamedSources()
441        {
442        HashSet<Tap> set = new HashSet<Tap>( sources.keySet() );
443    
444        set.removeAll( getAllAccumulatedSources() );
445    
446        return set;
447        }
448    
449      protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
450        {
451        // init sink first so tempSink can take precedence
452        if( getSink() != null )
453          getSink().sinkConfInit( flowProcess, conf );
454    
455        if( FileOutputFormat.getOutputPath( conf ) == null )
456          tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
457    
458        // tempSink exists because sink is writeDirect
459        if( tempSink != null )
460          tempSink.sinkConfInit( flowProcess, conf );
461        }
462    
463      protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf )
464        {
465        initFromTraps( flowProcess, conf, getMapperTraps() );
466        initFromTraps( flowProcess, conf, getReducerTraps() );
467        }
468    
469      @Override
470      public Set<Tap> getTraps()
471        {
472        Set<Tap> set = new HashSet<Tap>();
473    
474        set.addAll( mapperTraps.values() );
475        set.addAll( reducerTraps.values() );
476    
477        return Collections.unmodifiableSet( set );
478        }
479    
480      @Override
481      public Tap getTrap( String name )
482        {
483        Tap trap = getMapperTrap( name );
484    
485        if( trap == null )
486          trap = getReducerTrap( name );
487    
488        return trap;
489        }
490    
491      public Map<String, Tap> getMapperTraps()
492        {
493        return mapperTraps;
494        }
495    
496      public Map<String, Tap> getReducerTraps()
497        {
498        return reducerTraps;
499        }
500    
501      public Tap getMapperTrap( String name )
502        {
503        return getMapperTraps().get( name );
504        }
505    
506      public Tap getReducerTrap( String name )
507        {
508        return getReducerTraps().get( name );
509        }
510      }