001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple; 022 023 import java.io.Closeable; 024 import java.io.IOException; 025 026 import cascading.flow.FlowProcess; 027 import cascading.scheme.ConcreteCall; 028 import cascading.scheme.Scheme; 029 import cascading.util.CloseableIterator; 030 import cascading.util.SingleCloseableInputIterator; 031 032 /** 033 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling 034 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to 035 * {@link #next()}. 036 * <p/> 037 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the 038 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method. 039 */ 040 public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator 041 { 042 private final FlowProcess<Config> flowProcess; 043 private final Scheme scheme; 044 private final CloseableIterator<Input> inputIterator; 045 private ConcreteCall sourceCall; 046 047 private String identifier; 048 private boolean isComplete = false; 049 private boolean hasWaiting = false; 050 private TupleException currentException; 051 052 public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input ) 053 { 054 this( flowProcess, scheme, input, null ); 055 } 056 057 public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input, String identifier ) 058 { 059 this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier ); 060 } 061 062 public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator ) 063 { 064 this( flowProcess, scheme, inputIterator, null ); 065 } 066 067 public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier ) 068 { 069 super( scheme.getSourceFields() ); 070 this.flowProcess = flowProcess; 071 this.scheme = scheme; 072 this.inputIterator = inputIterator; 073 this.identifier = identifier; 074 075 if( this.identifier == null || this.identifier.isEmpty() ) 076 this.identifier = "'unknown'"; 077 078 if( !inputIterator.hasNext() ) 079 { 080 isComplete = true; 081 return; 082 } 083 084 sourceCall = new ConcreteCall(); 085 086 sourceCall.setIncomingEntry( getTupleEntry() ); 087 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 088 089 try 090 { 091 this.scheme.sourcePrepare( flowProcess, sourceCall ); 092 } 093 catch( IOException exception ) 094 { 095 throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception ); 096 } 097 } 098 099 protected FlowProcess<Config> getFlowProcess() 100 { 101 return flowProcess; 102 } 103 104 protected Input wrapInput( Input input ) 105 { 106 return input; 107 } 108 109 @Override 110 public boolean hasNext() 111 { 112 if( isComplete ) 113 return false; 114 115 if( hasWaiting ) 116 return true; 117 118 try 119 { 120 getNext(); 121 } 122 catch( Exception exception ) 123 { 124 if( identifier == null || identifier.isEmpty() ) 125 identifier = "'unknown'"; 126 127 currentException = new TupleException( "unable to read from input identifier: " + identifier, exception ); 128 return true; 129 } 130 131 if( !hasWaiting ) 132 isComplete = true; 133 134 return !isComplete; 135 } 136 137 private TupleEntry getNext() throws IOException 138 { 139 Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() ); 140 hasWaiting = scheme.source( flowProcess, sourceCall ); 141 142 if( !hasWaiting && inputIterator.hasNext() ) 143 { 144 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 145 146 return getNext(); 147 } 148 149 return getTupleEntry(); 150 } 151 152 @Override 153 public TupleEntry next() 154 { 155 try 156 { 157 if( currentException != null ) 158 throw currentException; 159 } 160 finally 161 { 162 currentException = null; // data may be trapped 163 } 164 165 if( isComplete ) 166 throw new IllegalStateException( "no next element" ); 167 168 try 169 { 170 if( hasWaiting ) 171 return getTupleEntry(); 172 173 return getNext(); 174 } 175 catch( Exception exception ) 176 { 177 throw new TupleException( "unable to source from input identifier: " + identifier, exception ); 178 } 179 finally 180 { 181 hasWaiting = false; 182 } 183 } 184 185 @Override 186 public void remove() 187 { 188 throw new UnsupportedOperationException( "may not remove elements from this iterator" ); 189 } 190 191 @Override 192 public void close() throws IOException 193 { 194 try 195 { 196 if( sourceCall != null ) 197 scheme.sourceCleanup( flowProcess, sourceCall ); 198 } 199 finally 200 { 201 inputIterator.close(); 202 } 203 } 204 }