001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.planner;
022
023import cascading.flow.planner.rule.RuleRegistry;
024import cascading.flow.planner.rule.assertion.BufferAfterEveryAssert;
025import cascading.flow.planner.rule.assertion.EveryAfterBufferAssert;
026import cascading.flow.planner.rule.assertion.LoneGroupAssert;
027import cascading.flow.planner.rule.assertion.MissingGroupAssert;
028import cascading.flow.planner.rule.assertion.SplitBeforeEveryAssert;
029import cascading.flow.planner.rule.partitioner.WholeGraphStepPartitioner;
030import cascading.flow.planner.rule.transformer.ApplyAssertionLevelTransformer;
031import cascading.flow.planner.rule.transformer.ApplyDebugLevelTransformer;
032import cascading.flow.planner.rule.transformer.RemoveNoOpPipeTransformer;
033import cascading.flow.tez.planner.rule.annotator.AccumulatedPostNodeAnnotator;
034import cascading.flow.tez.planner.rule.assertion.DualStreamedAccumulatedMergeNodeAssert;
035import cascading.flow.tez.planner.rule.partitioner.BottomUpBoundariesNodePartitioner;
036import cascading.flow.tez.planner.rule.partitioner.BottomUpJoinedBoundariesNodePartitioner;
037import cascading.flow.tez.planner.rule.partitioner.ConsecutiveGroupOrMergesNodePartitioner;
038import cascading.flow.tez.planner.rule.partitioner.SplitJoinBoundariesNodeRePartitioner;
039import cascading.flow.tez.planner.rule.partitioner.StreamedAccumulatedBoundariesNodeRePartitioner;
040import cascading.flow.tez.planner.rule.partitioner.StreamedOnlySourcesNodeRePartitioner;
041import cascading.flow.tez.planner.rule.partitioner.TopDownSplitBoundariesNodePartitioner;
042import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceCheckpointTransformer;
043import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupBlockingHashJoinTransformer;
044import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitHashJoinTransformer;
045import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitSpliceTransformer;
046import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitTransformer;
047import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinSameSourceTransformer;
048import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinToHashJoinTransformer;
049import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceJoinSplitTransformer;
050import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitSplitToStreamedHashJoinTransformer;
051import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitToStreamedHashJoinTransformer;
052import cascading.flow.tez.planner.rule.transformer.RemoveMalformedHashJoinNodeTransformer;
053
054/**
055 * The HashJoinHadoop2TezRuleRegistry provides support for assemblies using {@link cascading.pipe.HashJoin} pipes.
056 * <p/>
057 * Detecting and optimizing for HashJoin pipes adds further complexity and time to converge on a valid physical plan.
058 * <p/>
059 * If facing slowdowns, and no HashJoins are used, switch to the
060 * {@link cascading.flow.tez.planner.NoHashJoinHadoop2TezRuleRegistry} via the appropriate
061 * {@link cascading.flow.FlowConnector} constructor.
062 */
063public class HashJoinHadoop2TezRuleRegistry extends RuleRegistry
064  {
065  public HashJoinHadoop2TezRuleRegistry()
066    {
067//    enableDebugLogging();
068
069    // PreBalance
070    addRule( new LoneGroupAssert() );
071    addRule( new MissingGroupAssert() );
072    addRule( new BufferAfterEveryAssert() );
073    addRule( new EveryAfterBufferAssert() );
074    addRule( new SplitBeforeEveryAssert() );
075
076    addRule( new BoundaryBalanceGroupSplitTransformer() );
077    addRule( new BoundaryBalanceGroupSplitSpliceTransformer() ); // prevents AssemblyHelpersPlatformTest#testSameSourceMerge deadlock
078    addRule( new BoundaryBalanceCheckpointTransformer() );
079
080    // hash join
081    addRule( new BoundaryBalanceHashJoinSameSourceTransformer() );
082    addRule( new BoundaryBalanceSplitToStreamedHashJoinTransformer() ); // testGroupBySplitGroupByJoin
083    addRule( new BoundaryBalanceSplitSplitToStreamedHashJoinTransformer() ); // testGroupBySplitSplitGroupByJoin
084    addRule( new BoundaryBalanceHashJoinToHashJoinTransformer() ); // force HJ into unique nodes
085    addRule( new BoundaryBalanceGroupBlockingHashJoinTransformer() ); // joinAfterEvery
086    addRule( new BoundaryBalanceGroupSplitHashJoinTransformer() ); // groupBySplitJoins
087    addRule( new BoundaryBalanceJoinSplitTransformer() ); // prevents duplication of HashJoin, testJoinSplit
088
089    // PreResolve
090    addRule( new RemoveNoOpPipeTransformer() );
091    addRule( new ApplyAssertionLevelTransformer() );
092    addRule( new ApplyDebugLevelTransformer() );
093
094    // PostResolve
095
096    // PartitionSteps
097    addRule( new WholeGraphStepPartitioner() );
098
099    // PostSteps
100
101    // PartitionNodes
102
103    // no match with HashJoin inclusion
104    addRule( new TopDownSplitBoundariesNodePartitioner() ); // split from source to multiple sinks
105    addRule( new ConsecutiveGroupOrMergesNodePartitioner() );
106    addRule( new BottomUpBoundariesNodePartitioner() ); // streamed paths re-partitioned w/ StreamedOnly
107    addRule( new SplitJoinBoundariesNodeRePartitioner() ); // testCoGroupSelf - compensates for tez-1190
108
109    // hash join inclusion
110    addRule( new BottomUpJoinedBoundariesNodePartitioner() ); // will capture multiple inputs into sink for use with HashJoins
111    addRule( new StreamedAccumulatedBoundariesNodeRePartitioner() ); // joinsIntoCoGroupLhs & groupBySplitJoins
112    addRule( new StreamedOnlySourcesNodeRePartitioner() );
113
114    // PostNodes
115    addRule( new RemoveMalformedHashJoinNodeTransformer() ); // joinsIntoCoGroupLhs
116    addRule( new AccumulatedPostNodeAnnotator() ); // allows accumulated boundaries to be identified
117
118    addRule( new DualStreamedAccumulatedMergeNodeAssert() );
119    }
120  }