001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.collect;
022    
023    import java.io.Closeable;
024    import java.io.File;
025    import java.io.Flushable;
026    import java.io.IOException;
027    import java.util.Collection;
028    import java.util.Collections;
029    import java.util.Iterator;
030    import java.util.LinkedList;
031    import java.util.List;
032    
033    import cascading.flow.FlowProcess;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleException;
036    import cascading.tuple.io.TupleInputStream;
037    import cascading.tuple.io.TupleOutputStream;
038    import cascading.tuple.util.TupleViews;
039    import cascading.util.CloseableIterator;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    /**
044     * Class SpillableTupleList is a simple durable Collection that can spill its contents to disk when the
045     * {@code threshold} is met.
046     * <p/>
047     * Using a {@code threshold } of -1 will disable the spill, all values will remain in memory.
048     * <p.></p.>
049     * This class is used by the {@link cascading.pipe.CoGroup} pipe, to set properties specific to a given
050     * CoGroup instance, see the {@link cascading.pipe.CoGroup#getConfigDef()} method.
051     * <p/>
052     * Use the {@link SpillableProps} fluent helper class to set properties.
053     *
054     * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleList
055     */
056    public abstract class SpillableTupleList implements Collection<Tuple>, Spillable
057      {
058      /** Field LOG */
059      private static final Logger LOG = LoggerFactory.getLogger( SpillableTupleList.class );
060    
061      /** Number of tuples to hold in memory before spilling them to disk. */
062      @Deprecated
063      public static final String SPILL_THRESHOLD = SpillableProps.LIST_THRESHOLD;
064    
065      /**
066       * Whether to enable compress of the spills or not, on by default.
067       *
068       * @see Boolean#parseBoolean(String)
069       */
070      @Deprecated
071      public static final String SPILL_COMPRESS = SpillableProps.SPILL_COMPRESS;
072    
073      /** A comma delimited list of possible codecs to try. This is platform dependent. */
074      @Deprecated
075      public static final String SPILL_CODECS = SpillableProps.SPILL_CODECS;
076    
077      public static int getThreshold( FlowProcess flowProcess, int defaultValue )
078        {
079        String value = (String) flowProcess.getProperty( SpillableProps.LIST_THRESHOLD );
080    
081        if( value == null || value.length() == 0 )
082          return defaultValue;
083    
084        return Integer.parseInt( value );
085        }
086    
087      protected static Class getCodecClass( FlowProcess flowProcess, String defaultCodecs, Class subClass )
088        {
089        String compress = (String) flowProcess.getProperty( SpillableProps.SPILL_COMPRESS );
090    
091        if( compress != null && !Boolean.parseBoolean( compress ) )
092          return null;
093    
094        String codecs = (String) flowProcess.getProperty( SpillableProps.SPILL_CODECS );
095    
096        if( codecs == null || codecs.length() == 0 )
097          codecs = defaultCodecs;
098    
099        Class codecClass = null;
100    
101        for( String codec : codecs.split( "[,\\s]+" ) )
102          {
103          try
104            {
105            LOG.info( "attempting to load codec: {}", codec );
106            codecClass = Thread.currentThread().getContextClassLoader().loadClass( codec ).asSubclass( subClass );
107    
108            if( codecClass != null )
109              {
110              LOG.info( "found codec: {}", codec );
111              break;
112              }
113            }
114          catch( ClassNotFoundException exception )
115            {
116            // do nothing
117            }
118          }
119    
120        if( codecClass == null )
121          {
122          LOG.warn( "codecs set, but unable to load any: {}", codecs );
123          return null;
124          }
125    
126        return codecClass;
127        }
128    
129    
130      private SpillStrategy spillStrategy;
131    
132      /** Field files */
133      private List<File> files = Collections.EMPTY_LIST; // lazy init if we do a spill
134      /** Field current */
135      private final List<Object[]> current = new LinkedList<Object[]>();
136      /** Field size */
137      private int size = 0;
138      /** Fields listener */
139      private SpillListener spillListener = SpillListener.NULL;
140    
141      private Tuple group;
142    
143      protected SpillableTupleList( final int threshold )
144        {
145        this( new SpillStrategy()
146        {
147    
148        @Override
149        public boolean doSpill( Spillable spillable, int size )
150          {
151          return size >= threshold;
152          }
153    
154        @Override
155        public String getSpillReason( Spillable spillable )
156          {
157          return "met threshold: " + threshold;
158          }
159        } );
160        }
161    
162      protected SpillableTupleList( SpillStrategy spillStrategy )
163        {
164        this.spillStrategy = spillStrategy;
165        }
166    
167      @Override
168      public void setGrouping( Tuple group )
169        {
170        this.group = group;
171        }
172    
173      @Override
174      public Tuple getGrouping()
175        {
176        return group;
177        }
178    
179      @Override
180      public void setSpillStrategy( SpillStrategy spillStrategy )
181        {
182        this.spillStrategy = spillStrategy;
183        }
184    
185      @Override
186      public void setSpillListener( SpillListener spillListener )
187        {
188        this.spillListener = spillListener;
189        }
190    
191      @Override
192      public int spillCount()
193        {
194        return files.size();
195        }
196    
197      private class SpilledListIterator implements Iterator<Tuple>
198        {
199        int fileIndex = 0;
200        private Iterator<Tuple> lastIterator;
201        private Iterator<Tuple> iterator;
202    
203        private SpilledListIterator()
204          {
205          lastIterator = asTupleIterator();
206          getNextIterator();
207          }
208    
209        private void getNextIterator()
210          {
211          if( iterator instanceof Closeable )
212            closeSilent( (Closeable) iterator );
213    
214          if( fileIndex < files.size() )
215            iterator = getIteratorFor( files.get( fileIndex++ ) );
216          else
217            iterator = lastIterator;
218          }
219    
220        private Iterator<Tuple> getIteratorFor( File file )
221          {
222          spillListener.notifyReadSpillBegin( SpillableTupleList.this );
223    
224          return createIterator( createTupleInputStream( file ) );
225          }
226    
227        public boolean hasNext()
228          {
229          if( isLastCollection() )
230            return iterator.hasNext();
231    
232          if( iterator.hasNext() )
233            return true;
234    
235          getNextIterator();
236    
237          return hasNext();
238          }
239    
240        public Tuple next()
241          {
242          if( isLastCollection() || iterator.hasNext() )
243            return iterator.next();
244    
245          getNextIterator();
246    
247          return next();
248          }
249    
250        private boolean isLastCollection()
251          {
252          return iterator == lastIterator;
253          }
254    
255        public void remove()
256          {
257          throw new UnsupportedOperationException( "remove is not supported" );
258          }
259        }
260    
261      /**
262       * Method add will add the given {@link cascading.tuple.Tuple} to this list.
263       *
264       * @param tuple of type Tuple
265       */
266      @Override
267      public boolean add( Tuple tuple )
268        {
269        doSpill(); // spill if we break over the threshold
270    
271        current.add( Tuple.elements( tuple ).toArray( new Object[ tuple.size() ] ) );
272        size++;
273    
274        return true;
275        }
276    
277      @Override
278      public int size()
279        {
280        return size;
281        }
282    
283      @Override
284      public boolean isEmpty()
285        {
286        return files.isEmpty() && current.size() == 0;
287        }
288    
289      private final boolean doSpill()
290        {
291        if( !spillStrategy.doSpill( this, current.size() ) )
292          return false;
293    
294        long start = System.currentTimeMillis();
295        spillListener.notifyWriteSpillBegin( this, current.size(), spillStrategy.getSpillReason( this ) );
296    
297        File file = createTempFile();
298        TupleOutputStream dataOutputStream = createTupleOutputStream( file );
299    
300        try
301          {
302          writeList( dataOutputStream, current );
303          }
304        finally
305          {
306          flushSilent( dataOutputStream );
307          closeSilent( dataOutputStream );
308          }
309    
310        spillListener.notifyWriteSpillEnd( this, System.currentTimeMillis() - start );
311    
312        if( files == Collections.EMPTY_LIST )
313          files = new LinkedList<File>();
314    
315        files.add( file );
316        current.clear();
317    
318        return true;
319        }
320    
321      private void flushSilent( Flushable flushable )
322        {
323        try
324          {
325          flushable.flush();
326          }
327        catch( IOException exception )
328          {
329          // ignore
330          }
331        }
332    
333      private void closeSilent( Closeable closeable )
334        {
335        try
336          {
337          closeable.close();
338          }
339        catch( IOException exception )
340          {
341          // ignore
342          }
343        }
344    
345      private void writeList( TupleOutputStream dataOutputStream, List<Object[]> list )
346        {
347        try
348          {
349          dataOutputStream.writeLong( list.size() );
350    
351          for( Object[] elements : list )
352            dataOutputStream.writeElementArray( elements );
353          }
354        catch( IOException exception )
355          {
356          throw new TupleException( "unable to write tuple collection to file output stream", exception );
357          }
358        }
359    
360      protected abstract TupleOutputStream createTupleOutputStream( File file );
361    
362      private Iterator<Tuple> createIterator( final TupleInputStream tupleInputStream )
363        {
364        final long size;
365    
366        try
367          {
368          size = tupleInputStream.readLong();
369          }
370        catch( IOException exception )
371          {
372          throw new TupleException( "unable to read 'size' of collection from file input stream", exception );
373          }
374    
375        return new CloseableIterator<Tuple>()
376        {
377        Tuple tuple = new Tuple();
378        long count = 0;
379    
380        @Override
381        public boolean hasNext()
382          {
383          return count < size;
384          }
385    
386        @Override
387        public Tuple next()
388          {
389          try
390            {
391            return tupleInputStream.readTuple( tuple );
392            }
393          catch( IOException exception )
394            {
395            throw new TupleException( "unable to read next tuple from file input stream containing: " + size + " tuples, successfully read tuples: " + count, exception );
396            }
397          finally
398            {
399            count++;
400            }
401          }
402    
403        @Override
404        public void remove()
405          {
406          throw new UnsupportedOperationException( "remove is not supported" );
407          }
408    
409        @Override
410        public void close() throws IOException
411          {
412          tupleInputStream.close();
413          }
414        };
415        }
416    
417      protected abstract TupleInputStream createTupleInputStream( File file );
418    
419      private File createTempFile()
420        {
421        try
422          {
423          File file = File.createTempFile( "cascading-spillover", null );
424          file.deleteOnExit();
425    
426          return file;
427          }
428        catch( IOException exception )
429          {
430          throw new TupleException( "unable to create temporary file", exception );
431          }
432        }
433    
434      @Override
435      public void clear()
436        {
437        files.clear();
438        current.clear();
439        size = 0;
440        }
441    
442      @Override
443      public Iterator<Tuple> iterator()
444        {
445        if( files.isEmpty() )
446          return asTupleIterator();
447    
448        return new SpilledListIterator();
449        }
450    
451      private Iterator<Tuple> asTupleIterator()
452        {
453        final Tuple tuple = TupleViews.createObjectArray();
454        final Iterator<Object[]> iterator = current.iterator();
455    
456        return new Iterator<Tuple>()
457        {
458        @Override
459        public boolean hasNext()
460          {
461          return iterator.hasNext();
462          }
463    
464        @Override
465        public Tuple next()
466          {
467          return TupleViews.reset( tuple, iterator.next() );
468          }
469    
470        @Override
471        public void remove()
472          {
473          }
474        };
475        }
476    
477      // collection methods, this class cannot only be added to, so they aren't implemented
478      @Override
479      public boolean contains( Object object )
480        {
481        return false;
482        }
483    
484      @Override
485      public Object[] toArray()
486        {
487        return new Object[ 0 ];
488        }
489    
490      @Override
491      public <T> T[] toArray( T[] ts )
492        {
493        return null;
494        }
495    
496      @Override
497      public boolean remove( Object object )
498        {
499        return false;
500        }
501    
502      @Override
503      public boolean containsAll( Collection<?> objects )
504        {
505        return false;
506        }
507    
508      @Override
509      public boolean addAll( Collection<? extends Tuple> tuples )
510        {
511        return false;
512        }
513    
514      @Override
515      public boolean removeAll( Collection<?> objects )
516        {
517        return false;
518        }
519    
520      @Override
521      public boolean retainAll( Collection<?> objects )
522        {
523        return false;
524        }
525      }