001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Set;
026
027import cascading.flow.FlowElement;
028import cascading.flow.FlowProcess;
029import cascading.flow.planner.Scope;
030import cascading.flow.stream.duct.Stage;
031import cascading.flow.stream.graph.IORole;
032import cascading.pipe.Boundary;
033import cascading.pipe.Pipe;
034import cascading.tuple.TupleEntry;
035
036/**
037 *
038 */
039public abstract class BoundaryStage<Incoming, Outgoing> extends Stage<Incoming, Outgoing> implements ElementDuct
040  {
041  protected Boundary boundary;
042  protected final FlowProcess flowProcess;
043  protected IORole role = IORole.both;
044
045  protected final List<Scope> incomingScopes = new ArrayList<>();
046  protected final List<Scope> outgoingScopes = new ArrayList<>();
047
048  private TrapHandler trapHandler;
049  private Set<String> branchNames;
050
051  public BoundaryStage( FlowProcess flowProcess, Boundary boundary )
052    {
053    this.boundary = boundary;
054
055    Pipe element = boundary;
056
057    while( element != null )
058      {
059      if( element.hasConfigDef() )
060        flowProcess = new ElementFlowProcess( flowProcess, element.getConfigDef() );
061
062      element = element.getParent();
063      }
064
065    this.flowProcess = flowProcess;
066    }
067
068  public BoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role )
069    {
070    this.boundary = boundary;
071    this.flowProcess = flowProcess;
072    this.role = role;
073    }
074
075  public Boundary getBoundary()
076    {
077    return boundary;
078    }
079
080  protected void handleReThrowableException( String message, Throwable throwable )
081    {
082    trapHandler.handleReThrowableException( message, throwable );
083    }
084
085  protected void handleException( Throwable exception, TupleEntry tupleEntry )
086    {
087    trapHandler.handleException( exception, tupleEntry );
088    }
089
090  @Override
091  public void initialize()
092    {
093    super.initialize();
094
095    if( incomingScopes.size() == 0 )
096      throw new IllegalStateException( "incoming scopes may not be empty" );
097
098    if( outgoingScopes.size() == 0 )
099      throw new IllegalStateException( "outgoing scope may not be empty" );
100    }
101
102  public void setBranchNames( Set<String> branchNames )
103    {
104    this.branchNames = branchNames;
105    }
106
107  public Set<String> getBranchNames()
108    {
109    return branchNames;
110    }
111
112  @Override
113  public void setTrapHandler( TrapHandler trapHandler )
114    {
115    this.trapHandler = trapHandler;
116    }
117
118  @Override
119  public boolean hasTrapHandler()
120    {
121    return trapHandler != null;
122    }
123
124  @Override
125  public FlowElement getFlowElement()
126    {
127    return boundary;
128    }
129
130  @Override
131  public List<Scope> getOutgoingScopes()
132    {
133    return outgoingScopes;
134    }
135
136  @Override
137  public List<Scope> getIncomingScopes()
138    {
139    return incomingScopes;
140    }
141  }