001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import cascading.flow.FlowElement; 024 import cascading.flow.FlowProcess; 025 import cascading.flow.planner.Scope; 026 import cascading.operation.ConcreteCall; 027 import cascading.pipe.Operator; 028 import cascading.tuple.Fields; 029 import cascading.tuple.Tuple; 030 import cascading.tuple.TupleEntry; 031 import cascading.tuple.TupleEntryCollector; 032 import cascading.tuple.util.TupleBuilder; 033 import cascading.tuple.util.TupleViews; 034 035 import static cascading.tuple.util.TupleViews.*; 036 037 /** 038 * 039 */ 040 public abstract class OperatorStage<Incoming> extends ElementStage<Incoming, TupleEntry> 041 { 042 /** 043 * In 2.2 the collector is now nulled before the 044 * {@link cascading.operation.Operation#cleanup(cascading.flow.FlowProcess, cascading.operation.OperationCall)} 045 * is called. This property retains the collector to remain compatible with 2.1. 046 */ 047 public static final String RETAIN_COLLECTOR = "cascading.compatibility.retain.collector"; 048 049 protected ConcreteCall operationCall; 050 protected TupleEntry incomingEntry; 051 protected Fields argumentsSelector; 052 protected TupleEntry argumentsEntry; 053 protected Fields remainderFields; 054 protected Fields outgoingSelector; 055 protected TupleEntry outgoingEntry; 056 057 protected TupleBuilder argumentsBuilder; 058 protected TupleBuilder outgoingBuilder; 059 060 private final boolean retainCollector; 061 062 protected TupleEntryCollector outputCollector; 063 064 public OperatorStage( FlowProcess flowProcess, FlowElement flowElement ) 065 { 066 super( flowProcess, flowElement ); 067 068 this.retainCollector = Boolean.parseBoolean( flowProcess.getStringProperty( RETAIN_COLLECTOR ) ); 069 } 070 071 public abstract Operator getOperator(); 072 073 protected abstract Fields getOutgoingSelector(); 074 075 protected Fields getOperationDeclaredFields() 076 { 077 return outgoingScopes.get( 0 ).getOperationDeclaredFields(); 078 } 079 080 protected abstract Fields getIncomingPassThroughFields(); 081 082 protected abstract Fields getIncomingArgumentsFields(); 083 084 protected TupleBuilder createArgumentsBuilder( final Fields incomingFields, final Fields argumentsSelector ) 085 { 086 if( incomingFields.isUnknown() ) 087 return new TupleBuilder() 088 { 089 @Override 090 public Tuple makeResult( Tuple input, Tuple output ) 091 { 092 return input.get( incomingFields, argumentsSelector ); 093 } 094 }; 095 096 if( argumentsSelector.isAll() ) 097 return new TupleBuilder() 098 { 099 @Override 100 public Tuple makeResult( Tuple input, Tuple output ) 101 { 102 return input; 103 } 104 }; 105 106 if( argumentsSelector.isNone() ) 107 return new TupleBuilder() 108 { 109 @Override 110 public Tuple makeResult( Tuple input, Tuple output ) 111 { 112 return Tuple.NULL; 113 } 114 }; 115 116 final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields ); 117 118 return new TupleBuilder() 119 { 120 Tuple result = createNarrow( inputDeclarationFields.getPos( argumentsSelector ) ); 121 122 @Override 123 public Tuple makeResult( Tuple input, Tuple output ) 124 { 125 return TupleViews.reset( result, input ); 126 } 127 }; 128 } 129 130 protected TupleBuilder createOutgoingBuilder( final Operator operator, final Fields incomingFields, final Fields argumentSelector, final Fields remainderFields, final Fields declaredFields, final Fields outgoingSelector ) 131 { 132 final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields ); 133 134 if( operator.getOutputSelector().isResults() ) 135 return new TupleBuilder() 136 { 137 @Override 138 public Tuple makeResult( Tuple input, Tuple output ) 139 { 140 return output; 141 } 142 }; 143 144 if( operator.getOutputSelector().isAll() && !( incomingFields.isUnknown() || declaredFields.isUnknown() ) ) 145 return new TupleBuilder() 146 { 147 Tuple result = createComposite( inputDeclarationFields, declaredFields ); 148 149 @Override 150 public Tuple makeResult( Tuple input, Tuple output ) 151 { 152 return TupleViews.reset( result, input, output ); 153 } 154 }; 155 156 if( operator.getOutputSelector().isReplace() ) 157 { 158 if( incomingFields.isUnknown() ) 159 return new TupleBuilder() 160 { 161 Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields; 162 163 @Override 164 public Tuple makeResult( Tuple input, Tuple output ) 165 { 166 Tuple result = new Tuple( input ); 167 168 result.set( Fields.UNKNOWN, resultFields, output ); 169 170 return result; 171 } 172 }; 173 174 return new TupleBuilder() 175 { 176 Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields; 177 Tuple result = createOverride( inputDeclarationFields, resultFields ); 178 179 @Override 180 public Tuple makeResult( Tuple input, Tuple output ) 181 { 182 return TupleViews.reset( result, input, output ); 183 } 184 }; 185 } 186 187 if( operator.getOutputSelector().isSwap() ) 188 { 189 if( remainderFields.size() == 0 ) // the same as Fields.RESULTS 190 return new TupleBuilder() 191 { 192 @Override 193 public Tuple makeResult( Tuple input, Tuple output ) 194 { 195 return output; 196 } 197 }; 198 else if( declaredFields.isUnknown() ) 199 return new TupleBuilder() 200 { 201 @Override 202 public Tuple makeResult( Tuple input, Tuple output ) 203 { 204 return input.get( incomingFields, remainderFields ).append( output ); 205 } 206 }; 207 else 208 return new TupleBuilder() 209 { 210 Tuple view = createNarrow( inputDeclarationFields.getPos( remainderFields ) ); 211 Tuple result = createComposite( Fields.asDeclaration( remainderFields ), declaredFields ); 212 213 @Override 214 public Tuple makeResult( Tuple input, Tuple output ) 215 { 216 TupleViews.reset( view, input ); 217 218 return TupleViews.reset( result, view, output ); 219 } 220 }; 221 } 222 223 if( incomingFields.isUnknown() || declaredFields.isUnknown() ) 224 return new TupleBuilder() 225 { 226 Fields selector = outgoingSelector.isUnknown() ? Fields.ALL : outgoingSelector; 227 TupleEntry incoming = new TupleEntry( incomingFields, true ); 228 TupleEntry declared = new TupleEntry( declaredFields, true ); 229 230 @Override 231 public Tuple makeResult( Tuple input, Tuple output ) 232 { 233 incoming.setTuple( input ); 234 declared.setTuple( output ); 235 236 return TupleEntry.select( selector, incoming, declared ); 237 } 238 }; 239 240 return new TupleBuilder() 241 { 242 Fields inputFields = operator.getFieldDeclaration().isArguments() ? Fields.mask( inputDeclarationFields, declaredFields ) : inputDeclarationFields; 243 Tuple appended = createComposite( inputFields, declaredFields ); 244 Fields allFields = Fields.resolve( Fields.ALL, inputFields, declaredFields ); 245 Tuple result = createNarrow( allFields.getPos( outgoingSelector ), appended ); 246 247 248 @Override 249 public Tuple makeResult( Tuple input, Tuple output ) 250 { 251 TupleViews.reset( appended, input, output ); 252 253 return result; 254 } 255 }; 256 } 257 258 @Override 259 public void initialize() 260 { 261 Scope outgoingScope = outgoingScopes.get( 0 ); 262 263 operationCall = new ConcreteCall( outgoingScope.getArgumentsDeclarator(), outgoingScope.getOperationDeclaredFields() ); 264 265 argumentsSelector = outgoingScope.getArgumentsSelector(); 266 remainderFields = outgoingScope.getRemainderPassThroughFields(); 267 outgoingSelector = getOutgoingSelector(); 268 269 argumentsEntry = new TupleEntry( outgoingScope.getArgumentsDeclarator(), true ); 270 271 outgoingEntry = new TupleEntry( getOutgoingFields(), true ); 272 273 operationCall.setArguments( argumentsEntry ); 274 275 argumentsBuilder = createArgumentsBuilder( getIncomingArgumentsFields(), argumentsSelector ); 276 outgoingBuilder = createOutgoingBuilder( getOperator(), getIncomingPassThroughFields(), argumentsSelector, remainderFields, getOperationDeclaredFields(), outgoingSelector ); 277 } 278 279 @Override 280 public void prepare() 281 { 282 super.prepare(); // if fails, skip the this prepare 283 284 ( (Operator) getFlowElement() ).getOperation().prepare( flowProcess, operationCall ); 285 } 286 287 @Override 288 public void complete( Duct previous ) 289 { 290 try 291 { 292 ( (Operator) getFlowElement() ).getOperation().flush( flowProcess, operationCall ); 293 } 294 finally 295 { 296 super.complete( previous ); 297 } 298 } 299 300 @Override 301 public void cleanup() 302 { 303 if( !retainCollector ) // see comments for RETAIN_COLLECTOR 304 operationCall.setOutputCollector( null ); 305 306 try 307 { 308 ( (Operator) getFlowElement() ).getOperation().cleanup( flowProcess, operationCall ); 309 } 310 finally 311 { 312 super.cleanup(); // guarantee this happens 313 } 314 } 315 }