001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.iso.subgraph.iterator;
022
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Set;
028
029import cascading.flow.FlowElement;
030import cascading.flow.planner.Scope;
031import cascading.flow.planner.graph.ElementGraph;
032import cascading.flow.planner.graph.ElementSubGraph;
033import cascading.flow.planner.iso.ElementAnnotation;
034import cascading.flow.planner.iso.subgraph.SubGraphIterator;
035import cascading.util.EnumMultiMap;
036import cascading.util.Pair;
037import org.jgrapht.GraphPath;
038
039import static cascading.flow.planner.graph.ElementGraphs.*;
040import static cascading.util.Util.getFirst;
041import static org.jgrapht.Graphs.getPathVertexList;
042
043/**
044 *
045 */
046public class UniquePathSubGraphIterator implements SubGraphIterator
047  {
048  SubGraphIterator subGraphIterator;
049  boolean longestFirst;
050  ElementGraph current = null;
051  Iterator<GraphPath<FlowElement, Scope>> pathsIterator;
052  Set<Pair<FlowElement, FlowElement>> pairs = new HashSet<>();
053
054  public UniquePathSubGraphIterator( SubGraphIterator subGraphIterator, boolean longestFirst )
055    {
056    this.subGraphIterator = subGraphIterator;
057    this.longestFirst = longestFirst;
058    }
059
060  public Set<Pair<FlowElement, FlowElement>> getPairs()
061    {
062    return pairs;
063    }
064
065  @Override
066  public ElementGraph getElementGraph()
067    {
068    return subGraphIterator.getElementGraph();
069    }
070
071  @Override
072  public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations )
073    {
074    return subGraphIterator.getAnnotationMap( annotations ); // unsure we need to narrow results
075    }
076
077  @Override
078  public boolean hasNext()
079    {
080    if( pathsIterator == null )
081      advance();
082
083    if( current == null || pathsIterator == null )
084      return false;
085
086    boolean hasNextPath = pathsIterator.hasNext();
087
088    if( hasNextPath )
089      return true;
090
091    return subGraphIterator.hasNext();
092    }
093
094  private void advance()
095    {
096    if( current == null )
097      {
098      if( !subGraphIterator.hasNext() )
099        return;
100
101      current = subGraphIterator.next();
102      pathsIterator = null;
103      }
104
105    if( pathsIterator == null )
106      {
107      Set<FlowElement> sources = findSources( current, FlowElement.class );
108      Set<FlowElement> sinks = findSinks( current, FlowElement.class );
109
110      if( sources.size() > 1 || sinks.size() > 1 )
111        throw new IllegalArgumentException( "only supports single source and single sink graphs" );
112
113      FlowElement source = getFirst( sources );
114      FlowElement sink = getFirst( sinks );
115
116      pairs.add( new Pair<>( source, sink ) );
117
118      List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( current, source, sink );
119
120      if( longestFirst )
121        Collections.reverse( paths ); // break off longest paths into own partitions
122
123      pathsIterator = paths.iterator();
124      }
125    }
126
127  @Override
128  public ElementGraph next()
129    {
130    if( !pathsIterator.hasNext() )
131      {
132      current = null;
133      pathsIterator = null;
134
135      advance();
136
137      return next();
138      }
139
140    GraphPath<FlowElement, Scope> path = pathsIterator.next();
141    List<FlowElement> vertexList = getPathVertexList( path );
142    List<Scope> edgeList = path.getEdgeList();
143
144    return new ElementSubGraph( current, vertexList, edgeList );
145    }
146
147  @Override
148  public void remove()
149    {
150    subGraphIterator.remove();
151    }
152  }