001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple;
022    
023    import java.io.Closeable;
024    import java.io.IOException;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.scheme.ConcreteCall;
028    import cascading.scheme.Scheme;
029    import cascading.util.CloseableIterator;
030    import cascading.util.SingleCloseableInputIterator;
031    
032    /**
033     * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling
034     * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to
035     * {@link #next()}.
036     * <p/>
037     * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
038     * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method.
039     */
040    public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator
041      {
042      private final FlowProcess<Config> flowProcess;
043      private final Scheme scheme;
044      private final CloseableIterator<Input> inputIterator;
045      private ConcreteCall sourceCall;
046    
047      private String identifier;
048      private boolean isComplete = false;
049      private boolean hasWaiting = false;
050      private TupleException currentException;
051    
052      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input )
053        {
054        this( flowProcess, scheme, input, null );
055        }
056    
057      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input, String identifier )
058        {
059        this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier );
060        }
061    
062      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator )
063        {
064        this( flowProcess, scheme, inputIterator, null );
065        }
066    
067      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier )
068        {
069        super( scheme.getSourceFields() );
070        this.flowProcess = flowProcess;
071        this.scheme = scheme;
072        this.inputIterator = inputIterator;
073        this.identifier = identifier;
074    
075        if( this.identifier == null || this.identifier.isEmpty() )
076          this.identifier = "'unknown'";
077    
078        if( !inputIterator.hasNext() )
079          {
080          isComplete = true;
081          return;
082          }
083    
084        sourceCall = new ConcreteCall();
085    
086        sourceCall.setIncomingEntry( getTupleEntry() );
087        sourceCall.setInput( wrapInput( inputIterator.next() ) );
088    
089        try
090          {
091          this.scheme.sourcePrepare( flowProcess, sourceCall );
092          }
093        catch( IOException exception )
094          {
095          throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception );
096          }
097        }
098    
099      protected FlowProcess<Config> getFlowProcess()
100        {
101        return flowProcess;
102        }
103    
104      protected Input wrapInput( Input input )
105        {
106        return input;
107        }
108    
109      @Override
110      public boolean hasNext()
111        {
112        if( isComplete )
113          return false;
114    
115        if( hasWaiting )
116          return true;
117    
118        try
119          {
120          getNext();
121          }
122        catch( Exception exception )
123          {
124          if( identifier == null || identifier.isEmpty() )
125            identifier = "'unknown'";
126    
127          currentException = new TupleException( "unable to read from input identifier: " + identifier, exception );
128          return true;
129          }
130    
131        if( !hasWaiting )
132          isComplete = true;
133    
134        return !isComplete;
135        }
136    
137      private TupleEntry getNext() throws IOException
138        {
139        Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
140        hasWaiting = scheme.source( flowProcess, sourceCall );
141    
142        if( !hasWaiting && inputIterator.hasNext() )
143          {
144          sourceCall.setInput( wrapInput( inputIterator.next() ) );
145    
146          return getNext();
147          }
148    
149        return getTupleEntry();
150        }
151    
152      @Override
153      public TupleEntry next()
154        {
155        try
156          {
157          if( currentException != null )
158            throw currentException;
159          }
160        finally
161          {
162          currentException = null; // data may be trapped
163          }
164    
165        if( isComplete )
166          throw new IllegalStateException( "no next element" );
167    
168        try
169          {
170          if( hasWaiting )
171            return getTupleEntry();
172    
173          return getNext();
174          }
175        catch( Exception exception )
176          {
177          throw new TupleException( "unable to source from input identifier: " + identifier, exception );
178          }
179        finally
180          {
181          hasWaiting = false;
182          }
183        }
184    
185      @Override
186      public void remove()
187        {
188        throw new UnsupportedOperationException( "may not remove elements from this iterator" );
189        }
190    
191      @Override
192      public void close() throws IOException
193        {
194        try
195          {
196          if( sourceCall != null )
197            scheme.sourceCleanup( flowProcess, sourceCall );
198          }
199        finally
200          {
201          inputIterator.close();
202          }
203        }
204      }