001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.rule; 022 023import java.util.Iterator; 024import java.util.List; 025import java.util.Set; 026 027import cascading.flow.FlowElement; 028import cascading.flow.planner.Scope; 029import cascading.flow.planner.ScopedElement; 030import cascading.flow.planner.graph.ElementGraphs; 031import cascading.flow.planner.graph.Extent; 032import cascading.flow.planner.graph.FlowElementGraph; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * 038 */ 039public class ScopeResolver 040 { 041 private static final Logger LOG = LoggerFactory.getLogger( ScopeResolver.class ); 042 043 public static void resolveFields( FlowElementGraph flowElementGraph ) 044 { 045 if( flowElementGraph.isResolved() ) 046 throw new IllegalStateException( "element graph already resolved" ); 047 048 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( flowElementGraph ); 049 050 while( iterator.hasNext() ) 051 resolveFields( flowElementGraph, iterator.next() ); 052 053 flowElementGraph.setResolved( true ); 054 } 055 056 private static void resolveFields( FlowElementGraph flowElementGraph, FlowElement source ) 057 { 058 if( source instanceof Extent ) 059 return; 060 061 Set<Scope> incomingScopes = flowElementGraph.incomingEdgesOf( source ); 062 Set<Scope> outgoingScopes = flowElementGraph.outgoingEdgesOf( source ); 063 064 List<FlowElement> flowElements = flowElementGraph.successorListOf( source ); 065 066 if( flowElements.size() == 0 ) 067 throw new IllegalStateException( "unable to find next elements in pipeline from: " + source.toString() ); 068 069 if( !( source instanceof ScopedElement ) ) 070 throw new IllegalStateException( "flow element is not a scoped element: " + source.toString() ); 071 072 Scope outgoingScope = ( (ScopedElement) source ).outgoingScopeFor( incomingScopes ); 073 074 if( LOG.isDebugEnabled() && outgoingScope != null ) 075 { 076 LOG.debug( "for modifier: " + source ); 077 if( outgoingScope.getArgumentsSelector() != null ) 078 LOG.debug( "setting outgoing arguments: " + outgoingScope.getArgumentsSelector() ); 079 if( outgoingScope.getOperationDeclaredFields() != null ) 080 LOG.debug( "setting outgoing declared: " + outgoingScope.getOperationDeclaredFields() ); 081 if( outgoingScope.getKeySelectors() != null ) 082 LOG.debug( "setting outgoing group: " + outgoingScope.getKeySelectors() ); 083 if( outgoingScope.getOutValuesSelector() != null ) 084 LOG.debug( "setting outgoing values: " + outgoingScope.getOutValuesSelector() ); 085 } 086 087 for( Scope scope : outgoingScopes ) 088 scope.copyFields( outgoingScope ); 089 } 090 }