001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.io.IOException;
024import java.util.Iterator;
025
026import cascading.CascadingException;
027import cascading.flow.FlowProcess;
028import cascading.flow.stream.duct.Duct;
029import cascading.flow.stream.duct.Grouping;
030import cascading.flow.stream.duct.OpenWindow;
031import cascading.operation.Buffer;
032import cascading.pipe.Every;
033import cascading.pipe.OperatorException;
034import cascading.tuple.Fields;
035import cascading.tuple.Tuple;
036import cascading.tuple.TupleEntry;
037import cascading.tuple.TupleEntryCollector;
038import cascading.tuple.TupleEntryIterator;
039import cascading.tuple.Tuples;
040
041/**
042 *
043 */
044public class BufferEveryWindow extends EveryStage<Grouping<TupleEntry, TupleEntryIterator>> implements OpenWindow
045  {
046  Buffer buffer;
047
048  public BufferEveryWindow( FlowProcess flowProcess, Every every )
049    {
050    super( flowProcess, every );
051    }
052
053  @Override
054  public void initialize()
055    {
056    super.initialize();
057
058    buffer = every.getBuffer();
059
060    outputCollector = new TupleEntryCollector( getOperationDeclaredFields() )
061    {
062    @Override
063    protected void collect( TupleEntry resultEntry ) throws IOException
064      {
065      Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() );
066
067      outgoingEntry.setTuple( outgoing );
068
069      try
070        {
071        next.receive( BufferEveryWindow.this, outgoingEntry );
072        }
073      finally
074        {
075        Tuples.asModifiable( outgoing );
076        }
077      }
078    };
079    }
080
081  @Override
082  protected Fields getIncomingPassThroughFields()
083    {
084    return incomingScopes.get( 0 ).getIncomingBufferPassThroughFields();
085    }
086
087  @Override
088  protected Fields getIncomingArgumentsFields()
089    {
090    return incomingScopes.get( 0 ).getIncomingBufferArgumentFields();
091    }
092
093  @Override
094  protected Fields getOutgoingSelector()
095    {
096    return outgoingScopes.get( 0 ).getOutGroupingSelector();
097    }
098
099  @Override
100  public void start( Duct previous )
101    {
102    next.start( this );
103    }
104
105  @Override
106  public void receive( Duct previous, final Grouping<TupleEntry, TupleEntryIterator> grouping )
107    {
108    try
109      {
110      // we want to null out any 'values' before and after the iterator begins/ends
111      // this allows buffers to emit tuples before next() and when hasNext() return false;
112      final TupleEntry tupleEntry = grouping.joinIterator.getTupleEntry();
113      incomingEntry = tupleEntry;
114
115      // if Fields.NONE are declared on the CoGroup, we don't provide arguments, only the joinerClosure
116      if( !tupleEntry.getFields().isNone() )
117        {
118        final Tuple valueNulledTuple = Tuples.setOnEmpty( tupleEntry, grouping.key );
119        tupleEntry.setTuple( valueNulledTuple );
120
121        operationCall.setArgumentsIterator( createArgumentsIterator( grouping, tupleEntry, valueNulledTuple ) );
122        }
123
124      operationCall.setOutputCollector( outputCollector );
125      operationCall.setJoinerClosure( grouping.joinerClosure );
126      operationCall.setGroup( grouping.key );
127
128      buffer.operate( flowProcess, operationCall );
129      }
130    catch( CascadingException exception )
131      {
132      handleException( exception, argumentsEntry );
133      }
134    catch( Throwable throwable )
135      {
136      handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry );
137      }
138    }
139
140  private Iterator<TupleEntry> createArgumentsIterator( final Grouping<TupleEntry, TupleEntryIterator> grouping, final TupleEntry tupleEntry, final Tuple valueNulledTuple )
141    {
142    return new Iterator<TupleEntry>()
143    {
144    public boolean hasNext()
145      {
146      boolean hasNext = grouping.joinIterator.hasNext();
147
148      if( !hasNext && !operationCall.isRetainValues() )
149        tupleEntry.setTuple( valueNulledTuple ); // null out footer entries
150
151      return hasNext;
152      }
153
154    public TupleEntry next()
155      {
156      argumentsEntry.setTuple( argumentsBuilder.makeResult( grouping.joinIterator.next().getTuple(), null ) );
157
158      return argumentsEntry;
159      }
160
161    public void remove()
162      {
163      grouping.joinIterator.remove();
164      }
165    };
166    }
167  }