001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading;
022    
023    import java.io.IOException;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import cascading.flow.FlowConnectorProps;
030    import cascading.operation.DebugLevel;
031    import cascading.platform.PlatformRunner;
032    import cascading.platform.TestPlatform;
033    import cascading.util.Util;
034    import org.junit.After;
035    import org.junit.Before;
036    import org.junit.Rule;
037    import org.junit.rules.TestName;
038    import org.junit.runner.RunWith;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    
042    /**
043     * PlatformTestCase is the base class for JUnit tests that are platform agnostic. That is using the {@link TestPlatform}
044     * interface each test can be run against all supported platform like Hadoop or Cascading local mode.
045     * <p/>
046     * It is strongly recommended users look at the source of {@link FieldedPipesPlatformTest} or related tests to see how
047     * this class is used.
048     * <p/>
049     * This test case uses the {@link PlatformRunner} to inject the available platform providers which implement the
050     * TestPlatform base class.
051     * <p/>
052     * By default the PlatformRunner looks for "cascading/platform/platform.properties" file in the classpath, and
053     * instantiates the class specified by the "platform.classname" property. If more than one "platform.properties"
054     * resource is found, each class is instantiated and the whole suite of tests will be run against each instance.
055     * <p/>
056     * To limit this, setting the system property "platform.includes" to list the platform names that should be run will
057     * cause the PlatformRunner to ignore any unlisted platforms. Thus setting {@code platform.includes=local}, only
058     * local mode will run even if the "hadoop" platform was found in the classpath.
059     * <p/>
060     * To pass custom properties to each test to be used by the {@link cascading.flow.FlowConnector}, create
061     * system properties prefixed by "platform.". These properties, minus the "platform." prefix in the property name,
062     * will override any defaults.
063     * <p/>
064     * Subclasses of PlatformTestCase can set "{@code useCluster} to {@code true} on the constructor if the underlying
065     * platform can boot a cluster for testing. By setting the system property "test.cluster.enabled" to false, this
066     * can be deactivated in order to temporarily speed test execution. By default {@code useCluster} is {@code false},
067     * typically user tests don't need to have a cluster running to test their functionality so leaving the default is
068     * reasonable.
069     */
070    @RunWith(PlatformRunner.class)
071    public class PlatformTestCase extends CascadingTestCase
072      {
073      private static final Logger LOG = LoggerFactory.getLogger( PlatformTestCase.class );
074    
075      public static final String ROOT_OUTPUT_PATH = "test.output.root";
076      static Set<String> allPaths = new HashSet<String>();
077    
078      private String rootPath;
079      Set<String> currentPaths = new HashSet<String>();
080    
081      @Rule
082      public transient TestName name = new TestName();
083    
084      private transient TestPlatform platform = null;
085    
086      private transient boolean useCluster;
087      private transient int numMapTasks;
088      private transient int numReduceTasks;
089    
090      public PlatformTestCase( boolean useCluster )
091        {
092        this.useCluster = useCluster;
093        }
094    
095      public PlatformTestCase( boolean useCluster, int numMapTasks, int numReduceTasks )
096        {
097        this( useCluster );
098        this.numMapTasks = numMapTasks;
099        this.numReduceTasks = numReduceTasks;
100        }
101    
102      public PlatformTestCase()
103        {
104        this( false );
105        }
106    
107      public void installPlatform( TestPlatform platform )
108        {
109        this.platform = platform;
110        this.platform.setUseCluster( useCluster );
111    
112        if( this.platform.isMapReduce() )
113          {
114          platform.setNumMappers( numMapTasks );
115          platform.setNumReducers( numReduceTasks );
116          }
117        }
118    
119      public TestPlatform getPlatform()
120        {
121        return platform;
122        }
123    
124      public String getTestName()
125        {
126        return name.getMethodName();
127        }
128    
129      protected String getRootPath()
130        {
131        if( rootPath == null )
132          rootPath = Util.join( getPathElements(), "/" );
133    
134        return rootPath;
135        }
136    
137      protected String[] getPathElements()
138        {
139        return new String[]{getTestRoot(), getPlatformName(), getTestCaseName()};
140        }
141    
142      public String getOutputPath( String path )
143        {
144        String result = makeOutputPath( path );
145    
146        if( allPaths.contains( result ) )
147          throw new IllegalStateException( "path already has been used:" + result );
148    
149        allPaths.add( result );
150        currentPaths.add( result );
151    
152        return result;
153        }
154    
155      private String makeOutputPath( String path )
156        {
157        if( path.startsWith( "/" ) )
158          return getRootPath() + path;
159    
160        return getRootPath() + "/" + path;
161        }
162    
163      public String getTestCaseName()
164        {
165        return getClass().getSimpleName().replaceAll( "^(.*)Test.*$", "$1" ).toLowerCase();
166        }
167    
168      public String getPlatformName()
169        {
170        return platform.getName();
171        }
172    
173      public static String getTestRoot()
174        {
175        return System.getProperty( ROOT_OUTPUT_PATH, "build/test/output" ).replace( ":", "_" );
176        }
177    
178      @Before
179      public void setUp() throws Exception
180        {
181        getPlatform().setUp();
182        }
183    
184      public Map<Object, Object> getProperties()
185        {
186        return new HashMap<Object, Object>( getPlatform().getProperties() );
187        }
188    
189      protected void copyFromLocal( String inputFile ) throws IOException
190        {
191        getPlatform().copyFromLocal( inputFile );
192        }
193    
194      protected Map<Object, Object> disableDebug()
195        {
196        Map<Object, Object> properties = getProperties();
197        FlowConnectorProps.setDebugLevel( properties, DebugLevel.NONE );
198    
199        return properties;
200        }
201    
202      @After
203      public void tearDown() throws Exception
204        {
205        try
206          {
207          for( String path : currentPaths )
208            {
209            LOG.info( "copying to local {}", path );
210    
211            if( getPlatform().isUseCluster() && getPlatform().remoteExists( path ) )
212              getPlatform().copyToLocal( path );
213            }
214    
215          currentPaths.clear();
216          }
217        finally
218          {
219          getPlatform().tearDown();
220          }
221        }
222      }