001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import cascading.flow.FlowElement; 024import cascading.flow.FlowProcess; 025import cascading.flow.StepCounters; 026import cascading.flow.stream.TrapException; 027import cascading.flow.stream.duct.DuctException; 028import cascading.tap.Tap; 029import cascading.tap.TapException; 030import cascading.tap.TrapProps; 031import cascading.tuple.Fields; 032import cascading.tuple.Tuple; 033import cascading.tuple.TupleEntry; 034import cascading.tuple.TupleEntryCollector; 035import cascading.util.TraceUtil; 036import cascading.util.Traceable; 037import cascading.util.Util; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * 043 */ 044public class TrapHandler 045 { 046 private static final Logger LOG = LoggerFactory.getLogger( TrapHandler.class ); 047 048 final FlowProcess flowProcess; 049 final FlowElement flowElement; 050 final String elementTrace; 051 final Tap trap; 052 final String trapName; 053 054 boolean recordElementTrace = false; 055 boolean recordThrowableMessage = false; 056 boolean recordThrowableStackTrace = false; 057 boolean logThrowableStackTrace = true; 058 boolean stackTraceTrimLine = true; 059 String stackTraceLineDelimiter = "|"; 060 061 boolean recordAnyDiagnostics; 062 063 Fields diagnosticFields = Fields.UNKNOWN; 064 TupleEntry diagnosticEntry; 065 066 public TrapHandler( FlowProcess flowProcess ) 067 { 068 this.flowProcess = flowProcess; 069 this.flowElement = null; 070 this.trap = null; 071 this.trapName = null; 072 this.elementTrace = null; 073 } 074 075 public TrapHandler( FlowProcess flowProcess, FlowElement flowElement, Tap trap, String trapName ) 076 { 077 this.flowProcess = flowProcess; 078 this.flowElement = flowElement; 079 this.trap = trap; 080 this.trapName = trapName; 081 082 if( flowElement instanceof Traceable ) 083 this.elementTrace = ( (Traceable) flowElement ).getTrace(); 084 else 085 this.elementTrace = null; 086 087 this.recordElementTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_ELEMENT_TRACE, this.recordElementTrace ); 088 this.recordThrowableMessage = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_MESSAGE, this.recordThrowableMessage ); 089 this.recordThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_STACK_TRACE, this.recordThrowableStackTrace ); 090 this.logThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.LOG_THROWABLE_STACK_TRACE, this.logThrowableStackTrace ); 091 this.stackTraceLineDelimiter = flowProcess.getStringProperty( TrapProps.STACK_TRACE_LINE_DELIMITER, this.stackTraceLineDelimiter ); 092 this.stackTraceTrimLine = flowProcess.getBooleanProperty( TrapProps.STACK_TRACE_LINE_TRIM, this.stackTraceTrimLine ); 093 094 this.recordAnyDiagnostics = this.recordElementTrace || this.recordThrowableMessage || this.recordThrowableStackTrace; 095 096 Fields fields = new Fields(); 097 098 if( this.recordElementTrace ) 099 fields = fields.append( new Fields( "element-trace" ) ); 100 101 if( this.recordThrowableMessage ) 102 fields = fields.append( new Fields( "throwable-message" ) ); 103 104 if( this.recordThrowableStackTrace ) 105 fields = fields.append( new Fields( "throwable-stacktrace" ) ); 106 107 if( fields.size() != 0 ) 108 this.diagnosticFields = fields; 109 110 this.diagnosticEntry = new TupleEntry( diagnosticFields ); 111 } 112 113 protected void handleReThrowableException( String message, Throwable throwable ) 114 { 115 LOG.error( message, throwable ); 116 117 if( throwable instanceof Error ) 118 throw (Error) throwable; 119 else if( throwable instanceof RuntimeException ) 120 throw (RuntimeException) throwable; 121 else 122 throw new DuctException( message, throwable ); 123 } 124 125 protected void handleException( Throwable exception, TupleEntry tupleEntry ) 126 { 127 handleException( trapName, trap, exception, tupleEntry ); 128 } 129 130 protected void handleException( String trapName, Tap trap, Throwable throwable, TupleEntry tupleEntry ) 131 { 132 Throwable cause = throwable.getCause(); 133 134 if( cause instanceof OutOfMemoryError ) 135 handleReThrowableException( "caught OutOfMemoryException, will not trap, rethrowing", cause ); 136 137 if( cause instanceof TrapException ) 138 handleReThrowableException( "unable to write trap data, will not trap, rethrowing", cause ); 139 140 if( trap == null ) 141 handleReThrowableException( "caught Throwable, no trap available, rethrowing", throwable ); 142 143 TupleEntryCollector trapCollector = flowProcess.getTrapCollectorFor( trap ); 144 145 TupleEntry payload; 146 147 if( cause instanceof TapException && ( (TapException) cause ).getPayload() != null ) 148 { 149 payload = new TupleEntry( Fields.UNKNOWN, ( (TapException) cause ).getPayload() ); 150 } 151 else if( tupleEntry != null ) 152 { 153 payload = tupleEntry; 154 } 155 else 156 { 157 LOG.error( "failure resolving tuple entry", throwable ); 158 throw new DuctException( "failure resolving tuple entry", throwable ); 159 } 160 161 TupleEntry diagnostics = getDiagnostics( throwable ); 162 163 if( diagnostics != TupleEntry.NULL ) // prepend diagnostics, payload is variable 164 payload = diagnostics.appendNew( payload ); 165 166 try 167 { 168 trapCollector.add( payload ); 169 } 170 catch( Throwable current ) 171 { 172 throw new TrapException( "could not write to trap: " + trap.getIdentifier(), current ); 173 } 174 175 flowProcess.increment( StepCounters.Tuples_Trapped, 1 ); 176 177 if( logThrowableStackTrace ) 178 LOG.warn( "exception trap on branch: '" + trapName + "', for " + Util.truncate( print( tupleEntry ), 75 ), throwable ); 179 } 180 181 private TupleEntry getDiagnostics( Throwable throwable ) 182 { 183 if( !recordAnyDiagnostics ) 184 return TupleEntry.NULL; 185 186 Tuple diagnostics = new Tuple(); 187 188 if( recordElementTrace ) 189 diagnostics.add( elementTrace ); 190 191 if( recordThrowableMessage ) 192 diagnostics.add( throwable.getMessage() ); 193 194 if( recordThrowableStackTrace ) 195 diagnostics.add( TraceUtil.stringifyStackTrace( throwable, stackTraceLineDelimiter, stackTraceTrimLine, -1 ) ); 196 197 diagnosticEntry.setTuple( diagnostics ); 198 199 return diagnosticEntry; 200 } 201 202 private String print( TupleEntry tupleEntry ) 203 { 204 if( tupleEntry == null || tupleEntry.getFields() == null ) 205 return "[uninitialized]"; 206 else if( tupleEntry.getTuple() == null ) 207 return "fields: " + tupleEntry.getFields().printVerbose(); 208 else 209 return "fields: " + tupleEntry.getFields().printVerbose() + " tuple: " + tupleEntry.getTuple().print(); 210 } 211 } 212 213