001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Set;
026
027import cascading.flow.FlowElement;
028import cascading.flow.FlowProcess;
029import cascading.flow.planner.Scope;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.duct.Stage;
032import cascading.pipe.Pipe;
033import cascading.tuple.Fields;
034import cascading.tuple.TupleEntry;
035
036/**
037 *
038 */
039public abstract class ElementStage<Incoming, Outgoing> extends Stage<Incoming, Outgoing> implements ElementDuct
040  {
041  protected final FlowProcess flowProcess;
042  protected final FlowElement flowElement;
043  protected Set<String> branchNames;
044  protected TrapHandler trapHandler;
045
046  protected final List<Scope> incomingScopes = new ArrayList<Scope>();
047  protected final List<Scope> outgoingScopes = new ArrayList<Scope>();
048
049  public ElementStage( FlowProcess flowProcess, FlowElement flowElement )
050    {
051    this.flowElement = flowElement;
052
053    FlowElement element = flowElement;
054
055    while( element != null )
056      {
057      if( element.hasConfigDef() )
058        flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() );
059
060      if( element instanceof Pipe )
061        element = ( (Pipe) element ).getParent();
062      else
063        element = null;
064      }
065
066    this.flowProcess = flowProcess;
067    }
068
069  public FlowElement getFlowElement()
070    {
071    return flowElement;
072    }
073
074  @Override
075  public List<Scope> getIncomingScopes()
076    {
077    return incomingScopes;
078    }
079
080  public Set<String> getBranchNames()
081    {
082    return branchNames;
083    }
084
085  public void setBranchNames( Set<String> branchNames )
086    {
087    this.branchNames = branchNames;
088    }
089
090  public void setTrapHandler( TrapHandler trapHandler )
091    {
092    this.trapHandler = trapHandler;
093    }
094
095  @Override
096  public boolean hasTrapHandler()
097    {
098    return trapHandler != null;
099    }
100
101  @Override
102  public List<Scope> getOutgoingScopes()
103    {
104    return outgoingScopes;
105    }
106
107  protected Fields getOutgoingFields()
108    {
109    return unwind( next ).getFlowElement().resolveIncomingOperationPassThroughFields( outgoingScopes.get( 0 ) );
110    }
111
112  private ElementDuct unwind( Duct next )
113    {
114    if( next instanceof ElementDuct )
115      return (ElementDuct) next;
116
117    return unwind( next.getNext() );
118    }
119
120  @Override
121  public void cleanup()
122    {
123    super.cleanup();
124
125    // close if top of stack
126    if( next == null )
127      flowProcess.closeTrapCollectors();
128    }
129
130  protected void handleReThrowableException( String message, Throwable throwable )
131    {
132    trapHandler.handleReThrowableException( message, throwable );
133    }
134
135  protected void handleException( Throwable exception, TupleEntry tupleEntry )
136    {
137    trapHandler.handleException( exception, tupleEntry );
138    }
139
140  @Override
141  public final boolean equals( Object object )
142    {
143    if( this == object )
144      return true;
145    if( !( object instanceof ElementStage ) )
146      return false;
147
148    ElementStage that = (ElementStage) object;
149
150    if( flowElement != null ? flowElement != that.flowElement : that.flowElement != null )
151      return false;
152
153    return true;
154    }
155
156  @Override
157  public final int hashCode()
158    {
159    return flowElement != null ? System.identityHashCode( flowElement ) : 0;
160    }
161
162  @Override
163  public String toString()
164    {
165    final StringBuilder sb = new StringBuilder();
166    sb.append( getClass().getSimpleName() );
167    sb.append( "{flowElement=" ).append( flowElement );
168    sb.append( '}' );
169    return sb.toString();
170    }
171  }