001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.scheme.local; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.InputStreamReader; 028import java.io.LineNumberReader; 029import java.io.OutputStream; 030import java.io.OutputStreamWriter; 031import java.io.PrintWriter; 032import java.io.UnsupportedEncodingException; 033import java.nio.charset.Charset; 034import java.util.Properties; 035 036import cascading.flow.FlowProcess; 037import cascading.management.annotation.Property; 038import cascading.management.annotation.PropertyDescription; 039import cascading.management.annotation.Visibility; 040import cascading.scheme.Scheme; 041import cascading.scheme.SinkCall; 042import cascading.scheme.SourceCall; 043import cascading.tap.Tap; 044import cascading.tap.TapException; 045import cascading.tuple.Fields; 046import cascading.tuple.TupleEntry; 047 048/** 049 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into 050 * lines. Either line-feed or carriage-return are used to signal end of line. 051 * <p/> 052 * By default, this scheme returns a {@link cascading.tuple.Tuple} with two fields, "num" and "line". Where "num" 053 * is the line number for "line". 054 * <p/> 055 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names 056 * to be used instead of the names "num" and "line". sinkFields is a selector and is by default {@link Fields#ALL}. 057 * Any available field names can be given if only a subset of the incoming fields should be used. 058 * <p/> 059 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples 060 * will simply be the "line" value using the given field name. 061 * <p/> 062 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before 063 * writing out the line. 064 * <p/> 065 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 066 * argument. 067 */ 068public class TextLine extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter> 069 { 070 public static final String DEFAULT_CHARSET = "UTF-8"; 071 public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "num", "line" ).applyTypes( Integer.TYPE, String.class ); 072 073 private String charsetName = DEFAULT_CHARSET; 074 075 /** 076 * Creates a new TextLine instance that sources "num" and "line" fields, and sinks all incoming fields, where 077 * "num" is the line number of the line in the input file. 078 */ 079 public TextLine() 080 { 081 super( DEFAULT_SOURCE_FIELDS, Fields.ALL ); 082 } 083 084 /** 085 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 086 * subsequent tuples. 087 * 088 * @param sourceFields of Fields 089 */ 090 @ConstructorProperties({"sourceFields"}) 091 public TextLine( Fields sourceFields ) 092 { 093 super( sourceFields ); 094 095 verify( sourceFields ); 096 } 097 098 /** 099 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 100 * subsequent tuples. 101 * 102 * @param sourceFields of Fields 103 * @param charsetName of type String 104 */ 105 @ConstructorProperties({"sourceFields", "charsetName"}) 106 public TextLine( Fields sourceFields, String charsetName ) 107 { 108 super( sourceFields ); 109 110 // throws an exception if not found 111 setCharsetName( charsetName ); 112 113 verify( sourceFields ); 114 } 115 116 /** 117 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 118 * subsequent tuples. 119 * 120 * @param sourceFields of Fields 121 * @param sinkFields of Fields 122 */ 123 @ConstructorProperties({"sourceFields", "sinkFields"}) 124 public TextLine( Fields sourceFields, Fields sinkFields ) 125 { 126 super( sourceFields, sinkFields ); 127 128 verify( sourceFields ); 129 } 130 131 /** 132 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 133 * subsequent tuples. 134 * 135 * @param sourceFields of Fields 136 * @param sinkFields of Fields 137 * @param charsetName of type String 138 */ 139 @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"}) 140 public TextLine( Fields sourceFields, Fields sinkFields, String charsetName ) 141 { 142 super( sourceFields, sinkFields ); 143 144 // throws an exception if not found 145 setCharsetName( charsetName ); 146 147 verify( sourceFields ); 148 } 149 150 private void setCharsetName( String charsetName ) 151 { 152 if( charsetName != null ) 153 this.charsetName = charsetName; 154 155 Charset.forName( this.charsetName ); 156 } 157 158 @Property(name = "charset", visibility = Visibility.PUBLIC) 159 @PropertyDescription("character set used.") 160 public String getCharsetName() 161 { 162 return charsetName; 163 } 164 165 protected void verify( Fields sourceFields ) 166 { 167 if( sourceFields.size() < 1 || sourceFields.size() > 2 ) 168 throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" ); 169 } 170 171 public LineNumberReader createInput( InputStream inputStream ) 172 { 173 try 174 { 175 return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) ); 176 } 177 catch( UnsupportedEncodingException exception ) 178 { 179 throw new TapException( exception ); 180 } 181 } 182 183 public PrintWriter createOutput( OutputStream outputStream ) 184 { 185 try 186 { 187 return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) ); 188 } 189 catch( UnsupportedEncodingException exception ) 190 { 191 throw new TapException( exception ); 192 } 193 } 194 195 @Override 196 public void presentSourceFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields ) 197 { 198 // do nothing 199 } 200 201 @Override 202 public void presentSinkFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields ) 203 { 204 // do nothing 205 } 206 207 @Override 208 public void sourceConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 209 { 210 } 211 212 @Override 213 public void sinkConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 214 { 215 } 216 217 @Override 218 public void sourcePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 219 { 220 sourceCall.setContext( createInput( sourceCall.getInput() ) ); 221 } 222 223 @Override 224 public void sourceRePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 225 { 226 sourceCall.setContext( createInput( sourceCall.getInput() ) ); 227 } 228 229 @Override 230 public boolean source( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 231 { 232 // first line is 0, this matches offset being zero, so when throwing out the first line for comments 233 int lineNumber = sourceCall.getContext().getLineNumber(); 234 String line = sourceCall.getContext().readLine(); 235 236 if( line == null ) 237 return false; 238 239 TupleEntry incomingEntry = sourceCall.getIncomingEntry(); 240 241 if( getSourceFields().size() == 1 ) 242 { 243 incomingEntry.setObject( 0, line ); 244 } 245 else 246 { 247 incomingEntry.setInteger( 0, lineNumber ); 248 incomingEntry.setString( 1, line ); 249 } 250 251 return true; 252 } 253 254 @Override 255 public void sourceCleanup( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 256 { 257 sourceCall.setContext( null ); 258 } 259 260 @Override 261 public void sinkPrepare( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 262 { 263 sinkCall.setContext( createOutput( sinkCall.getOutput() ) ); 264 } 265 266 @Override 267 public void sink( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 268 { 269 sinkCall.getContext().println( sinkCall.getOutgoingEntry().getTuple().toString() ); 270 } 271 272 @Override 273 public void sinkCleanup( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 274 { 275 sinkCall.getContext().flush(); 276 sinkCall.setContext( null ); 277 } 278 }