001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.rule;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.Comparator;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Set;
029import java.util.concurrent.Callable;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorCompletionService;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.concurrent.TimeUnit;
036
037import cascading.flow.Flow;
038import cascading.flow.FlowDef;
039import cascading.flow.planner.FlowPlanner;
040import cascading.flow.planner.PlannerContext;
041import cascading.flow.planner.PlannerException;
042import cascading.flow.planner.graph.FlowElementGraph;
043import cascading.flow.planner.rule.util.TraceWriter;
044import cascading.util.ProcessLogger;
045import cascading.util.Util;
046
047import static cascading.util.Util.formatDurationFromMillis;
048import static java.util.Collections.synchronizedList;
049import static java.util.Collections.synchronizedSet;
050
051public class RuleSetExec
052  {
053  public static final int MAX_CONCURRENT_PLANNERS = 5;
054  public static final int DEFAULT_TIMEOUT = 10 * 60;
055  public static final Comparator<RuleResult> DEFAULT_PLAN_COMPARATOR = new Comparator<RuleResult>()
056  {
057  @Override
058  public int compare( RuleResult lhs, RuleResult rhs )
059    {
060    int c = lhs.getNumSteps() - rhs.getNumSteps();
061
062    if( c != 0 )
063      return c;
064
065    return lhs.getNumNodes() - rhs.getNumNodes();
066    }
067
068  @Override
069  public String toString()
070    {
071    return "default comparator: selects plan with fewest steps and fewest nodes";
072    }
073  };
074
075  private TraceWriter traceWriter;
076  private FlowPlanner flowPlanner;
077  private Flow flow;
078  private RuleRegistrySet registrySet;
079  private FlowDef flowDef;
080  private FlowElementGraph flowElementGraph;
081
082  Set<Callable> running;
083  List<RuleResult> success;
084  List<RuleResult> unsupported;
085  List<RuleResult> illegal;
086  List<RuleResult> interrupted;
087
088  public RuleSetExec( TraceWriter traceWriter, FlowPlanner flowPlanner, Flow flow, RuleRegistrySet registrySet, FlowDef flowDef, FlowElementGraph flowElementGraph )
089    {
090    this.traceWriter = traceWriter;
091    this.flowPlanner = flowPlanner;
092    this.flow = flow;
093    this.registrySet = registrySet;
094    this.flowDef = flowDef;
095    this.flowElementGraph = flowElementGraph;
096    }
097
098  protected ProcessLogger getFlowLogger()
099    {
100    return (ProcessLogger) flow;
101    }
102
103  protected Comparator<RuleResult> getPlanComparator()
104    {
105    if( registrySet.getPlanComparator() != null )
106      return registrySet.getPlanComparator();
107
108    return DEFAULT_PLAN_COMPARATOR;
109    }
110
111  protected Comparator<RuleResult> getOrderComparator()
112    {
113    return new Comparator<RuleResult>()
114    {
115    @Override
116    public int compare( RuleResult lhs, RuleResult rhs )
117      {
118      // preserver order of preference from rule registry if all things are equal
119      return registrySet.indexOf( lhs.getRegistry() ) - registrySet.indexOf( rhs.getRegistry() );
120      }
121    };
122    }
123
124  public RuleResult exec()
125    {
126    running = synchronizedSet( new HashSet<Callable>() );
127    success = synchronizedList( new ArrayList<RuleResult>() );
128    unsupported = synchronizedList( new ArrayList<RuleResult>() );
129    illegal = synchronizedList( new ArrayList<RuleResult>() );
130    interrupted = synchronizedList( new ArrayList<RuleResult>() );
131
132    List<Callable<RuleResult>> callables = createCallables();
133
134    submitCallables( callables );
135
136    notifyUnsupported();
137    notifyIllegal();
138    notifyInterrupted();
139
140    return selectSuccess();
141    }
142
143  protected RuleResult execPlannerFor( RuleRegistry ruleRegistry )
144    {
145    flowPlanner.configRuleRegistryDefaults( ruleRegistry );
146
147    String registryName = ruleRegistry.getName();
148
149    RuleExec ruleExec = new RuleExec( traceWriter, ruleRegistry );
150
151    PlannerContext plannerContext = new PlannerContext( ruleRegistry, flowPlanner, flowDef, flow, traceWriter.isTransformTraceEnabled() );
152
153    RuleResult ruleResult = ruleExec.exec( plannerContext, flowElementGraph );
154
155    getFlowLogger().logInfo( "executed rule registry: {}, completed as: {}, in: {}", registryName, ruleResult.getResultStatus(), formatDurationFromMillis( ruleResult.getDuration() ) );
156
157    traceWriter.writeTracePlan( registryName, "completed-flow-element-graph", ruleResult.getAssemblyGraph() );
158    traceWriter.writeStats( plannerContext, ruleResult );
159
160    Exception plannerException;
161
162    if( ruleResult.isSuccess() )
163      plannerException = flowPlanner.verifyResult( ruleResult );
164    else
165      plannerException = ruleResult.getPlannerException(); // will be re-thrown below
166
167    if( plannerException != null && plannerException instanceof PlannerException && ( (PlannerException) plannerException ).getElementGraph() != null )
168      traceWriter.writeTracePlan( registryName, "failed-source-element-graph", ( (PlannerException) plannerException ).getElementGraph() );
169
170    if( ruleResult.isSuccess() && plannerException != null )
171      rethrow( plannerException );
172
173    return ruleResult;
174    }
175
176  protected Set<Future<RuleResult>> submitCallables( List<Callable<RuleResult>> callables )
177    {
178    int size = Math.min( MAX_CONCURRENT_PLANNERS, callables.size() );
179
180    ExecutorService executor = Executors.newFixedThreadPool( size );
181    ExecutorCompletionService<RuleResult> completionService = new ExecutorCompletionService<>( executor );
182    Set<Future<RuleResult>> futures = new HashSet<>();
183
184    RuleRegistrySet.Select select = registrySet.getSelect();
185    long totalDuration = registrySet.getPlannerTimeoutSec();
186    long startAll = TimeUnit.MILLISECONDS.toSeconds( System.currentTimeMillis() );
187
188    for( Callable<RuleResult> callable : callables )
189      futures.add( completionService.submit( callable ) );
190
191    executor.shutdown();
192
193    try
194      {
195      boolean timedOut = false;
196
197      while( !futures.isEmpty() )
198        {
199        Future<RuleResult> future = completionService.poll( totalDuration, TimeUnit.SECONDS );
200
201        long currentDuration = TimeUnit.MILLISECONDS.toSeconds( System.currentTimeMillis() ) - startAll;
202
203        totalDuration -= currentDuration;
204
205        timedOut = future == null;
206
207        if( timedOut )
208          break;
209
210        futures.remove( future );
211
212        boolean success = binResult( future.get() );
213
214        if( success && select == RuleRegistrySet.Select.FIRST )
215          break;
216        }
217
218      if( !futures.isEmpty() )
219        {
220        if( timedOut )
221          getFlowLogger().logWarn( "planner cancelling long running registries past timeout period: {}, see RuleRegistrySet#setPlannerTimeoutSec() to change timeout", formatDurationFromMillis( registrySet.getPlannerTimeoutSec() * 1000 ) );
222        else
223          getFlowLogger().logInfo( "first registry completed, planner cancelling remaining running registries: {}, successful: {}", futures.size(), success.size() );
224
225        for( Future<RuleResult> current : futures )
226          current.cancel( true );
227
228        int timeout = 0;
229
230        while( !running.isEmpty() && timeout < 60 )
231          {
232          Util.safeSleep( 500 );
233          timeout++;
234          }
235        }
236      }
237    catch( InterruptedException exception )
238      {
239      getFlowLogger().logError( "planner thread interrupted", exception );
240
241      rethrow( exception );
242      }
243    catch( ExecutionException exception )
244      {
245      rethrow( exception.getCause() );
246      }
247
248    return futures;
249    }
250
251  protected List<Callable<RuleResult>> createCallables()
252    {
253    List<Callable<RuleResult>> callables = new ArrayList<>();
254
255    for( RuleRegistry ruleRegistry : registrySet.ruleRegistries )
256      callables.add( createCallable( ruleRegistry ) );
257
258    return callables;
259    }
260
261  private RuleResult selectSuccess()
262    {
263    if( success.isEmpty() )
264      throw new IllegalStateException( "no planner results from registry set" );
265
266    for( RuleResult ruleResult : success )
267      getFlowLogger().logInfo( "rule registry: {}, supports assembly with steps: {}, nodes: {}", ruleResult.getRegistry().getName(), ruleResult.getNumSteps(), ruleResult.getNumNodes() );
268
269    if( success.size() != 1 )
270      {
271      // sort is stable
272      Collections.sort( success, getOrderComparator() );
273      Collections.sort( success, getPlanComparator() );
274      }
275
276    RuleResult ruleResult = success.get( 0 );
277
278    if( registrySet.getSelect() == RuleRegistrySet.Select.FIRST )
279      getFlowLogger().logInfo( "rule registry: {}, result was selected as first successful", ruleResult.getRegistry().getName() );
280    else if( registrySet.getSelect() == RuleRegistrySet.Select.COMPARED )
281      getFlowLogger().logInfo( "rule registry: {}, result was selected using: \'{}\'", ruleResult.getRegistry().getName(), getPlanComparator().toString() );
282
283    return ruleResult;
284    }
285
286  private void notifyUnsupported()
287    {
288    if( unsupported.isEmpty() )
289      return;
290
291    for( RuleResult ruleResult : unsupported )
292      getFlowLogger().logInfo( "rule registry: {}, does not support assembly", ruleResult.getRegistry().getName() );
293
294    if( !registrySet.isIgnoreFailed() || success.isEmpty() && illegal.isEmpty() && interrupted.isEmpty() )
295      rethrow( unsupported.get( 0 ).getPlannerException() );
296    }
297
298  private void notifyIllegal()
299    {
300    if( illegal.isEmpty() )
301      return;
302
303    for( RuleResult ruleResult : illegal )
304      getFlowLogger().logInfo( "rule registry: {}, found assembly to be malformed", ruleResult.getRegistry().getName() );
305
306    if( !registrySet.isIgnoreFailed() || success.isEmpty() )
307      rethrow( illegal.get( 0 ).getPlannerException() );
308    }
309
310  private void notifyInterrupted()
311    {
312    if( interrupted.isEmpty() )
313      return;
314
315    for( RuleResult ruleResult : interrupted )
316      getFlowLogger().logInfo( "rule registry: {}, planned longer than default duration, was cancelled", ruleResult.getRegistry().getName() );
317
318    if( interrupted.size() == registrySet.size() )
319      throw new PlannerException( "planner registry timeout exceeded for all registries: " + formatDurationFromMillis( registrySet.getPlannerTimeoutSec() * 1000 ) );
320
321    if( !registrySet.isIgnoreFailed() || success.isEmpty() )
322      rethrow( interrupted.get( 0 ).getPlannerException() );
323    }
324
325  protected Callable<RuleResult> createCallable( final RuleRegistry ruleRegistry )
326    {
327    return new Callable<RuleResult>()
328    {
329    @Override
330    public RuleResult call() throws Exception
331      {
332      running.add( this );
333
334      try
335        {
336        return execPlannerFor( ruleRegistry );
337        }
338      finally
339        {
340        running.remove( this );
341        }
342      }
343    };
344    }
345
346  protected boolean binResult( RuleResult ruleResult )
347    {
348    switch( ruleResult.getResultStatus() )
349      {
350      case SUCCESS:
351        success.add( ruleResult );
352        return true;
353
354      case UNSUPPORTED:
355        unsupported.add( ruleResult );
356        break;
357
358      case ILLEGAL:
359        illegal.add( ruleResult );
360        break;
361
362      case INTERRUPTED:
363        interrupted.add( ruleResult );
364        break;
365      }
366
367    return false;
368    }
369
370  private void rethrow( Throwable throwable )
371    {
372    if( throwable instanceof Error )
373      throw (Error) throwable;
374
375    if( throwable instanceof RuntimeException )
376      throw (RuntimeException) throwable;
377
378    throw new PlannerException( throwable );
379    }
380  }