001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.cascade.planner;
022
023import java.util.Iterator;
024import java.util.LinkedList;
025import java.util.ListIterator;
026
027import cascading.cascade.CascadeException;
028import cascading.flow.BaseFlow;
029import cascading.flow.Flow;
030import cascading.tap.CompositeTap;
031import cascading.tap.Tap;
032import org.jgrapht.graph.SimpleDirectedGraph;
033
034/**
035 *
036 */
037public abstract class TopologyGraph<Vertex> extends SimpleDirectedGraph<Vertex, BaseFlow.FlowHolder>
038  {
039  public TopologyGraph( Flow... flows )
040    {
041    super( BaseFlow.FlowHolder.class );
042
043    makeGraph( flows );
044    }
045
046  private void makeGraph( Flow[] flows )
047    {
048    for( Flow flow : flows )
049      {
050      LinkedList<Tap> sources = new LinkedList<Tap>( flow.getSourcesCollection() );
051      LinkedList<Tap> sinks = new LinkedList<Tap>( flow.getSinksCollection() );
052
053      sinks.addAll( flow.getCheckpointsCollection() );
054
055      unwrapCompositeTaps( sources );
056      unwrapCompositeTaps( sinks );
057
058      for( Tap source : sources )
059        addVertex( getVertex( flow, source ) );
060
061      for( Tap sink : sinks )
062        addVertex( getVertex( flow, sink ) );
063
064      for( Tap source : sources )
065        {
066        for( Tap sink : sinks )
067          addEdgeFor( flow, source, sink );
068        }
069      }
070    }
071
072  private void addEdgeFor( Flow flow, Tap source, Tap sink )
073    {
074    try
075      {
076      addEdge( getVertex( flow, source ), getVertex( flow, sink ), ( (BaseFlow) flow ).getHolder() );
077      }
078    catch( IllegalArgumentException exception )
079      {
080      throw new CascadeException( "no loops allowed in cascade, flow: " + flow.getName() + ", source: " + source + ", sink: " + sink );
081      }
082    }
083
084  abstract protected Vertex getVertex( Flow flow, Tap tap );
085
086  private void unwrapCompositeTaps( LinkedList<Tap> taps )
087    {
088    ListIterator<Tap> iterator = taps.listIterator();
089
090    while( iterator.hasNext() )
091      {
092      Tap tap = iterator.next();
093
094      if( tap instanceof CompositeTap )
095        {
096        iterator.remove();
097
098        Iterator<Tap> childTaps = ( (CompositeTap) tap ).getChildTaps();
099
100        while( childTaps.hasNext() )
101          {
102          iterator.add( childTaps.next() );
103          iterator.previous(); // force cursor backward
104          }
105        }
106      }
107    }
108  }