001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap; 022 023import java.io.IOException; 024import java.io.Serializable; 025import java.util.Set; 026 027import cascading.flow.Flow; 028import cascading.flow.FlowElement; 029import cascading.flow.FlowException; 030import cascading.flow.FlowProcess; 031import cascading.flow.planner.Scope; 032import cascading.flow.planner.ScopedElement; 033import cascading.management.annotation.Property; 034import cascading.management.annotation.PropertyDescription; 035import cascading.management.annotation.PropertySanitizer; 036import cascading.management.annotation.Visibility; 037import cascading.pipe.Pipe; 038import cascading.property.ConfigDef; 039import cascading.scheme.Scheme; 040import cascading.tuple.Fields; 041import cascading.tuple.FieldsResolverException; 042import cascading.tuple.Tuple; 043import cascading.tuple.TupleEntryCollector; 044import cascading.tuple.TupleEntryIterator; 045import cascading.util.TraceUtil; 046import cascading.util.Traceable; 047import cascading.util.Util; 048 049/** 050 * A Tap represents the physical data source or sink in a connected {@link cascading.flow.Flow}. 051 * </p> 052 * That is, a source Tap is the head end of a connected {@link Pipe} and {@link Tuple} stream, and 053 * a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk, 054 * distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts 055 * out the complexity of connecting to these types of data sources. 056 * <p/> 057 * A Tap takes a {@link Scheme} instance, which is used to identify the type of resource (text file, binary file, etc). 058 * A Tap is responsible for how the resource is reached. 059 * <p/> 060 * By default when planning a Flow, Tap equality is a function of the {@link #getIdentifier()} and {@link #getScheme()} 061 * values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source 062 * the same fields. 063 * <p/> 064 * Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the 065 * {@code where} clause in a SQL statement so two taps reading from the same SQL table aren't considered equal. 066 * <p/> 067 * Taps are also used to determine dependencies between two or more {@link Flow} instances when used with a 068 * {@link cascading.cascade.Cascade}. In that case the {@link #getFullIdentifier(Object)} value is used and the Scheme 069 * is ignored. 070 */ 071public abstract class Tap<Config, Input, Output> implements ScopedElement, FlowElement, Serializable, Traceable 072 { 073 /** Field scheme */ 074 private Scheme<Config, Input, Output, ?, ?> scheme; 075 076 /** Field mode */ 077 SinkMode sinkMode = SinkMode.KEEP; 078 079 private ConfigDef configDef; 080 private ConfigDef nodeConfigDef; 081 private ConfigDef stepConfigDef; 082 083 /** Field id */ 084 private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent 085 /** Field trace */ 086 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 087 088 /** 089 * Convenience function to make an array of Tap instances. 090 * 091 * @param taps of type Tap 092 * @return Tap array 093 */ 094 public static Tap[] taps( Tap... taps ) 095 { 096 return taps; 097 } 098 099 /** 100 * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify 101 * the Tap instance in properties files etc. 102 * <p/> 103 * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent. 104 * 105 * @param tap of type Tap 106 * @return of type String 107 */ 108 public static synchronized String id( Tap tap ) 109 { 110 if( tap instanceof DecoratorTap ) 111 return id( ( (DecoratorTap) tap ).getOriginal() ); 112 113 return tap.id; 114 } 115 116 protected Tap() 117 { 118 } 119 120 protected Tap( Scheme<Config, Input, Output, ?, ?> scheme ) 121 { 122 this.setScheme( scheme ); 123 } 124 125 protected Tap( Scheme<Config, Input, Output, ?, ?> scheme, SinkMode sinkMode ) 126 { 127 this.setScheme( scheme ); 128 this.sinkMode = sinkMode; 129 } 130 131 protected void setScheme( Scheme<Config, Input, Output, ?, ?> scheme ) 132 { 133 this.scheme = scheme; 134 } 135 136 /** 137 * Method getScheme returns the scheme of this Tap object. 138 * 139 * @return the scheme (type Scheme) of this Tap object. 140 */ 141 public Scheme<Config, Input, Output, ?, ?> getScheme() 142 { 143 return scheme; 144 } 145 146 @Override 147 public String getTrace() 148 { 149 return trace; 150 } 151 152 /** 153 * Method flowInit allows this Tap instance to initialize itself in context of the given {@link cascading.flow.Flow} instance. 154 * This method is guaranteed to be called before the Flow is started and the 155 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} event is fired. 156 * <p/> 157 * This method will be called once per Flow, and before {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and 158 * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods. 159 * 160 * @param flow of type Flow 161 */ 162 public void flowConfInit( Flow<Config> flow ) 163 { 164 165 } 166 167 /** 168 * Method sourceConfInit initializes this instance as a source. 169 * <p/> 170 * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow} 171 * instance or if it participates in multiple times in a given Flow or across different Flows in 172 * a {@link cascading.cascade.Cascade}. 173 * <p/> 174 * In the context of a Flow, it will be called after 175 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} 176 * <p/> 177 * Note that no resources or services should be modified by this method. 178 * 179 * @param flowProcess of type FlowProcess 180 * @param conf of type Config 181 */ 182 public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf ) 183 { 184 getScheme().sourceConfInit( flowProcess, this, conf ); 185 } 186 187 /** 188 * Method sinkConfInit initializes this instance as a sink. 189 * <p/> 190 * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow} 191 * instance or if it participates in multiple times in a given Flow or across different Flows in 192 * a {@link cascading.cascade.Cascade}. 193 * <p/> 194 * Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'. 195 * <p/> 196 * In the context of a Flow, it will be called after 197 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} 198 * <p/> 199 * Note that no resources or services should be modified by this method. If this Tap instance returns true for 200 * {@link #isReplace()}, then {@link #deleteResource(Object)} will be called by the parent Flow. 201 * 202 * @param flowProcess of type FlowProcess 203 * @param conf of type Config 204 */ 205 public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf ) 206 { 207 getScheme().sinkConfInit( flowProcess, this, conf ); 208 } 209 210 /** 211 * Method getIdentifier returns a String representing the resource this Tap instance represents. 212 * <p/> 213 * Often, if the tap accesses a filesystem, the identifier is nothing more than the path to the file or directory. 214 * In other cases it may be a an URL or URI representing a connection string or remote resource. 215 * <p/> 216 * Any two Tap instances having the same value for the identifier are considered equal. 217 * 218 * @return String 219 */ 220 @Property(name = "identifier", visibility = Visibility.PUBLIC) 221 @PropertyDescription("The resource this instance represents") 222 @PropertySanitizer("cascading.management.annotation.URISanitizer") 223 public abstract String getIdentifier(); 224 225 /** 226 * Method getSourceFields returns the sourceFields of this Tap object. 227 * 228 * @return the sourceFields (type Fields) of this Tap object. 229 */ 230 public Fields getSourceFields() 231 { 232 return getScheme().getSourceFields(); 233 } 234 235 /** 236 * Method getSinkFields returns the sinkFields of this Tap object. 237 * 238 * @return the sinkFields (type Fields) of this Tap object. 239 */ 240 public Fields getSinkFields() 241 { 242 return getScheme().getSinkFields(); 243 } 244 245 /** 246 * Method openForRead opens the resource represented by this Tap instance for reading. 247 * <p/> 248 * {@code input} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme} 249 * via {@link Scheme#sourceConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper 250 * input type and instantiate it before calling {@code super.openForRead()}. 251 * <p/> 252 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 253 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 254 * stored in a Collection. 255 * 256 * @param flowProcess of type FlowProcess 257 * @param input of type Input 258 * @return TupleEntryIterator 259 * @throws java.io.IOException when the resource cannot be opened 260 */ 261 public abstract TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException; 262 263 /** 264 * Method openForRead opens the resource represented by this Tap instance for reading. 265 * <p/> 266 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 267 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 268 * stored in a Collection. 269 * 270 * @param flowProcess of type FlowProcess 271 * @return TupleEntryIterator 272 * @throws java.io.IOException when the resource cannot be opened 273 */ 274 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException 275 { 276 return openForRead( flowProcess, null ); 277 } 278 279 /** 280 * Method openForWrite opens the resource represented by this Tap instance for writing. 281 * <p/> 282 * This method is used internally and does not honor the {@link SinkMode} setting. If SinkMode is 283 * {@link SinkMode#REPLACE}, this call may fail. See {@link #openForWrite(cascading.flow.FlowProcess)}. 284 * <p/> 285 * {@code output} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme} 286 * via {@link Scheme#sinkConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper 287 * output type and instantiate it before calling {@code super.openForWrite()}. 288 * 289 * @param flowProcess of type FlowProcess 290 * @param output of type Output 291 * @return TupleEntryCollector 292 * @throws java.io.IOException when the resource cannot be opened 293 */ 294 public abstract TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException; 295 296 /** 297 * Method openForWrite opens the resource represented by this Tap instance for writing. 298 * <p/> 299 * This method is for user application use and does honor the {@link SinkMode#REPLACE} settings. That is, if 300 * SinkMode is set to {@link SinkMode#REPLACE} the underlying resource will be deleted. 301 * <p/> 302 * Note if {@link SinkMode#UPDATE} is set, the resource will not be deleted. 303 * 304 * @param flowProcess of type FlowProcess 305 * @return TupleEntryCollector 306 * @throws java.io.IOException when the resource cannot be opened 307 */ 308 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException 309 { 310 if( isReplace() ) 311 deleteResource( flowProcess ); 312 313 return openForWrite( flowProcess, null ); 314 } 315 316 @Override 317 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 318 { 319 // as a source Tap, we emit the scheme defined Fields 320 // as a sink Tap, we declare we emit the incoming Fields 321 // as a temp Tap, this method never gets called, but we emit what we consume 322 int count = 0; 323 for( Scope incomingScope : incomingScopes ) 324 { 325 Fields incomingFields = incomingScope.getIncomingTapFields(); 326 327 if( incomingFields != null ) 328 { 329 try 330 { 331 incomingFields.select( getSinkFields() ); 332 } 333 catch( FieldsResolverException exception ) 334 { 335 throw new TapException( this, exception.getSourceFields(), exception.getSelectorFields(), exception ); 336 } 337 338 count++; 339 } 340 } 341 342 if( count > 1 ) 343 throw new FlowException( "Tap may not have more than one incoming Scope" ); 344 345 // this allows the incoming to be passed through to the outgoing 346 Fields incomingFields = incomingScopes.size() == 0 ? null : incomingScopes.iterator().next().getIncomingTapFields(); 347 348 if( incomingFields != null && 349 ( isSource() && getSourceFields().equals( Fields.UNKNOWN ) || 350 isSink() && getSinkFields().equals( Fields.ALL ) ) ) 351 return new Scope( incomingFields ); 352 353 if( count == 1 ) 354 return new Scope( getSinkFields() ); 355 356 return new Scope( getSourceFields() ); 357 } 358 359 /** 360 * A hook for allowing a Scheme to lazily retrieve its source fields. 361 * 362 * @param flowProcess of type FlowProcess 363 * @return the found Fields 364 */ 365 public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess ) 366 { 367 return getScheme().retrieveSourceFields( flowProcess, this ); 368 } 369 370 public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields ) 371 { 372 getScheme().presentSourceFields( flowProcess, this, fields ); 373 } 374 375 /** 376 * A hook for allowing a Scheme to lazily retrieve its sink fields. 377 * 378 * @param flowProcess of type FlowProcess 379 * @return the found Fields 380 */ 381 public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess ) 382 { 383 return getScheme().retrieveSinkFields( flowProcess, this ); 384 } 385 386 public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields ) 387 { 388 getScheme().presentSinkFields( flowProcess, this, fields ); 389 } 390 391 @Override 392 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 393 { 394 return incomingScope.getIncomingTapFields(); 395 } 396 397 @Override 398 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 399 { 400 return incomingScope.getIncomingTapFields(); 401 } 402 403 /** 404 * Method getFullIdentifier returns a fully qualified resource identifier. 405 * 406 * @param flowProcess of type FlowProcess 407 * @return String 408 */ 409 public String getFullIdentifier( FlowProcess<? extends Config> flowProcess ) 410 { 411 return getFullIdentifier( flowProcess.getConfig() ); 412 } 413 414 /** 415 * Method getFullIdentifier returns a fully qualified resource identifier. 416 * 417 * @param conf of type Config 418 * @return String 419 */ 420 public String getFullIdentifier( Config conf ) 421 { 422 return getIdentifier(); 423 } 424 425 /** 426 * Method createResource creates the underlying resource. 427 * 428 * @param flowProcess of type FlowProcess 429 * @return boolean 430 * @throws IOException when there is an error making directories 431 */ 432 public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException 433 { 434 return createResource( flowProcess.getConfig() ); 435 } 436 437 /** 438 * Method createResource creates the underlying resource. 439 * 440 * @param conf of type Config 441 * @return boolean 442 * @throws IOException when there is an error making directories 443 */ 444 public abstract boolean createResource( Config conf ) throws IOException; 445 446 /** 447 * Method deleteResource deletes the resource represented by this instance. 448 * 449 * @param flowProcess of type FlowProcess 450 * @return boolean 451 * @throws IOException when the resource cannot be deleted 452 */ 453 public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException 454 { 455 return deleteResource( flowProcess.getConfig() ); 456 } 457 458 /** 459 * Method deleteResource deletes the resource represented by this instance. 460 * 461 * @param conf of type Config 462 * @return boolean 463 * @throws IOException when the resource cannot be deleted 464 */ 465 public abstract boolean deleteResource( Config conf ) throws IOException; 466 467 /** 468 * Method prepareResourceForRead allows the underlying resource to be notified when reading will begin. 469 * <p/> 470 * This method will be called client side so that any remote or external resources can be initialized. 471 * <p/> 472 * If this method returns {@code false}, an exception will be thrown halting the current Flow. 473 * <p/> 474 * In most cases, resource initialization should happen in the {@link #openForRead(FlowProcess, Object)} method. 475 * <p/> 476 * This allows for initialization of cluster side resources, like a JDBC driver used to read data from a database, 477 * that cannot be passed client to cluster. 478 * 479 * @param conf of type Config 480 * @return returns true if successful 481 * @throws IOException 482 */ 483 public boolean prepareResourceForRead( Config conf ) throws IOException 484 { 485 return true; 486 } 487 488 /** 489 * Method prepareResourceForWrite allows the underlying resource to be notified when writing will begin. 490 * <p/> 491 * This method will be called once client side so that any remote or external resources can be initialized. 492 * <p/> 493 * If this method returns {@code false}, an exception will be thrown halting the current Flow. 494 * <p/> 495 * In most cases, resource initialization should happen in the {@link #openForWrite(FlowProcess, Object)} method. 496 * <p/> 497 * This allows for initialization of cluster side resources, like a JDBC driver used to write data to a database, 498 * that cannot be passed client to cluster. 499 * <p/> 500 * In the above JDBC example, overriding this method will allow for testing for the existence of and/or creating 501 * a remote table used by all individual cluster side tasks. 502 * 503 * @param conf of type Config 504 * @return returns true if successful 505 * @throws IOException 506 */ 507 public boolean prepareResourceForWrite( Config conf ) throws IOException 508 { 509 return true; 510 } 511 512 /** 513 * Method commitResource allows the underlying resource to be notified when all write processing is 514 * successful so that any additional cleanup or processing may be completed. 515 * <p/> 516 * See {@link #rollbackResource(Object)} to handle cleanup in the face of failures. 517 * <p/> 518 * This method is invoked once client side and not in the cluster, if any. 519 * <p/> 520 * If other sink Tap instance in a given Flow fail on commitResource after called on this instance, 521 * rollbackResource will not be called. 522 * 523 * @param conf of type Config 524 * @return returns true if successful 525 * @throws IOException 526 */ 527 public boolean commitResource( Config conf ) throws IOException 528 { 529 return true; 530 } 531 532 /** 533 * Method rollbackResource allows the underlying resource to be notified when any write processing has failed or 534 * was stopped so that any cleanup may be started. 535 * <p/> 536 * See {@link #commitResource(Object)} to handle cleanup when the write has successfully completed. 537 * <p/> 538 * This method is invoked once client side and not in the cluster, if any. 539 * 540 * @param conf of type Config 541 * @return returns true if successful 542 * @throws IOException 543 */ 544 public boolean rollbackResource( Config conf ) throws IOException 545 { 546 return true; 547 } 548 549 /** 550 * Method resourceExists returns true if the path represented by this instance exists. 551 * 552 * @param flowProcess of type FlowProcess 553 * @return true if the underlying resource already exists 554 * @throws IOException when the status cannot be determined 555 */ 556 public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException 557 { 558 return resourceExists( flowProcess.getConfig() ); 559 } 560 561 /** 562 * Method resourceExists returns true if the path represented by this instance exists. 563 * 564 * @param conf of type Config 565 * @return true if the underlying resource already exists 566 * @throws IOException when the status cannot be determined 567 */ 568 public abstract boolean resourceExists( Config conf ) throws IOException; 569 570 /** 571 * Method getModifiedTime returns the date this resource was last modified. 572 * 573 * @param flowProcess of type FlowProcess 574 * @return The date this resource was last modified. 575 * @throws IOException 576 */ 577 public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException 578 { 579 return getModifiedTime( flowProcess.getConfig() ); 580 } 581 582 /** 583 * Method getModifiedTime returns the date this resource was last modified. 584 * 585 * @param conf of type Config 586 * @return The date this resource was last modified. 587 * @throws IOException 588 */ 589 public abstract long getModifiedTime( Config conf ) throws IOException; 590 591 /** 592 * Method getSinkMode returns the {@link SinkMode} }of this Tap object. 593 * 594 * @return the sinkMode (type SinkMode) of this Tap object. 595 */ 596 public SinkMode getSinkMode() 597 { 598 return sinkMode; 599 } 600 601 /** 602 * Method isKeep indicates whether the resource represented by this instance should be kept if it 603 * already exists when the Flow is started. 604 * 605 * @return boolean 606 */ 607 public boolean isKeep() 608 { 609 return sinkMode == SinkMode.KEEP; 610 } 611 612 /** 613 * Method isReplace indicates whether the resource represented by this instance should be deleted if it 614 * already exists when the Flow is started. 615 * 616 * @return boolean 617 */ 618 public boolean isReplace() 619 { 620 return sinkMode == SinkMode.REPLACE; 621 } 622 623 /** 624 * Method isUpdate indicates whether the resource represented by this instance should be updated if it already 625 * exists. Otherwise a new resource will be created, via {@link #createResource(Object)}, when the Flow is started. 626 * 627 * @return boolean 628 */ 629 public boolean isUpdate() 630 { 631 return sinkMode == SinkMode.UPDATE; 632 } 633 634 /** 635 * Method isSink returns true if this Tap instance can be used as a sink. 636 * 637 * @return boolean 638 */ 639 public boolean isSink() 640 { 641 return getScheme().isSink(); 642 } 643 644 /** 645 * Method isSource returns true if this Tap instance can be used as a source. 646 * 647 * @return boolean 648 */ 649 public boolean isSource() 650 { 651 return getScheme().isSource(); 652 } 653 654 /** 655 * Method isTemporary returns true if this Tap is temporary (used for intermediate results). 656 * 657 * @return the temporary (type boolean) of this Tap object. 658 */ 659 public boolean isTemporary() 660 { 661 return false; 662 } 663 664 /** 665 * Returns a {@link cascading.property.ConfigDef} instance that allows for local properties to be set and made available via 666 * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked. 667 * <p/> 668 * Any properties set on the configDef will not show up in any {@link Flow} or {@link cascading.flow.FlowStep} process 669 * level configuration, but will override any of those values as seen by the current Tap instance method call where a 670 * FlowProcess is provided except for the {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and 671 * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods. 672 * <p/> 673 * That is, the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into 674 * a ConfigDef instance will not be visible to them. 675 * 676 * @return an instance of ConfigDef 677 */ 678 public ConfigDef getConfigDef() 679 { 680 if( configDef == null ) 681 configDef = new ConfigDef(); 682 683 return configDef; 684 } 685 686 /** 687 * Returns {@code true} if there are properties in the configDef instance. 688 * 689 * @return true if there are configDef properties 690 */ 691 public boolean hasConfigDef() 692 { 693 return configDef != null && !configDef.isEmpty(); 694 } 695 696 /** 697 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 698 * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked. 699 * <p/> 700 * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in 701 * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the 702 * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 703 * </p> 704 * Use this method to tweak properties in the process node this tap instance is planned into. 705 * 706 * @return an instance of ConfigDef 707 */ 708 @Override 709 public ConfigDef getNodeConfigDef() 710 { 711 if( nodeConfigDef == null ) 712 nodeConfigDef = new ConfigDef(); 713 714 return nodeConfigDef; 715 } 716 717 /** 718 * Returns {@code true} if there are properties in the nodeConfigDef instance. 719 * 720 * @return true if there are nodeConfigDef properties 721 */ 722 @Override 723 public boolean hasNodeConfigDef() 724 { 725 return nodeConfigDef != null && !nodeConfigDef.isEmpty(); 726 } 727 728 /** 729 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 730 * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked. 731 * <p/> 732 * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in 733 * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the 734 * stepConfigDef will be overridden by the tap local {@code #getConfigDef} instance. 735 * </p> 736 * Use this method to tweak properties in the process step this tap instance is planned into. 737 * <p/> 738 * Note the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into 739 * a ConfigDef instance will not be visible to them. 740 * 741 * @return an instance of ConfigDef 742 */ 743 @Override 744 public ConfigDef getStepConfigDef() 745 { 746 if( stepConfigDef == null ) 747 stepConfigDef = new ConfigDef(); 748 749 return stepConfigDef; 750 } 751 752 /** 753 * Returns {@code true} if there are properties in the stepConfigDef instance. 754 * 755 * @return true if there are stepConfigDef properties 756 */ 757 @Override 758 public boolean hasStepConfigDef() 759 { 760 return stepConfigDef != null && !stepConfigDef.isEmpty(); 761 } 762 763 @Override 764 public boolean equals( Object object ) 765 { 766 if( this == object ) 767 return true; 768 if( object == null || getClass() != object.getClass() ) 769 return false; 770 771 Tap tap = (Tap) object; 772 773 if( getIdentifier() != null ? !getIdentifier().equals( tap.getIdentifier() ) : tap.getIdentifier() != null ) 774 return false; 775 776 if( getScheme() != null ? !getScheme().equals( tap.getScheme() ) : tap.getScheme() != null ) 777 return false; 778 779 return true; 780 } 781 782 @Override 783 public int hashCode() 784 { 785 int result = getIdentifier() != null ? getIdentifier().hashCode() : 0; 786 787 result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 ); 788 789 return result; 790 } 791 792 @Override 793 public String toString() 794 { 795 if( getIdentifier() != null ) 796 return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[\"" + Util.sanitizeUrl( getIdentifier() ) + "\"]"; // sanitize 797 else 798 return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[not initialized]"; 799 } 800 }