001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.io.IOException;
024
025import cascading.CascadingException;
026import cascading.flow.FlowProcess;
027import cascading.flow.stream.duct.Duct;
028import cascading.flow.stream.duct.Reducing;
029import cascading.operation.Aggregator;
030import cascading.pipe.Every;
031import cascading.pipe.OperatorException;
032import cascading.tuple.Fields;
033import cascading.tuple.Tuple;
034import cascading.tuple.TupleEntry;
035import cascading.tuple.TupleEntryCollector;
036import cascading.tuple.Tuples;
037
038/**
039 *
040 */
041public class AggregatorEveryStage extends EveryStage<TupleEntry> implements Reducing<TupleEntry, TupleEntry>
042  {
043  private Aggregator aggregator;
044  private Reducing reducing;
045
046  public AggregatorEveryStage( FlowProcess flowProcess, Every every )
047    {
048    super( flowProcess, every );
049    }
050
051  @Override
052  public void initialize()
053    {
054    super.initialize();
055
056    aggregator = every.getAggregator();
057
058    outputCollector = new TupleEntryCollector( getOperationDeclaredFields() )
059    {
060    @Override
061    protected void collect( TupleEntry resultEntry ) throws IOException
062      {
063      Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() );
064
065      outgoingEntry.setTuple( outgoing );
066
067      try
068        {
069        reducing.completeGroup( AggregatorEveryStage.this, outgoingEntry );
070        }
071      finally
072        {
073        Tuples.asModifiable( outgoing );
074        }
075      }
076    };
077
078    reducing = (Reducing) getNext();
079    }
080
081  @Override
082  protected Fields getIncomingPassThroughFields()
083    {
084    return incomingScopes.get( 0 ).getIncomingAggregatorPassThroughFields();
085    }
086
087  @Override
088  protected Fields getIncomingArgumentsFields()
089    {
090    return incomingScopes.get( 0 ).getIncomingAggregatorArgumentFields();
091    }
092
093  @Override
094  protected Fields getOutgoingSelector()
095    {
096    return outgoingScopes.get( 0 ).getOutGroupingSelector();
097    }
098
099  @Override
100  public void startGroup( Duct previous, TupleEntry groupEntry )
101    {
102    operationCall.setGroup( groupEntry );
103    operationCall.setArguments( null );  // zero it out
104    operationCall.setOutputCollector( null ); // zero it out
105
106    try
107      {
108      aggregator.start( flowProcess, operationCall );
109      }
110    catch( CascadingException exception )
111      {
112      handleException( exception, groupEntry );
113      }
114    catch( Throwable throwable )
115      {
116      handleException( new OperatorException( every, "operator Every failed starting operation: " + every.getOperation(), throwable ), groupEntry );
117      }
118
119    reducing.startGroup( this, groupEntry );
120    }
121
122  @Override
123  public void receive( Duct previous, TupleEntry tupleEntry )
124    {
125    try
126      {
127      argumentsEntry.setTuple( argumentsBuilder.makeResult( tupleEntry.getTuple(), null ) );
128      operationCall.setArguments( argumentsEntry );
129
130      aggregator.aggregate( flowProcess, operationCall );
131      }
132    catch( CascadingException exception )
133      {
134      handleException( exception, argumentsEntry );
135      }
136    catch( Throwable throwable )
137      {
138      handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry );
139      }
140
141    next.receive( this, tupleEntry );
142    }
143
144  @Override
145  public void completeGroup( Duct previous, TupleEntry incomingEntry )
146    {
147    this.incomingEntry = incomingEntry;
148    operationCall.setArguments( null );
149    operationCall.setOutputCollector( outputCollector );
150
151    try
152      {
153      aggregator.complete( flowProcess, operationCall ); // collector calls next
154      }
155    catch( CascadingException exception )
156      {
157      handleException( exception, incomingEntry );
158      }
159    catch( Throwable throwable )
160      {
161      handleException( new OperatorException( every, "operator Every failed completing operation: " + every.getOperation(), throwable ), incomingEntry );
162      }
163    }
164  }