001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.planner; 022 023import cascading.flow.planner.rule.RuleRegistry; 024import cascading.flow.planner.rule.assertion.BufferAfterEveryAssert; 025import cascading.flow.planner.rule.assertion.EveryAfterBufferAssert; 026import cascading.flow.planner.rule.assertion.LoneGroupAssert; 027import cascading.flow.planner.rule.assertion.MissingGroupAssert; 028import cascading.flow.planner.rule.assertion.SplitBeforeEveryAssert; 029import cascading.flow.planner.rule.partitioner.WholeGraphStepPartitioner; 030import cascading.flow.planner.rule.transformer.ApplyAssertionLevelTransformer; 031import cascading.flow.planner.rule.transformer.ApplyDebugLevelTransformer; 032import cascading.flow.planner.rule.transformer.RemoveNoOpPipeTransformer; 033import cascading.flow.tez.planner.rule.annotator.AccumulatedPostNodeAnnotator; 034import cascading.flow.tez.planner.rule.assertion.DualStreamedAccumulatedMergeNodeAssert; 035import cascading.flow.tez.planner.rule.partitioner.BottomUpBoundariesNodePartitioner; 036import cascading.flow.tez.planner.rule.partitioner.BottomUpJoinedBoundariesNodePartitioner; 037import cascading.flow.tez.planner.rule.partitioner.ConsecutiveGroupOrMergesNodePartitioner; 038import cascading.flow.tez.planner.rule.partitioner.SplitJoinBoundariesNodeRePartitioner; 039import cascading.flow.tez.planner.rule.partitioner.StreamedAccumulatedBoundariesNodeRePartitioner; 040import cascading.flow.tez.planner.rule.partitioner.StreamedOnlySourcesNodeRePartitioner; 041import cascading.flow.tez.planner.rule.partitioner.TopDownSplitBoundariesNodePartitioner; 042import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceCheckpointTransformer; 043import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupBlockingHashJoinTransformer; 044import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitHashJoinTransformer; 045import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitSpliceTransformer; 046import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceGroupSplitTransformer; 047import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinSameSourceTransformer; 048import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinToHashJoinTransformer; 049import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceJoinSplitTransformer; 050import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitSplitToStreamedHashJoinTransformer; 051import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitToStreamedHashJoinTransformer; 052import cascading.flow.tez.planner.rule.transformer.RemoveMalformedHashJoinNodeTransformer; 053 054/** 055 * The HashJoinHadoop2TezRuleRegistry provides support for assemblies using {@link cascading.pipe.HashJoin} pipes. 056 * <p/> 057 * Detecting and optimizing for HashJoin pipes adds further complexity and time to converge on a valid physical plan. 058 * <p/> 059 * If facing slowdowns, and no HashJoins are used, switch to the 060 * {@link cascading.flow.tez.planner.NoHashJoinHadoop2TezRuleRegistry} via the appropriate 061 * {@link cascading.flow.FlowConnector} constructor. 062 */ 063public class HashJoinHadoop2TezRuleRegistry extends RuleRegistry 064 { 065 public HashJoinHadoop2TezRuleRegistry() 066 { 067// enableDebugLogging(); 068 069 // PreBalance 070 addRule( new LoneGroupAssert() ); 071 addRule( new MissingGroupAssert() ); 072 addRule( new BufferAfterEveryAssert() ); 073 addRule( new EveryAfterBufferAssert() ); 074 addRule( new SplitBeforeEveryAssert() ); 075 076 addRule( new BoundaryBalanceGroupSplitTransformer() ); 077 addRule( new BoundaryBalanceGroupSplitSpliceTransformer() ); // prevents AssemblyHelpersPlatformTest#testSameSourceMerge deadlock 078 addRule( new BoundaryBalanceCheckpointTransformer() ); 079 080 // hash join 081 addRule( new BoundaryBalanceHashJoinSameSourceTransformer() ); 082 addRule( new BoundaryBalanceSplitToStreamedHashJoinTransformer() ); // testGroupBySplitGroupByJoin 083 addRule( new BoundaryBalanceSplitSplitToStreamedHashJoinTransformer() ); // testGroupBySplitSplitGroupByJoin 084 addRule( new BoundaryBalanceHashJoinToHashJoinTransformer() ); // force HJ into unique nodes 085 addRule( new BoundaryBalanceGroupBlockingHashJoinTransformer() ); // joinAfterEvery 086 addRule( new BoundaryBalanceGroupSplitHashJoinTransformer() ); // groupBySplitJoins 087 addRule( new BoundaryBalanceJoinSplitTransformer() ); // prevents duplication of HashJoin, testJoinSplit 088 089 // PreResolve 090 addRule( new RemoveNoOpPipeTransformer() ); 091 addRule( new ApplyAssertionLevelTransformer() ); 092 addRule( new ApplyDebugLevelTransformer() ); 093 094 // PostResolve 095 096 // PartitionSteps 097 addRule( new WholeGraphStepPartitioner() ); 098 099 // PostSteps 100 101 // PartitionNodes 102 103 // no match with HashJoin inclusion 104 addRule( new TopDownSplitBoundariesNodePartitioner() ); // split from source to multiple sinks 105 addRule( new ConsecutiveGroupOrMergesNodePartitioner() ); 106 addRule( new BottomUpBoundariesNodePartitioner() ); // streamed paths re-partitioned w/ StreamedOnly 107 addRule( new SplitJoinBoundariesNodeRePartitioner() ); // testCoGroupSelf - compensates for tez-1190 108 109 // hash join inclusion 110 addRule( new BottomUpJoinedBoundariesNodePartitioner() ); // will capture multiple inputs into sink for use with HashJoins 111 addRule( new StreamedAccumulatedBoundariesNodeRePartitioner() ); // joinsIntoCoGroupLhs & groupBySplitJoins 112 addRule( new StreamedOnlySourcesNodeRePartitioner() ); 113 114 // PostNodes 115 addRule( new RemoveMalformedHashJoinNodeTransformer() ); // joinsIntoCoGroupLhs 116 addRule( new AccumulatedPostNodeAnnotator() ); // allows accumulated boundaries to be identified 117 118 addRule( new DualStreamedAccumulatedMergeNodeAssert() ); 119 } 120 }