001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap; 022 023 import java.io.IOException; 024 import java.util.HashSet; 025 import java.util.LinkedHashMap; 026 import java.util.Map; 027 import java.util.Set; 028 029 import cascading.flow.FlowProcess; 030 import cascading.scheme.Scheme; 031 import cascading.scheme.SinkCall; 032 import cascading.scheme.SourceCall; 033 import cascading.tuple.Fields; 034 import cascading.tuple.Tuple; 035 import cascading.tuple.TupleEntry; 036 import cascading.tuple.TupleEntryCollector; 037 import cascading.tuple.TupleEntrySchemeCollector; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * 043 */ 044 public abstract class BaseTemplateTap<Config, Output> extends SinkTap<Config, Output> 045 { 046 /** Field LOG */ 047 private static final Logger LOG = LoggerFactory.getLogger( BaseTemplateTap.class ); 048 /** Field OPEN_FILES_THRESHOLD_DEFAULT */ 049 protected static final int OPEN_TAPS_THRESHOLD_DEFAULT = 300; 050 051 private class TemplateCollector extends TupleEntryCollector 052 { 053 private final FlowProcess<Config> flowProcess; 054 private final Config conf; 055 private final Fields parentFields; 056 private final Fields pathFields; 057 058 public TemplateCollector( FlowProcess<Config> flowProcess ) 059 { 060 super( Fields.asDeclaration( getSinkFields() ) ); 061 this.flowProcess = flowProcess; 062 this.conf = flowProcess.getConfigCopy(); 063 this.parentFields = parent.getSinkFields(); 064 this.pathFields = ( (TemplateScheme) getScheme() ).pathFields; 065 } 066 067 private TupleEntryCollector getCollector( String path ) 068 { 069 TupleEntryCollector collector = collectors.get( path ); 070 071 if( collector != null ) 072 return collector; 073 074 try 075 { 076 LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path ); 077 078 collector = createTupleEntrySchemeCollector( flowProcess, parent, path ); 079 080 flowProcess.increment( Counters.Paths_Opened, 1 ); 081 } 082 catch( IOException exception ) 083 { 084 throw new TapException( "unable to open template path: " + path, exception ); 085 } 086 087 if( collectors.size() > openTapsThreshold ) 088 purgeCollectors(); 089 090 collectors.put( path, collector ); 091 092 if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 ) 093 LOG.info( "caching {} open Taps", collectors.size() ); 094 095 return collector; 096 } 097 098 private void purgeCollectors() 099 { 100 int numToClose = Math.max( 1, (int) ( openTapsThreshold * .10 ) ); 101 102 if( LOG.isInfoEnabled() ) 103 LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() ); 104 105 Set<String> removeKeys = new HashSet<String>(); 106 Set<String> keys = collectors.keySet(); 107 108 for( String key : keys ) 109 { 110 if( numToClose-- == 0 ) 111 break; 112 113 removeKeys.add( key ); 114 } 115 116 for( String removeKey : removeKeys ) 117 closeCollector( collectors.remove( removeKey ) ); 118 119 flowProcess.increment( Counters.Path_Purges, 1 ); 120 } 121 122 @Override 123 public void close() 124 { 125 super.close(); 126 127 try 128 { 129 for( TupleEntryCollector collector : collectors.values() ) 130 closeCollector( collector ); 131 } 132 finally 133 { 134 collectors.clear(); 135 } 136 } 137 138 private void closeCollector( TupleEntryCollector collector ) 139 { 140 if( collector == null ) 141 return; 142 143 try 144 { 145 collector.close(); 146 147 flowProcess.increment( Counters.Paths_Closed, 1 ); 148 } 149 catch( Exception exception ) 150 { 151 // do nothing 152 } 153 } 154 155 protected void collect( TupleEntry tupleEntry ) throws IOException 156 { 157 if( pathFields != null ) 158 { 159 Tuple pathValues = tupleEntry.selectTuple( pathFields ); 160 String path = pathValues.format( pathTemplate ); 161 162 getCollector( path ).add( tupleEntry.selectTuple( parentFields ) ); 163 } 164 else 165 { 166 String path = tupleEntry.getTuple().format( pathTemplate ); 167 168 getCollector( path ).add( tupleEntry ); 169 } 170 } 171 } 172 173 /** Field parent */ 174 protected Tap parent; 175 /** Field pathTemplate */ 176 protected String pathTemplate; 177 /** Field keepParentOnDelete */ 178 protected boolean keepParentOnDelete = false; 179 /** Field openTapsThreshold */ 180 protected int openTapsThreshold = OPEN_TAPS_THRESHOLD_DEFAULT; 181 /** Field collectors */ 182 private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true ); 183 184 protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Tap parent, String path ) throws IOException; 185 186 /** 187 * Method getParent returns the parent Tap of this TemplateTap object. 188 * 189 * @return the parent (type Tap) of this TemplateTap object. 190 */ 191 public Tap getParent() 192 { 193 return parent; 194 } 195 196 /** 197 * Method getPathTemplate returns the pathTemplate {@link java.util.Formatter} format String of this TemplateTap object. 198 * 199 * @return the pathTemplate (type String) of this TemplateTap object. 200 */ 201 public String getPathTemplate() 202 { 203 return pathTemplate; 204 } 205 206 @Override 207 public String getIdentifier() 208 { 209 return parent.getIdentifier(); 210 } 211 212 /** 213 * Method getOpenTapsThreshold returns the openTapsThreshold of this TemplateTap object. 214 * 215 * @return the openTapsThreshold (type int) of this TemplateTap object. 216 */ 217 public int getOpenTapsThreshold() 218 { 219 return openTapsThreshold; 220 } 221 222 @Override 223 public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException 224 { 225 return new TemplateCollector( flowProcess ); 226 } 227 228 /** @see cascading.tap.Tap#createResource(Object) */ 229 public boolean createResource( Config conf ) throws IOException 230 { 231 return parent.createResource( conf ); 232 } 233 234 /** @see cascading.tap.Tap#deleteResource(Object) */ 235 public boolean deleteResource( Config conf ) throws IOException 236 { 237 return keepParentOnDelete || parent.deleteResource( conf ); 238 } 239 240 @Override 241 public boolean commitResource( Config conf ) throws IOException 242 { 243 return parent.commitResource( conf ); 244 } 245 246 @Override 247 public boolean rollbackResource( Config conf ) throws IOException 248 { 249 return parent.rollbackResource( conf ); 250 } 251 252 /** @see cascading.tap.Tap#resourceExists(Object) */ 253 public boolean resourceExists( Config conf ) throws IOException 254 { 255 return parent.resourceExists( conf ); 256 } 257 258 /** @see cascading.tap.Tap#getModifiedTime(Object) */ 259 @Override 260 public long getModifiedTime( Config conf ) throws IOException 261 { 262 return parent.getModifiedTime( conf ); 263 } 264 265 @Override 266 public boolean equals( Object object ) 267 { 268 if( this == object ) 269 return true; 270 if( object == null || getClass() != object.getClass() ) 271 return false; 272 if( !super.equals( object ) ) 273 return false; 274 275 BaseTemplateTap that = (BaseTemplateTap) object; 276 277 if( parent != null ? !parent.equals( that.parent ) : that.parent != null ) 278 return false; 279 if( pathTemplate != null ? !pathTemplate.equals( that.pathTemplate ) : that.pathTemplate != null ) 280 return false; 281 282 return true; 283 } 284 285 @Override 286 public int hashCode() 287 { 288 int result = super.hashCode(); 289 result = 31 * result + ( parent != null ? parent.hashCode() : 0 ); 290 result = 31 * result + ( pathTemplate != null ? pathTemplate.hashCode() : 0 ); 291 return result; 292 } 293 294 @Override 295 public String toString() 296 { 297 return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + pathTemplate + "\"]"; 298 } 299 300 public enum Counters 301 { 302 Paths_Opened, Paths_Closed, Path_Purges 303 } 304 305 protected BaseTemplateTap( Tap parent, String pathTemplate, int openTapsThreshold ) 306 { 307 this( new TemplateScheme( parent.getScheme() ) ); 308 this.parent = parent; 309 this.pathTemplate = pathTemplate; 310 this.openTapsThreshold = openTapsThreshold; 311 } 312 313 protected BaseTemplateTap( Tap parent, String pathTemplate, SinkMode sinkMode ) 314 { 315 super( new TemplateScheme( parent.getScheme() ), sinkMode ); 316 this.parent = parent; 317 this.pathTemplate = pathTemplate; 318 } 319 320 protected BaseTemplateTap( Tap parent, String pathTemplate, SinkMode sinkMode, boolean keepParentOnDelete, int openTapsThreshold ) 321 { 322 super( new TemplateScheme( parent.getScheme() ), sinkMode ); 323 this.parent = parent; 324 this.pathTemplate = pathTemplate; 325 this.keepParentOnDelete = keepParentOnDelete; 326 this.openTapsThreshold = openTapsThreshold; 327 } 328 329 protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, int openTapsThreshold ) 330 { 331 super( new TemplateScheme( parent.getScheme(), pathFields ) ); 332 this.parent = parent; 333 this.pathTemplate = pathTemplate; 334 this.openTapsThreshold = openTapsThreshold; 335 } 336 337 protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, SinkMode sinkMode ) 338 { 339 super( new TemplateScheme( parent.getScheme(), pathFields ), sinkMode ); 340 this.parent = parent; 341 this.pathTemplate = pathTemplate; 342 } 343 344 protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, SinkMode sinkMode, boolean keepParentOnDelete, int openTapsThreshold ) 345 { 346 super( new TemplateScheme( parent.getScheme(), pathFields ), sinkMode ); 347 this.parent = parent; 348 this.pathTemplate = pathTemplate; 349 this.keepParentOnDelete = keepParentOnDelete; 350 this.openTapsThreshold = openTapsThreshold; 351 } 352 353 protected BaseTemplateTap( Scheme<Config, ?, Output, ?, ?> scheme, SinkMode sinkMode ) 354 { 355 super( scheme, sinkMode ); 356 } 357 358 protected BaseTemplateTap( Scheme<Config, ?, Output, ?, ?> scheme ) 359 { 360 super( scheme ); 361 } 362 363 public static class TemplateScheme<Config, Output> extends Scheme<Config, Void, Output, Void, Void> 364 { 365 private final Scheme scheme; 366 private final Fields pathFields; 367 368 public TemplateScheme( Scheme scheme ) 369 { 370 this.scheme = scheme; 371 this.pathFields = null; 372 } 373 374 public TemplateScheme( Scheme scheme, Fields pathFields ) 375 { 376 this.scheme = scheme; 377 378 if( pathFields == null || pathFields.isAll() ) 379 this.pathFields = null; 380 else if( pathFields.isDefined() ) 381 this.pathFields = pathFields; 382 else 383 throw new IllegalArgumentException( "pathFields must be defined or the ALL substitution, got: " + pathFields.printVerbose() ); 384 } 385 386 public Fields getSinkFields() 387 { 388 if( pathFields == null || scheme.getSinkFields().isAll() ) 389 return scheme.getSinkFields(); 390 391 return Fields.merge( scheme.getSinkFields(), pathFields ); 392 } 393 394 public void setSinkFields( Fields sinkFields ) 395 { 396 scheme.setSinkFields( sinkFields ); 397 } 398 399 public Fields getSourceFields() 400 { 401 return scheme.getSourceFields(); 402 } 403 404 public void setSourceFields( Fields sourceFields ) 405 { 406 scheme.setSourceFields( sourceFields ); 407 } 408 409 public int getNumSinkParts() 410 { 411 return scheme.getNumSinkParts(); 412 } 413 414 public void setNumSinkParts( int numSinkParts ) 415 { 416 scheme.setNumSinkParts( numSinkParts ); 417 } 418 419 @Override 420 public void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Void, Output> tap, Config conf ) 421 { 422 scheme.sourceConfInit( flowProcess, tap, conf ); 423 } 424 425 @Override 426 public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException 427 { 428 scheme.sourcePrepare( flowProcess, sourceCall ); 429 } 430 431 @Override 432 public boolean source( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException 433 { 434 throw new UnsupportedOperationException( "not supported" ); 435 } 436 437 @Override 438 public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException 439 { 440 scheme.sourceCleanup( flowProcess, sourceCall ); 441 } 442 443 @Override 444 public void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Void, Output> tap, Config conf ) 445 { 446 scheme.sinkConfInit( flowProcess, tap, conf ); 447 } 448 449 @Override 450 public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 451 { 452 scheme.sinkPrepare( flowProcess, sinkCall ); 453 } 454 455 @Override 456 public void sink( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 457 { 458 throw new UnsupportedOperationException( "should never be called" ); 459 } 460 461 @Override 462 public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 463 { 464 scheme.sinkCleanup( flowProcess, sinkCall ); 465 } 466 } 467 }