001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.iso.subgraph.iterator; 023 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030 031import cascading.flow.FlowElement; 032import cascading.flow.planner.Scope; 033import cascading.flow.planner.graph.ElementGraph; 034import cascading.flow.planner.graph.ElementGraphs; 035import cascading.flow.planner.graph.ElementMaskSubGraph; 036import cascading.flow.planner.graph.Extent; 037import cascading.flow.planner.iso.ElementAnnotation; 038import cascading.flow.planner.iso.subgraph.SubGraphIterator; 039import cascading.util.EnumMultiMap; 040import cascading.util.Pair; 041import org.jgrapht.GraphPath; 042import org.jgrapht.Graphs; 043 044import static cascading.flow.planner.graph.ElementGraphs.*; 045import static cascading.util.Util.createIdentitySet; 046 047/** 048 * 049 */ 050public class IncludeRemainderSubGraphIterator implements SubGraphIterator 051 { 052 SubGraphIterator parentIterator; 053 boolean multiEdge; 054 055 Set<FlowElement> maskedElements = createIdentitySet(); 056 Set<Scope> maskedScopes = new HashSet<>(); 057 058 { 059 // creates consistent results across SubGraphIterators 060 maskedElements.add( Extent.head ); 061 maskedElements.add( Extent.tail ); 062 } 063 064 public IncludeRemainderSubGraphIterator( SubGraphIterator parentIterator, boolean multiEdge ) 065 { 066 this.parentIterator = parentIterator; 067 this.multiEdge = multiEdge; 068 } 069 070 @Override 071 public ElementGraph getElementGraph() 072 { 073 return parentIterator.getElementGraph(); 074 } 075 076 @Override 077 public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations ) 078 { 079 return parentIterator.getAnnotationMap( annotations ); 080 } 081 082 @Override 083 public boolean hasNext() 084 { 085 return parentIterator.hasNext(); 086 } 087 088 @Override 089 public ElementGraph next() 090 { 091 ElementGraph next = parentIterator.next(); 092 093 if( parentIterator.hasNext() ) 094 { 095 // hide these elements from the remainder graph 096 maskedElements.addAll( next.vertexSet() ); 097 maskedScopes.addAll( next.edgeSet() ); // catches case with no elements on path 098 099 return next; 100 } 101 102 ElementGraph elementGraph = parentIterator.getElementGraph(); 103 104 if( !multiEdge ) // no effect on mergepipes/tez 105 { 106 maskedElements.removeAll( next.vertexSet() ); 107 maskedScopes.removeAll( next.edgeSet() ); 108 } 109 else 110 { 111 // this is experimental, but was intended to allow capture of multiple edges between two 112 // nodes 113 maskedElements.addAll( next.vertexSet() ); 114 maskedScopes.addAll( next.edgeSet() ); 115 116 // if there is branching in the root graph, common ancestors could be masked out 117 // here we iterate all paths for all remaining paths 118 119 for( FlowElement maskedElement : new ArrayList<>( maskedElements ) ) 120 { 121 if( !maskedScopes.containsAll( elementGraph.edgesOf( maskedElement ) ) ) 122 maskedElements.remove( maskedElement ); 123 } 124 } 125 126 // previously source/sink pairs captured in prior partitions 127 Set<Pair<FlowElement, FlowElement>> pairs = getPairs(); 128 129 ElementMaskSubGraph maskSubGraph = new ElementMaskSubGraph( elementGraph, maskedElements, maskedScopes ); 130 131 // remaining source/sink pairs we need to traverse 132 Set<FlowElement> sources = findSources( maskSubGraph, FlowElement.class ); 133 Set<FlowElement> sinks = findSinks( maskSubGraph, FlowElement.class ); 134 135 for( FlowElement source : sources ) 136 { 137 for( FlowElement sink : sinks ) 138 { 139 if( pairs.contains( new Pair<>( source, sink ) ) ) 140 continue; 141 142 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( elementGraph, source, sink ); 143 144 for( GraphPath<FlowElement, Scope> path : paths ) 145 { 146 maskedElements.removeAll( Graphs.getPathVertexList( path ) ); 147 148 Collection<Scope> edgeList = path.getEdgeList(); 149 150 if( multiEdge ) 151 edgeList = ElementGraphs.getAllMultiEdgesBetween( edgeList, elementGraph ); 152 153 maskedScopes.removeAll( edgeList ); 154 } 155 } 156 } 157 158 // new graph since the prior made a copy of the masked vertices/edges 159 return new ElementMaskSubGraph( elementGraph, maskedElements, maskedScopes ); 160 } 161 162 protected Set<Pair<FlowElement, FlowElement>> getPairs() 163 { 164 Set<Pair<FlowElement, FlowElement>> pairs = Collections.emptySet(); 165 166 if( parentIterator instanceof UniquePathSubGraphIterator ) 167 pairs = ( (UniquePathSubGraphIterator) parentIterator ).getPairs(); 168 169 return pairs; 170 } 171 172 @Override 173 public void remove() 174 { 175 parentIterator.remove(); 176 } 177 }