001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.*; 022import java.util.zip.GZIPOutputStream; 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.io.compress.DefaultCodec; 027import org.apache.hadoop.io.compress.zlib.*; 028import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor; 029 030import static org.apache.hadoop.util.PlatformName.IBM_JAVA; 031 032/** 033 * This class creates gzip compressors/decompressors. 034 */ 035@InterfaceAudience.Public 036@InterfaceStability.Evolving 037public class GzipCodec extends DefaultCodec { 038 /** 039 * A bridge that wraps around a DeflaterOutputStream to make it 040 * a CompressionOutputStream. 041 */ 042 @InterfaceStability.Evolving 043 protected static class GzipOutputStream extends CompressorStream { 044 045 private static class ResetableGZIPOutputStream extends GZIPOutputStream { 046 private static final int TRAILER_SIZE = 8; 047 public static final String JVMVersion= System.getProperty("java.version"); 048 private static final boolean HAS_BROKEN_FINISH = 049 (IBM_JAVA && JVMVersion.contains("1.6.0")); 050 051 public ResetableGZIPOutputStream(OutputStream out) throws IOException { 052 super(out); 053 } 054 055 public void resetState() throws IOException { 056 def.reset(); 057 } 058 059 /** 060 * Override this method for HADOOP-8419. 061 * Override because IBM implementation calls def.end() which 062 * causes problem when reseting the stream for reuse. 063 * 064 */ 065 @Override 066 public void finish() throws IOException { 067 if (HAS_BROKEN_FINISH) { 068 if (!def.finished()) { 069 def.finish(); 070 while (!def.finished()) { 071 int i = def.deflate(this.buf, 0, this.buf.length); 072 if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) { 073 writeTrailer(this.buf, i); 074 i += TRAILER_SIZE; 075 out.write(this.buf, 0, i); 076 077 return; 078 } 079 if (i > 0) { 080 out.write(this.buf, 0, i); 081 } 082 } 083 084 byte[] arrayOfByte = new byte[TRAILER_SIZE]; 085 writeTrailer(arrayOfByte, 0); 086 out.write(arrayOfByte); 087 } 088 } else { 089 super.finish(); 090 } 091 } 092 093 /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */ 094 private void writeTrailer(byte[] paramArrayOfByte, int paramInt) 095 throws IOException { 096 writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt); 097 writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4); 098 } 099 100 /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */ 101 private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2) 102 throws IOException { 103 writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2); 104 writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2); 105 } 106 107 /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */ 108 private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2) 109 throws IOException { 110 paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF); 111 paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF); 112 } 113 } 114 115 public GzipOutputStream(OutputStream out) throws IOException { 116 super(new ResetableGZIPOutputStream(out)); 117 } 118 119 /** 120 * Allow children types to put a different type in here. 121 * @param out the Deflater stream to use 122 */ 123 protected GzipOutputStream(CompressorStream out) { 124 super(out); 125 } 126 127 @Override 128 public void close() throws IOException { 129 out.close(); 130 } 131 132 @Override 133 public void flush() throws IOException { 134 out.flush(); 135 } 136 137 @Override 138 public void write(int b) throws IOException { 139 out.write(b); 140 } 141 142 @Override 143 public void write(byte[] data, int offset, int length) 144 throws IOException { 145 out.write(data, offset, length); 146 } 147 148 @Override 149 public void finish() throws IOException { 150 ((ResetableGZIPOutputStream) out).finish(); 151 } 152 153 @Override 154 public void resetState() throws IOException { 155 ((ResetableGZIPOutputStream) out).resetState(); 156 } 157 } 158 159 @Override 160 public CompressionOutputStream createOutputStream(OutputStream out) 161 throws IOException { 162 if (!ZlibFactory.isNativeZlibLoaded(conf)) { 163 return new GzipOutputStream(out); 164 } 165 return CompressionCodec.Util. 166 createOutputStreamWithCodecPool(this, conf, out); 167 } 168 169 @Override 170 public CompressionOutputStream createOutputStream(OutputStream out, 171 Compressor compressor) 172 throws IOException { 173 return (compressor != null) ? 174 new CompressorStream(out, compressor, 175 conf.getInt("io.file.buffer.size", 176 4*1024)) : 177 createOutputStream(out); 178 } 179 180 @Override 181 public Compressor createCompressor() { 182 return (ZlibFactory.isNativeZlibLoaded(conf)) 183 ? new GzipZlibCompressor(conf) 184 : null; 185 } 186 187 @Override 188 public Class<? extends Compressor> getCompressorType() { 189 return ZlibFactory.isNativeZlibLoaded(conf) 190 ? GzipZlibCompressor.class 191 : null; 192 } 193 194 @Override 195 public CompressionInputStream createInputStream(InputStream in) 196 throws IOException { 197 return CompressionCodec.Util. 198 createInputStreamWithCodecPool(this, conf, in); 199 } 200 201 @Override 202 public CompressionInputStream createInputStream(InputStream in, 203 Decompressor decompressor) 204 throws IOException { 205 if (decompressor == null) { 206 decompressor = createDecompressor(); // always succeeds (or throws) 207 } 208 return new DecompressorStream(in, decompressor, 209 conf.getInt("io.file.buffer.size", 4*1024)); 210 } 211 212 @Override 213 public Decompressor createDecompressor() { 214 return (ZlibFactory.isNativeZlibLoaded(conf)) 215 ? new GzipZlibDecompressor() 216 : new BuiltInGzipDecompressor(); 217 } 218 219 @Override 220 public Class<? extends Decompressor> getDecompressorType() { 221 return ZlibFactory.isNativeZlibLoaded(conf) 222 ? GzipZlibDecompressor.class 223 : BuiltInGzipDecompressor.class; 224 } 225 226 @Override 227 public DirectDecompressor createDirectDecompressor() { 228 return ZlibFactory.isNativeZlibLoaded(conf) 229 ? new ZlibDecompressor.ZlibDirectDecompressor( 230 ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null; 231 } 232 233 @Override 234 public String getDefaultExtension() { 235 return ".gz"; 236 } 237 238 static final class GzipZlibCompressor extends ZlibCompressor { 239 public GzipZlibCompressor() { 240 super(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION, 241 ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY, 242 ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024); 243 } 244 245 public GzipZlibCompressor(Configuration conf) { 246 super(ZlibFactory.getCompressionLevel(conf), 247 ZlibFactory.getCompressionStrategy(conf), 248 ZlibCompressor.CompressionHeader.GZIP_FORMAT, 249 64 * 1024); 250 } 251 } 252 253 static final class GzipZlibDecompressor extends ZlibDecompressor { 254 public GzipZlibDecompressor() { 255 super(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 64*1024); 256 } 257 } 258 259}