001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.util.ArrayList; 024 import java.util.List; 025 import java.util.Set; 026 027 import cascading.flow.FlowElement; 028 import cascading.flow.FlowProcess; 029 import cascading.flow.planner.Scope; 030 import cascading.pipe.Pipe; 031 import cascading.tuple.Fields; 032 import cascading.tuple.TupleEntry; 033 034 /** 035 * 036 */ 037 public abstract class ElementStage<Incoming, Outgoing> extends Stage<Incoming, Outgoing> implements ElementDuct 038 { 039 protected final FlowProcess flowProcess; 040 protected final FlowElement flowElement; 041 protected Set<String> branchNames; 042 protected TrapHandler trapHandler; 043 044 protected final List<Scope> incomingScopes = new ArrayList<Scope>(); 045 protected final List<Scope> outgoingScopes = new ArrayList<Scope>(); 046 047 public ElementStage( FlowProcess flowProcess, FlowElement flowElement ) 048 { 049 this.flowElement = flowElement; 050 051 FlowElement element = flowElement; 052 053 while( element != null ) 054 { 055 if( element.hasConfigDef() ) 056 flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() ); 057 058 if( element instanceof Pipe ) 059 element = ( (Pipe) element ).getParent(); 060 else 061 element = null; 062 } 063 064 this.flowProcess = flowProcess; 065 } 066 067 public FlowElement getFlowElement() 068 { 069 return flowElement; 070 } 071 072 @Override 073 public List<Scope> getIncomingScopes() 074 { 075 return incomingScopes; 076 } 077 078 public Set<String> getBranchNames() 079 { 080 return branchNames; 081 } 082 083 public void setBranchNames( Set<String> branchNames ) 084 { 085 this.branchNames = branchNames; 086 } 087 088 public void setTrapHandler( TrapHandler trapHandler ) 089 { 090 this.trapHandler = trapHandler; 091 } 092 093 @Override 094 public boolean hasTrapHandler() 095 { 096 return trapHandler != null; 097 } 098 099 public void addIncomingScope( Scope incomingScope ) 100 { 101 incomingScopes.add( incomingScope ); 102 } 103 104 @Override 105 public List<Scope> getOutgoingScopes() 106 { 107 return outgoingScopes; 108 } 109 110 public void addOutgoingScope( Scope outgoingScope ) 111 { 112 outgoingScopes.add( outgoingScope ); 113 } 114 115 protected Fields getOutgoingFields() 116 { 117 return unwind( next ).getFlowElement().resolveIncomingOperationPassThroughFields( outgoingScopes.get( 0 ) ); 118 } 119 120 private ElementDuct unwind( Duct next ) 121 { 122 if( next instanceof ElementDuct ) 123 return (ElementDuct) next; 124 125 return unwind( next.getNext() ); 126 } 127 128 @Override 129 public void cleanup() 130 { 131 super.cleanup(); 132 133 // close if top of stack 134 if( next == null ) 135 TrapHandler.closeTraps(); 136 } 137 138 protected void handleReThrowableException( String message, Throwable throwable ) 139 { 140 trapHandler.handleReThrowableException( message, throwable ); 141 } 142 143 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 144 { 145 trapHandler.handleException( exception, tupleEntry ); 146 } 147 148 @Override 149 public final boolean equals( Object object ) 150 { 151 if( this == object ) 152 return true; 153 if( !( object instanceof ElementStage ) ) 154 return false; 155 156 ElementStage that = (ElementStage) object; 157 158 if( flowElement != null ? flowElement != that.flowElement : that.flowElement != null ) 159 return false; 160 161 return true; 162 } 163 164 @Override 165 public final int hashCode() 166 { 167 return flowElement != null ? System.identityHashCode( flowElement ) : 0; 168 } 169 170 @Override 171 public String toString() 172 { 173 final StringBuilder sb = new StringBuilder(); 174 sb.append( getClass().getSimpleName() ); 175 sb.append( "{flowElement=" ).append( flowElement ); 176 sb.append( '}' ); 177 return sb.toString(); 178 } 179 }