001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.iso.subgraph.iterator; 023 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Set; 030 031import cascading.flow.FlowElement; 032import cascading.flow.planner.Scope; 033import cascading.flow.planner.graph.ElementGraph; 034import cascading.flow.planner.graph.ElementSubGraph; 035import cascading.flow.planner.iso.ElementAnnotation; 036import cascading.flow.planner.iso.subgraph.SubGraphIterator; 037import cascading.util.EnumMultiMap; 038import cascading.util.Pair; 039import org.jgrapht.GraphPath; 040 041import static cascading.flow.planner.graph.ElementGraphs.*; 042import static cascading.util.Util.getFirst; 043import static org.jgrapht.Graphs.getPathVertexList; 044 045/** 046 * 047 */ 048public class UniquePathSubGraphIterator implements SubGraphIterator 049 { 050 SubGraphIterator subGraphIterator; 051 boolean longestFirst; 052 boolean multiEdge; 053 054 ElementGraph current = null; 055 Iterator<GraphPath<FlowElement, Scope>> pathsIterator; 056 Set<Pair<FlowElement, FlowElement>> pairs = new HashSet<>(); 057 058 public UniquePathSubGraphIterator( SubGraphIterator subGraphIterator, boolean longestFirst, boolean multiEdge ) 059 { 060 this.subGraphIterator = subGraphIterator; 061 this.longestFirst = longestFirst; 062 this.multiEdge = multiEdge; 063 } 064 065 public Set<Pair<FlowElement, FlowElement>> getPairs() 066 { 067 return pairs; 068 } 069 070 @Override 071 public ElementGraph getElementGraph() 072 { 073 return subGraphIterator.getElementGraph(); 074 } 075 076 @Override 077 public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations ) 078 { 079 return subGraphIterator.getAnnotationMap( annotations ); // unsure we need to narrow results 080 } 081 082 @Override 083 public boolean hasNext() 084 { 085 if( pathsIterator == null ) 086 advance(); 087 088 if( current == null || pathsIterator == null ) 089 return false; 090 091 boolean hasNextPath = pathsIterator.hasNext(); 092 093 if( hasNextPath ) 094 return true; 095 096 return subGraphIterator.hasNext(); 097 } 098 099 private void advance() 100 { 101 if( current == null ) 102 { 103 if( !subGraphIterator.hasNext() ) 104 return; 105 106 current = subGraphIterator.next(); 107 pathsIterator = null; 108 } 109 110 if( pathsIterator == null ) 111 { 112 Set<FlowElement> sources = findSources( current, FlowElement.class ); 113 Set<FlowElement> sinks = findSinks( current, FlowElement.class ); 114 115 if( sources.size() > 1 || sinks.size() > 1 ) 116 throw new IllegalArgumentException( "only supports single source and single sink graphs" ); 117 118 FlowElement source = getFirst( sources ); 119 FlowElement sink = getFirst( sinks ); 120 121 pairs.add( new Pair<>( source, sink ) ); 122 123 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( current, source, sink ); 124 125 if( longestFirst ) 126 Collections.reverse( paths ); // break off longest paths into own partitions 127 128 pathsIterator = paths.iterator(); 129 } 130 } 131 132 @Override 133 public ElementGraph next() 134 { 135 if( !pathsIterator.hasNext() ) 136 { 137 current = null; 138 pathsIterator = null; 139 140 advance(); 141 142 return next(); 143 } 144 145 GraphPath<FlowElement, Scope> path = pathsIterator.next(); 146 List<FlowElement> vertexList = getPathVertexList( path ); 147 Collection<Scope> edgeList = path.getEdgeList(); 148 149 if( multiEdge ) 150 edgeList = getAllMultiEdgesBetween( edgeList, current ); 151 152 return new ElementSubGraph( current, vertexList, edgeList ); 153 } 154 155 @Override 156 public void remove() 157 { 158 subGraphIterator.remove(); 159 } 160 }