001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.scheme.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025 026import cascading.flow.FlowProcess; 027import cascading.scheme.SinkCall; 028import cascading.scheme.SourceCall; 029import cascading.tap.Tap; 030import cascading.tuple.Fields; 031import cascading.tuple.Tuple; 032import cascading.tuple.TupleEntry; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.io.NullWritable; 035import org.apache.hadoop.io.Writable; 036import org.apache.hadoop.mapred.OutputCollector; 037import org.apache.hadoop.mapred.OutputFormat; 038import org.apache.hadoop.mapred.RecordReader; 039import org.apache.hadoop.mapred.SequenceFileOutputFormat; 040 041/** 042 * Class WritableSequenceFile is a sub-class of {@link SequenceFile} that reads and writes values of the given 043 * {@code writableType} {@code Class}, instead of {@link Tuple} instances used by default in SequenceFile. 044 * <p/> 045 * This Class is a convenience for those who need to read/write specific types from existing sequence files without 046 * them being wrapped in a Tuple instance. 047 * <p/> 048 * Note due to the nature of sequence files, only one type can be stored in the key and value positions, they they can be 049 * uniquely different types (LongWritable, Text). 050 * <p/> 051 * If keyType is null, valueType must not be null, and vice versa, assuming you only wish to store a single value. 052 * <p/> 053 * {@link NullWritable} is used as the empty type for either a null keyType or valueType. 054 */ 055public class WritableSequenceFile extends SequenceFile 056 { 057 protected final Class<? extends Writable> keyType; 058 protected final Class<? extends Writable> valueType; 059 060 /** 061 * Constructor WritableSequenceFile creates a new WritableSequenceFile instance. 062 * 063 * @param fields of type Fields 064 * @param valueType of type Class<? extends Writable>, may not be null 065 */ 066 @ConstructorProperties({"fields", "valueType"}) 067 public WritableSequenceFile( Fields fields, Class<? extends Writable> valueType ) 068 { 069 this( fields, null, valueType ); 070 } 071 072 /** 073 * Constructor WritableSequenceFile creates a new WritableSequenceFile instance. 074 * 075 * @param fields of type Fields 076 * @param keyType of type Class<? extends Writable> 077 * @param valueType of type Class<? extends Writable> 078 */ 079 @ConstructorProperties({"fields", "keyType", "valueType"}) 080 public WritableSequenceFile( Fields fields, Class<? extends Writable> keyType, Class<? extends Writable> valueType ) 081 { 082 super( fields ); 083 this.keyType = keyType; 084 this.valueType = valueType; 085 086 if( keyType == null && valueType == null ) 087 throw new IllegalArgumentException( "both keyType and valueType may not be null" ); 088 089 if( keyType == null && fields.size() != 1 ) 090 throw new IllegalArgumentException( "fields must declare exactly one field when only reading/writing 'keys' from a sequence file" ); 091 else if( valueType == null && fields.size() != 1 ) 092 throw new IllegalArgumentException( "fields must declare exactly one field when only reading/writing 'values' from a sequence file" ); 093 else if( keyType != null && valueType != null && fields.size() != 2 ) 094 throw new IllegalArgumentException( "fields must declare exactly two fields when only reading/writing 'keys' and 'values' from a sequence file" ); 095 } 096 097 @Override 098 public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf ) 099 { 100 if( keyType != null ) 101 conf.setClass( "mapred.output.key.class", keyType, Object.class ); 102 else 103 conf.setClass( "mapred.output.key.class", NullWritable.class, Object.class ); 104 105 if( valueType != null ) 106 conf.setClass( "mapred.output.value.class", valueType, Object.class ); 107 else 108 conf.setClass( "mapred.output.value.class", NullWritable.class, Object.class ); 109 110 conf.setClass( "mapred.output.format.class", SequenceFileOutputFormat.class, OutputFormat.class ); 111 } 112 113 @Override 114 public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 115 { 116 Object key = sourceCall.getContext()[ 0 ]; 117 Object value = sourceCall.getContext()[ 1 ]; 118 boolean result = sourceCall.getInput().next( key, value ); 119 120 if( !result ) 121 return false; 122 123 int count = 0; 124 TupleEntry entry = sourceCall.getIncomingEntry(); 125 126 if( keyType != null ) 127 entry.setObject( count++, key ); 128 129 if( valueType != null ) 130 entry.setObject( count, value ); 131 132 return true; 133 } 134 135 @Override 136 public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Void, OutputCollector> sinkCall ) throws IOException 137 { 138 TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); 139 140 Writable keyValue = NullWritable.get(); 141 Writable valueValue = NullWritable.get(); 142 143 if( keyType == null ) 144 { 145 valueValue = (Writable) tupleEntry.getObject( 0 ); 146 } 147 else if( valueType == null ) 148 { 149 keyValue = (Writable) tupleEntry.getObject( 0 ); 150 } 151 else 152 { 153 keyValue = (Writable) tupleEntry.getObject( 0 ); 154 valueValue = (Writable) tupleEntry.getObject( 1 ); 155 } 156 157 sinkCall.getOutput().collect( keyValue, valueValue ); 158 } 159 160 @Override 161 public boolean equals( Object object ) 162 { 163 if( this == object ) 164 return true; 165 if( !( object instanceof WritableSequenceFile ) ) 166 return false; 167 if( !super.equals( object ) ) 168 return false; 169 170 WritableSequenceFile that = (WritableSequenceFile) object; 171 172 if( keyType != null ? !keyType.equals( that.keyType ) : that.keyType != null ) 173 return false; 174 if( valueType != null ? !valueType.equals( that.valueType ) : that.valueType != null ) 175 return false; 176 177 return true; 178 } 179 180 @Override 181 public int hashCode() 182 { 183 int result = super.hashCode(); 184 result = 31 * result + ( keyType != null ? keyType.hashCode() : 0 ); 185 result = 31 * result + ( valueType != null ? valueType.hashCode() : 0 ); 186 return result; 187 } 188 }