001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Set; 026 027import cascading.flow.FlowElement; 028import cascading.flow.FlowProcess; 029import cascading.flow.planner.Scope; 030import cascading.flow.stream.duct.Duct; 031import cascading.flow.stream.duct.Stage; 032import cascading.pipe.Pipe; 033import cascading.tuple.Fields; 034import cascading.tuple.TupleEntry; 035 036/** 037 * 038 */ 039public abstract class ElementStage<Incoming, Outgoing> extends Stage<Incoming, Outgoing> implements ElementDuct 040 { 041 protected final FlowProcess flowProcess; 042 protected final FlowElement flowElement; 043 protected Set<String> branchNames; 044 protected TrapHandler trapHandler; 045 046 protected final List<Scope> incomingScopes = new ArrayList<Scope>(); 047 protected final List<Scope> outgoingScopes = new ArrayList<Scope>(); 048 049 public ElementStage( FlowProcess flowProcess, FlowElement flowElement ) 050 { 051 this.flowElement = flowElement; 052 053 FlowElement element = flowElement; 054 055 while( element != null ) 056 { 057 if( element.hasConfigDef() ) 058 flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() ); 059 060 if( element instanceof Pipe ) 061 element = ( (Pipe) element ).getParent(); 062 else 063 element = null; 064 } 065 066 this.flowProcess = flowProcess; 067 } 068 069 public FlowElement getFlowElement() 070 { 071 return flowElement; 072 } 073 074 @Override 075 public List<Scope> getIncomingScopes() 076 { 077 return incomingScopes; 078 } 079 080 public Set<String> getBranchNames() 081 { 082 return branchNames; 083 } 084 085 public void setBranchNames( Set<String> branchNames ) 086 { 087 this.branchNames = branchNames; 088 } 089 090 public void setTrapHandler( TrapHandler trapHandler ) 091 { 092 this.trapHandler = trapHandler; 093 } 094 095 @Override 096 public boolean hasTrapHandler() 097 { 098 return trapHandler != null; 099 } 100 101 @Override 102 public List<Scope> getOutgoingScopes() 103 { 104 return outgoingScopes; 105 } 106 107 protected Fields getOutgoingFields() 108 { 109 return unwind( next ).getFlowElement().resolveIncomingOperationPassThroughFields( outgoingScopes.get( 0 ) ); 110 } 111 112 private ElementDuct unwind( Duct next ) 113 { 114 if( next instanceof ElementDuct ) 115 return (ElementDuct) next; 116 117 return unwind( next.getNext() ); 118 } 119 120 @Override 121 public void cleanup() 122 { 123 super.cleanup(); 124 125 // close if top of stack 126 if( next == null ) 127 flowProcess.closeTrapCollectors(); 128 } 129 130 protected void handleReThrowableException( String message, Throwable throwable ) 131 { 132 trapHandler.handleReThrowableException( message, throwable ); 133 } 134 135 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 136 { 137 trapHandler.handleException( exception, tupleEntry ); 138 } 139 140 @Override 141 public final boolean equals( Object object ) 142 { 143 if( this == object ) 144 return true; 145 if( !( object instanceof ElementStage ) ) 146 return false; 147 148 ElementStage that = (ElementStage) object; 149 150 if( flowElement != null ? flowElement != that.flowElement : that.flowElement != null ) 151 return false; 152 153 return true; 154 } 155 156 @Override 157 public final int hashCode() 158 { 159 return flowElement != null ? System.identityHashCode( flowElement ) : 0; 160 } 161 162 @Override 163 public String toString() 164 { 165 final StringBuilder sb = new StringBuilder(); 166 sb.append( getClass().getSimpleName() ); 167 sb.append( "{flowElement=" ).append( flowElement ); 168 sb.append( '}' ); 169 return sb.toString(); 170 } 171 }