001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.Iterator;
025import java.util.Map;
026import java.util.Set;
027
028import cascading.flow.Flow;
029import cascading.flow.FlowException;
030import cascading.flow.FlowListener;
031import cascading.scheme.hadoop.TextLine;
032import cascading.tap.MultiSourceTap;
033import cascading.tap.Tap;
034import cascading.tap.hadoop.Hfs;
035import cascading.tap.hadoop.PartitionTap;
036import cascading.util.Util;
037import org.apache.hadoop.fs.Path;
038
039/**
040 * Class FailOnMissingSuccessFlowListener is a {@link FlowListener} that tests that all sources to a {@link Flow}
041 * have a {@code _SUCCESS} file before allowing the Flow to execute.
042 * <p>
043 * If any source Tap is a directory, existence of {@code _SUCCESS} is made, if the source is a file, the existence test
044 * is skipped.
045 * <p>
046 * This listener will unwind any {@link PartitionTap} or {@link MultiSourceTap} instances looking for {@link Hfs} instances
047 * to verify. If any Tap is found that is not a PartitionTap, MultiSourceTap, or Hfs type, an error will be thrown.
048 */
049public class FailOnMissingSuccessFlowListener implements FlowListener
050  {
051  @Override
052  public void onStarting( Flow flow )
053    {
054    Map<String, Tap> sources = flow.getSources();
055
056    for( Map.Entry<String, Tap> entry : sources.entrySet() )
057      {
058      String key = entry.getKey();
059      Tap value = entry.getValue();
060      Set<Hfs> taps = Util.createIdentitySet();
061
062      accumulate( taps, value );
063
064      for( Hfs tap : taps )
065        {
066        if( !testExists( flow, tap ) )
067          throw new FlowException( "cannot start flow: " + flow.getName() + ", _SUCCESS file missing in tap: '" + key + "', at: " + value.getIdentifier() );
068        }
069      }
070    }
071
072  public boolean testExists( Flow flow, Hfs tap )
073    {
074    try
075      {
076      // don't test for _SUCCESS if the tap is a file, only if a directory
077      if( !tap.isDirectory( flow.getFlowProcess() ) )
078        return true;
079
080      return new Hfs( new TextLine(), new Path( tap.getPath(), "_SUCCESS" ).toString() ).resourceExists( flow.getFlowProcess() );
081      }
082    catch( IOException exception )
083      {
084      throw new FlowException( exception );
085      }
086    }
087
088  public void accumulate( Set<Hfs> taps, Tap value )
089    {
090    if( value == null )
091      return;
092
093    if( value instanceof Hfs )
094      taps.add( (Hfs) value );
095    else if( value instanceof PartitionTap )
096      taps.add( (Hfs) ( (PartitionTap) value ).getParent() );
097    else if( value instanceof MultiSourceTap )
098      iterate( taps, (MultiSourceTap) value );
099    else
100      throw new IllegalArgumentException( "unsupprted Tap type: " + value.getClass().getName() );
101    }
102
103  public void iterate( Set<Hfs> taps, MultiSourceTap value )
104    {
105    Iterator<Tap> childTaps = value.getChildTaps();
106
107    while( childTaps.hasNext() )
108      accumulate( taps, childTaps.next() );
109    }
110
111  @Override
112  public void onStopping( Flow flow )
113    {
114
115    }
116
117  @Override
118  public void onCompleted( Flow flow )
119    {
120
121    }
122
123  @Override
124  public boolean onThrowable( Flow flow, Throwable throwable )
125    {
126    return false;
127    }
128  }