001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez.planner; 023 024import cascading.flow.hadoop.planner.rule.transformer.ReplaceAccumulateTapWithDistCacheTransformer; 025import cascading.flow.planner.rule.RuleRegistry; 026import cascading.flow.planner.rule.annotator.LogicalMergeAnnotator; 027import cascading.flow.planner.rule.assertion.BufferAfterEveryAssert; 028import cascading.flow.planner.rule.assertion.EveryAfterBufferAssert; 029import cascading.flow.planner.rule.assertion.LoneGroupAssert; 030import cascading.flow.planner.rule.assertion.MissingGroupAssert; 031import cascading.flow.planner.rule.assertion.SplitBeforeEveryAssert; 032import cascading.flow.planner.rule.partitioner.WholeGraphStepPartitioner; 033import cascading.flow.planner.rule.transformer.ApplyAssertionLevelTransformer; 034import cascading.flow.planner.rule.transformer.ApplyDebugLevelTransformer; 035import cascading.flow.planner.rule.transformer.RemoveNoOpPipeTransformer; 036import cascading.flow.tez.planner.rule.annotator.AccumulatedPostNodeAnnotator; 037import cascading.flow.tez.planner.rule.assertion.DualStreamedAccumulatedMergeNodeAssert; 038import cascading.flow.tez.planner.rule.partitioner.BottomUpBoundariesNodePartitioner; 039import cascading.flow.tez.planner.rule.partitioner.BottomUpJoinedBoundariesNodePartitioner; 040import cascading.flow.tez.planner.rule.partitioner.BottomUpJoinedBoundariesTriangleNodePartitioner; 041import cascading.flow.tez.planner.rule.partitioner.ConsecutiveGroupOrMergesNodePartitioner; 042import cascading.flow.tez.planner.rule.partitioner.SplitJoinBoundariesNodeRePartitioner; 043import cascading.flow.tez.planner.rule.partitioner.StreamedAccumulatedBoundariesNodeRePartitioner; 044import cascading.flow.tez.planner.rule.partitioner.StreamedOnlySourcesNodeRePartitioner; 045import cascading.flow.tez.planner.rule.partitioner.TopDownSplitBoundariesNodePartitioner; 046import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceCheckpointTransformer; 047import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupBlockingHashJoinTransformer; 048import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitHashJoinTransformer; 049import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitSpliceTransformer; 050import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitTransformer; 051import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinSameSourceTransformer; 052import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinToHashJoinTransformer; 053import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceJoinSplitTransformer; 054import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitSplitToStreamedHashJoinTransformer; 055import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitToStreamedHashJoinTransformer; 056import cascading.flow.tez.planner.rule.transformer.RemoveMalformedHashJoinNodeTransformer; 057 058/** 059 * The HashJoinHadoop2TezRuleRegistry provides support for assemblies using {@link cascading.pipe.HashJoin} pipes. 060 * <p/> 061 * Detecting and optimizing for HashJoin pipes adds further complexity and time to converge on a valid physical plan. 062 * <p/> 063 * If facing slowdowns, and no HashJoins are used, switch to the 064 * {@link cascading.flow.tez.planner.NoHashJoinHadoop2TezRuleRegistry} via the appropriate 065 * {@link cascading.flow.FlowConnector} constructor. 066 */ 067public class HashJoinHadoop2TezRuleRegistry extends RuleRegistry 068 { 069 public HashJoinHadoop2TezRuleRegistry() 070 { 071// enableDebugLogging(); 072 073 // PreBalance 074 addRule( new LoneGroupAssert() ); 075 addRule( new MissingGroupAssert() ); 076 addRule( new BufferAfterEveryAssert() ); 077 addRule( new EveryAfterBufferAssert() ); 078 addRule( new SplitBeforeEveryAssert() ); 079 080 addRule( new BoundaryBalanceGroupSplitTransformer() ); 081 addRule( new BoundaryBalanceGroupSplitSpliceTransformer() ); // prevents AssemblyHelpersPlatformTest#testSameSourceMerge deadlock 082 addRule( new BoundaryBalanceCheckpointTransformer() ); 083 084 // hash join 085 addRule( new BoundaryBalanceHashJoinSameSourceTransformer() ); 086 addRule( new BoundaryBalanceSplitToStreamedHashJoinTransformer() ); // testGroupBySplitGroupByJoin 087 addRule( new BoundaryBalanceSplitSplitToStreamedHashJoinTransformer() ); // testGroupBySplitSplitGroupByJoin 088 addRule( new BoundaryBalanceHashJoinToHashJoinTransformer() ); // force HJ into unique nodes 089 addRule( new BoundaryBalanceGroupBlockingHashJoinTransformer() ); // joinAfterEvery 090 addRule( new BoundaryBalanceGroupSplitHashJoinTransformer() ); // groupBySplitJoins 091 addRule( new BoundaryBalanceJoinSplitTransformer() ); // prevents duplication of HashJoin, testJoinSplit 092 093 // PreResolve 094 addRule( new RemoveNoOpPipeTransformer() ); 095 addRule( new ApplyAssertionLevelTransformer() ); 096 addRule( new ApplyDebugLevelTransformer() ); 097 addRule( new LogicalMergeAnnotator() ); // MergePipesPlatformTest#testSameSourceMergeHashJoin 098 addRule( new ReplaceAccumulateTapWithDistCacheTransformer() ); 099 100 // PostResolve 101 102 // PartitionSteps 103 addRule( new WholeGraphStepPartitioner() ); 104 105 // PostSteps 106 107 // PartitionNodes 108 109 // no match with HashJoin inclusion 110 addRule( new TopDownSplitBoundariesNodePartitioner() ); // split from source to multiple sinks 111 addRule( new ConsecutiveGroupOrMergesNodePartitioner() ); 112 addRule( new BottomUpBoundariesNodePartitioner() ); // streamed paths re-partitioned w/ StreamedOnly 113 addRule( new SplitJoinBoundariesNodeRePartitioner() ); // testCoGroupSelf - compensates for tez-1190 114 115 // hash join inclusion 116 addRule( new BottomUpJoinedBoundariesNodePartitioner() ); // will capture multiple inputs into sink for use with HashJoins 117 addRule( new BottomUpJoinedBoundariesTriangleNodePartitioner() ); // will capture multiple inputs into sink for use with HashJoins 118 addRule( new StreamedAccumulatedBoundariesNodeRePartitioner() ); // joinsIntoCoGroupLhs & groupBySplitJoins 119 addRule( new StreamedOnlySourcesNodeRePartitioner() ); 120 121 // PostNodes 122 addRule( new RemoveMalformedHashJoinNodeTransformer() ); // joinsIntoCoGroupLhs 123 addRule( new AccumulatedPostNodeAnnotator() ); // allows accumulated boundaries to be identified 124 125 addRule( new DualStreamedAccumulatedMergeNodeAssert() ); 126 } 127 }