001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.util.LinkedHashSet; 024import java.util.Properties; 025import java.util.Set; 026 027import cascading.property.Props; 028import cascading.util.Util; 029 030/** 031 * Class FlowRuntimeProps is a fluent helper class for setting {@link Flow} specific runtime properties through 032 * a {@link FlowConnector}. 033 * <p/> 034 * These properties apply to the cluster or remote side of the Flow execution. For client (or local) side properties 035 * see {@link cascading.flow.FlowProps}. 036 * <p/> 037 * Available properties are: 038 * <p/> 039 * <ul> 040 * <li>gather partitions - number of slices (partitions) to gather keys within each {@link cascading.flow.FlowNode}. 041 * In MapReduce this is the number of reducers. In Tez DAG this is the scatter gather parallelization.</li> 042 * <li>log counters - counter names to log to INFO when a cluster side slice completes.</li> 043 * </ul> 044 * <p/> 045 * Note, if the num of gather partitions is not set, the Flow may fail during planning or setup, depending on the 046 * platform. 047 */ 048public class FlowRuntimeProps extends Props 049 { 050 public static final String GATHER_PARTITIONS = "cascading.flow.runtime.gather.partitions.num"; 051 public static final String LOG_COUNTERS = "cascading.flow.runtime.log.counters"; 052 053 int gatherPartitions = 0; 054 Set<String> logCounters = new LinkedHashSet<>(); 055 056 public static FlowRuntimeProps flowRuntimeProps() 057 { 058 return new FlowRuntimeProps(); 059 } 060 061 public FlowRuntimeProps() 062 { 063 } 064 065 /** 066 * Method getGatherPartitions returns the number of gather partitions 067 * 068 * @return number of gather partitions 069 */ 070 public int getGatherPartitions() 071 { 072 return gatherPartitions; 073 } 074 075 /** 076 * Method setGatherPartitions sets the default number of gather partitions each {@link cascading.flow.FlowNode} 077 * should use. 078 * 079 * @param gatherPartitions number of gather partitions to use per node 080 * @return this 081 */ 082 public FlowRuntimeProps setGatherPartitions( int gatherPartitions ) 083 { 084 if( gatherPartitions < 1 ) 085 throw new IllegalArgumentException( "gatherPartitions value must be greater than zero" ); 086 087 this.gatherPartitions = gatherPartitions; 088 089 return this; 090 } 091 092 /** 093 * Method addLogCounter adds a new counter to be logged when a cluster side slice completes. 094 * <p/> 095 * The given counters will be logged using the default cluster side logging mechanism. 096 * 097 * @param counter the Enum counter to log 098 * @return this 099 */ 100 public FlowRuntimeProps addLogCounter( Enum counter ) 101 { 102 addLogCounter( counter.getDeclaringClass().getName(), counter.name() ); 103 104 return this; 105 } 106 107 /** 108 * Method addLogCounter adds a new counter to be logged when a cluster side slice completes. 109 * <p/> 110 * The given counters will be logged using the default cluster side logging mechanism. 111 * 112 * @param group the String counter group to log 113 * @param counter the String counter name to log 114 * @return this 115 */ 116 public FlowRuntimeProps addLogCounter( String group, String counter ) 117 { 118 logCounters.add( group + ":" + counter ); 119 120 return this; 121 } 122 123 @Override 124 protected void addPropertiesTo( Properties properties ) 125 { 126 if( gatherPartitions > 0 ) 127 properties.setProperty( GATHER_PARTITIONS, Integer.toString( gatherPartitions ) ); 128 129 if( !logCounters.isEmpty() ) 130 properties.setProperty( LOG_COUNTERS, Util.join( logCounters, "," ) ); 131 } 132 }