001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.iso.subgraph.iterator;
023
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.planner.Scope;
033import cascading.flow.planner.graph.ElementGraph;
034import cascading.flow.planner.graph.ElementSubGraph;
035import cascading.flow.planner.iso.ElementAnnotation;
036import cascading.flow.planner.iso.subgraph.SubGraphIterator;
037import cascading.util.EnumMultiMap;
038import cascading.util.Pair;
039import org.jgrapht.GraphPath;
040
041import static cascading.flow.planner.graph.ElementGraphs.*;
042import static cascading.util.Util.getFirst;
043import static org.jgrapht.Graphs.getPathVertexList;
044
045/**
046 *
047 */
048public class UniquePathSubGraphIterator implements SubGraphIterator
049  {
050  SubGraphIterator subGraphIterator;
051  boolean longestFirst;
052  boolean multiEdge;
053
054  ElementGraph current = null;
055  Iterator<GraphPath<FlowElement, Scope>> pathsIterator;
056  Set<Pair<FlowElement, FlowElement>> pairs = new HashSet<>();
057
058  public UniquePathSubGraphIterator( SubGraphIterator subGraphIterator, boolean longestFirst, boolean multiEdge )
059    {
060    this.subGraphIterator = subGraphIterator;
061    this.longestFirst = longestFirst;
062    this.multiEdge = multiEdge;
063    }
064
065  public Set<Pair<FlowElement, FlowElement>> getPairs()
066    {
067    return pairs;
068    }
069
070  @Override
071  public ElementGraph getElementGraph()
072    {
073    return subGraphIterator.getElementGraph();
074    }
075
076  @Override
077  public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations )
078    {
079    return subGraphIterator.getAnnotationMap( annotations ); // unsure we need to narrow results
080    }
081
082  @Override
083  public boolean hasNext()
084    {
085    if( pathsIterator == null )
086      advance();
087
088    if( current == null || pathsIterator == null )
089      return false;
090
091    boolean hasNextPath = pathsIterator.hasNext();
092
093    if( hasNextPath )
094      return true;
095
096    return subGraphIterator.hasNext();
097    }
098
099  private void advance()
100    {
101    if( current == null )
102      {
103      if( !subGraphIterator.hasNext() )
104        return;
105
106      current = subGraphIterator.next();
107      pathsIterator = null;
108      }
109
110    if( pathsIterator == null )
111      {
112      Set<FlowElement> sources = findSources( current, FlowElement.class );
113      Set<FlowElement> sinks = findSinks( current, FlowElement.class );
114
115      if( sources.size() > 1 || sinks.size() > 1 )
116        throw new IllegalArgumentException( "only supports single source and single sink graphs" );
117
118      FlowElement source = getFirst( sources );
119      FlowElement sink = getFirst( sinks );
120
121      pairs.add( new Pair<>( source, sink ) );
122
123      List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( current, source, sink );
124
125      if( longestFirst )
126        Collections.reverse( paths ); // break off longest paths into own partitions
127
128      pathsIterator = paths.iterator();
129      }
130    }
131
132  @Override
133  public ElementGraph next()
134    {
135    if( !pathsIterator.hasNext() )
136      {
137      current = null;
138      pathsIterator = null;
139
140      advance();
141
142      return next();
143      }
144
145    GraphPath<FlowElement, Scope> path = pathsIterator.next();
146    List<FlowElement> vertexList = getPathVertexList( path );
147    Collection<Scope> edgeList = path.getEdgeList();
148
149    if( multiEdge )
150      edgeList = getAllMultiEdgesBetween( edgeList, current );
151
152    return new ElementSubGraph( current, vertexList, edgeList );
153    }
154
155  @Override
156  public void remove()
157    {
158    subGraphIterator.remove();
159    }
160  }