001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.planner.rule.scopeexpression;
022
023import java.net.URI;
024
025import cascading.flow.FlowElement;
026import cascading.flow.hadoop.planner.HadoopPlanner;
027import cascading.flow.planner.PlannerContext;
028import cascading.flow.planner.Scope;
029import cascading.flow.planner.graph.ElementGraph;
030import cascading.flow.planner.iso.expression.ScopeExpression;
031import cascading.tap.hadoop.Hfs;
032
033/**
034 *
035 */
036public class EquivalentTapsScopeExpression extends ScopeExpression
037  {
038  @Override
039  public boolean applies( PlannerContext plannerContext, ElementGraph elementGraph, Scope scope )
040    {
041    FlowElement edgeSource = elementGraph.getEdgeSource( scope );
042    FlowElement edgeTarget = elementGraph.getEdgeTarget( scope );
043
044    if( !( edgeSource instanceof Hfs ) || !( edgeTarget instanceof Hfs ) )
045      throw new IllegalStateException( "non Hfs Taps matched" );
046
047    Hfs predecessor = (Hfs) edgeSource;
048    Hfs successor = (Hfs) edgeTarget;
049
050    // does this scheme source what it sinks
051    if( !successor.getScheme().isSymmetrical() )
052      return false;
053
054    HadoopPlanner flowPlanner = (HadoopPlanner) plannerContext.getFlowPlanner();
055
056    URI tempURIScheme = flowPlanner.getDefaultURIScheme( predecessor ); // temp uses default fs
057    URI successorURIScheme = flowPlanner.getURIScheme( successor );
058
059    if( !tempURIScheme.equals( successorURIScheme ) )
060      return false;
061
062    // safe, both are symmetrical
063    // should be called after fields are resolved
064    if( !predecessor.getSourceFields().equals( successor.getSourceFields() ) )
065      return true;
066
067    return true;
068    }
069  }