001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Set;
026
027import cascading.flow.FlowElement;
028import cascading.flow.FlowProcess;
029import cascading.flow.planner.Scope;
030import cascading.flow.stream.duct.Collapsing;
031import cascading.flow.stream.duct.Gate;
032import cascading.flow.stream.graph.IORole;
033import cascading.pipe.Pipe;
034import cascading.pipe.Splice;
035import cascading.tuple.TupleEntry;
036
037/**
038 *
039 */
040public abstract class SpliceGate<Incoming, Outgoing> extends Gate<Incoming, Outgoing> implements ElementDuct, Collapsing
041  {
042  protected Splice splice;
043  protected final FlowProcess flowProcess;
044  protected IORole role = IORole.both;
045
046  protected final List<Scope> incomingScopes = new ArrayList<>();
047  protected final List<Scope> outgoingScopes = new ArrayList<>();
048
049  private TrapHandler trapHandler;
050  private Set<String> branchNames;
051
052  public SpliceGate( FlowProcess flowProcess, Splice splice )
053    {
054    this.splice = splice;
055
056    Pipe element = splice;
057
058    while( element != null )
059      {
060      if( element.hasConfigDef() )
061        flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() );
062
063      element = element.getParent();
064      }
065
066    this.flowProcess = flowProcess;
067    }
068
069  public SpliceGate( FlowProcess flowProcess, Splice splice, IORole role )
070    {
071    this.splice = splice;
072    this.flowProcess = flowProcess;
073    this.role = role;
074    }
075
076  public Splice getSplice()
077    {
078    return splice;
079    }
080
081  protected void handleReThrowableException( String message, Throwable throwable )
082    {
083    trapHandler.handleReThrowableException( message, throwable );
084    }
085
086  protected void handleException( Throwable exception, TupleEntry tupleEntry )
087    {
088    trapHandler.handleException( exception, tupleEntry );
089    }
090
091  @Override
092  public void initialize()
093    {
094    super.initialize();
095
096    if( incomingScopes.size() == 0 )
097      throw new IllegalStateException( "incoming scopes may not be empty" );
098
099    if( outgoingScopes.size() == 0 )
100      throw new IllegalStateException( "outgoing scope may not be empty" );
101    }
102
103  public void setBranchNames( Set<String> branchNames )
104    {
105    this.branchNames = branchNames;
106    }
107
108  public Set<String> getBranchNames()
109    {
110    return branchNames;
111    }
112
113  @Override
114  public void setTrapHandler( TrapHandler trapHandler )
115    {
116    this.trapHandler = trapHandler;
117    }
118
119  @Override
120  public boolean hasTrapHandler()
121    {
122    return trapHandler != null;
123    }
124
125  @Override
126  public FlowElement getFlowElement()
127    {
128    return splice;
129    }
130
131  @Override
132  public List<Scope> getOutgoingScopes()
133    {
134    return outgoingScopes;
135    }
136
137  @Override
138  public List<Scope> getIncomingScopes()
139    {
140    return incomingScopes;
141    }
142
143  @Override
144  public final boolean equals( Object object )
145    {
146    if( this == object )
147      return true;
148    if( !( object instanceof SpliceGate ) )
149      return false;
150
151    SpliceGate that = (SpliceGate) object;
152
153    if( splice != null ? splice != that.splice : that.splice != null )
154      return false;
155
156    return true;
157    }
158
159  @Override
160  public final int hashCode()
161    {
162    return splice != null ? System.identityHashCode( splice ) : 0;
163    }
164
165  @Override
166  public String toString()
167    {
168    final StringBuilder sb = new StringBuilder();
169    sb.append( getClass().getSimpleName() ).append( "{" );
170    sb.append( "splice=" ).append( splice );
171    sb.append( ", role=" ).append( role );
172    sb.append( '}' );
173    return sb.toString();
174    }
175  }