001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.stream;
022
023import java.util.Collection;
024import java.util.HashSet;
025import java.util.Map;
026import java.util.Set;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.stream.element.MemoryHashJoinGate;
030import cascading.pipe.HashJoin;
031import cascading.provider.FactoryLoader;
032import cascading.tuple.Tuple;
033import cascading.tuple.collect.Spillable;
034import cascading.tuple.collect.SpillableTupleList;
035import cascading.tuple.collect.TupleMapFactory;
036import cascading.tuple.hadoop.collect.HadoopTupleMapFactory;
037import org.apache.hadoop.conf.Configuration;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import static cascading.tuple.collect.TupleMapFactory.TUPLE_MAP_FACTORY;
042
043/**
044 *
045 */
046public class HadoopMemoryJoinGate extends MemoryHashJoinGate
047  {
048  private static final Logger LOG = LoggerFactory.getLogger( HadoopMemoryJoinGate.class );
049
050  public enum Spill
051    {
052      Num_Spills_Written, Num_Spills_Read, Num_Tuples_Spilled, Duration_Millis_Written
053    }
054
055  private class SpillListener implements Spillable.SpillListener
056    {
057    private final FlowProcess<? extends Configuration> flowProcess;
058
059    public SpillListener( FlowProcess<? extends Configuration> flowProcess )
060      {
061      this.flowProcess = flowProcess;
062      }
063
064    @Override
065    public void notifyWriteSpillBegin( Spillable spillable, int spillSize, String spillReason )
066      {
067      int numFiles = spillable.spillCount();
068
069      if( numFiles % 10 == 0 )
070        {
071        LOG.info( "spilling grouping: {}, num times: {}, with reason: {}",
072          new Object[]{spillable.getGrouping().print(), numFiles + 1, spillReason} );
073
074        Runtime runtime = Runtime.getRuntime();
075        long freeMem = runtime.freeMemory() / 1024 / 1024;
076        long maxMem = runtime.maxMemory() / 1024 / 1024;
077        long totalMem = runtime.totalMemory() / 1024 / 1024;
078
079        LOG.info( "mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
080        }
081
082      LOG.info( "spilling {} tuples in list to file number {}", spillSize, numFiles + 1 );
083
084      flowProcess.increment( Spill.Num_Spills_Written, 1 );
085      flowProcess.increment( Spill.Num_Tuples_Spilled, spillSize );
086      }
087
088    @Override
089    public void notifyWriteSpillEnd( SpillableTupleList spillableTupleList, long duration )
090      {
091      flowProcess.increment( Spill.Duration_Millis_Written, duration );
092      }
093
094    @Override
095    public void notifyReadSpillBegin( Spillable spillable )
096      {
097      flowProcess.increment( Spill.Num_Spills_Read, 1 );
098      }
099    }
100
101  private final SpillListener spillListener;
102  private TupleMapFactory<Configuration> tupleMapFactory;
103
104  public HadoopMemoryJoinGate( FlowProcess<? extends Configuration> flowProcess, HashJoin join )
105    {
106    super( flowProcess, join );
107
108    this.spillListener = new SpillListener( flowProcess );
109
110    FactoryLoader loader = FactoryLoader.getInstance();
111
112    this.tupleMapFactory = loader.loadFactoryFrom( flowProcess, TUPLE_MAP_FACTORY, HadoopTupleMapFactory.class );
113    }
114
115  @Override
116  protected Set<Tuple> createKeySet()
117    {
118    return new HashSet<Tuple>(); // does not need to be synchronized, or ordered
119    }
120
121  @Override
122  protected Map<Tuple, Collection<Tuple>> createTupleMap()
123    {
124    Map<Tuple, Collection<Tuple>> map = tupleMapFactory.create( flowProcess );
125
126    if( map instanceof Spillable )
127      ( (Spillable) map ).setSpillListener( spillListener );
128
129    return map;
130    }
131
132  @Override
133  protected void waitOnLatch()
134    {
135    // do nothing
136    }
137
138  @Override
139  protected void countDownLatch()
140    {
141    // do nothing
142    }
143  }