001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.rule;
022
023import java.util.Iterator;
024import java.util.List;
025import java.util.Set;
026
027import cascading.flow.FlowElement;
028import cascading.flow.planner.Scope;
029import cascading.flow.planner.ScopedElement;
030import cascading.flow.planner.graph.ElementGraphs;
031import cascading.flow.planner.graph.Extent;
032import cascading.flow.planner.graph.FlowElementGraph;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 *
038 */
039public class ScopeResolver
040  {
041  private static final Logger LOG = LoggerFactory.getLogger( ScopeResolver.class );
042
043  public static void resolveFields( FlowElementGraph flowElementGraph )
044    {
045    if( flowElementGraph.isResolved() )
046      throw new IllegalStateException( "element graph already resolved" );
047
048    Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( flowElementGraph );
049
050    while( iterator.hasNext() )
051      resolveFields( flowElementGraph, iterator.next() );
052
053    flowElementGraph.setResolved( true );
054    }
055
056  private static void resolveFields( FlowElementGraph flowElementGraph, FlowElement source )
057    {
058    if( source instanceof Extent )
059      return;
060
061    Set<Scope> incomingScopes = flowElementGraph.incomingEdgesOf( source );
062    Set<Scope> outgoingScopes = flowElementGraph.outgoingEdgesOf( source );
063
064    List<FlowElement> flowElements = flowElementGraph.successorListOf( source );
065
066    if( flowElements.size() == 0 )
067      throw new IllegalStateException( "unable to find next elements in pipeline from: " + source.toString() );
068
069    if( !( source instanceof ScopedElement ) )
070      throw new IllegalStateException( "flow element is not a scoped element: " + source.toString() );
071
072    Scope outgoingScope = ( (ScopedElement) source ).outgoingScopeFor( incomingScopes );
073
074    if( LOG.isDebugEnabled() && outgoingScope != null )
075      {
076      LOG.debug( "for modifier: " + source );
077      if( outgoingScope.getArgumentsSelector() != null )
078        LOG.debug( "setting outgoing arguments: " + outgoingScope.getArgumentsSelector() );
079      if( outgoingScope.getOperationDeclaredFields() != null )
080        LOG.debug( "setting outgoing declared: " + outgoingScope.getOperationDeclaredFields() );
081      if( outgoingScope.getKeySelectors() != null )
082        LOG.debug( "setting outgoing group: " + outgoingScope.getKeySelectors() );
083      if( outgoingScope.getOutValuesSelector() != null )
084        LOG.debug( "setting outgoing values: " + outgoingScope.getOutValuesSelector() );
085      }
086
087    for( Scope scope : outgoingScopes )
088      scope.copyFields( outgoingScope );
089    }
090  }