001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.local;
023
024import java.io.File;
025import java.io.FileInputStream;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.nio.file.Files;
030import java.nio.file.Paths;
031import java.util.LinkedHashSet;
032import java.util.Properties;
033import java.util.Set;
034import java.util.concurrent.TimeUnit;
035
036import cascading.flow.FlowProcess;
037import cascading.scheme.Scheme;
038import cascading.tap.SinkMode;
039import cascading.tap.Tap;
040import cascading.tap.local.io.TapFileOutputStream;
041import cascading.tap.type.FileType;
042import cascading.tuple.TupleEntryCollector;
043import cascading.tuple.TupleEntryIterator;
044import cascading.tuple.TupleEntrySchemeCollector;
045import cascading.tuple.TupleEntrySchemeIterator;
046
047/**
048 * Class FileTap is a {@link Tap} sub-class that allows for direct local file access.
049 * <p/>
050 * FileTap must be used with the {@link cascading.flow.local.LocalFlowConnector} to create
051 * {@link cascading.flow.Flow} instances that run in "local" mode.
052 */
053public class FileTap extends Tap<Properties, InputStream, OutputStream> implements FileType<Properties>
054  {
055  private final String path;
056
057  /**
058   * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme} and file {@code path}.
059   *
060   * @param scheme of type LocalScheme
061   * @param path   of type String
062   */
063  public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path )
064    {
065    this( scheme, path, SinkMode.KEEP );
066    }
067
068  /**
069   * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme},
070   * file {@code path}, and {@code SinkMode}.
071   *
072   * @param scheme   of type LocalScheme
073   * @param path     of type String
074   * @param sinkMode of type SinkMode
075   */
076  public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path, SinkMode sinkMode )
077    {
078    super( scheme, sinkMode );
079    this.path = new File( path ).getPath(); // cleans path information
080    }
081
082  @Override
083  public String getIdentifier()
084    {
085    return path;
086    }
087
088  @Override
089  public String getFullIdentifier( Properties conf )
090    {
091    return fullyQualifyIdentifier( getIdentifier() );
092    }
093
094  private String fullyQualifyIdentifier( String identifier )
095    {
096    return new File( identifier ).getAbsoluteFile().toURI().toString();
097    }
098
099  @Override
100  public TupleEntryIterator openForRead( FlowProcess<? extends Properties> flowProcess, InputStream input ) throws IOException
101    {
102    if( input == null )
103      input = new FileInputStream( getIdentifier() );
104
105    return new TupleEntrySchemeIterator<Properties, InputStream>( flowProcess, getScheme(), input, getIdentifier() );
106    }
107
108  @Override
109  public TupleEntryCollector openForWrite( FlowProcess<? extends Properties> flowProcess, OutputStream output ) throws IOException
110    {
111    if( output == null )
112      output = new TapFileOutputStream( getIdentifier(), isUpdate() ); // append if we are in update mode
113
114    return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, getScheme(), output, getIdentifier() );
115    }
116
117  @Override
118  public boolean createResource( Properties conf ) throws IOException
119    {
120    File parentFile = new File( getIdentifier() ).getParentFile();
121
122    return parentFile.exists() || parentFile.mkdirs();
123    }
124
125  @Override
126  public boolean deleteResource( Properties conf ) throws IOException
127    {
128    return Files.deleteIfExists( Paths.get( getIdentifier() ) );
129    }
130
131  @Override
132  public boolean commitResource( Properties conf ) throws IOException
133    {
134    return true;
135    }
136
137  @Override
138  public boolean resourceExists( Properties conf ) throws IOException
139    {
140    return Files.exists( Paths.get( getIdentifier() ) );
141
142    }
143
144  @Override
145  public long getModifiedTime( Properties conf ) throws IOException
146    {
147    return Files.getLastModifiedTime( Paths.get( getIdentifier() ) ).to( TimeUnit.MILLISECONDS );
148    }
149
150  @Override
151  public boolean isDirectory( FlowProcess<? extends Properties> flowProcess ) throws IOException
152    {
153    return isDirectory( flowProcess.getConfig() );
154    }
155
156  @Override
157  public boolean isDirectory( Properties conf ) throws IOException
158    {
159    return Files.isDirectory( Paths.get( getIdentifier() ) );
160    }
161
162  @Override
163  public String[] getChildIdentifiers( FlowProcess<? extends Properties> flowProcess ) throws IOException
164    {
165    return getChildIdentifiers( flowProcess.getConfig() );
166    }
167
168  @Override
169  public String[] getChildIdentifiers( Properties conf ) throws IOException
170    {
171    return getChildIdentifiers( conf, 1, false );
172    }
173
174  @Override
175  public String[] getChildIdentifiers( FlowProcess<? extends Properties> flowProcess, int depth, boolean fullyQualified ) throws IOException
176    {
177    return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified );
178    }
179
180  @Override
181  public String[] getChildIdentifiers( Properties conf, int depth, boolean fullyQualified ) throws IOException
182    {
183    if( !resourceExists( conf ) )
184      return new String[ 0 ];
185
186    Set<String> results = new LinkedHashSet<String>();
187
188    getChildPaths( results, getIdentifier(), depth );
189
190    String[] allPaths = results.toArray( new String[ results.size() ] );
191
192    if( !fullyQualified )
193      return allPaths;
194
195    for( int i = 0; i < allPaths.length; i++ )
196      allPaths[ i ] = fullyQualifyIdentifier( allPaths[ i ] );
197
198    return allPaths;
199    }
200
201  @Override
202  public long getSize( FlowProcess<? extends Properties> flowProcess ) throws IOException
203    {
204    return getSize( flowProcess.getConfig() );
205    }
206
207  @Override
208  public long getSize( Properties conf ) throws IOException
209    {
210    File file = new File( getIdentifier() );
211
212    if( file.isDirectory() )
213      return 0;
214
215    return file.length();
216    }
217
218  private boolean getChildPaths( Set<String> results, String identifier, int depth )
219    {
220    File file = new File( identifier );
221
222    if( depth == 0 || file.isFile() )
223      {
224      results.add( identifier );
225      return true;
226      }
227
228    String[] paths = file.list();
229
230    if( paths == null )
231      return false;
232
233    boolean result = false;
234
235    for( String path : paths )
236      result |= getChildPaths( results, new File( file, path ).getPath(), depth - 1 );
237
238    return result;
239    }
240  }