001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.io.IOException;
024    
025    import cascading.CascadingException;
026    import cascading.flow.FlowProcess;
027    import cascading.operation.Function;
028    import cascading.pipe.Each;
029    import cascading.pipe.OperatorException;
030    import cascading.tuple.Fields;
031    import cascading.tuple.Tuple;
032    import cascading.tuple.TupleEntry;
033    import cascading.tuple.TupleEntryCollector;
034    import cascading.tuple.Tuples;
035    
036    /**
037     *
038     */
039    public class FunctionEachStage extends EachStage
040      {
041      private Function function;
042    
043      public FunctionEachStage( FlowProcess flowProcess, Each each )
044        {
045        super( flowProcess, each );
046        }
047    
048      @Override
049      protected Fields getIncomingPassThroughFields()
050        {
051        return incomingScopes.get( 0 ).getIncomingFunctionPassThroughFields();
052        }
053    
054      @Override
055      protected Fields getIncomingArgumentsFields()
056        {
057        return incomingScopes.get( 0 ).getIncomingFunctionArgumentFields();
058        }
059    
060      @Override
061      public void initialize()
062        {
063        super.initialize();
064    
065        function = each.getFunction();
066    
067        operationCall.setArguments( argumentsEntry );
068    
069        operationCall.setOutputCollector( new TupleEntryCollector( getOperationDeclaredFields() )
070        {
071        @Override
072        protected void collect( TupleEntry input ) throws IOException
073          {
074          Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), input.getTuple() );
075    
076          outgoingEntry.setTuple( outgoing );
077    
078          try
079            {
080            next.receive( FunctionEachStage.this, outgoingEntry );
081            }
082          finally
083            {
084            Tuples.asModifiable( outgoing );
085            }
086          }
087        } );
088        }
089    
090      @Override
091      public void receive( Duct previous, TupleEntry incomingEntry )
092        {
093        this.incomingEntry = incomingEntry;
094    
095        argumentsEntry.setTuple( argumentsBuilder.makeResult( incomingEntry.getTuple(), null ) );
096    
097        try
098          {
099          function.operate( flowProcess, operationCall ); // adds results to collector
100          }
101        catch( CascadingException exception )
102          {
103          handleException( exception, argumentsEntry );
104          }
105        catch( Throwable throwable )
106          {
107          handleException( new OperatorException( each, "operator Each failed executing operation", throwable ), argumentsEntry );
108          }
109        }
110      }