001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import cascading.CascadingException;
025import cascading.flow.FlowProcess;
026import cascading.flow.stream.duct.Duct;
027import cascading.operation.Filter;
028import cascading.pipe.Each;
029import cascading.pipe.OperatorException;
030import cascading.tuple.Fields;
031import cascading.tuple.TupleEntry;
032
033/**
034 *
035 */
036public class FilterEachStage extends EachStage
037  {
038  private Filter filter;
039
040  public FilterEachStage( FlowProcess flowProcess, Each each )
041    {
042    super( flowProcess, each );
043    }
044
045  @Override
046  protected Fields getIncomingPassThroughFields()
047    {
048    return incomingScopes.get( 0 ).getIncomingFunctionPassThroughFields();
049    }
050
051  @Override
052  protected Fields getIncomingArgumentsFields()
053    {
054    return incomingScopes.get( 0 ).getIncomingFunctionArgumentFields();
055    }
056
057  @Override
058  public void initialize()
059    {
060    super.initialize();
061
062    filter = each.getFilter();
063    }
064
065  @Override
066  public void receive( Duct previous, int ordinal, TupleEntry incomingEntry )
067    {
068    argumentsEntry.setTuple( argumentsBuilder.makeResult( incomingEntry.getTuple(), null ) );
069
070    try
071      {
072      if( filter.isRemove( flowProcess, operationCall ) )
073        return;
074
075      next.receive( this, 0, incomingEntry );
076      }
077    catch( CascadingException exception )
078      {
079      handleException( exception, argumentsEntry );
080      }
081    catch( Throwable throwable )
082      {
083      handleException( new OperatorException( each, "operator Each failed executing operation", throwable ), argumentsEntry );
084      }
085    }
086  }