001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import cascading.flow.FlowElement;
024    import cascading.flow.FlowProcess;
025    import cascading.flow.planner.Scope;
026    import cascading.operation.ConcreteCall;
027    import cascading.pipe.Operator;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    import cascading.tuple.TupleEntryCollector;
032    import cascading.tuple.util.TupleBuilder;
033    import cascading.tuple.util.TupleViews;
034    
035    import static cascading.tuple.util.TupleViews.*;
036    
037    /**
038     *
039     */
040    public abstract class OperatorStage<Incoming> extends ElementStage<Incoming, TupleEntry>
041      {
042      /**
043       * In 2.2 the collector is now nulled before the
044       * {@link cascading.operation.Operation#cleanup(cascading.flow.FlowProcess, cascading.operation.OperationCall)}
045       * is called. This property retains the collector to remain compatible with 2.1.
046       */
047      public static final String RETAIN_COLLECTOR = "cascading.compatibility.retain.collector";
048    
049      protected ConcreteCall operationCall;
050      protected TupleEntry incomingEntry;
051      protected Fields argumentsSelector;
052      protected TupleEntry argumentsEntry;
053      protected Fields remainderFields;
054      protected Fields outgoingSelector;
055      protected TupleEntry outgoingEntry;
056    
057      protected TupleBuilder argumentsBuilder;
058      protected TupleBuilder outgoingBuilder;
059    
060      private final boolean retainCollector;
061    
062      protected TupleEntryCollector outputCollector;
063    
064      public OperatorStage( FlowProcess flowProcess, FlowElement flowElement )
065        {
066        super( flowProcess, flowElement );
067    
068        this.retainCollector = Boolean.parseBoolean( flowProcess.getStringProperty( RETAIN_COLLECTOR ) );
069        }
070    
071      public abstract Operator getOperator();
072    
073      protected abstract Fields getOutgoingSelector();
074    
075      protected Fields getOperationDeclaredFields()
076        {
077        return outgoingScopes.get( 0 ).getOperationDeclaredFields();
078        }
079    
080      protected abstract Fields getIncomingPassThroughFields();
081    
082      protected abstract Fields getIncomingArgumentsFields();
083    
084      protected TupleBuilder createArgumentsBuilder( final Fields incomingFields, final Fields argumentsSelector )
085        {
086        if( incomingFields.isUnknown() )
087          return new TupleBuilder()
088          {
089          @Override
090          public Tuple makeResult( Tuple input, Tuple output )
091            {
092            return input.get( incomingFields, argumentsSelector );
093            }
094          };
095    
096        if( argumentsSelector.isAll() )
097          return new TupleBuilder()
098          {
099          @Override
100          public Tuple makeResult( Tuple input, Tuple output )
101            {
102            return input;
103            }
104          };
105    
106        if( argumentsSelector.isNone() )
107          return new TupleBuilder()
108          {
109          @Override
110          public Tuple makeResult( Tuple input, Tuple output )
111            {
112            return Tuple.NULL;
113            }
114          };
115    
116        final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields );
117    
118        return new TupleBuilder()
119        {
120        Tuple result = createNarrow( inputDeclarationFields.getPos( argumentsSelector ) );
121    
122        @Override
123        public Tuple makeResult( Tuple input, Tuple output )
124          {
125          return TupleViews.reset( result, input );
126          }
127        };
128        }
129    
130      protected TupleBuilder createOutgoingBuilder( final Operator operator, final Fields incomingFields, final Fields argumentSelector, final Fields remainderFields, final Fields declaredFields, final Fields outgoingSelector )
131        {
132        final Fields inputDeclarationFields = Fields.asDeclaration( incomingFields );
133    
134        if( operator.getOutputSelector().isResults() )
135          return new TupleBuilder()
136          {
137          @Override
138          public Tuple makeResult( Tuple input, Tuple output )
139            {
140            return output;
141            }
142          };
143    
144        if( operator.getOutputSelector().isAll() && !( incomingFields.isUnknown() || declaredFields.isUnknown() ) )
145          return new TupleBuilder()
146          {
147          Tuple result = createComposite( inputDeclarationFields, declaredFields );
148    
149          @Override
150          public Tuple makeResult( Tuple input, Tuple output )
151            {
152            return TupleViews.reset( result, input, output );
153            }
154          };
155    
156        if( operator.getOutputSelector().isReplace() )
157          {
158          if( incomingFields.isUnknown() )
159            return new TupleBuilder()
160            {
161            Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields;
162    
163            @Override
164            public Tuple makeResult( Tuple input, Tuple output )
165              {
166              Tuple result = new Tuple( input );
167    
168              result.set( Fields.UNKNOWN, resultFields, output );
169    
170              return result;
171              }
172            };
173    
174          return new TupleBuilder()
175          {
176          Fields resultFields = operator.getFieldDeclaration().isArguments() ? argumentSelector : declaredFields;
177          Tuple result = createOverride( inputDeclarationFields, resultFields );
178    
179          @Override
180          public Tuple makeResult( Tuple input, Tuple output )
181            {
182            return TupleViews.reset( result, input, output );
183            }
184          };
185          }
186    
187        if( operator.getOutputSelector().isSwap() )
188          {
189          if( remainderFields.size() == 0 ) // the same as Fields.RESULTS
190            return new TupleBuilder()
191            {
192            @Override
193            public Tuple makeResult( Tuple input, Tuple output )
194              {
195              return output;
196              }
197            };
198          else if( declaredFields.isUnknown() )
199            return new TupleBuilder()
200            {
201            @Override
202            public Tuple makeResult( Tuple input, Tuple output )
203              {
204              return input.get( incomingFields, remainderFields ).append( output );
205              }
206            };
207          else
208            return new TupleBuilder()
209            {
210            Tuple view = createNarrow( inputDeclarationFields.getPos( remainderFields ) );
211            Tuple result = createComposite( Fields.asDeclaration( remainderFields ), declaredFields );
212    
213            @Override
214            public Tuple makeResult( Tuple input, Tuple output )
215              {
216              TupleViews.reset( view, input );
217    
218              return TupleViews.reset( result, view, output );
219              }
220            };
221          }
222    
223        if( incomingFields.isUnknown() || declaredFields.isUnknown() )
224          return new TupleBuilder()
225          {
226          Fields selector = outgoingSelector.isUnknown() ? Fields.ALL : outgoingSelector;
227          TupleEntry incoming = new TupleEntry( incomingFields, true );
228          TupleEntry declared = new TupleEntry( declaredFields, true );
229    
230          @Override
231          public Tuple makeResult( Tuple input, Tuple output )
232            {
233            incoming.setTuple( input );
234            declared.setTuple( output );
235    
236            return TupleEntry.select( selector, incoming, declared );
237            }
238          };
239    
240        return new TupleBuilder()
241        {
242        Fields inputFields = operator.getFieldDeclaration().isArguments() ? Fields.mask( inputDeclarationFields, declaredFields ) : inputDeclarationFields;
243        Tuple appended = createComposite( inputFields, declaredFields );
244        Fields allFields = Fields.resolve( Fields.ALL, inputFields, declaredFields );
245        Tuple result = createNarrow( allFields.getPos( outgoingSelector ), appended );
246    
247    
248        @Override
249        public Tuple makeResult( Tuple input, Tuple output )
250          {
251          TupleViews.reset( appended, input, output );
252    
253          return result;
254          }
255        };
256        }
257    
258      @Override
259      public void initialize()
260        {
261        Scope outgoingScope = outgoingScopes.get( 0 );
262    
263        operationCall = new ConcreteCall( outgoingScope.getArgumentsDeclarator(), outgoingScope.getOperationDeclaredFields() );
264    
265        argumentsSelector = outgoingScope.getArgumentsSelector();
266        remainderFields = outgoingScope.getRemainderPassThroughFields();
267        outgoingSelector = getOutgoingSelector();
268    
269        argumentsEntry = new TupleEntry( outgoingScope.getArgumentsDeclarator(), true );
270    
271        outgoingEntry = new TupleEntry( getOutgoingFields(), true );
272    
273        operationCall.setArguments( argumentsEntry );
274    
275        argumentsBuilder = createArgumentsBuilder( getIncomingArgumentsFields(), argumentsSelector );
276        outgoingBuilder = createOutgoingBuilder( getOperator(), getIncomingPassThroughFields(), argumentsSelector, remainderFields, getOperationDeclaredFields(), outgoingSelector );
277        }
278    
279      @Override
280      public void prepare()
281        {
282        super.prepare(); // if fails, skip the this prepare
283    
284        ( (Operator) getFlowElement() ).getOperation().prepare( flowProcess, operationCall );
285        }
286    
287      @Override
288      public void complete( Duct previous )
289        {
290        try
291          {
292          ( (Operator) getFlowElement() ).getOperation().flush( flowProcess, operationCall );
293          }
294        finally
295          {
296          super.complete( previous );
297          }
298        }
299    
300      @Override
301      public void cleanup()
302        {
303        if( !retainCollector ) // see comments for RETAIN_COLLECTOR
304          operationCall.setOutputCollector( null );
305    
306        try
307          {
308          ( (Operator) getFlowElement() ).getOperation().cleanup( flowProcess, operationCall );
309          }
310        finally
311          {
312          super.cleanup(); // guarantee this happens
313          }
314        }
315      }