001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.io.IOException; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.Set; 027 028import cascading.flow.Flow; 029import cascading.flow.FlowException; 030import cascading.flow.FlowListener; 031import cascading.scheme.hadoop.TextLine; 032import cascading.tap.MultiSourceTap; 033import cascading.tap.Tap; 034import cascading.tap.hadoop.Hfs; 035import cascading.tap.hadoop.PartitionTap; 036import cascading.util.Util; 037import org.apache.hadoop.fs.Path; 038 039/** 040 * Class FailOnMissingSuccessFlowListener is a {@link FlowListener} that tests that all sources to a {@link Flow} 041 * have a {@code _SUCCESS} file before allowing the Flow to execute. 042 * <p> 043 * If any source Tap is a directory, existence of {@code _SUCCESS} is made, if the source is a file, the existence test 044 * is skipped. 045 * <p> 046 * This listener will unwind any {@link PartitionTap} or {@link MultiSourceTap} instances looking for {@link Hfs} instances 047 * to verify. If any Tap is found that is not a PartitionTap, MultiSourceTap, or Hfs type, an error will be thrown. 048 */ 049public class FailOnMissingSuccessFlowListener implements FlowListener 050 { 051 @Override 052 public void onStarting( Flow flow ) 053 { 054 Map<String, Tap> sources = flow.getSources(); 055 056 for( Map.Entry<String, Tap> entry : sources.entrySet() ) 057 { 058 String key = entry.getKey(); 059 Tap value = entry.getValue(); 060 Set<Hfs> taps = Util.createIdentitySet(); 061 062 accumulate( taps, value ); 063 064 for( Hfs tap : taps ) 065 { 066 if( !testExists( flow, tap ) ) 067 throw new FlowException( "cannot start flow: " + flow.getName() + ", _SUCCESS file missing in tap: '" + key + "', at: " + value.getIdentifier() ); 068 } 069 } 070 } 071 072 public boolean testExists( Flow flow, Hfs tap ) 073 { 074 try 075 { 076 // don't test for _SUCCESS if the tap is a file, only if a directory 077 if( !tap.isDirectory( flow.getFlowProcess() ) ) 078 return true; 079 080 return new Hfs( new TextLine(), new Path( tap.getPath(), "_SUCCESS" ).toString() ).resourceExists( flow.getFlowProcess() ); 081 } 082 catch( IOException exception ) 083 { 084 throw new FlowException( exception ); 085 } 086 } 087 088 public void accumulate( Set<Hfs> taps, Tap value ) 089 { 090 if( value == null ) 091 return; 092 093 if( value instanceof Hfs ) 094 taps.add( (Hfs) value ); 095 else if( value instanceof PartitionTap ) 096 taps.add( (Hfs) ( (PartitionTap) value ).getParent() ); 097 else if( value instanceof MultiSourceTap ) 098 iterate( taps, (MultiSourceTap) value ); 099 else 100 throw new IllegalArgumentException( "unsupprted Tap type: " + value.getClass().getName() ); 101 } 102 103 public void iterate( Set<Hfs> taps, MultiSourceTap value ) 104 { 105 Iterator<Tap> childTaps = value.getChildTaps(); 106 107 while( childTaps.hasNext() ) 108 accumulate( taps, childTaps.next() ); 109 } 110 111 @Override 112 public void onStopping( Flow flow ) 113 { 114 115 } 116 117 @Override 118 public void onCompleted( Flow flow ) 119 { 120 121 } 122 123 @Override 124 public boolean onThrowable( Flow flow, Throwable throwable ) 125 { 126 return false; 127 } 128 }