001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.Iterator;
025
026import cascading.CascadingException;
027import cascading.flow.FlowException;
028import cascading.flow.FlowNode;
029import cascading.flow.FlowSession;
030import cascading.flow.FlowStep;
031import cascading.flow.Flows;
032import cascading.flow.SliceCounters;
033import cascading.flow.hadoop.planner.HadoopFlowStepJob;
034import cascading.flow.hadoop.stream.graph.HadoopMapStreamGraph;
035import cascading.flow.hadoop.util.HadoopUtil;
036import cascading.flow.planner.BaseFlowNode;
037import cascading.flow.stream.duct.Duct;
038import cascading.flow.stream.element.ElementDuct;
039import cascading.flow.stream.element.SourceStage;
040import cascading.tap.Tap;
041import org.apache.hadoop.mapred.JobConf;
042import org.apache.hadoop.mapred.MapRunnable;
043import org.apache.hadoop.mapred.OutputCollector;
044import org.apache.hadoop.mapred.RecordReader;
045import org.apache.hadoop.mapred.Reporter;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import static cascading.flow.hadoop.util.HadoopMRUtil.readStateFromDistCache;
050import static cascading.flow.hadoop.util.HadoopUtil.deserializeBase64;
051import static cascading.util.LogUtil.logCounters;
052import static cascading.util.LogUtil.logMemory;
053
054/** Class FlowMapper is the Hadoop Mapper implementation. */
055public class FlowMapper implements MapRunnable
056  {
057  private static final Logger LOG = LoggerFactory.getLogger( FlowMapper.class );
058
059  private FlowNode flowNode;
060  private HadoopMapStreamGraph streamGraph;
061  private HadoopFlowProcess currentProcess;
062
063  /** Constructor FlowMapper creates a new FlowMapper instance. */
064  public FlowMapper()
065    {
066    }
067
068  @Override
069  public void configure( JobConf jobConf )
070    {
071    try
072      {
073      HadoopUtil.initLog4j( jobConf );
074
075      LOG.info( "cascading version: {}", jobConf.get( "cascading.version", "" ) );
076      LOG.info( "child jvm opts: {}", jobConf.get( "mapred.child.java.opts", "" ) );
077
078      currentProcess = new HadoopFlowProcess( new FlowSession(), jobConf, true );
079
080      String mapNodeState = jobConf.getRaw( "cascading.flow.step.node.map" );
081
082      if( mapNodeState == null )
083        mapNodeState = readStateFromDistCache( jobConf, jobConf.get( FlowStep.CASCADING_FLOW_STEP_ID ), "map" );
084
085      flowNode = deserializeBase64( mapNodeState, jobConf, BaseFlowNode.class );
086
087      LOG.info( "flow node id: {}, ordinal: {}", flowNode.getID(), flowNode.getOrdinal() );
088
089      Tap source = Flows.getTapForID( flowNode.getSourceTaps(), jobConf.get( "cascading.step.source" ) );
090
091      streamGraph = new HadoopMapStreamGraph( currentProcess, flowNode, source );
092
093      for( Duct head : streamGraph.getHeads() )
094        LOG.info( "sourcing from: " + ( (ElementDuct) head ).getFlowElement() );
095
096      for( Duct tail : streamGraph.getTails() )
097        LOG.info( "sinking to: " + ( (ElementDuct) tail ).getFlowElement() );
098
099      for( Tap trap : flowNode.getTraps() )
100        LOG.info( "trapping to: " + trap );
101
102      logMemory( LOG, "flow node id: " + flowNode.getID() + ", mem on start" );
103      }
104    catch( Throwable throwable )
105      {
106      reportIfLocal( throwable );
107
108      if( throwable instanceof CascadingException )
109        throw (CascadingException) throwable;
110
111      throw new FlowException( "internal error during mapper configuration", throwable );
112      }
113    }
114
115  @Override
116  public void run( RecordReader input, OutputCollector output, Reporter reporter ) throws IOException
117    {
118    currentProcess.setReporter( reporter );
119    currentProcess.setOutputCollector( output );
120
121    streamGraph.prepare();
122
123    long processBeginTime = System.currentTimeMillis();
124
125    currentProcess.increment( SliceCounters.Process_Begin_Time, processBeginTime );
126
127    SourceStage streamedHead = streamGraph.getStreamedHead();
128    Iterator<Duct> iterator = streamGraph.getHeads().iterator();
129
130    try
131      {
132      try
133        {
134        while( iterator.hasNext() )
135          {
136          Duct next = iterator.next();
137
138          if( next != streamedHead )
139            ( (SourceStage) next ).run( null );
140          }
141
142        streamedHead.run( input );
143        }
144      catch( OutOfMemoryError error )
145        {
146        throw error;
147        }
148      catch( IOException exception )
149        {
150        reportIfLocal( exception );
151        throw exception;
152        }
153      catch( Throwable throwable )
154        {
155        reportIfLocal( throwable );
156
157        if( throwable instanceof CascadingException )
158          throw (CascadingException) throwable;
159
160        throw new FlowException( "internal error during mapper execution", throwable );
161        }
162      }
163    finally
164      {
165      try
166        {
167        streamGraph.cleanup();
168        }
169      finally
170        {
171        long processEndTime = System.currentTimeMillis();
172
173        currentProcess.increment( SliceCounters.Process_End_Time, processEndTime );
174        currentProcess.increment( SliceCounters.Process_Duration, processEndTime - processBeginTime );
175
176        String message = "flow node id: " + flowNode.getID();
177        logMemory( LOG, message + ", mem on close" );
178        logCounters( LOG, message + ", counter:", currentProcess );
179        }
180      }
181    }
182
183  /**
184   * Report the error to HadoopFlowStepJob if we are running in Hadoops local mode.
185   *
186   * @param throwable The throwable that was thrown.
187   */
188  private void reportIfLocal( Throwable throwable )
189    {
190    if( HadoopUtil.isLocal( currentProcess.getJobConf() ) )
191      HadoopFlowStepJob.reportLocalError( throwable );
192    }
193  }