001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.collect;
022
023import java.io.Closeable;
024import java.io.File;
025import java.io.Flushable;
026import java.io.IOException;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Iterator;
030import java.util.LinkedList;
031import java.util.List;
032
033import cascading.flow.FlowProcess;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleException;
036import cascading.tuple.io.TupleInputStream;
037import cascading.tuple.io.TupleOutputStream;
038import cascading.tuple.util.TupleViews;
039import cascading.util.CloseableIterator;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Class SpillableTupleList is a simple durable Collection that can spill its contents to disk when the
045 * {@code threshold} is met.
046 * <p/>
047 * Using a {@code threshold } of -1 will disable the spill, all values will remain in memory.
048 * <p.></p.>
049 * This class is used by the {@link cascading.pipe.CoGroup} pipe, to set properties specific to a given
050 * CoGroup instance, see the {@link cascading.pipe.CoGroup#getConfigDef()} method.
051 * <p/>
052 * Use the {@link SpillableProps} fluent helper class to set properties.
053 *
054 * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleList
055 */
056public abstract class SpillableTupleList implements Collection<Tuple>, Spillable
057  {
058  /** Field LOG */
059  private static final Logger LOG = LoggerFactory.getLogger( SpillableTupleList.class );
060
061  public static int getThreshold( FlowProcess flowProcess, int defaultValue )
062    {
063    String value = (String) flowProcess.getProperty( SpillableProps.LIST_THRESHOLD );
064
065    if( value == null || value.length() == 0 )
066      return defaultValue;
067
068    return Integer.parseInt( value );
069    }
070
071  protected static Class getCodecClass( FlowProcess flowProcess, String defaultCodecs, Class subClass )
072    {
073    String compress = (String) flowProcess.getProperty( SpillableProps.SPILL_COMPRESS );
074
075    if( compress != null && !Boolean.parseBoolean( compress ) )
076      return null;
077
078    String codecs = (String) flowProcess.getProperty( SpillableProps.SPILL_CODECS );
079
080    if( codecs == null || codecs.length() == 0 )
081      codecs = defaultCodecs;
082
083    Class codecClass = null;
084
085    for( String codec : codecs.split( "[,\\s]+" ) )
086      {
087      try
088        {
089        LOG.info( "attempting to load codec: {}", codec );
090        codecClass = Thread.currentThread().getContextClassLoader().loadClass( codec ).asSubclass( subClass );
091
092        if( codecClass != null )
093          {
094          LOG.info( "found codec: {}", codec );
095          break;
096          }
097        }
098      catch( ClassNotFoundException exception )
099        {
100        // do nothing
101        }
102      }
103
104    if( codecClass == null )
105      {
106      LOG.warn( "codecs set, but unable to load any: {}", codecs );
107      return null;
108      }
109
110    return codecClass;
111    }
112
113  private SpillStrategy spillStrategy;
114
115  /** Field files */
116  private List<File> files = Collections.EMPTY_LIST; // lazy init if we do a spill
117  /** Field current */
118  private final List<Object[]> current = new LinkedList<Object[]>();
119  /** Field size */
120  private int size = 0;
121  /** Fields listener */
122  private SpillListener spillListener = SpillListener.NULL;
123
124  private Tuple group;
125
126  protected SpillableTupleList( final int threshold )
127    {
128    this( new SpillStrategy()
129    {
130
131    @Override
132    public boolean doSpill( Spillable spillable, int size )
133      {
134      return size >= threshold;
135      }
136
137    @Override
138    public String getSpillReason( Spillable spillable )
139      {
140      return "met threshold: " + threshold;
141      }
142    } );
143    }
144
145  protected SpillableTupleList( SpillStrategy spillStrategy )
146    {
147    this.spillStrategy = spillStrategy;
148    }
149
150  @Override
151  public void setGrouping( Tuple group )
152    {
153    this.group = group;
154    }
155
156  @Override
157  public Tuple getGrouping()
158    {
159    return group;
160    }
161
162  @Override
163  public void setSpillStrategy( SpillStrategy spillStrategy )
164    {
165    this.spillStrategy = spillStrategy;
166    }
167
168  @Override
169  public void setSpillListener( SpillListener spillListener )
170    {
171    this.spillListener = spillListener;
172    }
173
174  @Override
175  public int spillCount()
176    {
177    return files.size();
178    }
179
180  private class SpilledListIterator implements Iterator<Tuple>
181    {
182    int fileIndex = 0;
183    private Iterator<Tuple> lastIterator;
184    private Iterator<Tuple> iterator;
185
186    private SpilledListIterator()
187      {
188      lastIterator = asTupleIterator();
189      getNextIterator();
190      }
191
192    private void getNextIterator()
193      {
194      if( iterator instanceof Closeable )
195        closeSilent( (Closeable) iterator );
196
197      if( fileIndex < files.size() )
198        iterator = getIteratorFor( files.get( fileIndex++ ) );
199      else
200        iterator = lastIterator;
201      }
202
203    private Iterator<Tuple> getIteratorFor( File file )
204      {
205      spillListener.notifyReadSpillBegin( SpillableTupleList.this );
206
207      return createIterator( createTupleInputStream( file ) );
208      }
209
210    public boolean hasNext()
211      {
212      if( isLastCollection() )
213        return iterator.hasNext();
214
215      if( iterator.hasNext() )
216        return true;
217
218      getNextIterator();
219
220      return hasNext();
221      }
222
223    public Tuple next()
224      {
225      if( isLastCollection() || iterator.hasNext() )
226        return iterator.next();
227
228      getNextIterator();
229
230      return next();
231      }
232
233    private boolean isLastCollection()
234      {
235      return iterator == lastIterator;
236      }
237
238    public void remove()
239      {
240      throw new UnsupportedOperationException( "remove is not supported" );
241      }
242    }
243
244  /**
245   * Method add will add the given {@link cascading.tuple.Tuple} to this list.
246   *
247   * @param tuple of type Tuple
248   */
249  @Override
250  public boolean add( Tuple tuple )
251    {
252    doSpill(); // spill if we break over the threshold
253
254    current.add( Tuple.elements( tuple ).toArray( new Object[ tuple.size() ] ) );
255    size++;
256
257    return true;
258    }
259
260  @Override
261  public int size()
262    {
263    return size;
264    }
265
266  @Override
267  public boolean isEmpty()
268    {
269    return files.isEmpty() && current.size() == 0;
270    }
271
272  private final boolean doSpill()
273    {
274    if( !spillStrategy.doSpill( this, current.size() ) )
275      return false;
276
277    long start = System.currentTimeMillis();
278    spillListener.notifyWriteSpillBegin( this, current.size(), spillStrategy.getSpillReason( this ) );
279
280    File file = createTempFile();
281    TupleOutputStream dataOutputStream = createTupleOutputStream( file );
282
283    try
284      {
285      writeList( dataOutputStream, current );
286      }
287    finally
288      {
289      flushSilent( dataOutputStream );
290      closeSilent( dataOutputStream );
291      }
292
293    spillListener.notifyWriteSpillEnd( this, System.currentTimeMillis() - start );
294
295    if( files == Collections.EMPTY_LIST )
296      files = new LinkedList<File>();
297
298    files.add( file );
299    current.clear();
300
301    return true;
302    }
303
304  private void flushSilent( Flushable flushable )
305    {
306    try
307      {
308      flushable.flush();
309      }
310    catch( IOException exception )
311      {
312      // ignore
313      }
314    }
315
316  private void closeSilent( Closeable closeable )
317    {
318    try
319      {
320      closeable.close();
321      }
322    catch( IOException exception )
323      {
324      // ignore
325      }
326    }
327
328  private void writeList( TupleOutputStream dataOutputStream, List<Object[]> list )
329    {
330    try
331      {
332      dataOutputStream.writeLong( list.size() );
333
334      for( Object[] elements : list )
335        dataOutputStream.writeElementArray( elements );
336      }
337    catch( IOException exception )
338      {
339      throw new TupleException( "unable to write tuple collection to file output stream", exception );
340      }
341    }
342
343  protected abstract TupleOutputStream createTupleOutputStream( File file );
344
345  private Iterator<Tuple> createIterator( final TupleInputStream tupleInputStream )
346    {
347    final long size;
348
349    try
350      {
351      size = tupleInputStream.readLong();
352      }
353    catch( IOException exception )
354      {
355      throw new TupleException( "unable to read 'size' of collection from file input stream", exception );
356      }
357
358    return new CloseableIterator<Tuple>()
359    {
360    Tuple tuple = new Tuple();
361    long count = 0;
362
363    @Override
364    public boolean hasNext()
365      {
366      return count < size;
367      }
368
369    @Override
370    public Tuple next()
371      {
372      try
373        {
374        return tupleInputStream.readTuple( tuple );
375        }
376      catch( IOException exception )
377        {
378        throw new TupleException( "unable to read next tuple from file input stream containing: " + size + " tuples, successfully read tuples: " + count, exception );
379        }
380      finally
381        {
382        count++;
383        }
384      }
385
386    @Override
387    public void remove()
388      {
389      throw new UnsupportedOperationException( "remove is not supported" );
390      }
391
392    @Override
393    public void close() throws IOException
394      {
395      tupleInputStream.close();
396      }
397    };
398    }
399
400  protected abstract TupleInputStream createTupleInputStream( File file );
401
402  private File createTempFile()
403    {
404    try
405      {
406      File file = File.createTempFile( "cascading-spillover", null );
407      file.deleteOnExit();
408
409      return file;
410      }
411    catch( IOException exception )
412      {
413      throw new TupleException( "unable to create temporary file", exception );
414      }
415    }
416
417  @Override
418  public void clear()
419    {
420    files.clear();
421    current.clear();
422    size = 0;
423    }
424
425  @Override
426  public Iterator<Tuple> iterator()
427    {
428    if( files.isEmpty() )
429      return asTupleIterator();
430
431    return new SpilledListIterator();
432    }
433
434  private Iterator<Tuple> asTupleIterator()
435    {
436    final Tuple tuple = TupleViews.createObjectArray();
437    final Iterator<Object[]> iterator = current.iterator();
438
439    return new Iterator<Tuple>()
440    {
441    @Override
442    public boolean hasNext()
443      {
444      return iterator.hasNext();
445      }
446
447    @Override
448    public Tuple next()
449      {
450      return TupleViews.reset( tuple, iterator.next() );
451      }
452
453    @Override
454    public void remove()
455      {
456      }
457    };
458    }
459
460  // collection methods, this class cannot only be added to, so they aren't implemented
461  @Override
462  public boolean contains( Object object )
463    {
464    return false;
465    }
466
467  @Override
468  public Object[] toArray()
469    {
470    return new Object[ 0 ];
471    }
472
473  @Override
474  public <T> T[] toArray( T[] ts )
475    {
476    return null;
477    }
478
479  @Override
480  public boolean remove( Object object )
481    {
482    return false;
483    }
484
485  @Override
486  public boolean containsAll( Collection<?> objects )
487    {
488    return false;
489    }
490
491  @Override
492  public boolean addAll( Collection<? extends Tuple> tuples )
493    {
494    return false;
495    }
496
497  @Override
498  public boolean removeAll( Collection<?> objects )
499    {
500    return false;
501    }
502
503  @Override
504  public boolean retainAll( Collection<?> objects )
505    {
506    return false;
507    }
508  }