001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple;
022    
023    import java.io.IOException;
024    
025    /**
026     * Interface TupleEntryCollector is used to allow {@link cascading.operation.BaseOperation} instances to emit
027     * one or more result {@link Tuple} values.
028     * <p/>
029     * The general rule in Cascading is if you are handed a Tuple, you cannot change or cache it. Attempts at modifying
030     * such a Tuple will result in an Exception. Preventing caching is harder, see below.
031     * <p/>
032     * If you create the Tuple, you can re-use or modify it.
033     * <p/>
034     * When calling {@link #add(Tuple)} or {@link #add(TupleEntry)}, you are passing a Tuple to the down stream pipes and
035     * operations. Since no downstream operation may modify or cache the Tuple instance, it is safe to re-use the Tuple
036     * instance when {@code add()} returns.
037     * <p/>
038     * That said, Tuple copies do get cached in order to perform specific operations in the underlying platforms. Currently
039     * only a shallow copy is made (via the {@link Tuple} copy constructor). Thus, any mutable type or collection
040     * placed inside a Tuple will not be copied, but will likely be cached if a copy of the Tuple passed downstream is
041     * copied.
042     * <p/>
043     * So any subsequent changes to that nested type or collection will be reflected in the cached copy, a likely
044     * source of hard to find errors.
045     * <p/>
046     * There is currently no way to specify that a deep copy must be performed when making a Tuple copy.
047     */
048    public abstract class TupleEntryCollector
049      {
050      protected TupleEntry tupleEntry = new TupleEntry( Fields.UNKNOWN, null, true );
051    
052      protected TupleEntryCollector()
053        {
054        }
055    
056      /**
057       * Constructor TupleCollector creates a new TupleCollector instance.
058       *
059       * @param declared of type Fields
060       */
061      public TupleEntryCollector( Fields declared )
062        {
063        setFields( declared );
064        }
065    
066      public void setFields( Fields declared )
067        {
068        if( declared == null )
069          throw new IllegalArgumentException( "declared fields must not be null" );
070    
071        if( declared.isUnknown() || declared.isAll() )
072          return;
073    
074        this.tupleEntry = new TupleEntry( declared, Tuple.size( declared.size() ), true );
075        }
076    
077      /**
078       * Method add inserts the given {@link TupleEntry} into the outgoing stream. Note the method {@link #add(Tuple)} is
079       * more efficient as it simply calls {@link TupleEntry#getTuple()};
080       * <p/>
081       * See {@link cascading.tuple.TupleEntryCollector} on when and how to re-use a Tuple instance.
082       *
083       * @param tupleEntry of type TupleEntry
084       */
085      public void add( TupleEntry tupleEntry )
086        {
087        Fields expectedFields = this.tupleEntry.getFields();
088        TupleEntry outgoingEntry = this.tupleEntry;
089    
090        if( expectedFields.isUnknown() || expectedFields.equals( tupleEntry.getFields() ) )
091          outgoingEntry = tupleEntry;
092        else
093          outgoingEntry.setTuple( selectTupleFrom( tupleEntry, expectedFields ) );
094    
095        safeCollect( outgoingEntry );
096        }
097    
098      private Tuple selectTupleFrom( TupleEntry tupleEntry, Fields expectedFields )
099        {
100        try
101          {
102          return tupleEntry.selectTuple( expectedFields );
103          }
104        catch( TupleException exception )
105          {
106          Fields givenFields = tupleEntry.getFields();
107          String string = "given TupleEntry fields: " + givenFields.printVerbose();
108          string += " do not match the operation declaredFields: " + expectedFields.printVerbose();
109          string += ", operations must emit tuples that match the fields they declare as output";
110    
111          throw new TupleException( string, exception );
112          }
113        }
114    
115      /**
116       * Method add inserts the given {@link Tuple} into the outgoing stream.
117       * <p/>
118       * See {@link cascading.tuple.TupleEntryCollector} on when and how to re-use a Tuple instance.
119       *
120       * @param tuple of type Tuple
121       */
122      public void add( Tuple tuple )
123        {
124        if( !tupleEntry.getFields().isUnknown() && tupleEntry.getFields().size() != tuple.size() )
125          throw new TupleException( "operation added the wrong number of fields, expected: " + tupleEntry.getFields().print() + ", got result size: " + tuple.size() );
126    
127        boolean isUnmodifiable = tuple.isUnmodifiable();
128    
129        tupleEntry.setTuple( tuple );
130    
131        try
132          {
133          safeCollect( tupleEntry );
134          }
135        finally
136          {
137          Tuples.setUnmodifiable( tuple, isUnmodifiable );
138          }
139        }
140    
141      private void safeCollect( TupleEntry tupleEntry )
142        {
143        try
144          {
145          collect( tupleEntry );
146          }
147        catch( IOException exception )
148          {
149          throw new TupleException( "unable to collect tuple", exception );
150          }
151        }
152    
153      protected abstract void collect( TupleEntry tupleEntry ) throws IOException;
154    
155      /**
156       * Method close closes the underlying resource being written to.
157       * <p/>
158       * This method should be called when when an instance is returned via
159       * {@link cascading.tap.Tap#openForWrite(cascading.flow.FlowProcess)}
160       * and no more {@link Tuple} instances will be written out.
161       * <p/>
162       * This method must not be called when an instance is returned from {@code getOutputCollector()} from any of
163       * the relevant {@link cascading.operation.OperationCall} implementations (inside a Function, Aggregator, or Buffer).
164       */
165      public void close()
166        {
167        // do nothing
168        }
169      }