001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.CascadingException;
031import cascading.flow.FlowProcess;
032import cascading.flow.FlowSession;
033import cascading.flow.hadoop.MapRed;
034import cascading.flow.hadoop.util.HadoopUtil;
035import cascading.tap.Tap;
036import cascading.tuple.TupleEntryCollector;
037import cascading.tuple.TupleEntryIterator;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.mapred.Reporter;
040import org.apache.hadoop.util.ReflectionUtils;
041import org.apache.tez.dag.api.TezConfiguration;
042import org.apache.tez.mapreduce.processor.MRTaskReporter;
043import org.apache.tez.runtime.api.ProcessorContext;
044import org.apache.tez.runtime.api.Writer;
045
046/**
047 * Class HadoopFlowProcess is an implementation of {@link cascading.flow.FlowProcess} for Hadoop. Use this interface to get direct
048 * access to the Hadoop JobConf and Reporter interfaces.
049 * <p/>
050 * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to
051 * fail if they are executed on a system other than Hadoop.
052 *
053 * @see cascading.flow.FlowSession
054 */
055public class Hadoop2TezFlowProcess extends FlowProcess<TezConfiguration> implements MapRed
056  {
057  /** Field jobConf */
058  final TezConfiguration configuration;
059  private ProcessorContext context;
060  private Writer writer;
061
062  public Hadoop2TezFlowProcess()
063    {
064    this.configuration = new TezConfiguration();
065    }
066
067  public Hadoop2TezFlowProcess( TezConfiguration configuration )
068    {
069    this.configuration = configuration;
070    }
071
072  public Hadoop2TezFlowProcess( FlowSession flowSession, ProcessorContext context, TezConfiguration configuration )
073    {
074    super( flowSession );
075    this.context = context;
076    this.configuration = configuration;
077    }
078
079  public Hadoop2TezFlowProcess( Hadoop2TezFlowProcess flowProcess, TezConfiguration configuration )
080    {
081    super( flowProcess );
082    this.context = flowProcess.context;
083    this.configuration = configuration;
084    }
085
086  public ProcessorContext getContext()
087    {
088    return context;
089    }
090
091  public void setWriter( Writer writer )
092    {
093    this.writer = writer;
094    }
095
096  @Override
097  public FlowProcess copyWith( TezConfiguration configuration )
098    {
099    return new Hadoop2TezFlowProcess( this, configuration );
100    }
101
102  /**
103   * Method getJobConf returns the jobConf of this HadoopFlowProcess object.
104   *
105   * @return the jobConf (type JobConf) of this HadoopFlowProcess object.
106   */
107  public TezConfiguration getConfiguration()
108    {
109    return configuration;
110    }
111
112  @Override
113  public TezConfiguration getConfig()
114    {
115    return configuration;
116    }
117
118  @Override
119  public TezConfiguration getConfigCopy()
120    {
121    return new TezConfiguration( configuration );
122    }
123
124  /**
125   * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task.
126   *
127   * @return int
128   */
129  @Override
130  public int getCurrentSliceNum()
131    {
132    return getConfiguration().getInt( "mapred.task.partition", 0 ); // TODO: likely incorrect
133    }
134
135  @Override
136  public int getNumProcessSlices()
137    {
138    return 0;
139    }
140
141  /**
142   * Method getReporter returns the reporter of this HadoopFlowProcess object.
143   *
144   * @return the reporter (type Reporter) of this HadoopFlowProcess object.
145   */
146  @Override
147  public Reporter getReporter()
148    {
149    if( context == null )
150      return Reporter.NULL;
151
152    return new MRTaskReporter( context );
153    }
154
155  @Override
156  public Object getProperty( String key )
157    {
158    return configuration.get( key );
159    }
160
161  @Override
162  public Collection<String> getPropertyKeys()
163    {
164    Set<String> keys = new HashSet<String>();
165
166    for( Map.Entry<String, String> entry : configuration )
167      keys.add( entry.getKey() );
168
169    return Collections.unmodifiableSet( keys );
170    }
171
172  @Override
173  public Object newInstance( String className )
174    {
175    if( className == null || className.isEmpty() )
176      return null;
177
178    try
179      {
180      Class type = (Class) Hadoop2TezFlowProcess.class.getClassLoader().loadClass( className.toString() );
181
182      return ReflectionUtils.newInstance( type, configuration );
183      }
184    catch( ClassNotFoundException exception )
185      {
186      throw new CascadingException( "unable to load class: " + className.toString(), exception );
187      }
188    }
189
190  @Override
191  public void keepAlive()
192    {
193    // unsupported
194    }
195
196  @Override
197  public void increment( Enum counter, long amount )
198    {
199    if( context != null )
200      context.getCounters().findCounter( counter ).increment( amount );
201    }
202
203  @Override
204  public void increment( String group, String counter, long amount )
205    {
206    if( context != null )
207      context.getCounters().findCounter( group, counter ).increment( amount );
208    }
209
210  @Override
211  public long getCounterValue( Enum counter )
212    {
213    if( context == null )
214      return 0;
215
216    return context.getCounters().findCounter( counter ).getValue();
217    }
218
219  @Override
220  public long getCounterValue( String group, String counter )
221    {
222    if( context == null )
223      return 0;
224
225    return context.getCounters().findCounter( group, counter ).getValue();
226    }
227
228  @Override
229  public void setStatus( String status )
230    {
231    // unsupported
232    }
233
234  @Override
235  public boolean isCounterStatusInitialized()
236    {
237    if( context == null )
238      return false;
239
240    return context.getCounters() != null;
241    }
242
243  @Override
244  public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
245    {
246    return tap.openForRead( this );
247    }
248
249  @Override
250  public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
251    {
252    return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
253    }
254
255  @Override
256  public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
257    {
258    TezConfiguration jobConf = new TezConfiguration( getConfiguration() );
259
260    int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 );
261    int nodeNum = jobConf.getInt( "cascading.flow.node.num", 0 );
262
263    String partname = String.format( "-%05d-%05d-", stepNum, nodeNum );
264
265    jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" );
266
267    return trap.openForWrite( new Hadoop2TezFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks
268    }
269
270  @Override
271  public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
272    {
273    return null;
274/*
275    return new TupleEntryCollector( Fields.size( 2 ) )
276    {
277    @Override
278    protected void collect( TupleEntry tupleEntry )
279      {
280      try
281        {
282        getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) );
283        }
284      catch( IOException exception )
285        {
286        throw new CascadingException( "failed collecting key and value", exception );
287        }
288      }
289    };
290*/
291    }
292
293  @Override
294  public <C> C copyConfig( C config )
295    {
296    return HadoopUtil.copyJobConf( config );
297    }
298
299  @Override
300  public <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig )
301    {
302    return HadoopUtil.getConfig( (Configuration) defaultConfig, (Configuration) updatedConfig );
303    }
304
305  @Override
306  public TezConfiguration mergeMapIntoConfig( TezConfiguration defaultConfig, Map<String, String> map )
307    {
308    return HadoopUtil.mergeConf( new TezConfiguration( defaultConfig ), map, true );
309    }
310  }