001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.cascade.planner; 022 023 import java.util.Comparator; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.PriorityQueue; 027 import java.util.Set; 028 029 import cascading.cascade.CascadeException; 030 import cascading.flow.BaseFlow; 031 import cascading.flow.Flow; 032 import org.jgrapht.Graphs; 033 import org.jgrapht.graph.SimpleDirectedGraph; 034 import org.jgrapht.traverse.TopologicalOrderIterator; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 /** 039 * 040 */ 041 public class FlowGraph extends SimpleDirectedGraph<Flow, Integer> 042 { 043 private static final Logger LOG = LoggerFactory.getLogger( FlowGraph.class ); 044 045 public FlowGraph( IdentifierGraph identifierGraph ) 046 { 047 super( Integer.class ); 048 049 makeGraph( identifierGraph ); 050 051 verifyNoCycles(); 052 } 053 054 public TopologicalOrderIterator<Flow, Integer> getTopologicalIterator() 055 { 056 return new TopologicalOrderIterator<Flow, Integer>( this, new PriorityQueue<Flow>( 10, new Comparator<Flow>() 057 { 058 @Override 059 public int compare( Flow lhs, Flow rhs ) 060 { 061 return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() ); 062 } 063 } ) ); 064 } 065 066 private void verifyNoCycles() 067 { 068 Set<Flow> flows = new HashSet<Flow>(); 069 070 TopologicalOrderIterator<Flow, Integer> topoIterator = new TopologicalOrderIterator<Flow, Integer>( this ); 071 072 while( topoIterator.hasNext() ) 073 flows.add( topoIterator.next() ); 074 075 if( flows.size() != vertexSet().size() ) 076 throw new CascadeException( "there are likely cycles in the set of given flows, topological iterator cannot traverse flows with cycles" ); 077 } 078 079 private void makeGraph( IdentifierGraph identifierGraph ) 080 { 081 Set<String> identifiers = identifierGraph.vertexSet(); 082 083 int count = 0; 084 085 for( String source : identifiers ) 086 { 087 if( LOG.isDebugEnabled() ) 088 LOG.debug( "handling flow source: {}", source ); 089 090 List<String> sinks = Graphs.successorListOf( identifierGraph, source ); 091 092 for( String sink : sinks ) 093 { 094 if( LOG.isDebugEnabled() ) 095 LOG.debug( "handling flow path: {} -> {}", source, sink ); 096 097 Flow flow = identifierGraph.getEdge( source, sink ).flow; 098 099 addVertex( flow ); 100 101 Set<BaseFlow.FlowHolder> previous = identifierGraph.incomingEdgesOf( source ); 102 103 for( BaseFlow.FlowHolder previousFlow : previous ) 104 { 105 addVertex( previousFlow.flow ); 106 107 if( getEdge( previousFlow.flow, flow ) != null ) 108 continue; 109 110 if( !addEdge( previousFlow.flow, flow, count++ ) ) 111 throw new CascadeException( "unable to add path between: " + previousFlow.flow.getName() + " and: " + flow.getName() ); 112 } 113 } 114 } 115 } 116 }