001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.io.IOException;
024    
025    import cascading.CascadingException;
026    import cascading.flow.FlowProcess;
027    import cascading.operation.Aggregator;
028    import cascading.pipe.Every;
029    import cascading.pipe.OperatorException;
030    import cascading.tuple.Fields;
031    import cascading.tuple.Tuple;
032    import cascading.tuple.TupleEntry;
033    import cascading.tuple.TupleEntryCollector;
034    import cascading.tuple.Tuples;
035    
036    /**
037     *
038     */
039    public class AggregatorEveryStage extends EveryStage<TupleEntry> implements Reducing<TupleEntry, TupleEntry>
040      {
041      private Aggregator aggregator;
042      private Reducing reducing;
043    
044      public AggregatorEveryStage( FlowProcess flowProcess, Every every )
045        {
046        super( flowProcess, every );
047        }
048    
049      @Override
050      public void initialize()
051        {
052        super.initialize();
053    
054        aggregator = every.getAggregator();
055    
056        outputCollector = new TupleEntryCollector( getOperationDeclaredFields() )
057        {
058        @Override
059        protected void collect( TupleEntry resultEntry ) throws IOException
060          {
061          Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() );
062    
063          outgoingEntry.setTuple( outgoing );
064    
065          try
066            {
067            reducing.completeGroup( AggregatorEveryStage.this, outgoingEntry );
068            }
069          finally
070            {
071            Tuples.asModifiable( outgoing );
072            }
073          }
074        };
075    
076        reducing = (Reducing) getNext();
077        }
078    
079      @Override
080      protected Fields getIncomingPassThroughFields()
081        {
082        return incomingScopes.get( 0 ).getIncomingAggregatorPassThroughFields();
083        }
084    
085      @Override
086      protected Fields getIncomingArgumentsFields()
087        {
088        return incomingScopes.get( 0 ).getIncomingAggregatorArgumentFields();
089        }
090    
091      @Override
092      protected Fields getOutgoingSelector()
093        {
094        return outgoingScopes.get( 0 ).getOutGroupingSelector();
095        }
096    
097      @Override
098      public void startGroup( Duct previous, TupleEntry groupEntry )
099        {
100        operationCall.setGroup( groupEntry );
101        operationCall.setArguments( null );  // zero it out
102        operationCall.setOutputCollector( null ); // zero it out
103    
104        try
105          {
106          aggregator.start( flowProcess, operationCall );
107          }
108        catch( CascadingException exception )
109          {
110          handleException( exception, groupEntry );
111          }
112        catch( Throwable throwable )
113          {
114          handleException( new OperatorException( every, "operator Every failed starting operation: " + every.getOperation(), throwable ), groupEntry );
115          }
116    
117        reducing.startGroup( this, groupEntry );
118        }
119    
120      @Override
121      public void receive( Duct previous, TupleEntry tupleEntry )
122        {
123        try
124          {
125          argumentsEntry.setTuple( argumentsBuilder.makeResult( tupleEntry.getTuple(), null ) );
126          operationCall.setArguments( argumentsEntry );
127    
128          aggregator.aggregate( flowProcess, operationCall );
129          }
130        catch( CascadingException exception )
131          {
132          handleException( exception, argumentsEntry );
133          }
134        catch( Throwable throwable )
135          {
136          handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry );
137          }
138    
139        next.receive( this, tupleEntry );
140        }
141    
142      @Override
143      public void completeGroup( Duct previous, TupleEntry incomingEntry )
144        {
145        this.incomingEntry = incomingEntry;
146        operationCall.setArguments( null );
147        operationCall.setOutputCollector( outputCollector );
148    
149        try
150          {
151          aggregator.complete( flowProcess, operationCall ); // collector calls next
152          }
153        catch( CascadingException exception )
154          {
155          handleException( exception, incomingEntry );
156          }
157        catch( Throwable throwable )
158          {
159          handleException( new OperatorException( every, "operator Every failed completing operation: " + every.getOperation(), throwable ), incomingEntry );
160          }
161        }
162      }