001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.iso.subgraph.iterator; 022 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Set; 028 029import cascading.flow.FlowElement; 030import cascading.flow.planner.Scope; 031import cascading.flow.planner.graph.ElementGraph; 032import cascading.flow.planner.graph.ElementSubGraph; 033import cascading.flow.planner.iso.ElementAnnotation; 034import cascading.flow.planner.iso.subgraph.SubGraphIterator; 035import cascading.util.EnumMultiMap; 036import cascading.util.Pair; 037import org.jgrapht.GraphPath; 038 039import static cascading.flow.planner.graph.ElementGraphs.*; 040import static cascading.util.Util.getFirst; 041import static org.jgrapht.Graphs.getPathVertexList; 042 043/** 044 * 045 */ 046public class UniquePathSubGraphIterator implements SubGraphIterator 047 { 048 SubGraphIterator subGraphIterator; 049 boolean longestFirst; 050 ElementGraph current = null; 051 Iterator<GraphPath<FlowElement, Scope>> pathsIterator; 052 Set<Pair<FlowElement, FlowElement>> pairs = new HashSet<>(); 053 054 public UniquePathSubGraphIterator( SubGraphIterator subGraphIterator, boolean longestFirst ) 055 { 056 this.subGraphIterator = subGraphIterator; 057 this.longestFirst = longestFirst; 058 } 059 060 public Set<Pair<FlowElement, FlowElement>> getPairs() 061 { 062 return pairs; 063 } 064 065 @Override 066 public ElementGraph getElementGraph() 067 { 068 return subGraphIterator.getElementGraph(); 069 } 070 071 @Override 072 public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations ) 073 { 074 return subGraphIterator.getAnnotationMap( annotations ); // unsure we need to narrow results 075 } 076 077 @Override 078 public boolean hasNext() 079 { 080 if( pathsIterator == null ) 081 advance(); 082 083 if( current == null || pathsIterator == null ) 084 return false; 085 086 boolean hasNextPath = pathsIterator.hasNext(); 087 088 if( hasNextPath ) 089 return true; 090 091 return subGraphIterator.hasNext(); 092 } 093 094 private void advance() 095 { 096 if( current == null ) 097 { 098 if( !subGraphIterator.hasNext() ) 099 return; 100 101 current = subGraphIterator.next(); 102 pathsIterator = null; 103 } 104 105 if( pathsIterator == null ) 106 { 107 Set<FlowElement> sources = findSources( current, FlowElement.class ); 108 Set<FlowElement> sinks = findSinks( current, FlowElement.class ); 109 110 if( sources.size() > 1 || sinks.size() > 1 ) 111 throw new IllegalArgumentException( "only supports single source and single sink graphs" ); 112 113 FlowElement source = getFirst( sources ); 114 FlowElement sink = getFirst( sinks ); 115 116 pairs.add( new Pair<>( source, sink ) ); 117 118 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( current, source, sink ); 119 120 if( longestFirst ) 121 Collections.reverse( paths ); // break off longest paths into own partitions 122 123 pathsIterator = paths.iterator(); 124 } 125 } 126 127 @Override 128 public ElementGraph next() 129 { 130 if( !pathsIterator.hasNext() ) 131 { 132 current = null; 133 pathsIterator = null; 134 135 advance(); 136 137 return next(); 138 } 139 140 GraphPath<FlowElement, Scope> path = pathsIterator.next(); 141 List<FlowElement> vertexList = getPathVertexList( path ); 142 List<Scope> edgeList = path.getEdgeList(); 143 144 return new ElementSubGraph( current, vertexList, edgeList ); 145 } 146 147 @Override 148 public void remove() 149 { 150 subGraphIterator.remove(); 151 } 152 }