001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.util;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.Iterator;
026
027import cascading.flow.FlowProcess;
028import cascading.util.CloseableIterator;
029
030/**
031 *
032 */
033public class TimedIterator<V> implements CloseableIterator<V>
034  {
035  private final FlowProcess flowProcess;
036  private final Enum durationCounter;
037  private final Enum countCounter;
038  private final int ordinal;
039
040  public static <V> TimedIterator<V>[] iterators( TimedIterator<V>... iterators )
041    {
042    return iterators;
043    }
044
045  Iterator<V> iterator;
046
047  public TimedIterator( FlowProcess flowProcess, Enum durationCounter, Enum countCounter )
048    {
049    this( flowProcess, durationCounter, countCounter, 0 );
050    }
051
052  public TimedIterator( FlowProcess flowProcess, Enum durationCounter, Enum countCounter, int ordinal )
053    {
054    this.flowProcess = flowProcess;
055    this.durationCounter = durationCounter;
056    this.countCounter = countCounter;
057    this.ordinal = ordinal;
058    }
059
060  public void reset( Iterable<V> iterable )
061    {
062    if( iterable == null )
063      this.iterator = null;
064    else
065      this.iterator = iterable.iterator();
066    }
067
068  public void reset( Iterator<V> iterator )
069    {
070    this.iterator = iterator;
071    }
072
073  @Override
074  public boolean hasNext()
075    {
076    if( iterator == null )
077      return false;
078
079    long start = System.currentTimeMillis();
080
081    try
082      {
083      return iterator.hasNext();
084      }
085    finally
086      {
087      flowProcess.increment( durationCounter, System.currentTimeMillis() - start );
088      }
089    }
090
091  @Override
092  public V next()
093    {
094    long start = System.currentTimeMillis();
095
096    try
097      {
098      flowProcess.increment( countCounter, 1 );
099
100      return iterator.next();
101      }
102    finally
103      {
104      flowProcess.increment( durationCounter, System.currentTimeMillis() - start );
105      }
106    }
107
108  @Override
109  public void remove()
110    {
111    iterator.remove();
112    }
113
114  @Override
115  public void close() throws IOException
116    {
117    if( iterator instanceof Closeable )
118      ( (Closeable) iterator ).close();
119    }
120  }