001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple; 022 023 import java.io.Closeable; 024 import java.io.IOException; 025 import java.util.Collections; 026 import java.util.Set; 027 028 import cascading.flow.FlowProcess; 029 import cascading.scheme.ConcreteCall; 030 import cascading.scheme.Scheme; 031 import cascading.util.CloseableIterator; 032 import cascading.util.SingleCloseableInputIterator; 033 import cascading.util.Util; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 037 /** 038 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling 039 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to 040 * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}. 041 * <p/> 042 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the 043 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method. 044 */ 045 public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator 046 { 047 /** Field LOG */ 048 private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class ); 049 050 private final FlowProcess<Config> flowProcess; 051 private final Scheme scheme; 052 private final CloseableIterator<Input> inputIterator; 053 private final Set<Class<? extends Exception>> permittedExceptions; 054 private ConcreteCall sourceCall; 055 056 private String identifier; 057 private boolean isComplete = false; 058 private boolean hasWaiting = false; 059 private TupleException currentException; 060 061 public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input ) 062 { 063 this( flowProcess, scheme, input, null ); 064 } 065 066 public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input, String identifier ) 067 { 068 this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier ); 069 } 070 071 public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator ) 072 { 073 this( flowProcess, scheme, inputIterator, null ); 074 } 075 076 public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier ) 077 { 078 super( scheme.getSourceFields() ); 079 this.flowProcess = flowProcess; 080 this.scheme = scheme; 081 this.inputIterator = inputIterator; 082 this.identifier = identifier; 083 084 Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS ); 085 086 if( permittedExceptions != null ) 087 this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" ); 088 else 089 this.permittedExceptions = Collections.emptySet(); 090 091 if( this.identifier == null || this.identifier.isEmpty() ) 092 this.identifier = "'unknown'"; 093 094 if( !inputIterator.hasNext() ) 095 { 096 isComplete = true; 097 return; 098 } 099 100 sourceCall = new ConcreteCall(); 101 102 sourceCall.setIncomingEntry( getTupleEntry() ); 103 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 104 105 try 106 { 107 this.scheme.sourcePrepare( flowProcess, sourceCall ); 108 } 109 catch( IOException exception ) 110 { 111 throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception ); 112 } 113 } 114 115 protected FlowProcess<Config> getFlowProcess() 116 { 117 return flowProcess; 118 } 119 120 protected Input wrapInput( Input input ) 121 { 122 return input; 123 } 124 125 @Override 126 public boolean hasNext() 127 { 128 if( isComplete ) 129 return false; 130 131 if( hasWaiting ) 132 return true; 133 134 try 135 { 136 getNext(); 137 } 138 catch( Exception exception ) 139 { 140 if( identifier == null || identifier.isEmpty() ) 141 identifier = "'unknown'"; 142 143 if( permittedExceptions.contains( exception.getClass() ) ) 144 { 145 LOG.warn( "Caught permitted exception while reading {}", identifier, exception ); 146 return false; 147 } 148 149 currentException = new TupleException( "unable to read from input identifier: " + identifier, exception ); 150 151 return true; 152 } 153 154 if( !hasWaiting ) 155 isComplete = true; 156 157 return !isComplete; 158 } 159 160 private TupleEntry getNext() throws IOException 161 { 162 Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() ); 163 hasWaiting = scheme.source( flowProcess, sourceCall ); 164 165 if( !hasWaiting && inputIterator.hasNext() ) 166 { 167 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 168 169 return getNext(); 170 } 171 172 return getTupleEntry(); 173 } 174 175 @Override 176 public TupleEntry next() 177 { 178 try 179 { 180 if( currentException != null ) 181 throw currentException; 182 } 183 finally 184 { 185 currentException = null; // data may be trapped 186 } 187 188 if( isComplete ) 189 throw new IllegalStateException( "no next element" ); 190 191 try 192 { 193 if( hasWaiting ) 194 return getTupleEntry(); 195 196 return getNext(); 197 } 198 catch( Exception exception ) 199 { 200 throw new TupleException( "unable to source from input identifier: " + identifier, exception ); 201 } 202 finally 203 { 204 hasWaiting = false; 205 } 206 } 207 208 @Override 209 public void remove() 210 { 211 throw new UnsupportedOperationException( "may not remove elements from this iterator" ); 212 } 213 214 @Override 215 public void close() throws IOException 216 { 217 try 218 { 219 if( sourceCall != null ) 220 scheme.sourceCleanup( flowProcess, sourceCall ); 221 } 222 finally 223 { 224 inputIterator.close(); 225 } 226 } 227 }