001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.local.planner;
022
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.Future;
031
032import cascading.flow.FlowNode;
033import cascading.flow.FlowProcess;
034import cascading.flow.local.LocalFlowStep;
035import cascading.flow.local.stream.graph.LocalStepStreamGraph;
036import cascading.flow.stream.duct.Duct;
037import cascading.flow.stream.graph.StreamGraph;
038import cascading.util.Util;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import static cascading.util.LogUtil.logCounters;
043import static cascading.util.LogUtil.logMemory;
044
045/**
046 *
047 */
048public class LocalStepRunner implements Callable<Throwable>
049  {
050  private static final Logger LOG = LoggerFactory.getLogger( LocalStepRunner.class );
051
052  private final FlowProcess<Properties> currentProcess;
053
054  private boolean complete = false;
055  private boolean successful = false;
056
057  private final FlowNode flowNode;
058  private final StreamGraph streamGraph;
059  private final Collection<Duct> heads;
060  private Throwable throwable = null;
061
062  public LocalStepRunner( FlowProcess<Properties> flowProcess, LocalFlowStep step )
063    {
064    this.currentProcess = flowProcess;
065    this.flowNode = Util.getFirst( step.getFlowNodeGraph().vertexSet() );
066    this.streamGraph = new LocalStepStreamGraph( this.currentProcess, step, flowNode );
067    this.heads = streamGraph.getHeads();
068    }
069
070  public FlowProcess<Properties> getFlowProcess()
071    {
072    return currentProcess;
073    }
074
075  public boolean isComplete()
076    {
077    return complete;
078    }
079
080  public boolean isSuccessful()
081    {
082    return successful;
083    }
084
085  public Throwable getThrowable()
086    {
087    return throwable;
088    }
089
090  @Override
091  public Throwable call() throws Exception
092    {
093    boolean attemptedCleanup = false;
094
095    try
096      {
097      try
098        {
099        streamGraph.prepare();
100
101        logMemory( LOG, "flow node id: " + flowNode.getID() + ", mem on start" );
102        }
103      catch( Throwable currentThrowable )
104        {
105        if( !( currentThrowable instanceof OutOfMemoryError ) )
106          LOG.error( "unable to prepare operation graph", currentThrowable );
107
108        complete = true;
109        successful = false;
110        throwable = currentThrowable;
111
112        return throwable;
113        }
114
115      try
116        {
117        List<Future<Throwable>> futures = spawnHeads();
118
119        for( Future<Throwable> future : futures )
120          {
121          throwable = future.get();
122
123          if( throwable != null )
124            break;
125          }
126        }
127      catch( Throwable currentThrowable )
128        {
129        if( !( currentThrowable instanceof OutOfMemoryError ) )
130          LOG.error( "unable to complete step", currentThrowable );
131
132        throwable = currentThrowable;
133        }
134
135      try
136        {
137        attemptedCleanup = true; // set so we don't try again regardless
138
139        if( !( throwable instanceof OutOfMemoryError ) )
140          streamGraph.cleanup();
141        }
142      catch( Throwable currentThrowable )
143        {
144        if( !( currentThrowable instanceof OutOfMemoryError ) )
145          LOG.error( "unable to cleanup operation graph", currentThrowable );
146
147        if( throwable == null )
148          throwable = currentThrowable;
149        }
150
151      complete = true;
152      successful = throwable == null;
153
154      return throwable;
155      }
156    finally
157      {
158      try
159        {
160        if( !attemptedCleanup )
161          streamGraph.cleanup();
162        }
163      catch( Throwable currentThrowable )
164        {
165        if( !( currentThrowable instanceof OutOfMemoryError ) )
166          LOG.error( "unable to cleanup operation graph", currentThrowable );
167
168        if( throwable == null )
169          throwable = currentThrowable;
170
171        successful = false;
172        }
173
174      String message = "flow node id: " + flowNode.getID();
175      logMemory( LOG, message + ", mem on close" );
176      logCounters( LOG, message + ", counter:", currentProcess );
177      }
178    }
179
180  private List<Future<Throwable>> spawnHeads()
181    {
182    // todo: consider a CyclicBarrier to syn all threads after the openForRead
183    // todo: should find all Callable Ducts and spawn them, group ducts may run on a timer etc
184    ExecutorService executors = Executors.newFixedThreadPool( heads.size() );
185    List<Future<Throwable>> futures = new ArrayList<Future<Throwable>>();
186
187    for( Duct head : heads )
188      futures.add( executors.submit( (Callable) head ) );
189
190    executors.shutdown();
191
192    return futures;
193    }
194  }