001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.util.Set;
026
027import cascading.flow.Flow;
028import cascading.flow.FlowElement;
029import cascading.flow.FlowException;
030import cascading.flow.FlowProcess;
031import cascading.flow.planner.Scope;
032import cascading.management.annotation.Property;
033import cascading.management.annotation.PropertyDescription;
034import cascading.management.annotation.PropertySanitizer;
035import cascading.management.annotation.Visibility;
036import cascading.pipe.Pipe;
037import cascading.property.ConfigDef;
038import cascading.scheme.Scheme;
039import cascading.tuple.Fields;
040import cascading.tuple.FieldsResolverException;
041import cascading.tuple.Tuple;
042import cascading.tuple.TupleEntryCollector;
043import cascading.tuple.TupleEntryIterator;
044import cascading.util.TraceUtil;
045import cascading.util.Traceable;
046import cascading.util.Util;
047
048/**
049 * A Tap represents the physical data source or sink in a connected {@link cascading.flow.Flow}.
050 * </p>
051 * That is, a source Tap is the head end of a connected {@link Pipe} and {@link Tuple} stream, and
052 * a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk,
053 * distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts
054 * out the complexity of connecting to these types of data sources.
055 * <p/>
056 * A Tap takes a {@link Scheme} instance, which is used to identify the type of resource (text file, binary file, etc).
057 * A Tap is responsible for how the resource is reached.
058 * <p/>
059 * By default when planning a Flow, Tap equality is a function of the {@link #getIdentifier()} and {@link #getScheme()}
060 * values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source
061 * the same fields.
062 * <p/>
063 * Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the
064 * {@code where} clause in a SQL statement so two taps reading from the same SQL table aren't considered equal.
065 * <p/>
066 * Taps are also used to determine dependencies between two or more {@link Flow} instances when used with a
067 * {@link cascading.cascade.Cascade}. In that case the {@link #getFullIdentifier(Object)} value is used and the Scheme
068 * is ignored.
069 */
070public abstract class Tap<Config, Input, Output> implements FlowElement, Serializable, Traceable
071  {
072  /** Field scheme */
073  private Scheme<Config, Input, Output, ?, ?> scheme;
074
075  /** Field mode */
076  SinkMode sinkMode = SinkMode.KEEP;
077
078  private ConfigDef configDef;
079  private ConfigDef nodeConfigDef;
080  private ConfigDef stepConfigDef;
081
082  /** Field id */
083  private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent
084  /** Field trace */
085  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
086
087  /**
088   * Convenience function to make an array of Tap instances.
089   *
090   * @param taps of type Tap
091   * @return Tap array
092   */
093  public static Tap[] taps( Tap... taps )
094    {
095    return taps;
096    }
097
098  /**
099   * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify
100   * the Tap instance in properties files etc.
101   * <p/>
102   * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent.
103   *
104   * @param tap of type Tap
105   * @return of type String
106   */
107  public static synchronized String id( Tap tap )
108    {
109    if( tap instanceof DecoratorTap )
110      return id( ( (DecoratorTap) tap ).getOriginal() );
111
112    return tap.id;
113    }
114
115  protected Tap()
116    {
117    }
118
119  protected Tap( Scheme<Config, Input, Output, ?, ?> scheme )
120    {
121    this.setScheme( scheme );
122    }
123
124  protected Tap( Scheme<Config, Input, Output, ?, ?> scheme, SinkMode sinkMode )
125    {
126    this.setScheme( scheme );
127    this.sinkMode = sinkMode;
128    }
129
130  protected void setScheme( Scheme<Config, Input, Output, ?, ?> scheme )
131    {
132    this.scheme = scheme;
133    }
134
135  /**
136   * Method getScheme returns the scheme of this Tap object.
137   *
138   * @return the scheme (type Scheme) of this Tap object.
139   */
140  public Scheme<Config, Input, Output, ?, ?> getScheme()
141    {
142    return scheme;
143    }
144
145  @Override
146  public String getTrace()
147    {
148    return trace;
149    }
150
151  /**
152   * Method flowInit allows this Tap instance to initialize itself in context of the given {@link cascading.flow.Flow} instance.
153   * This method is guaranteed to be called before the Flow is started and the
154   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} event is fired.
155   * <p/>
156   * This method will be called once per Flow, and before {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and
157   * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods.
158   *
159   * @param flow of type Flow
160   */
161  public void flowConfInit( Flow<Config> flow )
162    {
163
164    }
165
166  /**
167   * Method sourceConfInit initializes this instance as a source.
168   * <p/>
169   * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow}
170   * instance or if it participates in multiple times in a given Flow or across different Flows in
171   * a {@link cascading.cascade.Cascade}.
172   * <p/>
173   * In the context of a Flow, it will be called after
174   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)}
175   * <p/>
176   * Note that no resources or services should be modified by this method.
177   *
178   * @param flowProcess of type FlowProcess
179   * @param conf        of type Config
180   */
181  public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
182    {
183    getScheme().sourceConfInit( flowProcess, this, conf );
184    }
185
186  /**
187   * Method sinkConfInit initializes this instance as a sink.
188   * <p/>
189   * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow}
190   * instance or if it participates in multiple times in a given Flow or across different Flows in
191   * a {@link cascading.cascade.Cascade}.
192   * <p/>
193   * Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'.
194   * <p/>
195   * In the context of a Flow, it will be called after
196   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)}
197   * <p/>
198   * Note that no resources or services should be modified by this method. If this Tap instance returns true for
199   * {@link #isReplace()}, then {@link #deleteResource(Object)} will be called by the parent Flow.
200   *
201   * @param flowProcess of type FlowProcess
202   * @param conf        of type Config
203   */
204  public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
205    {
206    getScheme().sinkConfInit( flowProcess, this, conf );
207    }
208
209  /**
210   * Method getIdentifier returns a String representing the resource this Tap instance represents.
211   * <p/>
212   * Often, if the tap accesses a filesystem, the identifier is nothing more than the path to the file or directory.
213   * In other cases it may be a an URL or URI representing a connection string or remote resource.
214   * <p/>
215   * Any two Tap instances having the same value for the identifier are considered equal.
216   *
217   * @return String
218   */
219  @Property(name = "identifier", visibility = Visibility.PUBLIC)
220  @PropertyDescription("The resource this Tap instance represents")
221  @PropertySanitizer("cascading.management.annotation.URISanitizer")
222  public abstract String getIdentifier();
223
224  /**
225   * Method getSourceFields returns the sourceFields of this Tap object.
226   *
227   * @return the sourceFields (type Fields) of this Tap object.
228   */
229  public Fields getSourceFields()
230    {
231    return getScheme().getSourceFields();
232    }
233
234  /**
235   * Method getSinkFields returns the sinkFields of this Tap object.
236   *
237   * @return the sinkFields (type Fields) of this Tap object.
238   */
239  public Fields getSinkFields()
240    {
241    return getScheme().getSinkFields();
242    }
243
244  /**
245   * Method openForRead opens the resource represented by this Tap instance for reading.
246   * <p/>
247   * {@code input} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme}
248   * via {@link Scheme#sourceConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper
249   * input type and instantiate it before calling {@code super.openForRead()}.
250   * <p/>
251   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
252   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
253   * stored in a Collection.
254   *
255   * @param flowProcess of type FlowProcess
256   * @param input       of type Input
257   * @return TupleEntryIterator
258   * @throws java.io.IOException when the resource cannot be opened
259   */
260  public abstract TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException;
261
262  /**
263   * Method openForRead opens the resource represented by this Tap instance for reading.
264   * <p/>
265   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
266   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
267   * stored in a Collection.
268   *
269   * @param flowProcess of type FlowProcess
270   * @return TupleEntryIterator
271   * @throws java.io.IOException when the resource cannot be opened
272   */
273  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException
274    {
275    return openForRead( flowProcess, null );
276    }
277
278  /**
279   * Method openForWrite opens the resource represented by this Tap instance for writing.
280   * <p/>
281   * This method is used internally and does not honor the {@link SinkMode} setting. If SinkMode is
282   * {@link SinkMode#REPLACE}, this call may fail. See {@link #openForWrite(cascading.flow.FlowProcess)}.
283   * <p/>
284   * {@code output} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme}
285   * via {@link Scheme#sinkConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper
286   * output type and instantiate it before calling {@code super.openForWrite()}.
287   *
288   * @param flowProcess of type FlowProcess
289   * @param output      of type Output
290   * @return TupleEntryCollector
291   * @throws java.io.IOException when the resource cannot be opened
292   */
293  public abstract TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException;
294
295  /**
296   * Method openForWrite opens the resource represented by this Tap instance for writing.
297   * <p/>
298   * This method is for user application use and does honor the {@link SinkMode#REPLACE} settings. That is, if
299   * SinkMode is set to {@link SinkMode#REPLACE} the underlying resource will be deleted.
300   * <p/>
301   * Note if {@link SinkMode#UPDATE} is set, the resource will not be deleted.
302   *
303   * @param flowProcess of type FlowProcess
304   * @return TupleEntryCollector
305   * @throws java.io.IOException when the resource cannot be opened
306   */
307  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException
308    {
309    if( isReplace() )
310      deleteResource( flowProcess );
311
312    return openForWrite( flowProcess, null );
313    }
314
315  @Override
316  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
317    {
318    // as a source Tap, we emit the scheme defined Fields
319    // as a sink Tap, we declare we emit the incoming Fields
320    // as a temp Tap, this method never gets called, but we emit what we consume
321    int count = 0;
322    for( Scope incomingScope : incomingScopes )
323      {
324      Fields incomingFields = incomingScope.getIncomingTapFields();
325
326      if( incomingFields != null )
327        {
328        try
329          {
330          incomingFields.select( getSinkFields() );
331          }
332        catch( FieldsResolverException exception )
333          {
334          throw new TapException( this, exception.getSourceFields(), exception.getSelectorFields(), exception );
335          }
336
337        count++;
338        }
339      }
340
341    if( count > 1 )
342      throw new FlowException( "Tap may not have more than one incoming Scope" );
343
344    // this allows the incoming to be passed through to the outgoing
345    Fields incomingFields = incomingScopes.size() == 0 ? null : incomingScopes.iterator().next().getIncomingTapFields();
346
347    if( incomingFields != null &&
348      ( isSource() && getSourceFields().equals( Fields.UNKNOWN ) ||
349        isSink() && getSinkFields().equals( Fields.ALL ) ) )
350      return new Scope( incomingFields );
351
352    if( count == 1 )
353      return new Scope( getSinkFields() );
354
355    return new Scope( getSourceFields() );
356    }
357
358  /**
359   * A hook for allowing a Scheme to lazily retrieve its source fields.
360   *
361   * @param flowProcess of type FlowProcess
362   * @return the found Fields
363   */
364  public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess )
365    {
366    return getScheme().retrieveSourceFields( flowProcess, this );
367    }
368
369  public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields )
370    {
371    getScheme().presentSourceFields( flowProcess, this, fields );
372    }
373
374  /**
375   * A hook for allowing a Scheme to lazily retrieve its sink fields.
376   *
377   * @param flowProcess of type FlowProcess
378   * @return the found Fields
379   */
380  public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess )
381    {
382    return getScheme().retrieveSinkFields( flowProcess, this );
383    }
384
385  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields )
386    {
387    getScheme().presentSinkFields( flowProcess, this, fields );
388    }
389
390  @Override
391  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
392    {
393    return incomingScope.getIncomingTapFields();
394    }
395
396  @Override
397  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
398    {
399    return incomingScope.getIncomingTapFields();
400    }
401
402  /**
403   * Method getFullIdentifier returns a fully qualified resource identifier.
404   *
405   * @param flowProcess of type FlowProcess
406   * @return String
407   */
408  public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
409    {
410    return getFullIdentifier( flowProcess.getConfig() );
411    }
412
413  /**
414   * Method getFullIdentifier returns a fully qualified resource identifier.
415   *
416   * @param conf of type Config
417   * @return String
418   */
419  public String getFullIdentifier( Config conf )
420    {
421    return getIdentifier();
422    }
423
424  /**
425   * Method createResource creates the underlying resource.
426   *
427   * @param flowProcess of type FlowProcess
428   * @return boolean
429   * @throws IOException when there is an error making directories
430   */
431  public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException
432    {
433    return createResource( flowProcess.getConfig() );
434    }
435
436  /**
437   * Method createResource creates the underlying resource.
438   *
439   * @param conf of type Config
440   * @return boolean
441   * @throws IOException when there is an error making directories
442   */
443  public abstract boolean createResource( Config conf ) throws IOException;
444
445  /**
446   * Method deleteResource deletes the resource represented by this instance.
447   *
448   * @param flowProcess of type FlowProcess
449   * @return boolean
450   * @throws IOException when the resource cannot be deleted
451   */
452  public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException
453    {
454    return deleteResource( flowProcess.getConfig() );
455    }
456
457  /**
458   * Method deleteResource deletes the resource represented by this instance.
459   *
460   * @param conf of type Config
461   * @return boolean
462   * @throws IOException when the resource cannot be deleted
463   */
464  public abstract boolean deleteResource( Config conf ) throws IOException;
465
466  /**
467   * Method prepareResourceForRead allows the underlying resource to be notified when reading will begin.
468   * <p/>
469   * This method will be called client side so that any remote or external resources can be initialized.
470   * <p/>
471   * If this method returns {@code false}, an exception will be thrown halting the current Flow.
472   * <p/>
473   * In most cases, resource initialization should happen in the {@link #openForRead(FlowProcess, Object)}  method.
474   * <p/>
475   * This allows for initialization of cluster side resources, like a JDBC driver used to read data from a database,
476   * that cannot be passed client to cluster.
477   *
478   * @param conf of type Config
479   * @return returns true if successful
480   * @throws IOException
481   */
482  public boolean prepareResourceForRead( Config conf ) throws IOException
483    {
484    return true;
485    }
486
487  /**
488   * Method prepareResourceForWrite allows the underlying resource to be notified when writing will begin.
489   * <p/>
490   * This method will be called once client side so that any remote or external resources can be initialized.
491   * <p/>
492   * If this method returns {@code false}, an exception will be thrown halting the current Flow.
493   * <p/>
494   * In most cases, resource initialization should happen in the {@link #openForWrite(FlowProcess, Object)} method.
495   * <p/>
496   * This allows for initialization of cluster side resources, like a JDBC driver used to write data to a database,
497   * that cannot be passed client to cluster.
498   * <p/>
499   * In the above JDBC example, overriding this method will allow for testing for the existence of and/or creating
500   * a remote table used by all individual cluster side tasks.
501   *
502   * @param conf of type Config
503   * @return returns true if successful
504   * @throws IOException
505   */
506  public boolean prepareResourceForWrite( Config conf ) throws IOException
507    {
508    return true;
509    }
510
511  /**
512   * Method commitResource allows the underlying resource to be notified when all write processing is
513   * successful so that any additional cleanup or processing may be completed.
514   * <p/>
515   * See {@link #rollbackResource(Object)} to handle cleanup in the face of failures.
516   * <p/>
517   * This method is invoked once client side and not in the cluster, if any.
518   * <p/>
519   * If other sink Tap instance in a given Flow fail on commitResource after called on this instance,
520   * rollbackResource will not be called.
521   *
522   * @param conf of type Config
523   * @return returns true if successful
524   * @throws IOException
525   */
526  public boolean commitResource( Config conf ) throws IOException
527    {
528    return true;
529    }
530
531  /**
532   * Method rollbackResource allows the underlying resource to be notified when any write processing has failed or
533   * was stopped so that any cleanup may be started.
534   * <p/>
535   * See {@link #commitResource(Object)} to handle cleanup when the write has successfully completed.
536   * <p/>
537   * This method is invoked once client side and not in the cluster, if any.
538   *
539   * @param conf of type Config
540   * @return returns true if successful
541   * @throws IOException
542   */
543  public boolean rollbackResource( Config conf ) throws IOException
544    {
545    return true;
546    }
547
548  /**
549   * Method resourceExists returns true if the path represented by this instance exists.
550   *
551   * @param flowProcess of type FlowProcess
552   * @return true if the underlying resource already exists
553   * @throws IOException when the status cannot be determined
554   */
555  public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException
556    {
557    return resourceExists( flowProcess.getConfig() );
558    }
559
560  /**
561   * Method resourceExists returns true if the path represented by this instance exists.
562   *
563   * @param conf of type Config
564   * @return true if the underlying resource already exists
565   * @throws IOException when the status cannot be determined
566   */
567  public abstract boolean resourceExists( Config conf ) throws IOException;
568
569  /**
570   * Method getModifiedTime returns the date this resource was last modified.
571   *
572   * @param flowProcess of type FlowProcess
573   * @return The date this resource was last modified.
574   * @throws IOException
575   */
576  public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException
577    {
578    return getModifiedTime( flowProcess.getConfig() );
579    }
580
581  /**
582   * Method getModifiedTime returns the date this resource was last modified.
583   *
584   * @param conf of type Config
585   * @return The date this resource was last modified.
586   * @throws IOException
587   */
588  public abstract long getModifiedTime( Config conf ) throws IOException;
589
590  /**
591   * Method getSinkMode returns the {@link SinkMode} }of this Tap object.
592   *
593   * @return the sinkMode (type SinkMode) of this Tap object.
594   */
595  public SinkMode getSinkMode()
596    {
597    return sinkMode;
598    }
599
600  /**
601   * Method isKeep indicates whether the resource represented by this instance should be kept if it
602   * already exists when the Flow is started.
603   *
604   * @return boolean
605   */
606  public boolean isKeep()
607    {
608    return sinkMode == SinkMode.KEEP;
609    }
610
611  /**
612   * Method isReplace indicates whether the resource represented by this instance should be deleted if it
613   * already exists when the Flow is started.
614   *
615   * @return boolean
616   */
617  public boolean isReplace()
618    {
619    return sinkMode == SinkMode.REPLACE;
620    }
621
622  /**
623   * Method isUpdate indicates whether the resource represented by this instance should be updated if it already
624   * exists. Otherwise a new resource will be created, via {@link #createResource(Object)}, when the Flow is started.
625   *
626   * @return boolean
627   */
628  public boolean isUpdate()
629    {
630    return sinkMode == SinkMode.UPDATE;
631    }
632
633  /**
634   * Method isSink returns true if this Tap instance can be used as a sink.
635   *
636   * @return boolean
637   */
638  public boolean isSink()
639    {
640    return getScheme().isSink();
641    }
642
643  /**
644   * Method isSource returns true if this Tap instance can be used as a source.
645   *
646   * @return boolean
647   */
648  public boolean isSource()
649    {
650    return getScheme().isSource();
651    }
652
653  /**
654   * Method isTemporary returns true if this Tap is temporary (used for intermediate results).
655   *
656   * @return the temporary (type boolean) of this Tap object.
657   */
658  public boolean isTemporary()
659    {
660    return false;
661    }
662
663  /**
664   * Returns a {@link cascading.property.ConfigDef} instance that allows for local properties to be set and made available via
665   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
666   * <p/>
667   * Any properties set on the configDef will not show up in any {@link Flow} or {@link cascading.flow.FlowStep} process
668   * level configuration, but will override any of those values as seen by the current Tap instance method call where a
669   * FlowProcess is provided except for the {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and
670   * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods.
671   * <p/>
672   * That is, the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into
673   * a ConfigDef instance will not be visible to them.
674   *
675   * @return an instance of ConfigDef
676   */
677  public ConfigDef getConfigDef()
678    {
679    if( configDef == null )
680      configDef = new ConfigDef();
681
682    return configDef;
683    }
684
685  /**
686   * Returns {@code true} if there are properties in the configDef instance.
687   *
688   * @return true if there are configDef properties
689   */
690  public boolean hasConfigDef()
691    {
692    return configDef != null && !configDef.isEmpty();
693    }
694
695  /**
696   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
697   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
698   * <p/>
699   * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in
700   * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the
701   * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
702   * </p>
703   * Use this method to tweak properties in the process node this tap instance is planned into.
704   *
705   * @return an instance of ConfigDef
706   */
707  @Override
708  public ConfigDef getNodeConfigDef()
709    {
710    if( nodeConfigDef == null )
711      nodeConfigDef = new ConfigDef();
712
713    return nodeConfigDef;
714    }
715
716  /**
717   * Returns {@code true} if there are properties in the nodeConfigDef instance.
718   *
719   * @return true if there are nodeConfigDef properties
720   */
721  @Override
722  public boolean hasNodeConfigDef()
723    {
724    return nodeConfigDef != null && !nodeConfigDef.isEmpty();
725    }
726
727  /**
728   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
729   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
730   * <p/>
731   * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
732   * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
733   * stepConfigDef will be overridden by the tap local {@code #getConfigDef} instance.
734   * </p>
735   * Use this method to tweak properties in the process step this tap instance is planned into.
736   * <p/>
737   * Note the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into
738   * a ConfigDef instance will not be visible to them.
739   *
740   * @return an instance of ConfigDef
741   */
742  @Override
743  public ConfigDef getStepConfigDef()
744    {
745    if( stepConfigDef == null )
746      stepConfigDef = new ConfigDef();
747
748    return stepConfigDef;
749    }
750
751  /**
752   * Returns {@code true} if there are properties in the stepConfigDef instance.
753   *
754   * @return true if there are stepConfigDef properties
755   */
756  @Override
757  public boolean hasStepConfigDef()
758    {
759    return stepConfigDef != null && !stepConfigDef.isEmpty();
760    }
761
762  @Override
763  public boolean isEquivalentTo( FlowElement element )
764    {
765    if( element == null )
766      return false;
767
768    if( this == element )
769      return true;
770
771    boolean compare = getClass() == element.getClass();
772
773    if( !compare )
774      return false;
775
776    return equals( element );
777    }
778
779  @Override
780  public boolean equals( Object object )
781    {
782    if( this == object )
783      return true;
784    if( object == null || getClass() != object.getClass() )
785      return false;
786
787    Tap tap = (Tap) object;
788
789    if( getIdentifier() != null ? !getIdentifier().equals( tap.getIdentifier() ) : tap.getIdentifier() != null )
790      return false;
791
792    if( getScheme() != null ? !getScheme().equals( tap.getScheme() ) : tap.getScheme() != null )
793      return false;
794
795    return true;
796    }
797
798  @Override
799  public int hashCode()
800    {
801    int result = getIdentifier() != null ? getIdentifier().hashCode() : 0;
802
803    result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 );
804
805    return result;
806    }
807
808  @Override
809  public String toString()
810    {
811    if( getIdentifier() != null )
812      return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[\"" + Util.sanitizeUrl( getIdentifier() ) + "\"]"; // sanitize
813    else
814      return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[not initialized]";
815    }
816  }