001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Set;
029import java.util.TreeSet;
030import java.util.concurrent.atomic.AtomicInteger;
031
032import cascading.flow.FlowProcess;
033import cascading.flow.stream.graph.StreamGraph;
034import cascading.pipe.Splice;
035import cascading.tuple.Fields;
036import cascading.tuple.Tuple;
037import cascading.tuple.util.TupleBuilder;
038
039import static cascading.tuple.util.TupleViews.createNarrow;
040
041/**
042 *
043 */
044public abstract class MemorySpliceGate extends GroupingSpliceGate
045  {
046  protected Set<Tuple> keys;
047  protected Map<Tuple, Collection<Tuple>>[] keyValues;
048
049  protected MemoryCoGroupClosure closure;
050
051  protected int numIncomingEventingPaths;
052
053  protected final AtomicInteger count = new AtomicInteger( 0 );
054
055  public MemorySpliceGate( FlowProcess flowProcess, Splice splice )
056    {
057    super( flowProcess, splice );
058    }
059
060  @Override
061  public void bind( StreamGraph streamGraph )
062    {
063    super.bind( streamGraph );
064
065    numIncomingEventingPaths = streamGraph.findAllPreviousFor( this ).length;
066    }
067
068  // we must make a new Tuple instance to wrap the incoming copy
069  protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
070    {
071    return new TupleBuilder()
072    {
073    int[] pos = incomingFields.getPos( narrowFields );
074
075    @Override
076    public Tuple makeResult( Tuple input, Tuple output )
077      {
078      return createNarrow( pos, input );
079      }
080    };
081    }
082
083  @Override
084  public void initialize()
085    {
086    super.initialize();
087
088    initComparators();
089
090    keys = createKeySet();
091
092    count.set( numIncomingEventingPaths ); // the number of paths incoming
093    }
094
095  @Override
096  public void prepare()
097    {
098    super.prepare();
099
100    keyValues = createKeyValuesArray();
101
102    closure = new MemoryCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields );
103
104    if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() )
105      grouping.joinerClosure = closure;
106    }
107
108  protected Set<Tuple> createKeySet()
109    {
110    return Collections.synchronizedSet( new TreeSet<Tuple>( getKeyComparator() ) );
111    }
112
113  /**
114   * This lets us just replace an old map and let the gc cleanup, vs clearing each map
115   *
116   * @return of type Map
117   */
118  protected Map<Tuple, Collection<Tuple>>[] createKeyValuesArray()
119    {
120    // Ducts use identity for equality
121    Map<Tuple, Collection<Tuple>>[] valueMap = new Map[ getNumDeclaredIncomingBranches() ];
122
123    int start = isBlockingStreamed() ? 0 : 1;
124    for( int i = start; i < getNumDeclaredIncomingBranches(); i++ )
125      valueMap[ i ] = createTupleMap();
126
127    return valueMap;
128    }
129
130  protected Map<Tuple, Collection<Tuple>> createTupleMap()
131    {
132    return new HashMap<Tuple, Collection<Tuple>>()
133    {
134    @Override
135    public Collection<Tuple> get( Object object )
136      {
137      Collection<Tuple> value = super.get( object );
138
139      if( value == null )
140        {
141        value = new ArrayList<Tuple>();
142
143        super.put( (Tuple) object, value );
144        }
145
146      return value;
147      }
148    };
149    }
150
151  protected abstract boolean isBlockingStreamed();
152  }