001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.stream;
022    
023    import java.util.Collection;
024    import java.util.HashSet;
025    import java.util.Map;
026    import java.util.Set;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.flow.stream.MemoryHashJoinGate;
030    import cascading.pipe.HashJoin;
031    import cascading.provider.FactoryLoader;
032    import cascading.tuple.Tuple;
033    import cascading.tuple.collect.Spillable;
034    import cascading.tuple.collect.SpillableTupleList;
035    import cascading.tuple.collect.TupleMapFactory;
036    import cascading.tuple.hadoop.collect.HadoopTupleMapFactory;
037    import org.apache.hadoop.mapred.JobConf;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    import static cascading.tuple.collect.TupleMapFactory.TUPLE_MAP_FACTORY;
042    
043    /**
044     *
045     */
046    public class HadoopMemoryJoinGate extends MemoryHashJoinGate
047      {
048      private static final Logger LOG = LoggerFactory.getLogger( HadoopMemoryJoinGate.class );
049    
050      public enum Spill
051        {
052          Num_Spills_Written, Num_Spills_Read, Num_Tuples_Spilled, Duration_Millis_Written
053        }
054    
055      private class SpillListener implements Spillable.SpillListener
056        {
057        private final FlowProcess<JobConf> flowProcess;
058    
059        public SpillListener( FlowProcess<JobConf> flowProcess )
060          {
061          this.flowProcess = flowProcess;
062          }
063    
064        @Override
065        public void notifyWriteSpillBegin( Spillable spillable, int spillSize, String spillReason )
066          {
067          int numFiles = spillable.spillCount();
068    
069          if( numFiles % 10 == 0 )
070            {
071            LOG.info( "spilling grouping: {}, num times: {}, with reason: {}",
072              new Object[]{spillable.getGrouping().print(), numFiles + 1, spillReason} );
073    
074            Runtime runtime = Runtime.getRuntime();
075            long freeMem = runtime.freeMemory() / 1024 / 1024;
076            long maxMem = runtime.maxMemory() / 1024 / 1024;
077            long totalMem = runtime.totalMemory() / 1024 / 1024;
078    
079            LOG.info( "mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
080            }
081    
082          LOG.info( "spilling {} tuples in list to file number {}", spillSize, numFiles + 1 );
083    
084          flowProcess.increment( Spill.Num_Spills_Written, 1 );
085          flowProcess.increment( Spill.Num_Tuples_Spilled, spillSize );
086          }
087    
088        @Override
089        public void notifyWriteSpillEnd( SpillableTupleList spillableTupleList, long duration )
090          {
091          flowProcess.increment( Spill.Duration_Millis_Written, duration );
092          }
093    
094        @Override
095        public void notifyReadSpillBegin( Spillable spillable )
096          {
097          flowProcess.increment( Spill.Num_Spills_Read, 1 );
098          }
099        }
100    
101      private final SpillListener spillListener;
102      private TupleMapFactory<JobConf> tupleMapFactory;
103    
104      public HadoopMemoryJoinGate( FlowProcess<JobConf> flowProcess, HashJoin join )
105        {
106        super( flowProcess, join );
107    
108        this.spillListener = new SpillListener( flowProcess );
109    
110        FactoryLoader loader = FactoryLoader.getInstance();
111    
112        this.tupleMapFactory = loader.loadFactoryFrom( flowProcess, TUPLE_MAP_FACTORY, HadoopTupleMapFactory.class );
113        }
114    
115      @Override
116      protected Set<Tuple> createKeySet()
117        {
118        return new HashSet<Tuple>(); // does not need to be synchronized, or ordered
119        }
120    
121      @Override
122      protected Map<Tuple, Collection<Tuple>> createTupleMap()
123        {
124        Map<Tuple, Collection<Tuple>> map = tupleMapFactory.create( flowProcess );
125    
126        if( map instanceof Spillable )
127          ( (Spillable) map ).setSpillListener( spillListener );
128    
129        return map;
130        }
131    
132      @Override
133      protected void waitOnLatch()
134        {
135        // do nothing
136        }
137    
138      @Override
139      protected void countDownLatch()
140        {
141        // do nothing
142        }
143      }