001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.io.FileWriter;
024    import java.io.IOException;
025    import java.io.Writer;
026    import java.util.Comparator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.PriorityQueue;
030    import java.util.Set;
031    
032    import cascading.flow.FlowElement;
033    import cascading.flow.FlowStep;
034    import cascading.pipe.Group;
035    import cascading.tap.Tap;
036    import cascading.util.Util;
037    import org.jgrapht.GraphPath;
038    import org.jgrapht.Graphs;
039    import org.jgrapht.ext.IntegerNameProvider;
040    import org.jgrapht.ext.VertexNameProvider;
041    import org.jgrapht.graph.SimpleDirectedGraph;
042    import org.jgrapht.traverse.TopologicalOrderIterator;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /** Class StepGraph is an internal representation of {@link FlowStep} instances. */
047    public abstract class FlowStepGraph<Config> extends SimpleDirectedGraph<FlowStep<Config>, Integer>
048      {
049      /** Field LOG */
050      private static final Logger LOG = LoggerFactory.getLogger( FlowStepGraph.class );
051    
052      /** Constructor StepGraph creates a new StepGraph instance. */
053      public FlowStepGraph()
054        {
055        super( Integer.class );
056        }
057    
058      /**
059       * Constructor StepGraph creates a new StepGraph instance.
060       *
061       * @param elementGraph of type ElementGraph
062       */
063      public FlowStepGraph( String flowName, ElementGraph elementGraph )
064        {
065        this();
066    
067        makeStepGraph( flowName, elementGraph );
068        }
069    
070      /**
071       * Method getCreateFlowStep ...
072       *
073       * @param steps   of type Map<String, FlowStep>
074       * @param sink    of type String
075       * @param numJobs of type int
076       * @return FlowStep
077       */
078      protected FlowStep<Config> getCreateFlowStep( Map<Tap, FlowStep<Config>> steps, Tap sink, int numJobs )
079        {
080        if( steps.containsKey( sink ) )
081          return steps.get( sink );
082    
083        LOG.debug( "creating step: {}", sink );
084    
085        int stepNum = steps.size() + 1;
086        String stepName = makeStepName( sink, numJobs, stepNum );
087        FlowStep<Config> step = createFlowStep( stepName, stepNum );
088    
089        steps.put( sink, step );
090    
091        return step;
092        }
093    
094      protected abstract FlowStep<Config> createFlowStep( String stepName, int stepNum );
095    
096      private String makeStepName( Tap sink, int numJobs, int stepNum )
097        {
098        if( sink.isTemporary() )
099          return String.format( "(%d/%d)", stepNum, numJobs );
100    
101        String identifier = sink.getIdentifier();
102    
103        if( identifier.length() > 25 )
104          identifier = String.format( "...%25s", identifier.substring( identifier.length() - 25 ) );
105    
106        return String.format( "(%d/%d) %s", stepNum, numJobs, identifier );
107        }
108    
109      protected abstract void makeStepGraph( String flowName, ElementGraph elementGraph );
110    
111      protected boolean pathContainsTap( GraphPath<FlowElement, Scope> path )
112        {
113        List<FlowElement> flowElements = Graphs.getPathVertexList( path );
114    
115        // first and last are taps, if we find more than 2, return false
116        int count = 0;
117    
118        for( FlowElement flowElement : flowElements )
119          {
120          if( flowElement instanceof Tap )
121            count++;
122          }
123    
124        return count > 2;
125        }
126    
127      public TopologicalOrderIterator<FlowStep<Config>, Integer> getTopologicalIterator()
128        {
129        return new TopologicalOrderIterator<FlowStep<Config>, Integer>( this, new PriorityQueue<FlowStep<Config>>( 10, new Comparator<FlowStep<Config>>()
130        {
131        @Override
132        public int compare( FlowStep<Config> lhs, FlowStep<Config> rhs )
133          {
134          return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() );
135          }
136        } ) );
137        }
138    
139      /**
140       * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
141       *
142       * @param filename of type String
143       */
144      public void writeDOT( String filename )
145        {
146        printElementGraph( filename );
147        }
148    
149      protected void printElementGraph( String filename )
150        {
151        try
152          {
153          Writer writer = new FileWriter( filename );
154    
155          Util.writeDOT( writer, this, new IntegerNameProvider<BaseFlowStep>(), new VertexNameProvider<FlowStep>()
156          {
157          public String getVertexName( FlowStep flowStep )
158            {
159            String sourceName = "";
160    
161            for( Object object : flowStep.getSources() )
162              {
163              Tap source = (Tap) object;
164    
165              if( source.isTemporary() )
166                continue;
167    
168              sourceName += "[" + source.getIdentifier() + "]";
169              }
170    
171            String name = "[" + flowStep.getName() + "]";
172    
173            if( sourceName.length() != 0 )
174              name += "\\nsrc:" + sourceName;
175    
176    
177            List<Group> groups = flowStep.getGroups();
178    
179            for( Group group : groups )
180              {
181              String groupName = group.getName();
182    
183              if( groupName.length() != 0 )
184                name += "\\ngrp:" + groupName;
185              }
186    
187            Set<Tap> sinks = flowStep.getSinks();
188    
189            for( Tap sink : sinks )
190              {
191              String sinkName = sink.isTemporary() ? "" : "[" + sink.getIdentifier() + "]";
192              if( sinkName.length() != 0 )
193                name += "\\nsnk:" + sinkName;
194              }
195    
196            return name.replaceAll( "\"", "\'" );
197            }
198          }, null );
199    
200          writer.close();
201          }
202        catch( IOException exception )
203          {
204          LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
205          }
206        }
207    
208      }