001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.iso.subgraph.iterator;
023
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.planner.Scope;
033import cascading.flow.planner.graph.ElementGraph;
034import cascading.flow.planner.graph.ElementGraphs;
035import cascading.flow.planner.graph.ElementMaskSubGraph;
036import cascading.flow.planner.graph.Extent;
037import cascading.flow.planner.iso.ElementAnnotation;
038import cascading.flow.planner.iso.subgraph.SubGraphIterator;
039import cascading.util.EnumMultiMap;
040import cascading.util.Pair;
041import org.jgrapht.GraphPath;
042import org.jgrapht.Graphs;
043
044import static cascading.flow.planner.graph.ElementGraphs.*;
045import static cascading.util.Util.createIdentitySet;
046
047/**
048 *
049 */
050public class IncludeRemainderSubGraphIterator implements SubGraphIterator
051  {
052  SubGraphIterator parentIterator;
053  boolean multiEdge;
054
055  Set<FlowElement> maskedElements = createIdentitySet();
056  Set<Scope> maskedScopes = new HashSet<>();
057
058  {
059  // creates consistent results across SubGraphIterators
060  maskedElements.add( Extent.head );
061  maskedElements.add( Extent.tail );
062  }
063
064  public IncludeRemainderSubGraphIterator( SubGraphIterator parentIterator, boolean multiEdge )
065    {
066    this.parentIterator = parentIterator;
067    this.multiEdge = multiEdge;
068    }
069
070  @Override
071  public ElementGraph getElementGraph()
072    {
073    return parentIterator.getElementGraph();
074    }
075
076  @Override
077  public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations )
078    {
079    return parentIterator.getAnnotationMap( annotations );
080    }
081
082  @Override
083  public boolean hasNext()
084    {
085    return parentIterator.hasNext();
086    }
087
088  @Override
089  public ElementGraph next()
090    {
091    ElementGraph next = parentIterator.next();
092
093    if( parentIterator.hasNext() )
094      {
095      // hide these elements from the remainder graph
096      maskedElements.addAll( next.vertexSet() );
097      maskedScopes.addAll( next.edgeSet() ); // catches case with no elements on path
098
099      return next;
100      }
101
102    ElementGraph elementGraph = parentIterator.getElementGraph();
103
104    if( !multiEdge ) // no effect on mergepipes/tez
105      {
106      maskedElements.removeAll( next.vertexSet() );
107      maskedScopes.removeAll( next.edgeSet() );
108      }
109    else
110      {
111      // this is experimental, but was intended to allow capture of multiple edges between two
112      // nodes
113      maskedElements.addAll( next.vertexSet() );
114      maskedScopes.addAll( next.edgeSet() );
115
116      // if there is branching in the root graph, common ancestors could be masked out
117      // here we iterate all paths for all remaining paths
118
119      for( FlowElement maskedElement : new ArrayList<>( maskedElements ) )
120        {
121        if( !maskedScopes.containsAll( elementGraph.edgesOf( maskedElement ) ) )
122          maskedElements.remove( maskedElement );
123        }
124      }
125
126    // previously source/sink pairs captured in prior partitions
127    Set<Pair<FlowElement, FlowElement>> pairs = getPairs();
128
129    ElementMaskSubGraph maskSubGraph = new ElementMaskSubGraph( elementGraph, maskedElements, maskedScopes );
130
131    // remaining source/sink pairs we need to traverse
132    Set<FlowElement> sources = findSources( maskSubGraph, FlowElement.class );
133    Set<FlowElement> sinks = findSinks( maskSubGraph, FlowElement.class );
134
135    for( FlowElement source : sources )
136      {
137      for( FlowElement sink : sinks )
138        {
139        if( pairs.contains( new Pair<>( source, sink ) ) )
140          continue;
141
142        List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( elementGraph, source, sink );
143
144        for( GraphPath<FlowElement, Scope> path : paths )
145          {
146          maskedElements.removeAll( Graphs.getPathVertexList( path ) );
147
148          Collection<Scope> edgeList = path.getEdgeList();
149
150          if( multiEdge )
151            edgeList = ElementGraphs.getAllMultiEdgesBetween( edgeList, elementGraph );
152
153          maskedScopes.removeAll( edgeList );
154          }
155        }
156      }
157
158    // new graph since the prior made a copy of the masked vertices/edges
159    return new ElementMaskSubGraph( elementGraph, maskedElements, maskedScopes );
160    }
161
162  protected Set<Pair<FlowElement, FlowElement>> getPairs()
163    {
164    Set<Pair<FlowElement, FlowElement>> pairs = Collections.emptySet();
165
166    if( parentIterator instanceof UniquePathSubGraphIterator )
167      pairs = ( (UniquePathSubGraphIterator) parentIterator ).getPairs();
168
169    return pairs;
170    }
171
172  @Override
173  public void remove()
174    {
175    parentIterator.remove();
176    }
177  }