001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Collections;
026import java.util.Set;
027
028import cascading.flow.FlowProcess;
029import cascading.scheme.ConcreteCall;
030import cascading.scheme.Scheme;
031import cascading.util.CloseableIterator;
032import cascading.util.SingleCloseableInputIterator;
033import cascading.util.Util;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling
039 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to
040 * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}.
041 * <p/>
042 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
043 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method.
044 */
045public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator
046  {
047  /** Field LOG */
048  private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class );
049
050  private final FlowProcess<? extends Config> flowProcess;
051  private final Scheme scheme;
052  private final CloseableIterator<Input> inputIterator;
053  private final Set<Class<? extends Exception>> permittedExceptions;
054  private ConcreteCall sourceCall;
055
056  private String identifier;
057  private boolean isComplete = false;
058  private boolean hasWaiting = false;
059  private TupleException currentException;
060
061  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input )
062    {
063    this( flowProcess, scheme, input, null );
064    }
065
066  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input, String identifier )
067    {
068    this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier );
069    }
070
071  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator )
072    {
073    this( flowProcess, scheme, inputIterator, null );
074    }
075
076  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier )
077    {
078    super( scheme.getSourceFields() );
079    this.flowProcess = flowProcess;
080    this.scheme = scheme;
081    this.inputIterator = inputIterator;
082    this.identifier = identifier;
083
084    Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS );
085
086    if( permittedExceptions != null )
087      this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" );
088    else
089      this.permittedExceptions = Collections.emptySet();
090
091    if( this.identifier == null || this.identifier.isEmpty() )
092      this.identifier = "'unknown'";
093
094    if( !inputIterator.hasNext() )
095      {
096      isComplete = true;
097      return;
098      }
099
100    sourceCall = new ConcreteCall();
101
102    sourceCall.setIncomingEntry( getTupleEntry() );
103    sourceCall.setInput( wrapInput( inputIterator.next() ) );
104
105    try
106      {
107      this.scheme.sourcePrepare( flowProcess, sourceCall );
108      }
109    catch( IOException exception )
110      {
111      throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception );
112      }
113    }
114
115  protected FlowProcess<? extends Config> getFlowProcess()
116    {
117    return flowProcess;
118    }
119
120  protected Input wrapInput( Input input )
121    {
122    return input;
123    }
124
125  @Override
126  public boolean hasNext()
127    {
128    if( currentException != null )
129      return true;
130
131    if( isComplete )
132      return false;
133
134    if( hasWaiting )
135      return true;
136
137    try
138      {
139      getNext();
140      }
141    catch( Exception exception )
142      {
143      if( identifier == null || identifier.isEmpty() )
144        identifier = "'unknown'";
145
146      if( permittedExceptions.contains( exception.getClass() ) )
147        {
148        LOG.warn( "Caught permitted exception while reading {}", identifier, exception );
149        return false;
150        }
151
152      currentException = new TupleException( "unable to read from input identifier: " + identifier, exception );
153
154      return true;
155      }
156
157    if( !hasWaiting )
158      isComplete = true;
159
160    return !isComplete;
161    }
162
163  private TupleEntry getNext() throws IOException
164    {
165    Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
166    hasWaiting = scheme.source( flowProcess, sourceCall );
167
168    while( !hasWaiting && inputIterator.hasNext() )
169      {
170      sourceCall.setInput( wrapInput( inputIterator.next() ) );
171
172      Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
173      hasWaiting = scheme.source( flowProcess, sourceCall );
174      }
175
176    return getTupleEntry();
177    }
178
179  @Override
180  public TupleEntry next()
181    {
182    try
183      {
184      if( currentException != null )
185        throw currentException;
186      }
187    finally
188      {
189      currentException = null; // data may be trapped
190      }
191
192    if( isComplete )
193      throw new IllegalStateException( "no next element" );
194
195    try
196      {
197      if( hasWaiting )
198        return getTupleEntry();
199
200      return getNext();
201      }
202    catch( Exception exception )
203      {
204      throw new TupleException( "unable to source from input identifier: " + identifier, exception );
205      }
206    finally
207      {
208      hasWaiting = false;
209      }
210    }
211
212  @Override
213  public void remove()
214    {
215    throw new UnsupportedOperationException( "may not remove elements from this iterator" );
216    }
217
218  @Override
219  public void close() throws IOException
220    {
221    try
222      {
223      if( sourceCall != null )
224        scheme.sourceCleanup( flowProcess, sourceCall );
225      }
226    finally
227      {
228      inputIterator.close();
229      }
230    }
231  }