001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple;
022    
023    import java.io.Closeable;
024    import java.io.IOException;
025    import java.util.Collections;
026    import java.util.Set;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.scheme.ConcreteCall;
030    import cascading.scheme.Scheme;
031    import cascading.util.CloseableIterator;
032    import cascading.util.SingleCloseableInputIterator;
033    import cascading.util.Util;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling
039     * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to
040     * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}.
041     * <p/>
042     * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
043     * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method.
044     */
045    public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator
046      {
047      /** Field LOG */
048      private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class );
049    
050      private final FlowProcess<Config> flowProcess;
051      private final Scheme scheme;
052      private final CloseableIterator<Input> inputIterator;
053      private final Set<Class<? extends Exception>> permittedExceptions;
054      private ConcreteCall sourceCall;
055    
056      private String identifier;
057      private boolean isComplete = false;
058      private boolean hasWaiting = false;
059      private TupleException currentException;
060    
061      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input )
062        {
063        this( flowProcess, scheme, input, null );
064        }
065    
066      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input, String identifier )
067        {
068        this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier );
069        }
070    
071      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator )
072        {
073        this( flowProcess, scheme, inputIterator, null );
074        }
075    
076      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier )
077        {
078        super( scheme.getSourceFields() );
079        this.flowProcess = flowProcess;
080        this.scheme = scheme;
081        this.inputIterator = inputIterator;
082        this.identifier = identifier;
083    
084        Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS );
085    
086        if( permittedExceptions != null )
087          this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" );
088        else
089          this.permittedExceptions = Collections.emptySet();
090    
091        if( this.identifier == null || this.identifier.isEmpty() )
092          this.identifier = "'unknown'";
093    
094        if( !inputIterator.hasNext() )
095          {
096          isComplete = true;
097          return;
098          }
099    
100        sourceCall = new ConcreteCall();
101    
102        sourceCall.setIncomingEntry( getTupleEntry() );
103        sourceCall.setInput( wrapInput( inputIterator.next() ) );
104    
105        try
106          {
107          this.scheme.sourcePrepare( flowProcess, sourceCall );
108          }
109        catch( IOException exception )
110          {
111          throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception );
112          }
113        }
114    
115      protected FlowProcess<Config> getFlowProcess()
116        {
117        return flowProcess;
118        }
119    
120      protected Input wrapInput( Input input )
121        {
122        return input;
123        }
124    
125      @Override
126      public boolean hasNext()
127        {
128        if( isComplete )
129          return false;
130    
131        if( hasWaiting )
132          return true;
133    
134        try
135          {
136          getNext();
137          }
138        catch( Exception exception )
139          {
140          if( identifier == null || identifier.isEmpty() )
141            identifier = "'unknown'";
142    
143          if( permittedExceptions.contains( exception.getClass() ) )
144            {
145            LOG.warn( "Caught permitted exception while reading {}", identifier, exception );
146            return false;
147            }
148    
149          currentException = new TupleException( "unable to read from input identifier: " + identifier, exception );
150    
151          return true;
152          }
153    
154        if( !hasWaiting )
155          isComplete = true;
156    
157        return !isComplete;
158        }
159    
160      private TupleEntry getNext() throws IOException
161        {
162        Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
163        hasWaiting = scheme.source( flowProcess, sourceCall );
164    
165        if( !hasWaiting && inputIterator.hasNext() )
166          {
167          sourceCall.setInput( wrapInput( inputIterator.next() ) );
168    
169          return getNext();
170          }
171    
172        return getTupleEntry();
173        }
174    
175      @Override
176      public TupleEntry next()
177        {
178        try
179          {
180          if( currentException != null )
181            throw currentException;
182          }
183        finally
184          {
185          currentException = null; // data may be trapped
186          }
187    
188        if( isComplete )
189          throw new IllegalStateException( "no next element" );
190    
191        try
192          {
193          if( hasWaiting )
194            return getTupleEntry();
195    
196          return getNext();
197          }
198        catch( Exception exception )
199          {
200          throw new TupleException( "unable to source from input identifier: " + identifier, exception );
201          }
202        finally
203          {
204          hasWaiting = false;
205          }
206        }
207    
208      @Override
209      public void remove()
210        {
211        throw new UnsupportedOperationException( "may not remove elements from this iterator" );
212        }
213    
214      @Override
215      public void close() throws IOException
216        {
217        try
218          {
219          if( sourceCall != null )
220            scheme.sourceCleanup( flowProcess, sourceCall );
221          }
222        finally
223          {
224          inputIterator.close();
225          }
226        }
227      }