001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.scheme.SinkCall;
028    import cascading.scheme.SourceCall;
029    import cascading.tap.Tap;
030    import cascading.tuple.Fields;
031    import cascading.tuple.Tuple;
032    import cascading.tuple.TupleEntry;
033    import org.apache.hadoop.io.NullWritable;
034    import org.apache.hadoop.io.Writable;
035    import org.apache.hadoop.mapred.JobConf;
036    import org.apache.hadoop.mapred.OutputCollector;
037    import org.apache.hadoop.mapred.RecordReader;
038    import org.apache.hadoop.mapred.SequenceFileOutputFormat;
039    
040    /**
041     * Class WritableSequenceFile is a sub-class of {@link SequenceFile} that reads and writes values of the given
042     * {@code writableType} {@code Class}, instead of {@link Tuple} instances used by default in SequenceFile.
043     * <p/>
044     * This Class is a convenience for those who need to read/write specific types from existing sequence files without
045     * them being wrapped in a Tuple instance.
046     * <p/>
047     * Note due to the nature of sequence files, only one type can be stored in the key and value positions, they they can be
048     * uniquely different types (LongWritable, Text).
049     * <p/>
050     * If keyType is null, valueType must not be null, and vice versa, assuming you only wish to store a single value.
051     * <p/>
052     * {@link NullWritable} is used as the empty type for either a null keyType or valueType.
053     */
054    public class WritableSequenceFile extends SequenceFile
055      {
056      protected final Class<? extends Writable> keyType;
057      protected final Class<? extends Writable> valueType;
058    
059      /**
060       * Constructor WritableSequenceFile creates a new WritableSequenceFile instance.
061       *
062       * @param fields    of type Fields
063       * @param valueType of type Class<? extends Writable>, may not be null
064       */
065      @ConstructorProperties({"fields", "valueType"})
066      public WritableSequenceFile( Fields fields, Class<? extends Writable> valueType )
067        {
068        this( fields, null, valueType );
069        }
070    
071      /**
072       * Constructor WritableSequenceFile creates a new WritableSequenceFile instance.
073       *
074       * @param fields    of type Fields
075       * @param keyType   of type Class<? extends Writable>
076       * @param valueType of type Class<? extends Writable>
077       */
078      @ConstructorProperties({"fields", "keyType", "valueType"})
079      public WritableSequenceFile( Fields fields, Class<? extends Writable> keyType, Class<? extends Writable> valueType )
080        {
081        super( fields );
082        this.keyType = keyType;
083        this.valueType = valueType;
084    
085        if( keyType == null && valueType == null )
086          throw new IllegalArgumentException( "both keyType and valueType may not be null" );
087    
088        if( keyType == null && fields.size() != 1 )
089          throw new IllegalArgumentException( "fields must declare exactly one field when only reading/writing 'keys' from a sequence file" );
090        else if( valueType == null && fields.size() != 1 )
091          throw new IllegalArgumentException( "fields must declare exactly one field when only reading/writing 'values' from a sequence file" );
092        else if( keyType != null && valueType != null && fields.size() != 2 )
093          throw new IllegalArgumentException( "fields must declare exactly two fields when only reading/writing 'keys' and 'values' from a sequence file" );
094        }
095    
096      @Override
097      public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
098        {
099        if( keyType != null )
100          conf.setOutputKeyClass( keyType );
101        else
102          conf.setOutputKeyClass( NullWritable.class );
103    
104        if( valueType != null )
105          conf.setOutputValueClass( valueType );
106        else
107          conf.setOutputValueClass( NullWritable.class );
108    
109        conf.setOutputFormat( SequenceFileOutputFormat.class );
110        }
111    
112      @Override
113      public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
114        {
115        Object key = sourceCall.getContext()[ 0 ];
116        Object value = sourceCall.getContext()[ 1 ];
117        boolean result = sourceCall.getInput().next( key, value );
118    
119        if( !result )
120          return false;
121    
122        int count = 0;
123        TupleEntry entry = sourceCall.getIncomingEntry();
124    
125        if( keyType != null )
126          entry.setObject( count++, key );
127    
128        if( valueType != null )
129          entry.setObject( count, value );
130    
131        return true;
132        }
133    
134      @Override
135      public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Void, OutputCollector> sinkCall ) throws IOException
136        {
137        TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
138    
139        Writable keyValue = NullWritable.get();
140        Writable valueValue = NullWritable.get();
141    
142        if( keyType == null )
143          {
144          valueValue = (Writable) tupleEntry.getObject( 0 );
145          }
146        else if( valueType == null )
147          {
148          keyValue = (Writable) tupleEntry.getObject( 0 );
149          }
150        else
151          {
152          keyValue = (Writable) tupleEntry.getObject( 0 );
153          valueValue = (Writable) tupleEntry.getObject( 1 );
154          }
155    
156        sinkCall.getOutput().collect( keyValue, valueValue );
157        }
158    
159      @Override
160      public boolean equals( Object object )
161        {
162        if( this == object )
163          return true;
164        if( !( object instanceof WritableSequenceFile ) )
165          return false;
166        if( !super.equals( object ) )
167          return false;
168    
169        WritableSequenceFile that = (WritableSequenceFile) object;
170    
171        if( keyType != null ? !keyType.equals( that.keyType ) : that.keyType != null )
172          return false;
173        if( valueType != null ? !valueType.equals( that.valueType ) : that.valueType != null )
174          return false;
175    
176        return true;
177        }
178    
179      @Override
180      public int hashCode()
181        {
182        int result = super.hashCode();
183        result = 31 * result + ( keyType != null ? keyType.hashCode() : 0 );
184        result = 31 * result + ( valueType != null ? valueType.hashCode() : 0 );
185        return result;
186        }
187      }