001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.*;
022import java.util.zip.GZIPOutputStream;
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.io.compress.DefaultCodec;
027import org.apache.hadoop.io.compress.zlib.*;
028import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
029
030import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
031
032/**
033 * This class creates gzip compressors/decompressors. 
034 */
035@InterfaceAudience.Public
036@InterfaceStability.Evolving
037public class GzipCodec extends DefaultCodec {
038  /**
039   * A bridge that wraps around a DeflaterOutputStream to make it 
040   * a CompressionOutputStream.
041   */
042  @InterfaceStability.Evolving
043  protected static class GzipOutputStream extends CompressorStream {
044
045    private static class ResetableGZIPOutputStream extends GZIPOutputStream {
046      private static final int TRAILER_SIZE = 8;
047      public static final String JVMVersion= System.getProperty("java.version");
048      private static final boolean HAS_BROKEN_FINISH =
049          (IBM_JAVA && JVMVersion.contains("1.6.0"));
050
051      public ResetableGZIPOutputStream(OutputStream out) throws IOException {
052        super(out);
053      }
054
055      public void resetState() throws IOException {
056        def.reset();
057      }
058
059      /**
060       * Override this method for HADOOP-8419.
061       * Override because IBM implementation calls def.end() which
062       * causes problem when reseting the stream for reuse.
063       *
064       */
065      @Override
066      public void finish() throws IOException {
067        if (HAS_BROKEN_FINISH) {
068          if (!def.finished()) {
069            def.finish();
070            while (!def.finished()) {
071              int i = def.deflate(this.buf, 0, this.buf.length);
072              if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) {
073                writeTrailer(this.buf, i);
074                i += TRAILER_SIZE;
075                out.write(this.buf, 0, i);
076
077                return;
078              }
079              if (i > 0) {
080                out.write(this.buf, 0, i);
081              }
082            }
083
084            byte[] arrayOfByte = new byte[TRAILER_SIZE];
085            writeTrailer(arrayOfByte, 0);
086            out.write(arrayOfByte);
087          }
088        } else {
089          super.finish();
090        }
091      }
092
093      /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */
094      private void writeTrailer(byte[] paramArrayOfByte, int paramInt)
095        throws IOException {
096        writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt);
097        writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
098      }
099
100      /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */
101      private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
102        throws IOException {
103        writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
104        writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
105      }
106
107      /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */
108      private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
109        throws IOException {
110        paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF);
111        paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF);
112      }
113    }
114
115    public GzipOutputStream(OutputStream out) throws IOException {
116      super(new ResetableGZIPOutputStream(out));
117    }
118    
119    /**
120     * Allow children types to put a different type in here.
121     * @param out the Deflater stream to use
122     */
123    protected GzipOutputStream(CompressorStream out) {
124      super(out);
125    }
126    
127    @Override
128    public void close() throws IOException {
129      out.close();
130    }
131    
132    @Override
133    public void flush() throws IOException {
134      out.flush();
135    }
136    
137    @Override
138    public void write(int b) throws IOException {
139      out.write(b);
140    }
141    
142    @Override
143    public void write(byte[] data, int offset, int length) 
144      throws IOException {
145      out.write(data, offset, length);
146    }
147    
148    @Override
149    public void finish() throws IOException {
150      ((ResetableGZIPOutputStream) out).finish();
151    }
152
153    @Override
154    public void resetState() throws IOException {
155      ((ResetableGZIPOutputStream) out).resetState();
156    }
157  }
158
159  @Override
160  public CompressionOutputStream createOutputStream(OutputStream out) 
161    throws IOException {
162    if (!ZlibFactory.isNativeZlibLoaded(conf)) {
163      return new GzipOutputStream(out);
164    }
165    return CompressionCodec.Util.
166        createOutputStreamWithCodecPool(this, conf, out);
167  }
168  
169  @Override
170  public CompressionOutputStream createOutputStream(OutputStream out, 
171                                                    Compressor compressor) 
172  throws IOException {
173    return (compressor != null) ?
174               new CompressorStream(out, compressor,
175                                    conf.getInt("io.file.buffer.size", 
176                                                4*1024)) :
177               createOutputStream(out);
178  }
179
180  @Override
181  public Compressor createCompressor() {
182    return (ZlibFactory.isNativeZlibLoaded(conf))
183      ? new GzipZlibCompressor(conf)
184      : null;
185  }
186
187  @Override
188  public Class<? extends Compressor> getCompressorType() {
189    return ZlibFactory.isNativeZlibLoaded(conf)
190      ? GzipZlibCompressor.class
191      : null;
192  }
193
194  @Override
195  public CompressionInputStream createInputStream(InputStream in)
196      throws IOException {
197    return CompressionCodec.Util.
198        createInputStreamWithCodecPool(this, conf, in);
199  }
200
201  @Override
202  public CompressionInputStream createInputStream(InputStream in,
203                                                  Decompressor decompressor)
204  throws IOException {
205    if (decompressor == null) {
206      decompressor = createDecompressor();  // always succeeds (or throws)
207    }
208    return new DecompressorStream(in, decompressor,
209                                  conf.getInt("io.file.buffer.size", 4*1024));
210  }
211
212  @Override
213  public Decompressor createDecompressor() {
214    return (ZlibFactory.isNativeZlibLoaded(conf))
215      ? new GzipZlibDecompressor()
216      : new BuiltInGzipDecompressor();
217  }
218
219  @Override
220  public Class<? extends Decompressor> getDecompressorType() {
221    return ZlibFactory.isNativeZlibLoaded(conf)
222      ? GzipZlibDecompressor.class
223      : BuiltInGzipDecompressor.class;
224  }
225    
226  @Override
227  public DirectDecompressor createDirectDecompressor() {
228    return ZlibFactory.isNativeZlibLoaded(conf) 
229        ? new ZlibDecompressor.ZlibDirectDecompressor(
230          ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null;
231  }
232
233  @Override
234  public String getDefaultExtension() {
235    return ".gz";
236  }
237
238  static final class GzipZlibCompressor extends ZlibCompressor {
239    public GzipZlibCompressor() {
240      super(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
241          ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
242          ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024);
243    }
244    
245    public GzipZlibCompressor(Configuration conf) {
246      super(ZlibFactory.getCompressionLevel(conf),
247           ZlibFactory.getCompressionStrategy(conf),
248           ZlibCompressor.CompressionHeader.GZIP_FORMAT,
249           64 * 1024);
250    }
251  }
252
253  static final class GzipZlibDecompressor extends ZlibDecompressor {
254    public GzipZlibDecompressor() {
255      super(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 64*1024);
256    }
257  }
258
259}