001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.Collections; 026import java.util.Set; 027 028import cascading.flow.FlowProcess; 029import cascading.scheme.ConcreteCall; 030import cascading.scheme.Scheme; 031import cascading.util.CloseableIterator; 032import cascading.util.SingleCloseableInputIterator; 033import cascading.util.Util; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling 039 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to 040 * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}. 041 * <p/> 042 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the 043 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method. 044 */ 045public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator 046 { 047 /** Field LOG */ 048 private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class ); 049 050 private final FlowProcess<? extends Config> flowProcess; 051 private final Scheme scheme; 052 private final CloseableIterator<Input> inputIterator; 053 private final Set<Class<? extends Exception>> permittedExceptions; 054 private ConcreteCall sourceCall; 055 056 private String identifier; 057 private boolean isComplete = false; 058 private boolean hasWaiting = false; 059 private TupleException currentException; 060 061 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input ) 062 { 063 this( flowProcess, scheme, input, null ); 064 } 065 066 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input, String identifier ) 067 { 068 this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier ); 069 } 070 071 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator ) 072 { 073 this( flowProcess, scheme, inputIterator, null ); 074 } 075 076 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier ) 077 { 078 super( scheme.getSourceFields() ); 079 this.flowProcess = flowProcess; 080 this.scheme = scheme; 081 this.inputIterator = inputIterator; 082 this.identifier = identifier; 083 084 Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS ); 085 086 if( permittedExceptions != null ) 087 this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" ); 088 else 089 this.permittedExceptions = Collections.emptySet(); 090 091 if( this.identifier == null || this.identifier.isEmpty() ) 092 this.identifier = "'unknown'"; 093 094 if( !inputIterator.hasNext() ) 095 { 096 isComplete = true; 097 return; 098 } 099 100 sourceCall = new ConcreteCall(); 101 102 sourceCall.setIncomingEntry( getTupleEntry() ); 103 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 104 105 try 106 { 107 this.scheme.sourcePrepare( flowProcess, sourceCall ); 108 } 109 catch( IOException exception ) 110 { 111 throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception ); 112 } 113 } 114 115 protected FlowProcess<? extends Config> getFlowProcess() 116 { 117 return flowProcess; 118 } 119 120 protected Input wrapInput( Input input ) 121 { 122 return input; 123 } 124 125 @Override 126 public boolean hasNext() 127 { 128 if( currentException != null ) 129 return true; 130 131 if( isComplete ) 132 return false; 133 134 if( hasWaiting ) 135 return true; 136 137 try 138 { 139 getNext(); 140 } 141 catch( Exception exception ) 142 { 143 if( identifier == null || identifier.isEmpty() ) 144 identifier = "'unknown'"; 145 146 if( permittedExceptions.contains( exception.getClass() ) ) 147 { 148 LOG.warn( "Caught permitted exception while reading {}", identifier, exception ); 149 return false; 150 } 151 152 currentException = new TupleException( "unable to read from input identifier: " + identifier, exception ); 153 154 return true; 155 } 156 157 if( !hasWaiting ) 158 isComplete = true; 159 160 return !isComplete; 161 } 162 163 private TupleEntry getNext() throws IOException 164 { 165 Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() ); 166 hasWaiting = scheme.source( flowProcess, sourceCall ); 167 168 if( !hasWaiting && inputIterator.hasNext() ) 169 { 170 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 171 172 return getNext(); 173 } 174 175 return getTupleEntry(); 176 } 177 178 @Override 179 public TupleEntry next() 180 { 181 try 182 { 183 if( currentException != null ) 184 throw currentException; 185 } 186 finally 187 { 188 currentException = null; // data may be trapped 189 } 190 191 if( isComplete ) 192 throw new IllegalStateException( "no next element" ); 193 194 try 195 { 196 if( hasWaiting ) 197 return getTupleEntry(); 198 199 return getNext(); 200 } 201 catch( Exception exception ) 202 { 203 throw new TupleException( "unable to source from input identifier: " + identifier, exception ); 204 } 205 finally 206 { 207 hasWaiting = false; 208 } 209 } 210 211 @Override 212 public void remove() 213 { 214 throw new UnsupportedOperationException( "may not remove elements from this iterator" ); 215 } 216 217 @Override 218 public void close() throws IOException 219 { 220 try 221 { 222 if( sourceCall != null ) 223 scheme.sourceCleanup( flowProcess, sourceCall ); 224 } 225 finally 226 { 227 inputIterator.close(); 228 } 229 } 230 }