001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.util.LinkedHashSet; 024import java.util.Properties; 025import java.util.Set; 026 027import cascading.property.Props; 028import cascading.util.Util; 029 030/** 031 * Class FlowRuntimeProps is a fluent helper class for setting {@link Flow} specific runtime properties through 032 * a {@link FlowConnector}. 033 * <p/> 034 * These properties apply to the cluster or remote side of the Flow execution. For client (or local) side properties 035 * see {@link cascading.flow.FlowProps}. 036 * <p/> 037 * Available properties are: 038 * <p/> 039 * <ul> 040 * <li>gather partitions - number of slices (partitions) to gather keys within each {@link cascading.flow.FlowNode}. 041 * In MapReduce this is the number of reducers. In Tez DAG this is the scatter gather parallelization.</li> 042 * <li>log counters - counter names to log to INFO when a cluster side slice completes.</li> 043 * </ul> 044 * <p/> 045 * Note, if the num of gather partitions is not set, the Flow may fail during planning or setup, depending on the 046 * platform. 047 */ 048public class FlowRuntimeProps extends Props 049 { 050 public static final String GATHER_PARTITIONS = "cascading.flow.runtime.gather.partitions.num"; 051 public static final String LOG_COUNTERS = "cascading.flow.runtime.log.counters"; 052 public static final String COMBINE_SPLITS = "cascading.flow.runtime.splits.combine"; 053 054 int gatherPartitions = 0; 055 Set<String> logCounters = new LinkedHashSet<>(); 056 Boolean combineSplits; 057 058 public static FlowRuntimeProps flowRuntimeProps() 059 { 060 return new FlowRuntimeProps(); 061 } 062 063 public FlowRuntimeProps() 064 { 065 } 066 067 /** 068 * Method getGatherPartitions returns the number of gather partitions 069 * 070 * @return number of gather partitions 071 */ 072 public int getGatherPartitions() 073 { 074 return gatherPartitions; 075 } 076 077 /** 078 * Method setGatherPartitions sets the default number of gather partitions each {@link cascading.flow.FlowNode} 079 * should use. 080 * 081 * @param gatherPartitions number of gather partitions to use per node 082 * @return this 083 */ 084 public FlowRuntimeProps setGatherPartitions( int gatherPartitions ) 085 { 086 if( gatherPartitions < 1 ) 087 throw new IllegalArgumentException( "gatherPartitions value must be greater than zero" ); 088 089 this.gatherPartitions = gatherPartitions; 090 091 return this; 092 } 093 094 /** 095 * Method addLogCounter adds a new counter to be logged when a cluster side slice completes. 096 * <p/> 097 * The given counters will be logged using the default cluster side logging mechanism. 098 * 099 * @param counter the Enum counter to log 100 * @return this 101 */ 102 public FlowRuntimeProps addLogCounter( Enum counter ) 103 { 104 addLogCounter( counter.getDeclaringClass().getName(), counter.name() ); 105 106 return this; 107 } 108 109 /** 110 * Method addLogCounter adds a new counter to be logged when a cluster side slice completes. 111 * <p/> 112 * The given counters will be logged using the default cluster side logging mechanism. 113 * 114 * @param group the String counter group to log 115 * @param counter the String counter name to log 116 * @return this 117 */ 118 public FlowRuntimeProps addLogCounter( String group, String counter ) 119 { 120 logCounters.add( group + ":" + counter ); 121 122 return this; 123 } 124 125 public Boolean getCombineSplits() 126 { 127 return combineSplits; 128 } 129 130 /** 131 * Method setCombineSplits will enable or disable combining of 'splits' on sources. 132 * <p/> 133 * A split is a sub-set of data from a {@link cascading.tap.Tap} source resource. Combining 134 * small splits into larger ones both reduce parallelism, but also reduce overhead of starting 135 * work on a very small data set. 136 * <p/> 137 * This is commonly done when sourcing large numbers of very small files. 138 * <p/> 139 * Setting this value will change the default, which is a platform dependent value. 140 * 141 * @param combineSplits 142 * @return 143 */ 144 public FlowRuntimeProps setCombineSplits( Boolean combineSplits ) 145 { 146 this.combineSplits = combineSplits; 147 148 return this; 149 } 150 151 @Override 152 protected void addPropertiesTo( Properties properties ) 153 { 154 if( gatherPartitions > 0 ) 155 properties.setProperty( GATHER_PARTITIONS, Integer.toString( gatherPartitions ) ); 156 157 if( !logCounters.isEmpty() ) 158 properties.setProperty( LOG_COUNTERS, Util.join( logCounters, "," ) ); 159 160 if( combineSplits != null ) 161 properties.setProperty( COMBINE_SPLITS, Boolean.toString( combineSplits ) ); 162 } 163 }