001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 026 import cascading.flow.FlowProcess; 027 import cascading.scheme.Scheme; 028 import cascading.scheme.SinkCall; 029 import cascading.scheme.SourceCall; 030 import cascading.tap.Tap; 031 import cascading.tuple.Fields; 032 import cascading.tuple.Tuple; 033 import cascading.tuple.TupleEntry; 034 import org.apache.hadoop.mapred.JobConf; 035 import org.apache.hadoop.mapred.OutputCollector; 036 import org.apache.hadoop.mapred.RecordReader; 037 import org.apache.hadoop.mapred.SequenceFileInputFormat; 038 import org.apache.hadoop.mapred.SequenceFileOutputFormat; 039 040 /** 041 * A SequenceFile is a type of {@link cascading.scheme.Scheme}, which is a flat file consisting of 042 * binary key/value pairs. This is a space and time efficient means to store data. 043 */ 044 public class SequenceFile extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Void> 045 { 046 /** Protected for use by TempDfs and other subclasses. Not for general consumption. */ 047 protected SequenceFile() 048 { 049 super( Fields.UNKNOWN, Fields.ALL ); 050 } 051 052 /** 053 * Creates a new SequenceFile instance that stores the given field names. 054 * 055 * @param fields 056 */ 057 @ConstructorProperties({"fields"}) 058 public SequenceFile( Fields fields ) 059 { 060 super( fields, fields ); 061 } 062 063 @Override 064 public void sourceConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) 065 { 066 conf.setInputFormat( SequenceFileInputFormat.class ); 067 } 068 069 @Override 070 public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) 071 { 072 conf.setOutputKeyClass( Tuple.class ); // supports TapCollector 073 conf.setOutputValueClass( Tuple.class ); // supports TapCollector 074 conf.setOutputFormat( SequenceFileOutputFormat.class ); 075 } 076 077 @Override 078 public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 079 { 080 Object[] pair = new Object[]{ 081 sourceCall.getInput().createKey(), 082 sourceCall.getInput().createValue() 083 }; 084 085 sourceCall.setContext( pair ); 086 } 087 088 @Override 089 public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 090 { 091 Tuple key = (Tuple) sourceCall.getContext()[ 0 ]; 092 Tuple value = (Tuple) sourceCall.getContext()[ 1 ]; 093 boolean result = sourceCall.getInput().next( key, value ); 094 095 if( !result ) 096 return false; 097 098 TupleEntry entry = sourceCall.getIncomingEntry(); 099 100 if( entry.hasTypes() ) 101 entry.setCanonicalTuple( value ); 102 else 103 entry.setTuple( value ); 104 105 return true; 106 } 107 108 @Override 109 public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 110 { 111 sourceCall.setContext( null ); 112 } 113 114 @Override 115 public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Void, OutputCollector> sinkCall ) throws IOException 116 { 117 sinkCall.getOutput().collect( Tuple.NULL, sinkCall.getOutgoingEntry().getTuple() ); 118 } 119 }