001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.element; 023 024import java.io.IOException; 025import java.util.Iterator; 026 027import cascading.CascadingException; 028import cascading.flow.FlowProcess; 029import cascading.flow.stream.duct.Duct; 030import cascading.flow.stream.duct.Grouping; 031import cascading.flow.stream.duct.OpenWindow; 032import cascading.operation.Buffer; 033import cascading.pipe.Every; 034import cascading.pipe.OperatorException; 035import cascading.tuple.Fields; 036import cascading.tuple.Tuple; 037import cascading.tuple.TupleEntry; 038import cascading.tuple.TupleEntryCollector; 039import cascading.tuple.TupleEntryIterator; 040import cascading.tuple.Tuples; 041 042/** 043 * 044 */ 045public class BufferEveryWindow extends EveryStage<Grouping<TupleEntry, TupleEntryIterator>> implements OpenWindow 046 { 047 Buffer buffer; 048 049 public BufferEveryWindow( FlowProcess flowProcess, Every every ) 050 { 051 super( flowProcess, every ); 052 } 053 054 @Override 055 public void initialize() 056 { 057 super.initialize(); 058 059 buffer = every.getBuffer(); 060 061 outputCollector = new TupleEntryCollector( getOperationDeclaredFields() ) 062 { 063 @Override 064 protected void collect( TupleEntry resultEntry ) throws IOException 065 { 066 Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() ); 067 068 outgoingEntry.setTuple( outgoing ); 069 070 try 071 { 072 next.receive( BufferEveryWindow.this, 0, outgoingEntry ); 073 } 074 finally 075 { 076 Tuples.asModifiable( outgoing ); 077 } 078 } 079 }; 080 } 081 082 @Override 083 protected Fields getIncomingPassThroughFields() 084 { 085 return incomingScopes.get( 0 ).getIncomingBufferPassThroughFields(); 086 } 087 088 @Override 089 protected Fields getIncomingArgumentsFields() 090 { 091 return incomingScopes.get( 0 ).getIncomingBufferArgumentFields(); 092 } 093 094 @Override 095 protected Fields getOutgoingSelector() 096 { 097 return outgoingScopes.get( 0 ).getOutGroupingSelector(); 098 } 099 100 @Override 101 public void start( Duct previous ) 102 { 103 next.start( this ); 104 } 105 106 @Override 107 public void receive( Duct previous, int ordinal, final Grouping<TupleEntry, TupleEntryIterator> grouping ) 108 { 109 try 110 { 111 // we want to null out any 'values' before and after the iterator begins/ends 112 // this allows buffers to emit tuples before next() and when hasNext() return false; 113 final TupleEntry tupleEntry = grouping.joinIterator.getTupleEntry(); 114 incomingEntry = tupleEntry; 115 116 // if Fields.NONE are declared on the CoGroup, we don't provide arguments, only the joinerClosure 117 if( !tupleEntry.getFields().isNone() ) 118 { 119 final Tuple valueNulledTuple = Tuples.setOnEmpty( tupleEntry, grouping.key ); 120 tupleEntry.setTuple( valueNulledTuple ); 121 122 operationCall.setArgumentsIterator( createArgumentsIterator( grouping, tupleEntry, valueNulledTuple ) ); 123 } 124 125 operationCall.setOutputCollector( outputCollector ); 126 operationCall.setJoinerClosure( grouping.joinerClosure ); 127 operationCall.setGroup( grouping.key ); 128 129 buffer.operate( flowProcess, operationCall ); 130 } 131 catch( CascadingException exception ) 132 { 133 handleException( exception, argumentsEntry ); 134 } 135 catch( Throwable throwable ) 136 { 137 handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry ); 138 } 139 } 140 141 private Iterator<TupleEntry> createArgumentsIterator( final Grouping<TupleEntry, TupleEntryIterator> grouping, final TupleEntry tupleEntry, final Tuple valueNulledTuple ) 142 { 143 return new Iterator<TupleEntry>() 144 { 145 public boolean hasNext() 146 { 147 boolean hasNext = grouping.joinIterator.hasNext(); 148 149 if( !hasNext && !operationCall.isRetainValues() ) 150 tupleEntry.setTuple( valueNulledTuple ); // null out footer entries 151 152 return hasNext; 153 } 154 155 public TupleEntry next() 156 { 157 argumentsEntry.setTuple( argumentsBuilder.makeResult( grouping.joinIterator.next().getTuple(), null ) ); 158 159 return argumentsEntry; 160 } 161 162 public void remove() 163 { 164 grouping.joinIterator.remove(); 165 } 166 }; 167 } 168 }