001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.process;
022
023import java.io.Serializable;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.flow.FlowElement;
031import cascading.flow.FlowElements;
032import cascading.flow.planner.Scope;
033import cascading.flow.planner.graph.AnnotatedGraph;
034import cascading.flow.planner.graph.ElementGraph;
035import cascading.util.Util;
036
037/**
038 *
039 */
040public class ProcessEdge<Process extends ProcessModel> implements Serializable
041  {
042  String id;
043  String sourceProcessID;
044  String sinkProcessID;
045  FlowElement flowElement;
046  Set<Integer> sourceProvidedOrdinals;
047  Set<Integer> sinkExpectedOrdinals;
048  Set<Enum> sinkAnnotations = Collections.emptySet();
049  Set<Enum> sourceAnnotations = Collections.emptySet();
050  Map<String, String> edgeAnnotations;
051
052  public ProcessEdge( Process sourceProcess, FlowElement flowElement, Process sinkProcess )
053    {
054    this.flowElement = flowElement;
055    this.sourceProcessID = sourceProcess.getID();
056    this.sinkProcessID = sinkProcess.getID();
057
058    ElementGraph sourceElementGraph = sourceProcess.getElementGraph();
059    ElementGraph sinkElementGraph = sinkProcess.getElementGraph();
060
061    // for both set of ordinals, we only care about the edges entering the flowElement
062    // the source may only provide a subset of the paths expected by the sink
063
064    // all ordinals the source process provides
065    this.sourceProvidedOrdinals = createOrdinals( sourceElementGraph.incomingEdgesOf( flowElement ) );
066
067    // all ordinals the sink process expects
068    this.sinkExpectedOrdinals = createOrdinals( sinkElementGraph.incomingEdgesOf( flowElement ) );
069
070    if( sourceElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sourceElementGraph ).hasAnnotations() )
071      this.sourceAnnotations = ( (AnnotatedGraph) sourceElementGraph ).getAnnotations().getKeysFor( flowElement );
072
073    if( sinkElementGraph instanceof AnnotatedGraph && ( (AnnotatedGraph) sinkElementGraph ).hasAnnotations() )
074      this.sinkAnnotations = ( (AnnotatedGraph) sinkElementGraph ).getAnnotations().getKeysFor( flowElement );
075    }
076
077  public ProcessEdge( Process sourceProcess, Process sinkProcess )
078    {
079    this.sourceProcessID = sourceProcess.getID();
080    this.sinkProcessID = sinkProcess.getID();
081    }
082
083  public String getID()
084    {
085    if( id == null ) // make it lazy
086      id = Util.createUniqueID();
087
088    return id;
089    }
090
091  public String getSourceProcessID()
092    {
093    return sourceProcessID;
094    }
095
096  public String getSinkProcessID()
097    {
098    return sinkProcessID;
099    }
100
101  /**
102   * Returns any edge annotations, or an empty immutable Map.
103   * <p/>
104   * Use {@link #addEdgeAnnotation(String, String)} to add edge annotations.
105   *
106   * @return
107   */
108  public Map<String, String> getEdgeAnnotations()
109    {
110    if( edgeAnnotations == null )
111      return Collections.emptyMap();
112
113    return Collections.unmodifiableMap( edgeAnnotations );
114    }
115
116  public void addEdgeAnnotation( Enum annotation )
117    {
118    if( annotation == null )
119      return;
120
121    addEdgeAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
122    }
123
124  public void addEdgeAnnotation( String key, String value )
125    {
126    if( edgeAnnotations == null )
127      edgeAnnotations = new HashMap<>();
128
129    edgeAnnotations.put( key, value );
130    }
131
132  private Set<Integer> createOrdinals( Set<Scope> scopes )
133    {
134    Set<Integer> ordinals = new HashSet<>();
135
136    for( Scope scope : scopes )
137      ordinals.add( scope.getOrdinal() );
138
139    return ordinals;
140    }
141
142  public FlowElement getFlowElement()
143    {
144    return flowElement;
145    }
146
147  public String getFlowElementID()
148    {
149    return FlowElements.id( flowElement );
150    }
151
152  public Set<Integer> getSinkExpectedOrdinals()
153    {
154    return sinkExpectedOrdinals;
155    }
156
157  public Set<Integer> getSourceProvidedOrdinals()
158    {
159    return sourceProvidedOrdinals;
160    }
161
162  public Set<Enum> getSinkAnnotations()
163    {
164    return sinkAnnotations;
165    }
166
167  public Set<Enum> getSourceAnnotations()
168    {
169    return sourceAnnotations;
170    }
171  }