001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Collections;
026import java.util.Set;
027
028import cascading.flow.FlowProcess;
029import cascading.scheme.ConcreteCall;
030import cascading.scheme.Scheme;
031import cascading.util.CloseableIterator;
032import cascading.util.SingleCloseableInputIterator;
033import cascading.util.Util;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling
039 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to
040 * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}.
041 * <p/>
042 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
043 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method.
044 */
045public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator
046  {
047  /** Field LOG */
048  private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class );
049
050  private final FlowProcess<? extends Config> flowProcess;
051  private final Scheme scheme;
052  private final CloseableIterator<Input> inputIterator;
053  private final Set<Class<? extends Exception>> permittedExceptions;
054  private ConcreteCall sourceCall;
055
056  private String identifier;
057  private boolean isComplete = false;
058  private boolean hasWaiting = false;
059  private TupleException currentException;
060
061  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input )
062    {
063    this( flowProcess, scheme, input, null );
064    }
065
066  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input, String identifier )
067    {
068    this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier );
069    }
070
071  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator )
072    {
073    this( flowProcess, scheme, inputIterator, null );
074    }
075
076  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier )
077    {
078    super( scheme.getSourceFields() );
079    this.flowProcess = flowProcess;
080    this.scheme = scheme;
081    this.inputIterator = inputIterator;
082    this.identifier = identifier;
083
084    Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS );
085
086    if( permittedExceptions != null )
087      this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" );
088    else
089      this.permittedExceptions = Collections.emptySet();
090
091    if( this.identifier == null || this.identifier.isEmpty() )
092      this.identifier = "'unknown'";
093
094    if( !inputIterator.hasNext() )
095      {
096      isComplete = true;
097      return;
098      }
099
100    sourceCall = new ConcreteCall();
101
102    sourceCall.setIncomingEntry( getTupleEntry() );
103    sourceCall.setInput( wrapInput( inputIterator.next() ) );
104
105    try
106      {
107      this.scheme.sourcePrepare( flowProcess, sourceCall );
108      }
109    catch( IOException exception )
110      {
111      throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception );
112      }
113    }
114
115  protected FlowProcess<? extends Config> getFlowProcess()
116    {
117    return flowProcess;
118    }
119
120  protected Input wrapInput( Input input )
121    {
122    return input;
123    }
124
125  @Override
126  public boolean hasNext()
127    {
128    if( currentException != null )
129      return true;
130
131    if( isComplete )
132      return false;
133
134    if( hasWaiting )
135      return true;
136
137    try
138      {
139      getNext();
140      }
141    catch( Exception exception )
142      {
143      if( identifier == null || identifier.isEmpty() )
144        identifier = "'unknown'";
145
146      if( permittedExceptions.contains( exception.getClass() ) )
147        {
148        LOG.warn( "Caught permitted exception while reading {}", identifier, exception );
149        return false;
150        }
151
152      currentException = new TupleException( "unable to read from input identifier: " + identifier, exception );
153
154      return true;
155      }
156
157    if( !hasWaiting )
158      isComplete = true;
159
160    return !isComplete;
161    }
162
163  private TupleEntry getNext() throws IOException
164    {
165    Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
166    hasWaiting = scheme.source( flowProcess, sourceCall );
167
168    if( !hasWaiting && inputIterator.hasNext() )
169      {
170      sourceCall.setInput( wrapInput( inputIterator.next() ) );
171
172      return getNext();
173      }
174
175    return getTupleEntry();
176    }
177
178  @Override
179  public TupleEntry next()
180    {
181    try
182      {
183      if( currentException != null )
184        throw currentException;
185      }
186    finally
187      {
188      currentException = null; // data may be trapped
189      }
190
191    if( isComplete )
192      throw new IllegalStateException( "no next element" );
193
194    try
195      {
196      if( hasWaiting )
197        return getTupleEntry();
198
199      return getNext();
200      }
201    catch( Exception exception )
202      {
203      throw new TupleException( "unable to source from input identifier: " + identifier, exception );
204      }
205    finally
206      {
207      hasWaiting = false;
208      }
209    }
210
211  @Override
212  public void remove()
213    {
214    throw new UnsupportedOperationException( "may not remove elements from this iterator" );
215    }
216
217  @Override
218  public void close() throws IOException
219    {
220    try
221      {
222      if( sourceCall != null )
223        scheme.sourceCleanup( flowProcess, sourceCall );
224      }
225    finally
226      {
227      inputIterator.close();
228      }
229    }
230  }