001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.util.LinkedHashSet;
024import java.util.Properties;
025import java.util.Set;
026
027import cascading.property.Props;
028import cascading.util.Util;
029
030/**
031 * Class FlowRuntimeProps is a fluent helper class for setting {@link Flow} specific runtime properties through
032 * a {@link FlowConnector}.
033 * <p/>
034 * These properties apply to the cluster or remote side of the Flow execution. For client (or local) side properties
035 * see {@link cascading.flow.FlowProps}.
036 * <p/>
037 * Available properties are:
038 * <p/>
039 * <ul>
040 * <li>gather partitions - number of slices (partitions) to gather keys within each {@link cascading.flow.FlowNode}.
041 * In MapReduce this is the number of reducers. In Tez DAG this is the scatter gather parallelization.</li>
042 * <li>log counters - counter names to log to INFO when a cluster side slice completes.</li>
043 * </ul>
044 * <p/>
045 * Note, if the num of gather partitions is not set, the Flow may fail during planning or setup, depending on the
046 * platform.
047 */
048public class FlowRuntimeProps extends Props
049  {
050  public static final String GATHER_PARTITIONS = "cascading.flow.runtime.gather.partitions.num";
051  public static final String LOG_COUNTERS = "cascading.flow.runtime.log.counters";
052
053  int gatherPartitions = 0;
054  Set<String> logCounters = new LinkedHashSet<>();
055
056  public static FlowRuntimeProps flowRuntimeProps()
057    {
058    return new FlowRuntimeProps();
059    }
060
061  public FlowRuntimeProps()
062    {
063    }
064
065  /**
066   * Method getGatherPartitions returns the number of gather partitions
067   *
068   * @return number of gather partitions
069   */
070  public int getGatherPartitions()
071    {
072    return gatherPartitions;
073    }
074
075  /**
076   * Method setGatherPartitions sets the default number of gather partitions each {@link cascading.flow.FlowNode}
077   * should use.
078   *
079   * @param gatherPartitions number of gather partitions to use per node
080   * @return this
081   */
082  public FlowRuntimeProps setGatherPartitions( int gatherPartitions )
083    {
084    if( gatherPartitions < 1 )
085      throw new IllegalArgumentException( "gatherPartitions value must be greater than zero" );
086
087    this.gatherPartitions = gatherPartitions;
088
089    return this;
090    }
091
092  /**
093   * Method addLogCounter adds a new counter to be logged when a cluster side slice completes.
094   * <p/>
095   * The given counters will be logged using the default cluster side logging mechanism.
096   *
097   * @param counter the Enum counter to log
098   * @return this
099   */
100  public FlowRuntimeProps addLogCounter( Enum counter )
101    {
102    addLogCounter( counter.getDeclaringClass().getName(), counter.name() );
103
104    return this;
105    }
106
107  /**
108   * Method addLogCounter adds a new counter to be logged when a cluster side slice completes.
109   * <p/>
110   * The given counters will be logged using the default cluster side logging mechanism.
111   *
112   * @param group   the String counter group to log
113   * @param counter the String counter name to log
114   * @return this
115   */
116  public FlowRuntimeProps addLogCounter( String group, String counter )
117    {
118    logCounters.add( group + ":" + counter );
119
120    return this;
121    }
122
123  @Override
124  protected void addPropertiesTo( Properties properties )
125    {
126    if( gatherPartitions > 0 )
127      properties.setProperty( GATHER_PARTITIONS, Integer.toString( gatherPartitions ) );
128
129    if( !logCounters.isEmpty() )
130      properties.setProperty( LOG_COUNTERS, Util.join( logCounters, "," ) );
131    }
132  }