001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.io.IOException; 024import java.util.Iterator; 025 026import cascading.CascadingException; 027import cascading.flow.FlowProcess; 028import cascading.flow.stream.duct.Duct; 029import cascading.flow.stream.duct.Grouping; 030import cascading.flow.stream.duct.OpenWindow; 031import cascading.operation.Buffer; 032import cascading.pipe.Every; 033import cascading.pipe.OperatorException; 034import cascading.tuple.Fields; 035import cascading.tuple.Tuple; 036import cascading.tuple.TupleEntry; 037import cascading.tuple.TupleEntryCollector; 038import cascading.tuple.TupleEntryIterator; 039import cascading.tuple.Tuples; 040 041/** 042 * 043 */ 044public class BufferEveryWindow extends EveryStage<Grouping<TupleEntry, TupleEntryIterator>> implements OpenWindow 045 { 046 Buffer buffer; 047 048 public BufferEveryWindow( FlowProcess flowProcess, Every every ) 049 { 050 super( flowProcess, every ); 051 } 052 053 @Override 054 public void initialize() 055 { 056 super.initialize(); 057 058 buffer = every.getBuffer(); 059 060 outputCollector = new TupleEntryCollector( getOperationDeclaredFields() ) 061 { 062 @Override 063 protected void collect( TupleEntry resultEntry ) throws IOException 064 { 065 Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() ); 066 067 outgoingEntry.setTuple( outgoing ); 068 069 try 070 { 071 next.receive( BufferEveryWindow.this, outgoingEntry ); 072 } 073 finally 074 { 075 Tuples.asModifiable( outgoing ); 076 } 077 } 078 }; 079 } 080 081 @Override 082 protected Fields getIncomingPassThroughFields() 083 { 084 return incomingScopes.get( 0 ).getIncomingBufferPassThroughFields(); 085 } 086 087 @Override 088 protected Fields getIncomingArgumentsFields() 089 { 090 return incomingScopes.get( 0 ).getIncomingBufferArgumentFields(); 091 } 092 093 @Override 094 protected Fields getOutgoingSelector() 095 { 096 return outgoingScopes.get( 0 ).getOutGroupingSelector(); 097 } 098 099 @Override 100 public void start( Duct previous ) 101 { 102 next.start( this ); 103 } 104 105 @Override 106 public void receive( Duct previous, final Grouping<TupleEntry, TupleEntryIterator> grouping ) 107 { 108 try 109 { 110 // we want to null out any 'values' before and after the iterator begins/ends 111 // this allows buffers to emit tuples before next() and when hasNext() return false; 112 final TupleEntry tupleEntry = grouping.joinIterator.getTupleEntry(); 113 incomingEntry = tupleEntry; 114 115 // if Fields.NONE are declared on the CoGroup, we don't provide arguments, only the joinerClosure 116 if( !tupleEntry.getFields().isNone() ) 117 { 118 final Tuple valueNulledTuple = Tuples.setOnEmpty( tupleEntry, grouping.key ); 119 tupleEntry.setTuple( valueNulledTuple ); 120 121 operationCall.setArgumentsIterator( createArgumentsIterator( grouping, tupleEntry, valueNulledTuple ) ); 122 } 123 124 operationCall.setOutputCollector( outputCollector ); 125 operationCall.setJoinerClosure( grouping.joinerClosure ); 126 operationCall.setGroup( grouping.key ); 127 128 buffer.operate( flowProcess, operationCall ); 129 } 130 catch( CascadingException exception ) 131 { 132 handleException( exception, argumentsEntry ); 133 } 134 catch( Throwable throwable ) 135 { 136 handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry ); 137 } 138 } 139 140 private Iterator<TupleEntry> createArgumentsIterator( final Grouping<TupleEntry, TupleEntryIterator> grouping, final TupleEntry tupleEntry, final Tuple valueNulledTuple ) 141 { 142 return new Iterator<TupleEntry>() 143 { 144 public boolean hasNext() 145 { 146 boolean hasNext = grouping.joinIterator.hasNext(); 147 148 if( !hasNext && !operationCall.isRetainValues() ) 149 tupleEntry.setTuple( valueNulledTuple ); // null out footer entries 150 151 return hasNext; 152 } 153 154 public TupleEntry next() 155 { 156 argumentsEntry.setTuple( argumentsBuilder.makeResult( grouping.joinIterator.next().getTuple(), null ) ); 157 158 return argumentsEntry; 159 } 160 161 public void remove() 162 { 163 grouping.joinIterator.remove(); 164 } 165 }; 166 } 167 }