001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.element; 023 024import cascading.CascadingException; 025import cascading.flow.FlowProcess; 026import cascading.flow.stream.duct.Duct; 027import cascading.flow.stream.duct.Reducing; 028import cascading.operation.GroupAssertion; 029import cascading.pipe.Every; 030import cascading.pipe.OperatorException; 031import cascading.tuple.Fields; 032import cascading.tuple.TupleEntry; 033 034/** 035 * 036 */ 037public class GroupAssertionEveryStage extends EveryStage<TupleEntry> implements Reducing<TupleEntry, TupleEntry> 038 { 039 private GroupAssertion groupAssertion; 040 private Reducing reducing; 041 042 public GroupAssertionEveryStage( FlowProcess flowProcess, Every every ) 043 { 044 super( flowProcess, every ); 045 } 046 047 @Override 048 protected Fields getIncomingPassThroughFields() 049 { 050 return incomingScopes.get( 0 ).getIncomingAggregatorPassThroughFields(); 051 } 052 053 @Override 054 protected Fields getIncomingArgumentsFields() 055 { 056 return incomingScopes.get( 0 ).getIncomingAggregatorArgumentFields(); 057 } 058 059 @Override 060 protected Fields getOutgoingSelector() 061 { 062 return outgoingScopes.get( 0 ).getOutGroupingSelector(); 063 } 064 065 @Override 066 public void initialize() 067 { 068 super.initialize(); 069 070 groupAssertion = every.getGroupAssertion(); 071 072 reducing = (Reducing) getNext(); 073 } 074 075 @Override 076 public void startGroup( Duct previous, TupleEntry groupEntry ) 077 { 078 operationCall.setGroup( groupEntry ); 079 operationCall.setArguments( null ); // zero it out 080 operationCall.setOutputCollector( null ); // zero it out 081 082 try 083 { 084 groupAssertion.start( flowProcess, operationCall ); 085 } 086 catch( CascadingException exception ) 087 { 088 handleException( exception, groupEntry ); 089 } 090 catch( Throwable throwable ) 091 { 092 handleException( new OperatorException( every, "operator Every failed starting operation: " + every.getOperation(), throwable ), groupEntry ); 093 } 094 095 reducing.startGroup( this, groupEntry ); 096 } 097 098 @Override 099 public void receive( Duct previous, int ordinal, TupleEntry tupleEntry ) 100 { 101 try 102 { 103 argumentsEntry.setTuple( argumentsBuilder.makeResult( tupleEntry.getTuple(), null ) ); 104 operationCall.setArguments( argumentsEntry ); 105 106 groupAssertion.aggregate( flowProcess, operationCall ); 107 } 108 catch( CascadingException exception ) 109 { 110 handleException( exception, argumentsEntry ); 111 } 112 catch( Throwable throwable ) 113 { 114 handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry ); 115 } 116 117 next.receive( this, 0, tupleEntry ); 118 } 119 120 @Override 121 public void completeGroup( Duct previous, TupleEntry incomingEntry ) 122 { 123 this.incomingEntry = incomingEntry; 124 operationCall.setArguments( null ); 125 126 try 127 { 128 groupAssertion.doAssert( flowProcess, operationCall ); // collector calls next 129 130 reducing.completeGroup( this, incomingEntry ); 131 } 132 catch( CascadingException exception ) 133 { 134 handleException( exception, incomingEntry ); 135 } 136 catch( Throwable throwable ) 137 { 138 handleException( new OperatorException( every, "operator Every failed completing operation: " + every.getOperation(), throwable ), incomingEntry ); 139 } 140 } 141 }