001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop.io; 022 023 import java.io.IOException; 024 import java.io.InputStream; 025 import java.security.DigestInputStream; 026 import java.security.MessageDigest; 027 import java.security.NoSuchAlgorithmException; 028 029 import org.apache.commons.codec.binary.Hex; 030 import org.apache.hadoop.fs.FSInputStream; 031 import org.slf4j.Logger; 032 import org.slf4j.LoggerFactory; 033 034 /** 035 * Class FSDigestInputStream is an {@link FSInputStream} implementation that can verify a 036 * {@link MessageDigest} and will count the number of bytes read for use in progress status. 037 */ 038 public class FSDigestInputStream extends FSInputStream 039 { 040 /** Field LOG */ 041 private static final Logger LOG = LoggerFactory.getLogger( FSDigestInputStream.class ); 042 043 /** Field count */ 044 int count = 0; 045 /** Field inputStream */ 046 final InputStream inputStream; 047 /** Field digestHex */ 048 final String digestHex; 049 050 /** 051 * Constructor FSDigestInputStream creates a new FSDigestInputStream instance. 052 * 053 * @param inputStream of type InputStream 054 * @param digestHex of type String 055 * @throws IOException if unable to get md5 digest 056 */ 057 public FSDigestInputStream( InputStream inputStream, String digestHex ) throws IOException 058 { 059 this( inputStream, getMD5Digest(), digestHex ); 060 } 061 062 /** 063 * Constructor FSDigestInputStream creates a new FSDigestInputStream instance. 064 * 065 * @param inputStream of type InputStream 066 * @param messageDigest of type MessageDigest 067 * @param digestHex of type String 068 */ 069 public FSDigestInputStream( InputStream inputStream, MessageDigest messageDigest, String digestHex ) 070 { 071 this.inputStream = digestHex == null ? inputStream : new DigestInputStream( inputStream, messageDigest ); 072 this.digestHex = digestHex; 073 } 074 075 /** 076 * Method getMD5Digest returns the MD5Digest of this FSDigestInputStream object. 077 * 078 * @return the MD5Digest (type MessageDigest) of this FSDigestInputStream object. 079 * @throws IOException when 080 */ 081 private static MessageDigest getMD5Digest() throws IOException 082 { 083 try 084 { 085 return MessageDigest.getInstance( "MD5" ); 086 } 087 catch( NoSuchAlgorithmException exception ) 088 { 089 throw new IOException( "digest not found: " + exception.getMessage() ); 090 } 091 } 092 093 @Override 094 public int read() throws IOException 095 { 096 count++; 097 return inputStream.read(); 098 } 099 100 @Override 101 public int read( byte[] b, int off, int len ) throws IOException 102 { 103 int result = inputStream.read( b, off, len ); 104 count += result; 105 return result; 106 } 107 108 @Override 109 public void close() throws IOException 110 { 111 inputStream.close(); 112 113 LOG.info( "closing stream, testing digest: [{}]", digestHex == null ? "none" : digestHex ); 114 115 if( digestHex == null ) 116 return; 117 118 String digestHex = new String( Hex.encodeHex( ( (DigestInputStream) inputStream ).getMessageDigest().digest() ) ); 119 120 if( !digestHex.equals( this.digestHex ) ) 121 { 122 String message = "given digest: [" + this.digestHex + "], does not match input stream digest: [" + digestHex + "]"; 123 LOG.error( message ); 124 throw new IOException( message ); 125 } 126 } 127 128 @Override 129 public void seek( long pos ) throws IOException 130 { 131 if( getPos() == pos ) 132 return; 133 134 if( getPos() > pos ) 135 throw new IOException( "cannot seek to " + pos + ", currently at" + getPos() ); 136 137 int len = (int) ( pos - getPos() ); 138 byte[] bytes = new byte[ 50 * 1024 ]; 139 140 while( len > 0 ) 141 len -= read( bytes, 0, Math.min( len, bytes.length ) ); 142 } 143 144 @Override 145 public long getPos() throws IOException 146 { 147 return count; 148 } 149 150 @Override 151 public boolean seekToNewSource( long targetPos ) throws IOException 152 { 153 return false; 154 } 155 }