001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.local.planner; 022 023import java.io.IOException; 024import java.util.Properties; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.Future; 028 029import cascading.flow.local.LocalFlowProcess; 030import cascading.flow.local.LocalFlowStep; 031import cascading.flow.planner.FlowStepJob; 032import cascading.management.state.ClientState; 033import cascading.stats.FlowNodeStats; 034import cascading.stats.FlowStepStats; 035import cascading.stats.local.LocalStepStats; 036 037/** 038 * 039 */ 040public class LocalFlowStepJob extends FlowStepJob<Properties> 041 { 042 private final LocalStepRunner stackRunner; 043 private Future<Throwable> future; 044 045 public LocalFlowStepJob( ClientState clientState, LocalFlowProcess flowProcess, LocalFlowStep flowStep ) 046 { 047 super( clientState, flowStep.getConfig(), flowStep, 200, 1000, 1000 * 60 ); 048 flowProcess.setStepStats( (LocalStepStats) this.flowStepStats ); 049 this.stackRunner = new LocalStepRunner( flowProcess, flowStep ); 050 } 051 052 @Override 053 protected FlowStepStats createStepStats( ClientState clientState ) 054 { 055 return new LocalStepStats( flowStep, clientState ); 056 } 057 058 @Override 059 protected boolean isRemoteExecution() 060 { 061 return false; 062 } 063 064 @Override 065 protected String internalJobId() 066 { 067 return "flow"; 068 } 069 070 @Override 071 protected void internalNonBlockingStart() throws IOException 072 { 073 ExecutorService executors = Executors.newFixedThreadPool( 1 ); 074 075 future = executors.submit( stackRunner ); 076 077 executors.shutdown(); 078 } 079 080 @Override 081 protected void updateNodeStatus( FlowNodeStats flowNodeStats ) 082 { 083 } 084 085 @Override 086 protected boolean internalIsStartedRunning() 087 { 088 return future != null; 089 } 090 091 @Override 092 protected boolean internalNonBlockingIsComplete() throws IOException 093 { 094 return stackRunner.isComplete(); 095 } 096 097 @Override 098 protected Throwable getThrowable() 099 { 100 return stackRunner.getThrowable(); 101 } 102 103 @Override 104 protected boolean internalNonBlockingIsSuccessful() throws IOException 105 { 106 return stackRunner.isSuccessful(); 107 } 108 109 @Override 110 protected void internalBlockOnStop() throws IOException 111 { 112 } 113 114 @Override 115 protected void dumpDebugInfo() 116 { 117 } 118 }