001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import cascading.flow.FlowElement; 024import cascading.flow.FlowProcess; 025import cascading.flow.planner.Scope; 026import cascading.flow.stream.duct.Duct; 027import cascading.operation.ConcreteCall; 028import cascading.pipe.Operator; 029import cascading.tuple.Fields; 030import cascading.tuple.Tuple; 031import cascading.tuple.TupleEntry; 032import cascading.tuple.TupleEntryCollector; 033import cascading.tuple.util.TupleBuilder; 034import cascading.tuple.util.TupleViews; 035 036import static cascading.tuple.util.TupleViews.*; 037 038/** 039 * 040 */ 041public abstract class OperatorStage<Incoming> extends ElementStage<Incoming, TupleEntry> 042 { 043 /** 044 * In 2.2 the collector is now nulled before the 045 * {@link cascading.operation.Operation#cleanup(cascading.flow.FlowProcess, cascading.operation.OperationCall)} 046 * is called. This property retains the collector to remain compatible with 2.1. 047 */ 048 public static final String RETAIN_COLLECTOR = "cascading.compatibility.retain.collector"; 049 050 protected ConcreteCall operationCall; 051 protected TupleEntry incomingEntry; 052 protected Fields argumentsSelector; 053 protected TupleEntry argumentsEntry; 054 protected Fields remainderFields; 055 protected Fields outgoingSelector; 056 protected TupleEntry outgoingEntry; 057 058 protected TupleBuilder argumentsBuilder; 059 protected TupleBuilder outgoingBuilder; 060 061 private final boolean retainCollector; 062 063 protected TupleEntryCollector outputCollector; 064 065 public OperatorStage( FlowProcess flowProcess, FlowElement flowElement ) 066 { 067 super( flowProcess, flowElement ); 068 069 this.retainCollector = Boolean.parseBoolean( flowProcess.getStringProperty( RETAIN_COLLECTOR ) ); 070 } 071 072 public abstract Operator getOperator(); 073 074 protected abstract Fields getOutgoingSelector(); 075 076 protected Fields getOperationDeclaredFields() 077 { 078 return outgoingScopes.get( 0 ).getOperationDeclaredFields(); 079 } 080 081 protected abstract Fields getIncomingPassThroughFields(); 082 083 protected abstract Fields getIncomingArgumentsFields(); 084 085 protected TupleBuilder createArgumentsBuilder( final Fields incomingFields, final Fields argumentsSelector ) 086 { 087 if( incomingFields.isUnknown() ) 088 return new TupleBuilder() 089 { 090 @Override 091 public Tuple makeResult( Tuple input, Tuple output ) 092 { 093 return input.get( incomingFields, argumentsSelector ); 094 } 095 }; 096 097 if( argumentsSelector.isAll() ) 098 return new TupleBuilder() 099 { 100 @Override 101 public Tuple makeResult( Tuple input, Tuple output ) 102 { 103 return input; 104 } 105 }; 106 107 if( argumentsSelector.isNone() ) 108 return new TupleBuilder() 109 { 110 @Override 111 public Tuple makeResult( Tuple input, Tuple output ) 112 { 113 return Tuple.NULL; 114 } 115 }; 116 117 final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields ); 118 119 return new TupleBuilder() 120 { 121 Tuple result = createNarrow( inputDeclarationFields.getPos( argumentsSelector ) ); 122 123 @Override 124 public Tuple makeResult( Tuple input, Tuple output ) 125 { 126 return TupleViews.reset( result, input ); 127 } 128 }; 129 } 130 131 protected TupleBuilder createOutgoingBuilder( final Operator operator, final Fields incomingFields, final Fields argumentSelector, final Fields remainderFields, final Fields declaredFields, final Fields outgoingSelector ) 132 { 133 final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields ); 134 135 if( operator.getOutputSelector().isResults() ) 136 return new TupleBuilder() 137 { 138 @Override 139 public Tuple makeResult( Tuple input, Tuple output ) 140 { 141 return output; 142 } 143 }; 144 145 if( operator.getOutputSelector().isAll() && !( incomingFields.isUnknown() || declaredFields.isUnknown() ) ) 146 return new TupleBuilder() 147 { 148 Tuple result = createComposite( inputDeclarationFields, declaredFields ); 149 150 @Override 151 public Tuple makeResult( Tuple input, Tuple output ) 152 { 153 return TupleViews.reset( result, input, output ); 154 } 155 }; 156 157 if( operator.getOutputSelector().isReplace() ) 158 { 159 if( incomingFields.isUnknown() ) 160 return new TupleBuilder() 161 { 162 Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields; 163 164 @Override 165 public Tuple makeResult( Tuple input, Tuple output ) 166 { 167 Tuple result = new Tuple( input ); 168 169 result.set( Fields.UNKNOWN, resultFields, output ); 170 171 return result; 172 } 173 }; 174 175 return new TupleBuilder() 176 { 177 Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields; 178 Tuple result = createOverride( inputDeclarationFields, resultFields ); 179 180 @Override 181 public Tuple makeResult( Tuple input, Tuple output ) 182 { 183 return TupleViews.reset( result, input, output ); 184 } 185 }; 186 } 187 188 if( operator.getOutputSelector().isSwap() ) 189 { 190 if( remainderFields.size() == 0 ) // the same as Fields.RESULTS 191 return new TupleBuilder() 192 { 193 @Override 194 public Tuple makeResult( Tuple input, Tuple output ) 195 { 196 return output; 197 } 198 }; 199 else if( declaredFields.isUnknown() ) 200 return new TupleBuilder() 201 { 202 @Override 203 public Tuple makeResult( Tuple input, Tuple output ) 204 { 205 return input.get( incomingFields, remainderFields ).append( output ); 206 } 207 }; 208 else 209 return new TupleBuilder() 210 { 211 Tuple view = createNarrow( inputDeclarationFields.getPos( remainderFields ) ); 212 Tuple result = createComposite( Fields.asDeclaration( remainderFields ), declaredFields ); 213 214 @Override 215 public Tuple makeResult( Tuple input, Tuple output ) 216 { 217 TupleViews.reset( view, input ); 218 219 return TupleViews.reset( result, view, output ); 220 } 221 }; 222 } 223 224 if( incomingFields.isUnknown() || declaredFields.isUnknown() ) 225 return new TupleBuilder() 226 { 227 Fields selector = outgoingSelector.isUnknown() ? Fields.ALL : outgoingSelector; 228 TupleEntry incoming = new TupleEntry( incomingFields, true ); 229 TupleEntry declared = new TupleEntry( declaredFields, true ); 230 231 @Override 232 public Tuple makeResult( Tuple input, Tuple output ) 233 { 234 incoming.setTuple( input ); 235 declared.setTuple( output ); 236 237 return TupleEntry.select( selector, incoming, declared ); 238 } 239 }; 240 241 return new TupleBuilder() 242 { 243 Fields inputFields = operator.getFieldDeclaration().isArguments() ? Fields.mask( inputDeclarationFields, declaredFields ) : inputDeclarationFields; 244 Tuple appended = createComposite( inputFields, declaredFields ); 245 Fields allFields = Fields.resolve( Fields.ALL, inputFields, declaredFields ); 246 Tuple result = createNarrow( allFields.getPos( outgoingSelector ), appended ); 247 248 @Override 249 public Tuple makeResult( Tuple input, Tuple output ) 250 { 251 TupleViews.reset( appended, input, output ); 252 253 return result; 254 } 255 }; 256 } 257 258 @Override 259 public void initialize() 260 { 261 Scope outgoingScope = outgoingScopes.get( 0 ); 262 263 operationCall = new ConcreteCall( outgoingScope.getArgumentsDeclarator(), outgoingScope.getOperationDeclaredFields() ); 264 265 argumentsSelector = outgoingScope.getArgumentsSelector(); 266 remainderFields = outgoingScope.getRemainderPassThroughFields(); 267 outgoingSelector = getOutgoingSelector(); 268 269 argumentsEntry = new TupleEntry( outgoingScope.getArgumentsDeclarator(), true ); 270 271 outgoingEntry = new TupleEntry( getOutgoingFields(), true ); 272 273 operationCall.setArguments( argumentsEntry ); 274 275 argumentsBuilder = createArgumentsBuilder( getIncomingArgumentsFields(), argumentsSelector ); 276 outgoingBuilder = createOutgoingBuilder( getOperator(), getIncomingPassThroughFields(), argumentsSelector, remainderFields, getOperationDeclaredFields(), outgoingSelector ); 277 } 278 279 @Override 280 public void prepare() 281 { 282 super.prepare(); // if fails, skip the this prepare 283 284 ( (Operator) getFlowElement() ).getOperation().prepare( flowProcess, operationCall ); 285 } 286 287 @Override 288 public void complete( Duct previous ) 289 { 290 try 291 { 292 ( (Operator) getFlowElement() ).getOperation().flush( flowProcess, operationCall ); 293 } 294 finally 295 { 296 super.complete( previous ); 297 } 298 } 299 300 @Override 301 public void cleanup() 302 { 303 if( !retainCollector ) // see comments for RETAIN_COLLECTOR 304 operationCall.setOutputCollector( null ); 305 306 try 307 { 308 ( (Operator) getFlowElement() ).getOperation().cleanup( flowProcess, operationCall ); 309 } 310 finally 311 { 312 super.cleanup(); // guarantee this happens 313 } 314 } 315 }