001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import cascading.flow.FlowElement;
024import cascading.flow.FlowProcess;
025import cascading.flow.planner.Scope;
026import cascading.flow.stream.duct.Duct;
027import cascading.operation.ConcreteCall;
028import cascading.pipe.Operator;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.tuple.TupleEntry;
032import cascading.tuple.TupleEntryCollector;
033import cascading.tuple.util.TupleBuilder;
034import cascading.tuple.util.TupleViews;
035
036import static cascading.tuple.util.TupleViews.*;
037
038/**
039 *
040 */
041public abstract class OperatorStage<Incoming> extends ElementStage<Incoming, TupleEntry>
042  {
043  /**
044   * In 2.2 the collector is now nulled before the
045   * {@link cascading.operation.Operation#cleanup(cascading.flow.FlowProcess, cascading.operation.OperationCall)}
046   * is called. This property retains the collector to remain compatible with 2.1.
047   */
048  public static final String RETAIN_COLLECTOR = "cascading.compatibility.retain.collector";
049
050  protected ConcreteCall operationCall;
051  protected TupleEntry incomingEntry;
052  protected Fields argumentsSelector;
053  protected TupleEntry argumentsEntry;
054  protected Fields remainderFields;
055  protected Fields outgoingSelector;
056  protected TupleEntry outgoingEntry;
057
058  protected TupleBuilder argumentsBuilder;
059  protected TupleBuilder outgoingBuilder;
060
061  private final boolean retainCollector;
062
063  protected TupleEntryCollector outputCollector;
064
065  public OperatorStage( FlowProcess flowProcess, FlowElement flowElement )
066    {
067    super( flowProcess, flowElement );
068
069    this.retainCollector = Boolean.parseBoolean( flowProcess.getStringProperty( RETAIN_COLLECTOR ) );
070    }
071
072  public abstract Operator getOperator();
073
074  protected abstract Fields getOutgoingSelector();
075
076  protected Fields getOperationDeclaredFields()
077    {
078    return outgoingScopes.get( 0 ).getOperationDeclaredFields();
079    }
080
081  protected abstract Fields getIncomingPassThroughFields();
082
083  protected abstract Fields getIncomingArgumentsFields();
084
085  protected TupleBuilder createArgumentsBuilder( final Fields incomingFields, final Fields argumentsSelector )
086    {
087    if( incomingFields.isUnknown() )
088      return new TupleBuilder()
089      {
090      @Override
091      public Tuple makeResult( Tuple input, Tuple output )
092        {
093        return input.get( incomingFields, argumentsSelector );
094        }
095      };
096
097    if( argumentsSelector.isAll() )
098      return new TupleBuilder()
099      {
100      @Override
101      public Tuple makeResult( Tuple input, Tuple output )
102        {
103        return input;
104        }
105      };
106
107    if( argumentsSelector.isNone() )
108      return new TupleBuilder()
109      {
110      @Override
111      public Tuple makeResult( Tuple input, Tuple output )
112        {
113        return Tuple.NULL;
114        }
115      };
116
117    final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields );
118
119    return new TupleBuilder()
120    {
121    Tuple result = createNarrow( inputDeclarationFields.getPos( argumentsSelector ) );
122
123    @Override
124    public Tuple makeResult( Tuple input, Tuple output )
125      {
126      return TupleViews.reset( result, input );
127      }
128    };
129    }
130
131  protected TupleBuilder createOutgoingBuilder( final Operator operator, final Fields incomingFields, final Fields argumentSelector, final Fields remainderFields, final Fields declaredFields, final Fields outgoingSelector )
132    {
133    final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields );
134
135    if( operator.getOutputSelector().isResults() )
136      return new TupleBuilder()
137      {
138      @Override
139      public Tuple makeResult( Tuple input, Tuple output )
140        {
141        return output;
142        }
143      };
144
145    if( operator.getOutputSelector().isAll() && !( incomingFields.isUnknown() || declaredFields.isUnknown() ) )
146      return new TupleBuilder()
147      {
148      Tuple result = createComposite( inputDeclarationFields, declaredFields );
149
150      @Override
151      public Tuple makeResult( Tuple input, Tuple output )
152        {
153        return TupleViews.reset( result, input, output );
154        }
155      };
156
157    if( operator.getOutputSelector().isReplace() )
158      {
159      if( incomingFields.isUnknown() )
160        return new TupleBuilder()
161        {
162        Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields;
163
164        @Override
165        public Tuple makeResult( Tuple input, Tuple output )
166          {
167          Tuple result = new Tuple( input );
168
169          result.set( Fields.UNKNOWN, resultFields, output );
170
171          return result;
172          }
173        };
174
175      return new TupleBuilder()
176      {
177      Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields;
178      Tuple result = createOverride( inputDeclarationFields, resultFields );
179
180      @Override
181      public Tuple makeResult( Tuple input, Tuple output )
182        {
183        return TupleViews.reset( result, input, output );
184        }
185      };
186      }
187
188    if( operator.getOutputSelector().isSwap() )
189      {
190      if( remainderFields.size() == 0 ) // the same as Fields.RESULTS
191        return new TupleBuilder()
192        {
193        @Override
194        public Tuple makeResult( Tuple input, Tuple output )
195          {
196          return output;
197          }
198        };
199      else if( declaredFields.isUnknown() )
200        return new TupleBuilder()
201        {
202        @Override
203        public Tuple makeResult( Tuple input, Tuple output )
204          {
205          return input.get( incomingFields, remainderFields ).append( output );
206          }
207        };
208      else
209        return new TupleBuilder()
210        {
211        Tuple view = createNarrow( inputDeclarationFields.getPos( remainderFields ) );
212        Tuple result = createComposite( Fields.asDeclaration( remainderFields ), declaredFields );
213
214        @Override
215        public Tuple makeResult( Tuple input, Tuple output )
216          {
217          TupleViews.reset( view, input );
218
219          return TupleViews.reset( result, view, output );
220          }
221        };
222      }
223
224    if( incomingFields.isUnknown() || declaredFields.isUnknown() )
225      return new TupleBuilder()
226      {
227      Fields selector = outgoingSelector.isUnknown() ? Fields.ALL : outgoingSelector;
228      TupleEntry incoming = new TupleEntry( incomingFields, true );
229      TupleEntry declared = new TupleEntry( declaredFields, true );
230
231      @Override
232      public Tuple makeResult( Tuple input, Tuple output )
233        {
234        incoming.setTuple( input );
235        declared.setTuple( output );
236
237        return TupleEntry.select( selector, incoming, declared );
238        }
239      };
240
241    return new TupleBuilder()
242    {
243    Fields inputFields = operator.getFieldDeclaration().isArguments() ? Fields.mask( inputDeclarationFields, declaredFields ) : inputDeclarationFields;
244    Tuple appended = createComposite( inputFields, declaredFields );
245    Fields allFields = Fields.resolve( Fields.ALL, inputFields, declaredFields );
246    Tuple result = createNarrow( allFields.getPos( outgoingSelector ), appended );
247
248    @Override
249    public Tuple makeResult( Tuple input, Tuple output )
250      {
251      TupleViews.reset( appended, input, output );
252
253      return result;
254      }
255    };
256    }
257
258  @Override
259  public void initialize()
260    {
261    Scope outgoingScope = outgoingScopes.get( 0 );
262
263    operationCall = new ConcreteCall( outgoingScope.getArgumentsDeclarator(), outgoingScope.getOperationDeclaredFields() );
264
265    argumentsSelector = outgoingScope.getArgumentsSelector();
266    remainderFields = outgoingScope.getRemainderPassThroughFields();
267    outgoingSelector = getOutgoingSelector();
268
269    argumentsEntry = new TupleEntry( outgoingScope.getArgumentsDeclarator(), true );
270
271    outgoingEntry = new TupleEntry( getOutgoingFields(), true );
272
273    operationCall.setArguments( argumentsEntry );
274
275    argumentsBuilder = createArgumentsBuilder( getIncomingArgumentsFields(), argumentsSelector );
276    outgoingBuilder = createOutgoingBuilder( getOperator(), getIncomingPassThroughFields(), argumentsSelector, remainderFields, getOperationDeclaredFields(), outgoingSelector );
277    }
278
279  @Override
280  public void prepare()
281    {
282    super.prepare(); // if fails, skip the this prepare
283
284    ( (Operator) getFlowElement() ).getOperation().prepare( flowProcess, operationCall );
285    }
286
287  @Override
288  public void complete( Duct previous )
289    {
290    try
291      {
292      ( (Operator) getFlowElement() ).getOperation().flush( flowProcess, operationCall );
293      }
294    finally
295      {
296      super.complete( previous );
297      }
298    }
299
300  @Override
301  public void cleanup()
302    {
303    if( !retainCollector ) // see comments for RETAIN_COLLECTOR
304      operationCall.setOutputCollector( null );
305
306    try
307      {
308      ( (Operator) getFlowElement() ).getOperation().cleanup( flowProcess, operationCall );
309      }
310    finally
311      {
312      super.cleanup(); // guarantee this happens
313      }
314    }
315  }