001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.aggregator;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashSet;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.operation.Aggregator;
030    import cascading.operation.AggregatorCall;
031    import cascading.operation.BaseOperation;
032    import cascading.operation.OperationCall;
033    import cascading.tuple.Fields;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    
037    /** Class ExtentBase is the base class for First and Last. */
038    public abstract class ExtentBase extends BaseOperation<Tuple[]> implements Aggregator<Tuple[]>
039      {
040      /** Field ignoreTuples */
041      private final Collection<Tuple> ignoreTuples;
042    
043      @ConstructorProperties({"fieldDeclaration"})
044      protected ExtentBase( Fields fieldDeclaration )
045        {
046        super( fieldDeclaration );
047        this.ignoreTuples = null;
048        }
049    
050      @ConstructorProperties({"numArgs", "fieldDeclaration"})
051      protected ExtentBase( int numArgs, Fields fieldDeclaration )
052        {
053        super( numArgs, fieldDeclaration );
054        ignoreTuples = null;
055        }
056    
057      @ConstructorProperties({"fieldDeclaration", "ignoreTuples"})
058      protected ExtentBase( Fields fieldDeclaration, Tuple... ignoreTuples )
059        {
060        super( fieldDeclaration );
061        this.ignoreTuples = new HashSet<Tuple>();
062        Collections.addAll( this.ignoreTuples, ignoreTuples );
063        }
064    
065      public Collection<Tuple> getIgnoreTuples()
066        {
067        return Collections.unmodifiableCollection( ignoreTuples );
068        }
069    
070      @Override
071      public void prepare( FlowProcess flowProcess, OperationCall<Tuple[]> operationCall )
072        {
073        operationCall.setContext( new Tuple[ 1 ] );
074        }
075    
076      @Override
077      public void start( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
078        {
079        aggregatorCall.getContext()[ 0 ] = null;
080        }
081    
082      @Override
083      public void aggregate( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
084        {
085        if( ignoreTuples != null && ignoreTuples.contains( aggregatorCall.getArguments().getTuple() ) )
086          return;
087    
088        performOperation( aggregatorCall.getContext(), aggregatorCall.getArguments() );
089        }
090    
091      protected abstract void performOperation( Tuple[] context, TupleEntry entry );
092    
093      @Override
094      public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
095        {
096        if( aggregatorCall.getContext()[ 0 ] != null )
097          aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
098        }
099    
100      protected Tuple getResult( AggregatorCall<Tuple[]> aggregatorCall )
101        {
102        return aggregatorCall.getContext()[ 0 ];
103        }
104    
105      @Override
106      public boolean equals( Object object )
107        {
108        if( this == object )
109          return true;
110        if( !( object instanceof ExtentBase ) )
111          return false;
112        if( !super.equals( object ) )
113          return false;
114    
115        ExtentBase that = (ExtentBase) object;
116    
117        if( ignoreTuples != null ? !ignoreTuples.equals( that.ignoreTuples ) : that.ignoreTuples != null )
118          return false;
119    
120        return true;
121        }
122    
123      @Override
124      public int hashCode()
125        {
126        int result = super.hashCode();
127        result = 31 * result + ( ignoreTuples != null ? ignoreTuples.hashCode() : 0 );
128        return result;
129        }
130      }