001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.scheme.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025
026import cascading.flow.FlowProcess;
027import cascading.scheme.SinkCall;
028import cascading.scheme.SourceCall;
029import cascading.tap.Tap;
030import cascading.tuple.Fields;
031import cascading.tuple.Tuple;
032import cascading.tuple.TupleEntry;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.io.NullWritable;
035import org.apache.hadoop.io.Writable;
036import org.apache.hadoop.mapred.OutputCollector;
037import org.apache.hadoop.mapred.OutputFormat;
038import org.apache.hadoop.mapred.RecordReader;
039import org.apache.hadoop.mapred.SequenceFileOutputFormat;
040
041/**
042 * Class WritableSequenceFile is a sub-class of {@link SequenceFile} that reads and writes values of the given
043 * {@code writableType} {@code Class}, instead of {@link Tuple} instances used by default in SequenceFile.
044 * <p/>
045 * This Class is a convenience for those who need to read/write specific types from existing sequence files without
046 * them being wrapped in a Tuple instance.
047 * <p/>
048 * Note due to the nature of sequence files, only one type can be stored in the key and value positions, they they can be
049 * uniquely different types (LongWritable, Text).
050 * <p/>
051 * If keyType is null, valueType must not be null, and vice versa, assuming you only wish to store a single value.
052 * <p/>
053 * {@link NullWritable} is used as the empty type for either a null keyType or valueType.
054 */
055public class WritableSequenceFile extends SequenceFile
056  {
057  protected final Class<? extends Writable> keyType;
058  protected final Class<? extends Writable> valueType;
059
060  /**
061   * Constructor WritableSequenceFile creates a new WritableSequenceFile instance.
062   *
063   * @param fields    of type Fields
064   * @param valueType of type Class<? extends Writable>, may not be null
065   */
066  @ConstructorProperties({"fields", "valueType"})
067  public WritableSequenceFile( Fields fields, Class<? extends Writable> valueType )
068    {
069    this( fields, null, valueType );
070    }
071
072  /**
073   * Constructor WritableSequenceFile creates a new WritableSequenceFile instance.
074   *
075   * @param fields    of type Fields
076   * @param keyType   of type Class<? extends Writable>
077   * @param valueType of type Class<? extends Writable>
078   */
079  @ConstructorProperties({"fields", "keyType", "valueType"})
080  public WritableSequenceFile( Fields fields, Class<? extends Writable> keyType, Class<? extends Writable> valueType )
081    {
082    super( fields );
083    this.keyType = keyType;
084    this.valueType = valueType;
085
086    if( keyType == null && valueType == null )
087      throw new IllegalArgumentException( "both keyType and valueType may not be null" );
088
089    if( keyType == null && fields.size() != 1 )
090      throw new IllegalArgumentException( "fields must declare exactly one field when only reading/writing 'keys' from a sequence file" );
091    else if( valueType == null && fields.size() != 1 )
092      throw new IllegalArgumentException( "fields must declare exactly one field when only reading/writing 'values' from a sequence file" );
093    else if( keyType != null && valueType != null && fields.size() != 2 )
094      throw new IllegalArgumentException( "fields must declare exactly two fields when only reading/writing 'keys' and 'values' from a sequence file" );
095    }
096
097  @Override
098  public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
099    {
100    if( keyType != null )
101      conf.setClass( "mapred.output.key.class", keyType, Object.class );
102    else
103      conf.setClass( "mapred.output.key.class", NullWritable.class, Object.class );
104
105    if( valueType != null )
106      conf.setClass( "mapred.output.value.class", valueType, Object.class );
107    else
108      conf.setClass( "mapred.output.value.class", NullWritable.class, Object.class );
109
110    conf.setClass( "mapred.output.format.class", SequenceFileOutputFormat.class, OutputFormat.class );
111    }
112
113  @Override
114  public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
115    {
116    Object key = sourceCall.getContext()[ 0 ];
117    Object value = sourceCall.getContext()[ 1 ];
118    boolean result = sourceCall.getInput().next( key, value );
119
120    if( !result )
121      return false;
122
123    int count = 0;
124    TupleEntry entry = sourceCall.getIncomingEntry();
125
126    if( keyType != null )
127      entry.setObject( count++, key );
128
129    if( valueType != null )
130      entry.setObject( count, value );
131
132    return true;
133    }
134
135  @Override
136  public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Void, OutputCollector> sinkCall ) throws IOException
137    {
138    TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
139
140    Writable keyValue = NullWritable.get();
141    Writable valueValue = NullWritable.get();
142
143    if( keyType == null )
144      {
145      valueValue = (Writable) tupleEntry.getObject( 0 );
146      }
147    else if( valueType == null )
148      {
149      keyValue = (Writable) tupleEntry.getObject( 0 );
150      }
151    else
152      {
153      keyValue = (Writable) tupleEntry.getObject( 0 );
154      valueValue = (Writable) tupleEntry.getObject( 1 );
155      }
156
157    sinkCall.getOutput().collect( keyValue, valueValue );
158    }
159
160  @Override
161  public boolean equals( Object object )
162    {
163    if( this == object )
164      return true;
165    if( !( object instanceof WritableSequenceFile ) )
166      return false;
167    if( !super.equals( object ) )
168      return false;
169
170    WritableSequenceFile that = (WritableSequenceFile) object;
171
172    if( keyType != null ? !keyType.equals( that.keyType ) : that.keyType != null )
173      return false;
174    if( valueType != null ? !valueType.equals( that.valueType ) : that.valueType != null )
175      return false;
176
177    return true;
178    }
179
180  @Override
181  public int hashCode()
182    {
183    int result = super.hashCode();
184    result = 31 * result + ( keyType != null ? keyType.hashCode() : 0 );
185    result = 31 * result + ( valueType != null ? valueType.hashCode() : 0 );
186    return result;
187    }
188  }