001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.io;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.security.DigestInputStream;
026import java.security.MessageDigest;
027import java.security.NoSuchAlgorithmException;
028
029import org.apache.commons.codec.binary.Hex;
030import org.apache.hadoop.fs.FSInputStream;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Class FSDigestInputStream is an {@link FSInputStream} implementation that can verify a
036 * {@link MessageDigest} and will count the number of bytes read for use in progress status.
037 */
038public class FSDigestInputStream extends FSInputStream
039  {
040  /** Field LOG */
041  private static final Logger LOG = LoggerFactory.getLogger( FSDigestInputStream.class );
042
043  /** Field count */
044  int count = 0;
045  /** Field inputStream */
046  final InputStream inputStream;
047  /** Field digestHex */
048  final String digestHex;
049
050  /**
051   * Constructor FSDigestInputStream creates a new FSDigestInputStream instance.
052   *
053   * @param inputStream of type InputStream
054   * @param digestHex   of type String
055   * @throws IOException if unable to get md5 digest
056   */
057  public FSDigestInputStream( InputStream inputStream, String digestHex ) throws IOException
058    {
059    this( inputStream, getMD5Digest(), digestHex );
060    }
061
062  /**
063   * Constructor FSDigestInputStream creates a new FSDigestInputStream instance.
064   *
065   * @param inputStream   of type InputStream
066   * @param messageDigest of type MessageDigest
067   * @param digestHex     of type String
068   */
069  public FSDigestInputStream( InputStream inputStream, MessageDigest messageDigest, String digestHex )
070    {
071    this.inputStream = digestHex == null ? inputStream : new DigestInputStream( inputStream, messageDigest );
072    this.digestHex = digestHex;
073    }
074
075  /**
076   * Method getMD5Digest returns the MD5Digest of this FSDigestInputStream object.
077   *
078   * @return the MD5Digest (type MessageDigest) of this FSDigestInputStream object.
079   * @throws IOException when
080   */
081  private static MessageDigest getMD5Digest() throws IOException
082    {
083    try
084      {
085      return MessageDigest.getInstance( "MD5" );
086      }
087    catch( NoSuchAlgorithmException exception )
088      {
089      throw new IOException( "digest not found: " + exception.getMessage() );
090      }
091    }
092
093  @Override
094  public int read() throws IOException
095    {
096    count++;
097    return inputStream.read();
098    }
099
100  @Override
101  public int read( byte[] b, int off, int len ) throws IOException
102    {
103    int result = inputStream.read( b, off, len );
104    count += result;
105    return result;
106    }
107
108  @Override
109  public void close() throws IOException
110    {
111    inputStream.close();
112
113    LOG.info( "closing stream, testing digest: [{}]", digestHex == null ? "none" : digestHex );
114
115    if( digestHex == null )
116      return;
117
118    String digestHex = new String( Hex.encodeHex( ( (DigestInputStream) inputStream ).getMessageDigest().digest() ) );
119
120    if( !digestHex.equals( this.digestHex ) )
121      {
122      String message = "given digest: [" + this.digestHex + "], does not match input stream digest: [" + digestHex + "]";
123      LOG.error( message );
124      throw new IOException( message );
125      }
126    }
127
128  @Override
129  public void seek( long pos ) throws IOException
130    {
131    if( getPos() == pos )
132      return;
133
134    if( getPos() > pos )
135      throw new IOException( "cannot seek to " + pos + ", currently at" + getPos() );
136
137    int len = (int) ( pos - getPos() );
138    byte[] bytes = new byte[ 50 * 1024 ];
139
140    while( len > 0 )
141      len -= read( bytes, 0, Math.min( len, bytes.length ) );
142    }
143
144  @Override
145  public long getPos() throws IOException
146    {
147    return count;
148    }
149
150  @Override
151  public boolean seekToNewSource( long targetPos ) throws IOException
152    {
153    return false;
154    }
155  }