001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.platform;
022
023import java.io.IOException;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.Map;
027
028import cascading.flow.FlowConnector;
029import cascading.flow.FlowProcess;
030import cascading.scheme.Scheme;
031import cascading.scheme.util.FieldTypeResolver;
032import cascading.tap.SinkMode;
033import cascading.tap.Tap;
034import cascading.tap.partition.Partition;
035import cascading.tuple.Fields;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 *
041 */
042public abstract class TestPlatform
043  {
044  private static final Logger LOG = LoggerFactory.getLogger( TestPlatform.class );
045
046  public static final String CLUSTER_TESTING_PROPERTY = "test.cluster.enabled";
047
048  private boolean useCluster = false;
049  private boolean enableCluster = true;
050  protected int numMappers = 0;
051  protected int numReducers = 0;
052  protected int numGatherPartitions = 0;
053
054  /**
055   * Method getGlobalProperties fetches all "platform." prefixed system properties.
056   * <p/>
057   * Sub-classes of TestPlatform should use these values as overrides before returning from
058   * {@link #getProperties()}.
059   *
060   * @return a Map of properties
061   */
062  public static Map<Object, Object> getGlobalProperties()
063    {
064    HashMap<Object, Object> properties = new HashMap<Object, Object>();
065
066    for( String propertyName : System.getProperties().stringPropertyNames() )
067      {
068      if( propertyName.startsWith( "platform." ) )
069        properties.put( propertyName.substring( "platform.".length() ), System.getProperty( propertyName ) );
070      }
071
072    if( !properties.isEmpty() )
073      LOG.info( "platform property overrides: ", properties );
074
075    return properties;
076    }
077
078  protected TestPlatform()
079    {
080    enableCluster = Boolean.parseBoolean( System.getProperty( CLUSTER_TESTING_PROPERTY, Boolean.toString( enableCluster ) ) );
081    }
082
083  public String getName()
084    {
085    return getClass().getSimpleName().replaceAll( "^(.*)Platform$", "$1" ).toLowerCase();
086    }
087
088  public boolean isMapReduce()
089    {
090    return false;
091    }
092
093  public boolean isDAG()
094    {
095    return false;
096    }
097
098  public int getNumMappers()
099    {
100    return numMappers;
101    }
102
103  public void setNumMappers( int numMappers )
104    {
105    this.numMappers = numMappers;
106    }
107
108  public int getNumReducers()
109    {
110    return numReducers;
111    }
112
113  public void setNumReducers( int numReducers )
114    {
115    this.numReducers = numReducers;
116    }
117
118  public int getNumGatherPartitions()
119    {
120    return numGatherPartitions;
121    }
122
123  public void setNumGatherPartitions( int numGatherPartitions )
124    {
125    this.numGatherPartitions = numGatherPartitions;
126    }
127
128  public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
129    {
130    // do nothing
131    }
132
133  public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
134    {
135    // do nothing
136    }
137
138  public void setNumGatherPartitionTasks( Map<Object, Object> properties, int numReduceTasks )
139    {
140    // do nothing
141    }
142
143  public Integer getNumMapTasks( Map<Object, Object> properties )
144    {
145    return null;
146    }
147
148  public Integer getNumReduceTasks( Map<Object, Object> properties )
149    {
150    return null;
151    }
152
153  public Integer getNumGatherPartitionTasks( Map<Object, Object> properties )
154    {
155    return null;
156    }
157
158  public abstract void setUp() throws IOException;
159
160  public abstract Map<Object, Object> getProperties();
161
162  public abstract void tearDown();
163
164  public void setUseCluster( boolean useCluster )
165    {
166    this.useCluster = useCluster;
167    }
168
169  public boolean isUseCluster()
170    {
171    return enableCluster && useCluster;
172    }
173
174  public abstract void copyFromLocal( String inputFile ) throws IOException;
175
176  public abstract void copyToLocal( String outputFile ) throws IOException;
177
178  public abstract boolean remoteExists( String outputFile ) throws IOException;
179
180  public abstract boolean remoteRemove( String outputFile, boolean recursive ) throws IOException;
181
182  public abstract FlowProcess getFlowProcess();
183
184  public abstract FlowConnector getFlowConnector( Map<Object, Object> properties );
185
186  public FlowConnector getFlowConnector()
187    {
188    return getFlowConnector( getProperties() );
189    }
190
191  public abstract Tap getTap( Scheme scheme, String filename, SinkMode mode );
192
193  public Tap getTextFile( Fields sourceFields, String filename )
194    {
195    return getTextFile( sourceFields, filename, SinkMode.KEEP );
196    }
197
198  public Tap getTextFile( String filename )
199    {
200    return getTextFile( filename, SinkMode.KEEP );
201    }
202
203  public Tap getTextFile( String filename, SinkMode mode )
204    {
205    return getTextFile( null, filename, mode );
206    }
207
208  public Tap getTextFile( Fields sourceFields, String filename, SinkMode mode )
209    {
210    return getTextFile( sourceFields, Fields.ALL, filename, mode );
211    }
212
213  public abstract Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode );
214
215  public Tap getDelimitedFile( Fields fields, String delimiter, String filename )
216    {
217    return getDelimitedFile( fields, false, delimiter, "\"", null, filename, SinkMode.KEEP );
218    }
219
220  public Tap getDelimitedFile( Fields fields, String delimiter, String filename, SinkMode mode )
221    {
222    return getDelimitedFile( fields, false, delimiter, "\"", null, filename, mode );
223    }
224
225  public Tap getTabDelimitedFile( Fields fields, String filename, SinkMode mode )
226    {
227    return getDelimitedFile( fields, false, "\t", "\"", null, filename, mode );
228    }
229
230  public Tap getTabDelimitedFile( Fields fields, boolean hasHeader, String filename, SinkMode mode )
231    {
232    return getDelimitedFile( fields, hasHeader, "\t", "\"", null, filename, mode );
233    }
234
235  public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, String filename, SinkMode mode )
236    {
237    return getDelimitedFile( fields, hasHeader, delimiter, quote, null, filename, mode );
238    }
239
240  public Tap getDelimitedFile( Fields fields, String delimiter, String quote, String filename, SinkMode mode )
241    {
242    return getDelimitedFile( fields, false, delimiter, quote, null, filename, mode );
243    }
244
245  public Tap getDelimitedFile( Fields fields, String delimiter, Class[] types, String filename, SinkMode mode )
246    {
247    return getDelimitedFile( fields, false, delimiter, "\"", types, filename, mode );
248    }
249
250  public abstract Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode );
251
252  public abstract Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode );
253
254  public abstract Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode );
255
256  public abstract Tap getPartitionTap( Tap sink, Partition partition, int openThreshold );
257
258  public abstract Scheme getTestConfigDefScheme();
259
260  public abstract Scheme getTestFailScheme();
261
262  public abstract Comparator getLongComparator( boolean reverseSort );
263
264  public abstract Comparator getStringComparator( boolean reverseSort );
265
266  public abstract String getHiddenTemporaryPath();
267  }