001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.io.IOException; 024 import java.util.HashMap; 025 import java.util.Map; 026 027 import cascading.flow.FlowProcess; 028 import cascading.flow.StepCounters; 029 import cascading.tap.Tap; 030 import cascading.tap.TapException; 031 import cascading.tuple.TupleEntry; 032 import cascading.tuple.TupleEntryCollector; 033 import cascading.util.Util; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 037 /** 038 * 039 */ 040 public class TrapHandler 041 { 042 private static final Logger LOG = LoggerFactory.getLogger( TrapHandler.class ); 043 044 static final Map<Tap, TupleEntryCollector> trapCollectors = new HashMap<Tap, TupleEntryCollector>(); 045 046 final FlowProcess flowProcess; 047 final Tap trap; 048 final String trapName; 049 050 static TupleEntryCollector getTrapCollector( Tap trap, FlowProcess flowProcess ) 051 { 052 TupleEntryCollector trapCollector = trapCollectors.get( trap ); 053 054 if( trapCollector == null ) 055 { 056 try 057 { 058 trapCollector = flowProcess.openTrapForWrite( trap ); 059 trapCollectors.put( trap, trapCollector ); 060 } 061 catch( IOException exception ) 062 { 063 throw new DuctException( exception ); 064 } 065 } 066 067 return trapCollector; 068 } 069 070 static synchronized void closeTraps() 071 { 072 for( TupleEntryCollector trapCollector : trapCollectors.values() ) 073 { 074 try 075 { 076 trapCollector.close(); 077 } 078 catch( Exception exception ) 079 { 080 // do nothing 081 } 082 } 083 084 trapCollectors.clear(); 085 } 086 087 public TrapHandler( FlowProcess flowProcess ) 088 { 089 this.flowProcess = flowProcess; 090 this.trap = null; 091 this.trapName = null; 092 } 093 094 public TrapHandler( FlowProcess flowProcess, Tap trap, String trapName ) 095 { 096 this.flowProcess = flowProcess; 097 this.trap = trap; 098 this.trapName = trapName; 099 } 100 101 protected void handleReThrowableException( String message, Throwable throwable ) 102 { 103 LOG.error( message, throwable ); 104 105 if( throwable instanceof Error ) 106 throw (Error) throwable; 107 else if( throwable instanceof RuntimeException ) 108 throw (RuntimeException) throwable; 109 else 110 throw new DuctException( message, throwable ); 111 } 112 113 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 114 { 115 handleException( trapName, trap, exception, tupleEntry ); 116 } 117 118 protected void handleException( String trapName, Tap trap, Throwable throwable, TupleEntry tupleEntry ) 119 { 120 Throwable cause = throwable.getCause(); 121 122 if( cause instanceof OutOfMemoryError ) 123 handleReThrowableException( "caught OutOfMemoryException, will not trap, rethrowing", cause ); 124 125 if( trap == null ) 126 handleReThrowableException( "caught Throwable, no trap available, rethrowing", throwable ); 127 128 if( cause instanceof TapException && ( (TapException) cause ).getPayload() != null ) 129 { 130 getTrapCollector( trap, flowProcess ).add( ( (TapException) cause ).getPayload() ); 131 } 132 else if( tupleEntry != null ) 133 { 134 getTrapCollector( trap, flowProcess ).add( tupleEntry ); 135 } 136 else 137 { 138 LOG.error( "failure resolving tuple entry", throwable ); 139 throw new DuctException( "failure resolving tuple entry", throwable ); 140 } 141 142 flowProcess.increment( StepCounters.Tuples_Trapped, 1 ); 143 144 LOG.warn( "exception trap on branch: '" + trapName + "', for " + Util.truncate( print( tupleEntry ), 75 ), throwable ); 145 } 146 147 private String print( TupleEntry tupleEntry ) 148 { 149 if( tupleEntry == null || tupleEntry.getFields() == null ) 150 return "[uninitialized]"; 151 else if( tupleEntry.getTuple() == null ) 152 return "fields: " + tupleEntry.getFields().printVerbose(); 153 else 154 return "fields: " + tupleEntry.getFields().printVerbose() + " tuple: " + tupleEntry.getTuple().print(); 155 } 156 } 157 158