001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Set; 026 027import cascading.flow.FlowElement; 028import cascading.flow.FlowProcess; 029import cascading.flow.planner.Scope; 030import cascading.flow.stream.duct.Collapsing; 031import cascading.flow.stream.duct.Gate; 032import cascading.flow.stream.graph.IORole; 033import cascading.pipe.Pipe; 034import cascading.pipe.Splice; 035import cascading.tuple.TupleEntry; 036 037/** 038 * 039 */ 040public abstract class SpliceGate<Incoming, Outgoing> extends Gate<Incoming, Outgoing> implements ElementDuct, Collapsing 041 { 042 protected Splice splice; 043 protected final FlowProcess flowProcess; 044 protected IORole role = IORole.both; 045 046 protected final List<Scope> incomingScopes = new ArrayList<>(); 047 protected final List<Scope> outgoingScopes = new ArrayList<>(); 048 049 private TrapHandler trapHandler; 050 private Set<String> branchNames; 051 052 public SpliceGate( FlowProcess flowProcess, Splice splice ) 053 { 054 this.splice = splice; 055 056 FlowElement element = splice; 057 058 while( element != null ) 059 { 060 if( element.hasConfigDef() ) 061 flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() ); 062 063 element = ( (Pipe) element ).getParent(); 064 } 065 066 this.flowProcess = flowProcess; 067 } 068 069 public SpliceGate( FlowProcess flowProcess, Splice splice, IORole role ) 070 { 071 this.splice = splice; 072 this.flowProcess = flowProcess; 073 this.role = role; 074 } 075 076 public Splice getSplice() 077 { 078 return splice; 079 } 080 081 protected void handleReThrowableException( String message, Throwable throwable ) 082 { 083 trapHandler.handleReThrowableException( message, throwable ); 084 } 085 086 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 087 { 088 trapHandler.handleException( exception, tupleEntry ); 089 } 090 091 @Override 092 public void initialize() 093 { 094 super.initialize(); 095 096 if( incomingScopes.size() == 0 ) 097 throw new IllegalStateException( "incoming scopes may not be empty" ); 098 099 if( outgoingScopes.size() == 0 ) 100 throw new IllegalStateException( "outgoing scope may not be empty" ); 101 } 102 103 public void setBranchNames( Set<String> branchNames ) 104 { 105 this.branchNames = branchNames; 106 } 107 108 public Set<String> getBranchNames() 109 { 110 return branchNames; 111 } 112 113 @Override 114 public void setTrapHandler( TrapHandler trapHandler ) 115 { 116 this.trapHandler = trapHandler; 117 } 118 119 @Override 120 public boolean hasTrapHandler() 121 { 122 return trapHandler != null; 123 } 124 125 @Override 126 public FlowElement getFlowElement() 127 { 128 return splice; 129 } 130 131 @Override 132 public List<Scope> getOutgoingScopes() 133 { 134 return outgoingScopes; 135 } 136 137 @Override 138 public List<Scope> getIncomingScopes() 139 { 140 return incomingScopes; 141 } 142 }