001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop.util; 022 023import java.io.IOException; 024import java.util.Set; 025 026import cascading.CascadingException; 027import cascading.flow.FlowProcess; 028import cascading.flow.planner.Scope; 029import cascading.scheme.Scheme; 030import cascading.scheme.SinkCall; 031import cascading.scheme.SourceCall; 032import cascading.scheme.hadoop.SequenceFile; 033import cascading.tap.Tap; 034import cascading.tap.hadoop.Hfs; 035import cascading.tuple.Fields; 036import cascading.tuple.Tuple; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.mapred.OutputCollector; 040import org.apache.hadoop.mapred.OutputFormat; 041import org.apache.hadoop.mapred.RecordReader; 042import org.apache.hadoop.mapred.lib.NullOutputFormat; 043 044/** Class TempHfs creates a temporary {@link cascading.tap.Tap} instance for use internally. */ 045public class TempHfs extends Hfs 046 { 047 /** Field name */ 048 final String name; 049 /** Field schemeClass */ 050 private Class<? extends Scheme> schemeClass; 051 /** Field temporaryPath */ 052 053 /** Class NullScheme is a noop scheme used as a placeholder */ 054 private static class NullScheme extends Scheme<Configuration, RecordReader, OutputCollector, Object, Object> 055 { 056 @Override 057 public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf ) 058 { 059 // do nothing 060 } 061 062 @Override 063 public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf ) 064 { 065 conf.setClass( "mapred.output.key.class", Tuple.class, Object.class ); 066 conf.setClass( "mapred.output.value.class", Tuple.class, Object.class ); 067 conf.setClass( "mapred.output.format.class", NullOutputFormat.class, OutputFormat.class ); 068 } 069 070 @Override 071 public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object, RecordReader> sourceCall ) throws IOException 072 { 073 return false; 074 } 075 076 @Override 077 public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object, OutputCollector> sinkCall ) throws IOException 078 { 079 } 080 } 081 082 /** 083 * Constructor TempHfs creates a new TempHfs instance. 084 * 085 * @param name of type String 086 * @param isNull of type boolean 087 */ 088 public TempHfs( Configuration conf, String name, boolean isNull ) 089 { 090 super( isNull ? new NullScheme() : new SequenceFile() 091 { 092 } ); 093 this.name = name; 094 this.stringPath = initTemporaryPath( conf, true ); 095 } 096 097 /** 098 * Constructor TempDfs creates a new TempDfs instance. 099 * 100 * @param name of type String 101 */ 102 public TempHfs( Configuration conf, String name, Class<? extends Scheme> schemeClass ) 103 { 104 this( conf, name, schemeClass, true ); 105 } 106 107 public TempHfs( Configuration conf, String name, Class<? extends Scheme> schemeClass, boolean unique ) 108 { 109 this.name = name; 110 111 if( schemeClass == null ) 112 this.schemeClass = SequenceFile.class; 113 else 114 this.schemeClass = schemeClass; 115 116 this.stringPath = initTemporaryPath( conf, unique ); 117 } 118 119 public Class<? extends Scheme> getSchemeClass() 120 { 121 return schemeClass; 122 } 123 124 private String initTemporaryPath( Configuration conf, boolean unique ) 125 { 126 String child = unique ? makeTemporaryPathDirString( name ) : name; 127 128 return new Path( getTempPath( conf ), child ).toString(); 129 } 130 131 @Override 132 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 133 { 134 Fields fields = incomingScopes.iterator().next().getIncomingTapFields(); 135 136 setSchemeUsing( fields ); 137 138 return new Scope( fields ); 139 } 140 141 private void setSchemeUsing( Fields fields ) 142 { 143 try 144 { 145 setScheme( schemeClass.getConstructor( Fields.class ).newInstance( fields ) ); 146 } 147 catch( Exception exception ) 148 { 149 throw new CascadingException( "unable to create specified scheme: " + schemeClass.getName(), exception ); 150 } 151 } 152 153 @Override 154 public boolean isTemporary() 155 { 156 return true; 157 } 158 159 @Override 160 public String toString() 161 { 162 return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[" + name + "]"; 163 } 164 }