001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029
030import cascading.CascadingException;
031import cascading.flow.FlowElements;
032import cascading.flow.FlowException;
033import cascading.flow.FlowNode;
034import cascading.flow.FlowSession;
035import cascading.flow.SliceCounters;
036import cascading.flow.hadoop.util.HadoopUtil;
037import cascading.flow.planner.BaseFlowNode;
038import cascading.flow.stream.duct.Duct;
039import cascading.flow.stream.element.ElementDuct;
040import cascading.flow.stream.element.InputSource;
041import cascading.flow.tez.stream.graph.Hadoop2TezStreamGraph;
042import cascading.flow.tez.util.TezUtil;
043import cascading.tap.Tap;
044import cascading.util.Util;
045import org.apache.tez.common.TezUtils;
046import org.apache.tez.dag.api.TezConfiguration;
047import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
048import org.apache.tez.runtime.api.Event;
049import org.apache.tez.runtime.api.Input;
050import org.apache.tez.runtime.api.LogicalInput;
051import org.apache.tez.runtime.api.LogicalOutput;
052import org.apache.tez.runtime.api.ProcessorContext;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import static cascading.flow.hadoop.util.HadoopUtil.deserializeBase64;
057import static cascading.util.LogUtil.logCounters;
058import static cascading.util.LogUtil.logMemory;
059
060/**
061 *
062 */
063public class FlowProcessor extends AbstractLogicalIOProcessor
064  {
065  private static final Logger LOG = LoggerFactory.getLogger( FlowProcessor.class );
066
067  private TezConfiguration configuration;
068  private Hadoop2TezFlowProcess currentProcess;
069  private FlowNode flowNode;
070  private Hadoop2TezStreamGraph streamGraph;
071
072  public FlowProcessor( ProcessorContext context )
073    {
074    super( context );
075    }
076
077  @Override
078  public void initialize() throws Exception
079    {
080    configuration = new TezConfiguration( TezUtils.createConfFromUserPayload( getContext().getUserPayload() ) );
081
082    TezUtil.setMRProperties( getContext(), configuration, true );
083
084    try
085      {
086      HadoopUtil.initLog4j( configuration );
087
088      LOG.info( "cascading version: {}", configuration.get( "cascading.version", "" ) );
089
090      currentProcess = new Hadoop2TezFlowProcess( new FlowSession(), getContext(), configuration );
091
092      flowNode = deserializeBase64( configuration.getRaw( FlowNode.CASCADING_FLOW_NODE ), configuration, BaseFlowNode.class );
093
094      LOG.info( "flow node id: {}, ordinal: {}", flowNode.getID(), flowNode.getOrdinal() );
095
096      logMemory( LOG, "flow node id: " + flowNode.getID() + ", mem on start" );
097      }
098    catch( Throwable throwable )
099      {
100      if( throwable instanceof CascadingException )
101        throw (CascadingException) throwable;
102
103      throw new FlowException( "internal error during processor configuration", throwable );
104      }
105    }
106
107  @Override
108  public void run( Map<String, LogicalInput> inputMap, Map<String, LogicalOutput> outputMap ) throws Exception
109    {
110    Collection<Duct> allHeads;
111    InputSource streamedHead;
112
113    try
114      {
115      streamGraph = new Hadoop2TezStreamGraph( currentProcess, flowNode, inputMap, outputMap );
116
117      allHeads = streamGraph.getHeads();
118      streamedHead = streamGraph.getStreamedHead();
119
120      for( Duct head : allHeads )
121        LOG.info( "sourcing from: {} streamed: {}, id: {}", ( (ElementDuct) head ).getFlowElement(), head == streamedHead, FlowElements.id( ( (ElementDuct) head ).getFlowElement() ) );
122
123      for( Duct tail : streamGraph.getTails() )
124        LOG.info( "sinking to: {}, id: {}", ( (ElementDuct) tail ).getFlowElement(), FlowElements.id( ( (ElementDuct) tail ).getFlowElement() ) );
125
126      for( Tap trap : flowNode.getTraps() )
127        LOG.info( "trapping to: {}, id: {}", trap, FlowElements.id( trap ) );
128      }
129    catch( Throwable throwable )
130      {
131      if( throwable instanceof CascadingException )
132        throw (CascadingException) throwable;
133
134      throw new FlowException( "internal error during processor configuration", throwable );
135      }
136
137    streamGraph.prepare(); // starts inputs
138
139    // wait for shuffle
140    waitForInputsReady( inputMap );
141
142    // user code begins executing from here
143    long processBeginTime = System.currentTimeMillis();
144
145    currentProcess.increment( SliceCounters.Process_Begin_Time, processBeginTime );
146
147    Iterator<Duct> iterator = allHeads.iterator();
148
149    try
150      {
151      try
152        {
153        while( iterator.hasNext() )
154          {
155          Duct next = iterator.next();
156
157          if( next != streamedHead )
158            {
159            ( (InputSource) next ).run( null );
160
161            logMemory( LOG, "mem after accumulating source: " + ( (ElementDuct) next ).getFlowElement() + ", " );
162            }
163          }
164
165        streamedHead.run( null );
166        }
167      catch( OutOfMemoryError | IOException error )
168        {
169        throw error;
170        }
171      catch( Throwable throwable )
172        {
173        if( throwable instanceof CascadingException )
174          throw (CascadingException) throwable;
175
176        throw new FlowException( "internal error during processor execution on node: " + flowNode.getOrdinal(), throwable );
177        }
178      }
179    finally
180      {
181      try
182        {
183        streamGraph.cleanup();
184        }
185      finally
186        {
187        long processEndTime = System.currentTimeMillis();
188        currentProcess.increment( SliceCounters.Process_End_Time, processEndTime );
189        currentProcess.increment( SliceCounters.Process_Duration, processEndTime - processBeginTime );
190        }
191      }
192    }
193
194  protected void waitForInputsReady( Map<String, LogicalInput> inputMap ) throws InterruptedException
195    {
196    long beginInputReady = System.currentTimeMillis();
197
198    HashSet<Input> inputs = new HashSet<Input>( inputMap.values() );
199
200    getContext().waitForAllInputsReady( inputs );
201
202    LOG.info( "flow node id: {}, all {} inputs ready in: {}", flowNode.getID(), inputs.size(), Util.formatDurationHMSms( System.currentTimeMillis() - beginInputReady ) );
203    }
204
205  @Override
206  public void handleEvents( List<Event> events )
207    {
208    LOG.debug( "in events" );
209    }
210
211  @Override
212  public void close() throws Exception
213    {
214    String message = "flow node id: " + flowNode.getID();
215    logMemory( LOG, message + ", mem on close" );
216    logCounters( LOG, message + ", counter:", currentProcess );
217    }
218  }