001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.platform; 022 023import java.io.IOException; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.Map; 027 028import cascading.flow.FlowConnector; 029import cascading.flow.FlowProcess; 030import cascading.scheme.Scheme; 031import cascading.scheme.util.FieldTypeResolver; 032import cascading.tap.SinkMode; 033import cascading.tap.Tap; 034import cascading.tap.partition.Partition; 035import cascading.tuple.Fields; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * 041 */ 042public abstract class TestPlatform 043 { 044 private static final Logger LOG = LoggerFactory.getLogger( TestPlatform.class ); 045 046 public static final String CLUSTER_TESTING_PROPERTY = "test.cluster.enabled"; 047 048 private boolean useCluster = false; 049 private boolean enableCluster = true; 050 protected int numMappers = 0; 051 protected int numReducers = 0; 052 protected int numGatherPartitions = 0; 053 054 /** 055 * Method getGlobalProperties fetches all "platform." prefixed system properties. 056 * <p/> 057 * Sub-classes of TestPlatform should use these values as overrides before returning from 058 * {@link #getProperties()}. 059 * 060 * @return a Map of properties 061 */ 062 public static Map<Object, Object> getGlobalProperties() 063 { 064 HashMap<Object, Object> properties = new HashMap<Object, Object>(); 065 066 for( String propertyName : System.getProperties().stringPropertyNames() ) 067 { 068 if( propertyName.startsWith( "platform." ) ) 069 properties.put( propertyName.substring( "platform.".length() ), System.getProperty( propertyName ) ); 070 } 071 072 if( !properties.isEmpty() ) 073 LOG.info( "platform property overrides: ", properties ); 074 075 return properties; 076 } 077 078 protected TestPlatform() 079 { 080 enableCluster = Boolean.parseBoolean( System.getProperty( CLUSTER_TESTING_PROPERTY, Boolean.toString( enableCluster ) ) ); 081 } 082 083 public String getName() 084 { 085 return getClass().getSimpleName().replaceAll( "^(.*)Platform$", "$1" ).toLowerCase(); 086 } 087 088 public boolean isMapReduce() 089 { 090 return false; 091 } 092 093 public boolean isDAG() 094 { 095 return false; 096 } 097 098 public int getNumMappers() 099 { 100 return numMappers; 101 } 102 103 public void setNumMappers( int numMappers ) 104 { 105 this.numMappers = numMappers; 106 } 107 108 public int getNumReducers() 109 { 110 return numReducers; 111 } 112 113 public void setNumReducers( int numReducers ) 114 { 115 this.numReducers = numReducers; 116 } 117 118 public int getNumGatherPartitions() 119 { 120 return numGatherPartitions; 121 } 122 123 public void setNumGatherPartitions( int numGatherPartitions ) 124 { 125 this.numGatherPartitions = numGatherPartitions; 126 } 127 128 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 129 { 130 // do nothing 131 } 132 133 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 134 { 135 // do nothing 136 } 137 138 public void setNumGatherPartitionTasks( Map<Object, Object> properties, int numReduceTasks ) 139 { 140 // do nothing 141 } 142 143 public Integer getNumMapTasks( Map<Object, Object> properties ) 144 { 145 return null; 146 } 147 148 public Integer getNumReduceTasks( Map<Object, Object> properties ) 149 { 150 return null; 151 } 152 153 public Integer getNumGatherPartitionTasks( Map<Object, Object> properties ) 154 { 155 return null; 156 } 157 158 public abstract void setUp() throws IOException; 159 160 public abstract Map<Object, Object> getProperties(); 161 162 public abstract void tearDown(); 163 164 public void setUseCluster( boolean useCluster ) 165 { 166 this.useCluster = useCluster; 167 } 168 169 public boolean isUseCluster() 170 { 171 return enableCluster && useCluster; 172 } 173 174 public abstract void copyFromLocal( String inputFile ) throws IOException; 175 176 public abstract void copyToLocal( String outputFile ) throws IOException; 177 178 public abstract boolean remoteExists( String outputFile ) throws IOException; 179 180 public abstract boolean remoteRemove( String outputFile, boolean recursive ) throws IOException; 181 182 public abstract FlowProcess getFlowProcess(); 183 184 public abstract FlowConnector getFlowConnector( Map<Object, Object> properties ); 185 186 public FlowConnector getFlowConnector() 187 { 188 return getFlowConnector( getProperties() ); 189 } 190 191 public abstract Tap getTap( Scheme scheme, String filename, SinkMode mode ); 192 193 public Tap getTextFile( Fields sourceFields, String filename ) 194 { 195 return getTextFile( sourceFields, filename, SinkMode.KEEP ); 196 } 197 198 public Tap getTextFile( String filename ) 199 { 200 return getTextFile( filename, SinkMode.KEEP ); 201 } 202 203 public Tap getTextFile( String filename, SinkMode mode ) 204 { 205 return getTextFile( null, filename, mode ); 206 } 207 208 public Tap getTextFile( Fields sourceFields, String filename, SinkMode mode ) 209 { 210 return getTextFile( sourceFields, Fields.ALL, filename, mode ); 211 } 212 213 public abstract Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode ); 214 215 public Tap getDelimitedFile( Fields fields, String delimiter, String filename ) 216 { 217 return getDelimitedFile( fields, false, delimiter, "\"", null, filename, SinkMode.KEEP ); 218 } 219 220 public Tap getDelimitedFile( Fields fields, String delimiter, String filename, SinkMode mode ) 221 { 222 return getDelimitedFile( fields, false, delimiter, "\"", null, filename, mode ); 223 } 224 225 public Tap getTabDelimitedFile( Fields fields, String filename, SinkMode mode ) 226 { 227 return getDelimitedFile( fields, false, "\t", "\"", null, filename, mode ); 228 } 229 230 public Tap getTabDelimitedFile( Fields fields, boolean hasHeader, String filename, SinkMode mode ) 231 { 232 return getDelimitedFile( fields, hasHeader, "\t", "\"", null, filename, mode ); 233 } 234 235 public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, String filename, SinkMode mode ) 236 { 237 return getDelimitedFile( fields, hasHeader, delimiter, quote, null, filename, mode ); 238 } 239 240 public Tap getDelimitedFile( Fields fields, String delimiter, String quote, String filename, SinkMode mode ) 241 { 242 return getDelimitedFile( fields, false, delimiter, quote, null, filename, mode ); 243 } 244 245 public Tap getDelimitedFile( Fields fields, String delimiter, Class[] types, String filename, SinkMode mode ) 246 { 247 return getDelimitedFile( fields, false, delimiter, "\"", types, filename, mode ); 248 } 249 250 public abstract Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ); 251 252 public abstract Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ); 253 254 public abstract Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode ); 255 256 public abstract Tap getPartitionTap( Tap sink, Partition partition, int openThreshold ); 257 258 public abstract Scheme getTestConfigDefScheme(); 259 260 public abstract Scheme getTestFailScheme(); 261 262 public abstract Comparator getLongComparator( boolean reverseSort ); 263 264 public abstract Comparator getStringComparator( boolean reverseSort ); 265 266 public abstract String getHiddenTemporaryPath(); 267 }