001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.local.planner;
022
023import java.io.IOException;
024import java.util.Properties;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.Future;
028
029import cascading.flow.local.LocalFlowProcess;
030import cascading.flow.local.LocalFlowStep;
031import cascading.flow.planner.FlowStepJob;
032import cascading.management.state.ClientState;
033import cascading.stats.FlowNodeStats;
034import cascading.stats.FlowStepStats;
035import cascading.stats.local.LocalStepStats;
036
037/**
038 *
039 */
040public class LocalFlowStepJob extends FlowStepJob<Properties>
041  {
042  private final LocalStepRunner stackRunner;
043  private Future<Throwable> future;
044
045  public LocalFlowStepJob( ClientState clientState, LocalFlowProcess flowProcess, LocalFlowStep flowStep )
046    {
047    super( clientState, flowStep.getConfig(), flowStep, 200, 1000, 1000 * 60 );
048    flowProcess.setStepStats( (LocalStepStats) this.flowStepStats );
049    this.stackRunner = new LocalStepRunner( flowProcess, flowStep );
050    }
051
052  @Override
053  protected FlowStepStats createStepStats( ClientState clientState )
054    {
055    return new LocalStepStats( flowStep, clientState );
056    }
057
058  @Override
059  protected boolean isRemoteExecution()
060    {
061    return false;
062    }
063
064  @Override
065  protected String internalJobId()
066    {
067    return "flow";
068    }
069
070  @Override
071  protected void internalNonBlockingStart() throws IOException
072    {
073    ExecutorService executors = Executors.newFixedThreadPool( 1 );
074
075    future = executors.submit( stackRunner );
076
077    executors.shutdown();
078    }
079
080  @Override
081  protected void updateNodeStatus( FlowNodeStats flowNodeStats )
082    {
083    }
084
085  @Override
086  protected boolean internalIsStartedRunning()
087    {
088    return future != null;
089    }
090
091  @Override
092  protected boolean internalNonBlockingIsComplete() throws IOException
093    {
094    return stackRunner.isComplete();
095    }
096
097  @Override
098  protected Throwable getThrowable()
099    {
100    return stackRunner.getThrowable();
101    }
102
103  @Override
104  protected boolean internalNonBlockingIsSuccessful() throws IOException
105    {
106    return stackRunner.isSuccessful();
107    }
108
109  @Override
110  protected void internalBlockOnStop() throws IOException
111    {
112    }
113
114  @Override
115  protected void dumpDebugInfo()
116    {
117    }
118  }