001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import cascading.flow.FlowElement;
024import cascading.flow.FlowProcess;
025import cascading.flow.StepCounters;
026import cascading.flow.stream.TrapException;
027import cascading.flow.stream.duct.DuctException;
028import cascading.tap.Tap;
029import cascading.tap.TapException;
030import cascading.tap.TrapProps;
031import cascading.tuple.Fields;
032import cascading.tuple.Tuple;
033import cascading.tuple.TupleEntry;
034import cascading.tuple.TupleEntryCollector;
035import cascading.util.TraceUtil;
036import cascading.util.Traceable;
037import cascading.util.Util;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 *
043 */
044public class TrapHandler
045  {
046  private static final Logger LOG = LoggerFactory.getLogger( TrapHandler.class );
047
048  final FlowProcess flowProcess;
049  final FlowElement flowElement;
050  final String elementTrace;
051  final Tap trap;
052  final String trapName;
053
054  boolean recordElementTrace = false;
055  boolean recordThrowableMessage = false;
056  boolean recordThrowableStackTrace = false;
057  boolean logThrowableStackTrace = true;
058  boolean stackTraceTrimLine = true;
059  String stackTraceLineDelimiter = "|";
060
061  boolean recordAnyDiagnostics;
062
063  Fields diagnosticFields = Fields.UNKNOWN;
064  TupleEntry diagnosticEntry;
065
066  public TrapHandler( FlowProcess flowProcess )
067    {
068    this.flowProcess = flowProcess;
069    this.flowElement = null;
070    this.trap = null;
071    this.trapName = null;
072    this.elementTrace = null;
073    }
074
075  public TrapHandler( FlowProcess flowProcess, FlowElement flowElement, Tap trap, String trapName )
076    {
077    this.flowProcess = flowProcess;
078    this.flowElement = flowElement;
079    this.trap = trap;
080    this.trapName = trapName;
081
082    if( flowElement instanceof Traceable )
083      this.elementTrace = ( (Traceable) flowElement ).getTrace();
084    else
085      this.elementTrace = null;
086
087    this.recordElementTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_ELEMENT_TRACE, this.recordElementTrace );
088    this.recordThrowableMessage = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_MESSAGE, this.recordThrowableMessage );
089    this.recordThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.RECORD_THROWABLE_STACK_TRACE, this.recordThrowableStackTrace );
090    this.logThrowableStackTrace = flowProcess.getBooleanProperty( TrapProps.LOG_THROWABLE_STACK_TRACE, this.logThrowableStackTrace );
091    this.stackTraceLineDelimiter = flowProcess.getStringProperty( TrapProps.STACK_TRACE_LINE_DELIMITER, this.stackTraceLineDelimiter );
092    this.stackTraceTrimLine = flowProcess.getBooleanProperty( TrapProps.STACK_TRACE_LINE_TRIM, this.stackTraceTrimLine );
093
094    this.recordAnyDiagnostics = this.recordElementTrace || this.recordThrowableMessage || this.recordThrowableStackTrace;
095
096    Fields fields = new Fields();
097
098    if( this.recordElementTrace )
099      fields = fields.append( new Fields( "element-trace" ) );
100
101    if( this.recordThrowableMessage )
102      fields = fields.append( new Fields( "throwable-message" ) );
103
104    if( this.recordThrowableStackTrace )
105      fields = fields.append( new Fields( "throwable-stacktrace" ) );
106
107    if( fields.size() != 0 )
108      this.diagnosticFields = fields;
109
110    this.diagnosticEntry = new TupleEntry( diagnosticFields );
111    }
112
113  protected void handleReThrowableException( String message, Throwable throwable )
114    {
115    LOG.error( message, throwable );
116
117    if( throwable instanceof Error )
118      throw (Error) throwable;
119    else if( throwable instanceof RuntimeException )
120      throw (RuntimeException) throwable;
121    else
122      throw new DuctException( message, throwable );
123    }
124
125  protected void handleException( Throwable exception, TupleEntry tupleEntry )
126    {
127    handleException( trapName, trap, exception, tupleEntry );
128    }
129
130  protected void handleException( String trapName, Tap trap, Throwable throwable, TupleEntry tupleEntry )
131    {
132    Throwable cause = throwable.getCause();
133
134    if( cause instanceof OutOfMemoryError )
135      handleReThrowableException( "caught OutOfMemoryException, will not trap, rethrowing", cause );
136
137    if( cause instanceof TrapException )
138      handleReThrowableException( "unable to write trap data, will not trap, rethrowing", cause );
139
140    if( trap == null )
141      handleReThrowableException( "caught Throwable, no trap available, rethrowing", throwable );
142
143    TupleEntryCollector trapCollector = flowProcess.getTrapCollectorFor( trap );
144
145    TupleEntry payload;
146
147    if( cause instanceof TapException && ( (TapException) cause ).getPayload() != null )
148      {
149      payload = new TupleEntry( Fields.UNKNOWN, ( (TapException) cause ).getPayload() );
150      }
151    else if( tupleEntry != null )
152      {
153      payload = tupleEntry;
154      }
155    else
156      {
157      LOG.error( "failure resolving tuple entry", throwable );
158      throw new DuctException( "failure resolving tuple entry", throwable );
159      }
160
161    TupleEntry diagnostics = getDiagnostics( throwable );
162
163    if( diagnostics != TupleEntry.NULL ) // prepend diagnostics, payload is variable
164      payload = diagnostics.appendNew( payload );
165
166    try
167      {
168      trapCollector.add( payload );
169      }
170    catch( Throwable current )
171      {
172      throw new TrapException( "could not write to trap: " + trap.getIdentifier(), current );
173      }
174
175    flowProcess.increment( StepCounters.Tuples_Trapped, 1 );
176
177    if( logThrowableStackTrace )
178      LOG.warn( "exception trap on branch: '" + trapName + "', for " + Util.truncate( print( tupleEntry ), 75 ), throwable );
179    }
180
181  private TupleEntry getDiagnostics( Throwable throwable )
182    {
183    if( !recordAnyDiagnostics )
184      return TupleEntry.NULL;
185
186    Tuple diagnostics = new Tuple();
187
188    if( recordElementTrace )
189      diagnostics.add( elementTrace );
190
191    if( recordThrowableMessage )
192      diagnostics.add( throwable.getMessage() );
193
194    if( recordThrowableStackTrace )
195      diagnostics.add( TraceUtil.stringifyStackTrace( throwable, stackTraceLineDelimiter, stackTraceTrimLine, -1 ) );
196
197    diagnosticEntry.setTuple( diagnostics );
198
199    return diagnosticEntry;
200    }
201
202  private String print( TupleEntry tupleEntry )
203    {
204    if( tupleEntry == null || tupleEntry.getFields() == null )
205      return "[uninitialized]";
206    else if( tupleEntry.getTuple() == null )
207      return "fields: " + tupleEntry.getFields().printVerbose();
208    else
209      return "fields: " + tupleEntry.getFields().printVerbose() + " tuple: " + tupleEntry.getTuple().print();
210    }
211  }
212
213