001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.concurrent.Callable;
024
025import cascading.CascadingException;
026import cascading.flow.FlowProcess;
027import cascading.flow.SliceCounters;
028import cascading.flow.StepCounters;
029import cascading.flow.stream.duct.Duct;
030import cascading.flow.stream.duct.DuctException;
031import cascading.tap.Tap;
032import cascading.tuple.TupleEntry;
033import cascading.tuple.TupleEntryIterator;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 *
039 */
040public class SourceStage extends ElementStage<Void, TupleEntry> implements Callable<Throwable>, InputSource
041  {
042  private static final Logger LOG = LoggerFactory.getLogger( SourceStage.class );
043
044  private final Tap source;
045
046  public SourceStage( FlowProcess flowProcess, Tap source )
047    {
048    super( flowProcess, source );
049    this.source = source;
050    }
051
052  public Tap getSource()
053    {
054    return source;
055    }
056
057  @Override
058  public Throwable call() throws Exception
059    {
060    return map( null );
061    }
062
063  @Override
064  public void run( Object input ) throws Throwable
065    {
066    Throwable throwable = map( input );
067
068    if( throwable != null )
069      throw throwable;
070    }
071
072  private Throwable map( Object input )
073    {
074    Throwable localThrowable = null;
075    TupleEntryIterator iterator = null;
076
077    try
078      {
079      next.start( this );
080
081      // input may be null
082      iterator = source.openForRead( flowProcess, input );
083
084      while( iterator.hasNext() )
085        {
086        TupleEntry tupleEntry;
087
088        try
089          {
090          tupleEntry = iterator.next();
091          flowProcess.increment( StepCounters.Tuples_Read, 1 );
092          flowProcess.increment( SliceCounters.Tuples_Read, 1 );
093          }
094        catch( OutOfMemoryError error )
095          {
096          handleReThrowableException( "out of memory, try increasing task memory allocation", error );
097          continue;
098          }
099        catch( CascadingException exception )
100          {
101          handleException( exception, null );
102          continue;
103          }
104        catch( Throwable throwable )
105          {
106          handleException( new DuctException( "internal error", throwable ), null );
107          continue;
108          }
109
110        next.receive( this, tupleEntry );
111        }
112
113      next.complete( this );
114      }
115    catch( Throwable throwable )
116      {
117      if( !( throwable instanceof OutOfMemoryError ) )
118        LOG.error( "caught throwable", throwable );
119
120      return throwable;
121      }
122    finally
123      {
124      try
125        {
126        if( iterator != null )
127          iterator.close();
128        }
129      catch( Throwable currentThrowable )
130        {
131        if( !( currentThrowable instanceof OutOfMemoryError ) )
132          LOG.warn( "failed closing iterator", currentThrowable );
133
134        localThrowable = currentThrowable;
135        }
136      }
137
138    return localThrowable;
139    }
140
141  @Override
142  public void initialize()
143    {
144    }
145
146  @Override
147  public void receive( Duct previous, Void nada )
148    {
149    throw new UnsupportedOperationException( "use call() instead" );
150    }
151  }