001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.aggregator;
022
023import java.beans.ConstructorProperties;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027
028import cascading.flow.FlowProcess;
029import cascading.operation.Aggregator;
030import cascading.operation.AggregatorCall;
031import cascading.operation.BaseOperation;
032import cascading.operation.OperationCall;
033import cascading.tuple.Fields;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036
037/**
038 * Class ExtremaValueBase is the base class for MaxValue and MinValue where the values are expected to by
039 * {@link Comparable} types and the {@link Comparable#compareTo(Object)} result is use for max/min
040 * comparison.
041 */
042public abstract class ExtremaValueBase extends BaseOperation<ExtremaValueBase.Context> implements Aggregator<ExtremaValueBase.Context>
043  {
044  /** Field ignoreValues */
045  protected final Collection ignoreValues;
046
047  protected static class Context
048    {
049    Tuple value = Tuple.size( 1 );
050
051    public Context()
052      {
053      }
054
055    public Context reset()
056      {
057      this.value.set( 0, null );
058
059      return this;
060      }
061    }
062
063  @ConstructorProperties({"fieldDeclaration"})
064  public ExtremaValueBase( Fields fieldDeclaration )
065    {
066    super( fieldDeclaration );
067    ignoreValues = null;
068    }
069
070  @ConstructorProperties({"numArgs", "fieldDeclaration"})
071  public ExtremaValueBase( int numArgs, Fields fieldDeclaration )
072    {
073    super( numArgs, fieldDeclaration );
074    ignoreValues = null;
075
076    if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
077      throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
078    }
079
080  @ConstructorProperties({"fieldDeclaration", "ignoreValues"})
081  protected ExtremaValueBase( Fields fieldDeclaration, Object... ignoreValues )
082    {
083    super( fieldDeclaration );
084    this.ignoreValues = new HashSet();
085    Collections.addAll( this.ignoreValues, ignoreValues );
086    }
087
088  public Collection getIgnoreValues()
089    {
090    return Collections.unmodifiableCollection( ignoreValues );
091    }
092
093  @Override
094  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
095    {
096    operationCall.setContext( new Context() );
097    }
098
099  @Override
100  public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
101    {
102    aggregatorCall.getContext().reset();
103    }
104
105  @Override
106  public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
107    {
108    TupleEntry entry = aggregatorCall.getArguments();
109    Context context = aggregatorCall.getContext();
110
111    Object arg = entry.getObject( 0 ); // returns canonical type
112
113    if( ignoreValues != null && ignoreValues.contains( arg ) )
114      return;
115
116    Comparable lhs = (Comparable) context.value.getObject( 0 );
117    Comparable rhs = (Comparable) arg;
118
119    if( lhs == null || ( rhs != null && compare( lhs, rhs ) ) )
120      context.value.set( 0, rhs );
121    }
122
123  /**
124   * Allows subclasses to provide own comparison method.
125   *
126   * @param lhs Comparable type
127   * @param rhs Comparable type
128   * @return true if the rhs should be retained as the result value
129   */
130  protected abstract boolean compare( Comparable lhs, Comparable rhs );
131
132  @Override
133  public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
134    {
135    aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
136    }
137
138  protected Tuple getResult( AggregatorCall<Context> aggregatorCall )
139    {
140    return aggregatorCall.getContext().value;
141    }
142
143  @Override
144  public boolean equals( Object object )
145    {
146    if( this == object )
147      return true;
148    if( !( object instanceof ExtremaValueBase ) )
149      return false;
150    if( !super.equals( object ) )
151      return false;
152
153    ExtremaValueBase that = (ExtremaValueBase) object;
154
155    if( ignoreValues != null ? !ignoreValues.equals( that.ignoreValues ) : that.ignoreValues != null )
156      return false;
157
158    return true;
159    }
160
161  @Override
162  public int hashCode()
163    {
164    int result = super.hashCode();
165    result = 31 * result + ( ignoreValues != null ? ignoreValues.hashCode() : 0 );
166    return result;
167    }
168  }