001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.iso.subgraph.iterator;
022
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027
028import cascading.flow.FlowElement;
029import cascading.flow.planner.Scope;
030import cascading.flow.planner.graph.ElementGraph;
031import cascading.flow.planner.graph.ElementMaskSubGraph;
032import cascading.flow.planner.graph.Extent;
033import cascading.flow.planner.iso.ElementAnnotation;
034import cascading.flow.planner.iso.subgraph.SubGraphIterator;
035import cascading.util.EnumMultiMap;
036import cascading.util.Pair;
037import org.jgrapht.GraphPath;
038import org.jgrapht.Graphs;
039
040import static cascading.flow.planner.graph.ElementGraphs.*;
041import static cascading.util.Util.createIdentitySet;
042
043/**
044 *
045 */
046public class IncludeRemainderSubGraphIterator implements SubGraphIterator
047  {
048  SubGraphIterator parentIterator;
049
050  Set<FlowElement> maskedElements = createIdentitySet();
051  Set<Scope> maskedScopes = new HashSet<>();
052
053  {
054  // creates consistent results across SubGraphIterators
055  maskedElements.add( Extent.head );
056  maskedElements.add( Extent.tail );
057  }
058
059  public IncludeRemainderSubGraphIterator( SubGraphIterator parentIterator )
060    {
061    this.parentIterator = parentIterator;
062    }
063
064  @Override
065  public ElementGraph getElementGraph()
066    {
067    return parentIterator.getElementGraph();
068    }
069
070  @Override
071  public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations )
072    {
073    return parentIterator.getAnnotationMap( annotations );
074    }
075
076  @Override
077  public boolean hasNext()
078    {
079    return parentIterator.hasNext();
080    }
081
082  @Override
083  public ElementGraph next()
084    {
085    ElementGraph next = parentIterator.next();
086
087    if( parentIterator.hasNext() )
088      {
089      maskedElements.addAll( next.vertexSet() );
090      maskedScopes.addAll( next.edgeSet() ); // catches case with no elements on path
091
092      return next;
093      }
094
095    maskedElements.removeAll( next.vertexSet() );
096    maskedScopes.removeAll( next.edgeSet() );
097
098    // if there is branching in the root graph, common ancestors could be masked out
099    // here we iterate all paths for all remaining paths
100
101    // previously source/sink pairs captured in prior partitions
102    Set<Pair<FlowElement, FlowElement>> pairs = getPairs();
103
104    ElementGraph elementGraph = parentIterator.getElementGraph();
105    ElementMaskSubGraph maskSubGraph = new ElementMaskSubGraph( elementGraph, maskedElements, maskedScopes );
106
107    // remaining source/sink pairs we need to traverse
108    Set<FlowElement> sources = findSources( maskSubGraph, FlowElement.class );
109    Set<FlowElement> sinks = findSinks( maskSubGraph, FlowElement.class );
110
111    for( FlowElement source : sources )
112      {
113      for( FlowElement sink : sinks )
114        {
115        if( pairs.contains( new Pair<>( source, sink ) ) )
116          continue;
117
118        List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( elementGraph, source, sink );
119
120        for( GraphPath<FlowElement, Scope> path : paths )
121          {
122          maskedElements.removeAll( Graphs.getPathVertexList( path ) );
123          maskedScopes.removeAll( path.getEdgeList() );
124          }
125        }
126      }
127
128    // new graph since the prior made a copy of the masked vertices/edges
129    return new ElementMaskSubGraph( elementGraph, maskedElements, maskedScopes );
130    }
131
132  protected Set<Pair<FlowElement, FlowElement>> getPairs()
133    {
134    Set<Pair<FlowElement, FlowElement>> pairs = Collections.emptySet();
135
136    if( parentIterator instanceof UniquePathSubGraphIterator )
137      pairs = ( (UniquePathSubGraphIterator) parentIterator ).getPairs();
138
139    return pairs;
140    }
141
142  @Override
143  public void remove()
144    {
145    parentIterator.remove();
146    }
147  }