001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.util.Map;
024    import java.util.Properties;
025    
026    import cascading.property.Props;
027    
028    /**
029     * Class HfsProps is a fluent helper for setting various Hadoop FS level properties that some
030     * {@link cascading.flow.Flow} may or may not be required to have set. These properties are typically passed to a Flow
031     * via a {@link cascading.flow.FlowConnector}.
032     */
033    public class HfsProps extends Props
034      {
035      /** Field TEMPORARY_DIRECTORY */
036      public static final String TEMPORARY_DIRECTORY = "cascading.tmp.dir";
037      /** Fields LOCAL_MODE_SCHEME * */
038      public static final String LOCAL_MODE_SCHEME = "cascading.hadoop.localmode.scheme";
039      /** Field COMBINE_INPUT_FILES */
040      public static final String COMBINE_INPUT_FILES = "cascading.hadoop.hfs.combine.files";
041      /** Field COMBINE_INPUT_FILES_SAFEMODE */
042      public static final String COMBINE_INPUT_FILES_SAFE_MODE = "cascading.hadoop.hfs.combine.safemode";
043      /** Field COMBINE_INPUT_FILES_SIZE_MAX */
044      public static final String COMBINE_INPUT_FILES_SIZE_MAX = "cascading.hadoop.hfs.combine.max.size";
045    
046      protected String temporaryDirectory;
047      protected String localModeScheme;
048      protected Boolean useCombinedInput;
049      protected Long combinedInputMaxSize;
050      protected Boolean combinedInputSafeMode;
051    
052      /**
053       * Method setTemporaryDirectory sets the temporary directory on the given properties object.
054       *
055       * @param properties         of type Map<Object,Object>
056       * @param temporaryDirectory of type String
057       */
058      public static void setTemporaryDirectory( Map<Object, Object> properties, String temporaryDirectory )
059        {
060        properties.put( TEMPORARY_DIRECTORY, temporaryDirectory );
061        }
062    
063      /**
064       * Method setLocalModeScheme provides a means to change the scheme value used to detect when a
065       * MapReduce job should be run in Hadoop local mode. By default the value is {@code "file"}, set to
066       * {@code "none"} to disable entirely.
067       *
068       * @param properties of type Map<Object,Object>
069       * @param scheme     a String
070       */
071      public static void setLocalModeScheme( Map<Object, Object> properties, String scheme )
072        {
073        properties.put( LOCAL_MODE_SCHEME, scheme );
074        }
075    
076      /**
077       * Method setUseCombinedInput provides a means to indicate whether to leverage
078       * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} for the input format. By default it is false.
079       * <p/>
080       * Use {@link #setCombinedInputMaxSize(long)} to set the max split/combined input size. Other specific
081       * properties must be specified directly if needed. Specifically "mapred.min.split.size.per.node" and
082       * "mapred.min.split.size.per.rack", which are 0 by default.
083       *
084       * @param properties of type Map<Object,Object>
085       * @param combine    a boolean
086       */
087      public static void setUseCombinedInput( Map<Object, Object> properties, Boolean combine )
088        {
089        if( combine != null )
090          properties.put( COMBINE_INPUT_FILES, Boolean.toString( combine ) );
091        }
092    
093      /**
094       * Method setUseCombinedInputSafeMode toggles safe mode when using
095       * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat}. Safe mode will throw an exception if the underlying
096       * InputFormat is not of type {@link org.apache.hadoop.mapred.FileInputFormat}. If safeMode is off a warning will
097       * be logged instead. safeMode is on by default.
098       * <p/>
099       * Setting this property when not setting {@link #setUseCombinedInput(boolean)} to true has no effect.
100       *
101       * @param properties of type Map<Object,Object>
102       * @param safeMode   a boolean
103       */
104      public static void setUseCombinedInputSafeMode( Map<Object, Object> properties, Boolean safeMode )
105        {
106        if( safeMode != null )
107          properties.put( COMBINE_INPUT_FILES_SAFE_MODE, Boolean.toString( safeMode ) );
108        }
109    
110      /**
111       * Method setCombinedInputMaxSize sets the maximum input split size to be used.
112       * <p/>
113       * This property is an alias for the Hadoop property "mapred.max.split.size".
114       *
115       * @param properties of type Map<Object,Object>
116       * @param size       of type long
117       */
118      public static void setCombinedInputMaxSize( Map<Object, Object> properties, Long size )
119        {
120        if( size != null )
121          properties.put( COMBINE_INPUT_FILES_SIZE_MAX, Long.toString( size ) );
122        }
123    
124      /**
125       * Creates a new HfsProps instance.
126       *
127       * @return HfsProps instance
128       */
129      public static HfsProps hfsProps()
130        {
131        return new HfsProps();
132        }
133    
134      public HfsProps()
135        {
136        }
137    
138      public String getTemporaryDirectory()
139        {
140        return temporaryDirectory;
141        }
142    
143      /**
144       * Method setTemporaryDirectory sets the temporary directory for use on the underlying filesystem.
145       *
146       * @param temporaryDirectory of type String
147       * @return returns this instance
148       */
149      public HfsProps setTemporaryDirectory( String temporaryDirectory )
150        {
151        this.temporaryDirectory = temporaryDirectory;
152    
153        return this;
154        }
155    
156      public String getLocalModeScheme()
157        {
158        return localModeScheme;
159        }
160    
161      /**
162       * Method setLocalModeScheme provides a means to change the scheme value used to detect when a
163       * MapReduce job should be run in Hadoop local mode. By default the value is {@code "file"}, set to
164       * {@code "none"} to disable entirely.
165       *
166       * @param localModeScheme of type String
167       * @return returns this instance
168       */
169      public HfsProps setLocalModeScheme( String localModeScheme )
170        {
171        this.localModeScheme = localModeScheme;
172    
173        return this;
174        }
175    
176      public boolean isUseCombinedInput()
177        {
178        return useCombinedInput;
179        }
180    
181      /**
182       * Method setUseCombinedInput provides a means to indicate whether to leverage
183       * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} for the input format. By default it is false.
184       *
185       * @param useCombinedInput boolean
186       * @return returns this instance
187       */
188      public HfsProps setUseCombinedInput( boolean useCombinedInput )
189        {
190        this.useCombinedInput = useCombinedInput;
191    
192        return this;
193        }
194    
195      public Long getCombinedInputMaxSize()
196        {
197        return combinedInputMaxSize;
198        }
199    
200      /**
201       * Method setCombinedInputMaxSize sets the maximum input split size to be used.
202       * <p/>
203       * This value is not honored unless {@link #setUseCombinedInput(boolean)} is {@code true}.
204       *
205       * @param combinedInputMaxSize of type long
206       * @return returns this instance
207       */
208      public HfsProps setCombinedInputMaxSize( long combinedInputMaxSize )
209        {
210        this.combinedInputMaxSize = combinedInputMaxSize;
211    
212        return this;
213        }
214    
215      public boolean isUseCombinedInputSafeMode()
216        {
217        return combinedInputSafeMode;
218        }
219    
220      /**
221       * Method setUseCombinedInputSafeMode toggles safe mode when using
222       * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat}. Safe mode will throw an exception if the underlying
223       * InputFormat is not of type {@link org.apache.hadoop.mapred.FileInputFormat}. If safeMode is off a warning will
224       * be logged instead. safeMode is on by default.
225       * <p/>
226       * Setting this property when not setting {@link #setUseCombinedInput(boolean)} to true has no effect.
227       *
228       * @param combinedInputSafeMode boolean
229       * @return returns this instance
230       */
231      public HfsProps setUseCombinedInputSafeMode( boolean combinedInputSafeMode )
232        {
233        this.combinedInputSafeMode = combinedInputSafeMode;
234    
235        return this;
236        }
237    
238      @Override
239      protected void addPropertiesTo( Properties properties )
240        {
241        setTemporaryDirectory( properties, temporaryDirectory );
242        setLocalModeScheme( properties, localModeScheme );
243        setUseCombinedInput( properties, useCombinedInput );
244        setCombinedInputMaxSize( properties, combinedInputMaxSize );
245        setUseCombinedInputSafeMode( properties, combinedInputSafeMode );
246        }
247      }