001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.util.TreeMap;
024
025import cascading.pipe.Pipe;
026import cascading.pipe.SubAssembly;
027import cascading.tap.Tap;
028import cascading.util.Util;
029
030/**
031 *
032 */
033public class FlowElements
034  {
035  public static String id( FlowElement flowElement )
036    {
037    if( flowElement instanceof Pipe )
038      return Pipe.id( (Pipe) flowElement );
039
040    if( flowElement instanceof Tap )
041      return Tap.id( (Tap) flowElement );
042
043    String id = Util.returnInstanceFieldIfExistsSafe( flowElement, "id" );
044
045    if( id != null )
046      return id;
047
048    throw new IllegalArgumentException( "id not supported for: " + flowElement.getClass().getCanonicalName() );
049    }
050
051  public static int isPrevious( Pipe pipe, Pipe previous )
052    {
053    if( pipe == previous )
054      return 0;
055
056    if( pipe instanceof SubAssembly )
057      {
058      Pipe[] unwind = SubAssembly.unwind( pipe );
059
060      for( Pipe unwound : unwind )
061        {
062        int result = collectPipes( unwound, 0, previous );
063
064        if( result != -1 )
065          return result;
066        }
067
068      return -1;
069      }
070
071    return collectPipes( pipe, 0, previous );
072    }
073
074  private static int collectPipes( Pipe pipe, int depth, Pipe... allPrevious )
075    {
076    depth++;
077
078    for( Pipe previous : allPrevious )
079      {
080      if( pipe == previous )
081        return depth;
082
083      int result;
084      if( previous instanceof SubAssembly )
085        result = collectPipes( pipe, depth, SubAssembly.unwind( previous ) );
086      else
087        result = collectPipes( pipe, depth, previous.getPrevious() );
088
089      if( result != -1 )
090        return result;
091      }
092
093    return -1;
094    }
095
096  public static Integer findOrdinal( Pipe pipe, Pipe previous )
097    {
098    Pipe[] previousPipes = pipe.getPrevious();
099
100    TreeMap<Integer, Integer> sorted = new TreeMap<>();
101
102    for( int i = 0; i < previousPipes.length; i++ )
103      {
104      int result = isPrevious( previousPipes[ i ], (Pipe) previous );
105
106      if( result == -1 )
107        continue;
108
109      sorted.put( result, i );
110      }
111
112    return sorted.firstEntry().getValue();
113    }
114  }