001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 026 import cascading.flow.FlowProcess; 027 import cascading.scheme.SinkCall; 028 import cascading.scheme.SourceCall; 029 import cascading.tap.Tap; 030 import cascading.tuple.Fields; 031 import cascading.tuple.Tuple; 032 import cascading.tuple.TupleEntry; 033 import org.apache.hadoop.io.NullWritable; 034 import org.apache.hadoop.io.Writable; 035 import org.apache.hadoop.mapred.JobConf; 036 import org.apache.hadoop.mapred.OutputCollector; 037 import org.apache.hadoop.mapred.RecordReader; 038 import org.apache.hadoop.mapred.SequenceFileOutputFormat; 039 040 /** 041 * Class WritableSequenceFile is a sub-class of {@link SequenceFile} that reads and writes values of the given 042 * {@code writableType} {@code Class}, instead of {@link Tuple} instances used by default in SequenceFile. 043 * <p/> 044 * This Class is a convenience for those who need to read/write specific types from existing sequence files without 045 * them being wrapped in a Tuple instance. 046 * <p/> 047 * Note due to the nature of sequence files, only one type can be stored in the key and value positions, they they can be 048 * uniquely different types (LongWritable, Text). 049 * <p/> 050 * If keyType is null, valueType must not be null, and vice versa, assuming you only wish to store a single value. 051 * <p/> 052 * {@link NullWritable} is used as the empty type for either a null keyType or valueType. 053 */ 054 public class WritableSequenceFile extends SequenceFile 055 { 056 protected final Class<? extends Writable> keyType; 057 protected final Class<? extends Writable> valueType; 058 059 /** 060 * Constructor WritableSequenceFile creates a new WritableSequenceFile instance. 061 * 062 * @param fields of type Fields 063 * @param valueType of type Class<? extends Writable>, may not be null 064 */ 065 @ConstructorProperties({"fields", "valueType"}) 066 public WritableSequenceFile( Fields fields, Class<? extends Writable> valueType ) 067 { 068 this( fields, null, valueType ); 069 } 070 071 /** 072 * Constructor WritableSequenceFile creates a new WritableSequenceFile instance. 073 * 074 * @param fields of type Fields 075 * @param keyType of type Class<? extends Writable> 076 * @param valueType of type Class<? extends Writable> 077 */ 078 @ConstructorProperties({"fields", "keyType", "valueType"}) 079 public WritableSequenceFile( Fields fields, Class<? extends Writable> keyType, Class<? extends Writable> valueType ) 080 { 081 super( fields ); 082 this.keyType = keyType; 083 this.valueType = valueType; 084 085 if( keyType == null && valueType == null ) 086 throw new IllegalArgumentException( "both keyType and valueType may not be null" ); 087 088 if( keyType == null && fields.size() != 1 ) 089 throw new IllegalArgumentException( "fields must declare exactly one field when only reading/writing 'keys' from a sequence file" ); 090 else if( valueType == null && fields.size() != 1 ) 091 throw new IllegalArgumentException( "fields must declare exactly one field when only reading/writing 'values' from a sequence file" ); 092 else if( keyType != null && valueType != null && fields.size() != 2 ) 093 throw new IllegalArgumentException( "fields must declare exactly two fields when only reading/writing 'keys' and 'values' from a sequence file" ); 094 } 095 096 @Override 097 public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) 098 { 099 if( keyType != null ) 100 conf.setOutputKeyClass( keyType ); 101 else 102 conf.setOutputKeyClass( NullWritable.class ); 103 104 if( valueType != null ) 105 conf.setOutputValueClass( valueType ); 106 else 107 conf.setOutputValueClass( NullWritable.class ); 108 109 conf.setOutputFormat( SequenceFileOutputFormat.class ); 110 } 111 112 @Override 113 public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 114 { 115 Object key = sourceCall.getContext()[ 0 ]; 116 Object value = sourceCall.getContext()[ 1 ]; 117 boolean result = sourceCall.getInput().next( key, value ); 118 119 if( !result ) 120 return false; 121 122 int count = 0; 123 TupleEntry entry = sourceCall.getIncomingEntry(); 124 125 if( keyType != null ) 126 entry.setObject( count++, key ); 127 128 if( valueType != null ) 129 entry.setObject( count, value ); 130 131 return true; 132 } 133 134 @Override 135 public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Void, OutputCollector> sinkCall ) throws IOException 136 { 137 TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); 138 139 Writable keyValue = NullWritable.get(); 140 Writable valueValue = NullWritable.get(); 141 142 if( keyType == null ) 143 { 144 valueValue = (Writable) tupleEntry.getObject( 0 ); 145 } 146 else if( valueType == null ) 147 { 148 keyValue = (Writable) tupleEntry.getObject( 0 ); 149 } 150 else 151 { 152 keyValue = (Writable) tupleEntry.getObject( 0 ); 153 valueValue = (Writable) tupleEntry.getObject( 1 ); 154 } 155 156 sinkCall.getOutput().collect( keyValue, valueValue ); 157 } 158 159 @Override 160 public boolean equals( Object object ) 161 { 162 if( this == object ) 163 return true; 164 if( !( object instanceof WritableSequenceFile ) ) 165 return false; 166 if( !super.equals( object ) ) 167 return false; 168 169 WritableSequenceFile that = (WritableSequenceFile) object; 170 171 if( keyType != null ? !keyType.equals( that.keyType ) : that.keyType != null ) 172 return false; 173 if( valueType != null ? !valueType.equals( that.valueType ) : that.valueType != null ) 174 return false; 175 176 return true; 177 } 178 179 @Override 180 public int hashCode() 181 { 182 int result = super.hashCode(); 183 result = 31 * result + ( keyType != null ? keyType.hashCode() : 0 ); 184 result = 31 * result + ( valueType != null ? valueType.hashCode() : 0 ); 185 return result; 186 } 187 }