001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.local.planner; 022 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031 032import cascading.flow.FlowNode; 033import cascading.flow.FlowProcess; 034import cascading.flow.local.LocalFlowStep; 035import cascading.flow.local.stream.graph.LocalStepStreamGraph; 036import cascading.flow.stream.duct.Duct; 037import cascading.flow.stream.graph.StreamGraph; 038import cascading.util.Util; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import static cascading.util.LogUtil.logCounters; 043import static cascading.util.LogUtil.logMemory; 044 045/** 046 * 047 */ 048public class LocalStepRunner implements Callable<Throwable> 049 { 050 private static final Logger LOG = LoggerFactory.getLogger( LocalStepRunner.class ); 051 052 private final FlowProcess<Properties> currentProcess; 053 054 private boolean complete = false; 055 private boolean successful = false; 056 057 private final FlowNode flowNode; 058 private final StreamGraph streamGraph; 059 private final Collection<Duct> heads; 060 private Throwable throwable = null; 061 062 public LocalStepRunner( FlowProcess<Properties> flowProcess, LocalFlowStep step ) 063 { 064 this.currentProcess = flowProcess; 065 this.flowNode = Util.getFirst( step.getFlowNodeGraph().vertexSet() ); 066 this.streamGraph = new LocalStepStreamGraph( this.currentProcess, step, flowNode ); 067 this.heads = streamGraph.getHeads(); 068 } 069 070 public FlowProcess<Properties> getFlowProcess() 071 { 072 return currentProcess; 073 } 074 075 public boolean isComplete() 076 { 077 return complete; 078 } 079 080 public boolean isSuccessful() 081 { 082 return successful; 083 } 084 085 public Throwable getThrowable() 086 { 087 return throwable; 088 } 089 090 @Override 091 public Throwable call() throws Exception 092 { 093 boolean attemptedCleanup = false; 094 095 try 096 { 097 try 098 { 099 streamGraph.prepare(); 100 101 logMemory( LOG, "flow node id: " + flowNode.getID() + ", mem on start" ); 102 } 103 catch( Throwable currentThrowable ) 104 { 105 if( !( currentThrowable instanceof OutOfMemoryError ) ) 106 LOG.error( "unable to prepare operation graph", currentThrowable ); 107 108 complete = true; 109 successful = false; 110 throwable = currentThrowable; 111 112 return throwable; 113 } 114 115 try 116 { 117 List<Future<Throwable>> futures = spawnHeads(); 118 119 for( Future<Throwable> future : futures ) 120 { 121 throwable = future.get(); 122 123 if( throwable != null ) 124 break; 125 } 126 } 127 catch( Throwable currentThrowable ) 128 { 129 if( !( currentThrowable instanceof OutOfMemoryError ) ) 130 LOG.error( "unable to complete step", currentThrowable ); 131 132 throwable = currentThrowable; 133 } 134 135 try 136 { 137 attemptedCleanup = true; // set so we don't try again regardless 138 139 if( !( throwable instanceof OutOfMemoryError ) ) 140 streamGraph.cleanup(); 141 } 142 catch( Throwable currentThrowable ) 143 { 144 if( !( currentThrowable instanceof OutOfMemoryError ) ) 145 LOG.error( "unable to cleanup operation graph", currentThrowable ); 146 147 if( throwable == null ) 148 throwable = currentThrowable; 149 } 150 151 complete = true; 152 successful = throwable == null; 153 154 return throwable; 155 } 156 finally 157 { 158 try 159 { 160 if( !attemptedCleanup ) 161 streamGraph.cleanup(); 162 } 163 catch( Throwable currentThrowable ) 164 { 165 if( !( currentThrowable instanceof OutOfMemoryError ) ) 166 LOG.error( "unable to cleanup operation graph", currentThrowable ); 167 168 if( throwable == null ) 169 throwable = currentThrowable; 170 171 successful = false; 172 } 173 174 String message = "flow node id: " + flowNode.getID(); 175 logMemory( LOG, message + ", mem on close" ); 176 logCounters( LOG, message + ", counter:", currentProcess ); 177 } 178 } 179 180 private List<Future<Throwable>> spawnHeads() 181 { 182 // todo: consider a CyclicBarrier to syn all threads after the openForRead 183 // todo: should find all Callable Ducts and spawn them, group ducts may run on a timer etc 184 ExecutorService executors = Executors.newFixedThreadPool( heads.size() ); 185 List<Future<Throwable>> futures = new ArrayList<Future<Throwable>>(); 186 187 for( Duct head : heads ) 188 futures.add( executors.submit( (Callable) head ) ); 189 190 executors.shutdown(); 191 192 return futures; 193 } 194 }