001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.util;
022    
023    import java.io.Closeable;
024    import java.io.IOException;
025    
026    import cascading.flow.FlowProcess;
027    import org.apache.hadoop.mapred.OutputCollector;
028    
029    /**
030     *
031     */
032    public class MeasuredOutputCollector implements OutputCollector, Closeable
033      {
034      private final FlowProcess flowProcess;
035      private final Enum counter;
036    
037      private OutputCollector outputCollector;
038    
039      public MeasuredOutputCollector( FlowProcess flowProcess, Enum counter )
040        {
041        this.flowProcess = flowProcess;
042        this.counter = counter;
043        }
044    
045      public MeasuredOutputCollector( FlowProcess flowProcess, Enum counter, OutputCollector outputCollector )
046        {
047        this.flowProcess = flowProcess;
048        this.counter = counter;
049        this.outputCollector = outputCollector;
050        }
051    
052      public OutputCollector getOutputCollector()
053        {
054        return outputCollector;
055        }
056    
057      public void setOutputCollector( OutputCollector outputCollector )
058        {
059        this.outputCollector = outputCollector;
060        }
061    
062      @Override
063      public void collect( Object key, Object value ) throws IOException
064        {
065        long start = System.currentTimeMillis();
066    
067        try
068          {
069          outputCollector.collect( key, value );
070          }
071        finally
072          {
073          flowProcess.increment( counter, System.currentTimeMillis() - start );
074          }
075        }
076    
077      @Override
078      public void close() throws IOException
079        {
080        if( outputCollector instanceof Closeable )
081          ( (Closeable) outputCollector ).close();
082        }
083      }