001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.local; 023 024import java.io.File; 025import java.io.FileInputStream; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.nio.file.Files; 030import java.nio.file.Paths; 031import java.util.LinkedHashSet; 032import java.util.Properties; 033import java.util.Set; 034import java.util.concurrent.TimeUnit; 035 036import cascading.flow.FlowProcess; 037import cascading.scheme.Scheme; 038import cascading.tap.SinkMode; 039import cascading.tap.Tap; 040import cascading.tap.local.io.TapFileOutputStream; 041import cascading.tap.type.FileType; 042import cascading.tuple.TupleEntryCollector; 043import cascading.tuple.TupleEntryIterator; 044import cascading.tuple.TupleEntrySchemeCollector; 045import cascading.tuple.TupleEntrySchemeIterator; 046 047/** 048 * Class FileTap is a {@link Tap} sub-class that allows for direct local file access. 049 * <p/> 050 * FileTap must be used with the {@link cascading.flow.local.LocalFlowConnector} to create 051 * {@link cascading.flow.Flow} instances that run in "local" mode. 052 */ 053public class FileTap extends Tap<Properties, InputStream, OutputStream> implements FileType<Properties> 054 { 055 private final String path; 056 057 /** 058 * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme} and file {@code path}. 059 * 060 * @param scheme of type LocalScheme 061 * @param path of type String 062 */ 063 public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path ) 064 { 065 this( scheme, path, SinkMode.KEEP ); 066 } 067 068 /** 069 * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme}, 070 * file {@code path}, and {@code SinkMode}. 071 * 072 * @param scheme of type LocalScheme 073 * @param path of type String 074 * @param sinkMode of type SinkMode 075 */ 076 public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path, SinkMode sinkMode ) 077 { 078 super( scheme, sinkMode ); 079 this.path = new File( path ).getPath(); // cleans path information 080 } 081 082 @Override 083 public String getIdentifier() 084 { 085 return path; 086 } 087 088 @Override 089 public String getFullIdentifier( Properties conf ) 090 { 091 return fullyQualifyIdentifier( getIdentifier() ); 092 } 093 094 private String fullyQualifyIdentifier( String identifier ) 095 { 096 return new File( identifier ).getAbsoluteFile().toURI().toString(); 097 } 098 099 @Override 100 public TupleEntryIterator openForRead( FlowProcess<? extends Properties> flowProcess, InputStream input ) throws IOException 101 { 102 if( input == null ) 103 input = new FileInputStream( getIdentifier() ); 104 105 return new TupleEntrySchemeIterator<Properties, InputStream>( flowProcess, getScheme(), input, getIdentifier() ); 106 } 107 108 @Override 109 public TupleEntryCollector openForWrite( FlowProcess<? extends Properties> flowProcess, OutputStream output ) throws IOException 110 { 111 if( output == null ) 112 output = new TapFileOutputStream( getIdentifier(), isUpdate() ); // append if we are in update mode 113 114 return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, getScheme(), output, getIdentifier() ); 115 } 116 117 @Override 118 public boolean createResource( Properties conf ) throws IOException 119 { 120 File parentFile = new File( getIdentifier() ).getParentFile(); 121 122 return parentFile.exists() || parentFile.mkdirs(); 123 } 124 125 @Override 126 public boolean deleteResource( Properties conf ) throws IOException 127 { 128 return Files.deleteIfExists( Paths.get( getIdentifier() ) ); 129 } 130 131 @Override 132 public boolean commitResource( Properties conf ) throws IOException 133 { 134 return true; 135 } 136 137 @Override 138 public boolean resourceExists( Properties conf ) throws IOException 139 { 140 return Files.exists( Paths.get( getIdentifier() ) ); 141 142 } 143 144 @Override 145 public long getModifiedTime( Properties conf ) throws IOException 146 { 147 return Files.getLastModifiedTime( Paths.get( getIdentifier() ) ).to( TimeUnit.MILLISECONDS ); 148 } 149 150 @Override 151 public boolean isDirectory( FlowProcess<? extends Properties> flowProcess ) throws IOException 152 { 153 return isDirectory( flowProcess.getConfig() ); 154 } 155 156 @Override 157 public boolean isDirectory( Properties conf ) throws IOException 158 { 159 return Files.isDirectory( Paths.get( getIdentifier() ) ); 160 } 161 162 @Override 163 public String[] getChildIdentifiers( FlowProcess<? extends Properties> flowProcess ) throws IOException 164 { 165 return getChildIdentifiers( flowProcess.getConfig() ); 166 } 167 168 @Override 169 public String[] getChildIdentifiers( Properties conf ) throws IOException 170 { 171 return getChildIdentifiers( conf, 1, false ); 172 } 173 174 @Override 175 public String[] getChildIdentifiers( FlowProcess<? extends Properties> flowProcess, int depth, boolean fullyQualified ) throws IOException 176 { 177 return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified ); 178 } 179 180 @Override 181 public String[] getChildIdentifiers( Properties conf, int depth, boolean fullyQualified ) throws IOException 182 { 183 if( !resourceExists( conf ) ) 184 return new String[ 0 ]; 185 186 Set<String> results = new LinkedHashSet<String>(); 187 188 getChildPaths( results, getIdentifier(), depth ); 189 190 String[] allPaths = results.toArray( new String[ results.size() ] ); 191 192 if( !fullyQualified ) 193 return allPaths; 194 195 for( int i = 0; i < allPaths.length; i++ ) 196 allPaths[ i ] = fullyQualifyIdentifier( allPaths[ i ] ); 197 198 return allPaths; 199 } 200 201 @Override 202 public long getSize( FlowProcess<? extends Properties> flowProcess ) throws IOException 203 { 204 return getSize( flowProcess.getConfig() ); 205 } 206 207 @Override 208 public long getSize( Properties conf ) throws IOException 209 { 210 File file = new File( getIdentifier() ); 211 212 if( file.isDirectory() ) 213 return 0; 214 215 return file.length(); 216 } 217 218 private boolean getChildPaths( Set<String> results, String identifier, int depth ) 219 { 220 File file = new File( identifier ); 221 222 if( depth == 0 || file.isFile() ) 223 { 224 results.add( identifier ); 225 return true; 226 } 227 228 String[] paths = file.list(); 229 230 if( paths == null ) 231 return false; 232 233 boolean result = false; 234 235 for( String path : paths ) 236 result |= getChildPaths( results, new File( file, path ).getPath(), depth - 1 ); 237 238 return result; 239 } 240 }