001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.io.compress; 019 020import java.util.HashSet; 021import java.util.HashMap; 022import java.util.Set; 023import java.util.Map; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.util.ReflectionUtils; 032 033import com.google.common.cache.CacheBuilder; 034import com.google.common.cache.CacheLoader; 035import com.google.common.cache.LoadingCache; 036 037/** 038 * A global compressor/decompressor pool used to save and reuse 039 * (possibly native) compression/decompression codecs. 040 */ 041@InterfaceAudience.Public 042@InterfaceStability.Evolving 043public class CodecPool { 044 private static final Log LOG = LogFactory.getLog(CodecPool.class); 045 046 /** 047 * A global compressor pool used to save the expensive 048 * construction/destruction of (possibly native) decompression codecs. 049 */ 050 private static final Map<Class<Compressor>, Set<Compressor>> compressorPool = 051 new HashMap<Class<Compressor>, Set<Compressor>>(); 052 053 /** 054 * A global decompressor pool used to save the expensive 055 * construction/destruction of (possibly native) decompression codecs. 056 */ 057 private static final Map<Class<Decompressor>, Set<Decompressor>> decompressorPool = 058 new HashMap<Class<Decompressor>, Set<Decompressor>>(); 059 060 private static <T> LoadingCache<Class<T>, AtomicInteger> createCache( 061 Class<T> klass) { 062 return CacheBuilder.newBuilder().build( 063 new CacheLoader<Class<T>, AtomicInteger>() { 064 @Override 065 public AtomicInteger load(Class<T> key) throws Exception { 066 return new AtomicInteger(); 067 } 068 }); 069 } 070 071 /** 072 * Map to track the number of leased compressors 073 */ 074 private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = 075 createCache(Compressor.class); 076 077 /** 078 * Map to tracks the number of leased decompressors 079 */ 080 private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = 081 createCache(Decompressor.class); 082 083 private static <T> T borrow(Map<Class<T>, Set<T>> pool, 084 Class<? extends T> codecClass) { 085 T codec = null; 086 087 // Check if an appropriate codec is available 088 Set<T> codecSet; 089 synchronized (pool) { 090 codecSet = pool.get(codecClass); 091 } 092 093 if (codecSet != null) { 094 synchronized (codecSet) { 095 if (!codecSet.isEmpty()) { 096 codec = codecSet.iterator().next(); 097 codecSet.remove(codec); 098 } 099 } 100 } 101 102 return codec; 103 } 104 105 private static <T> boolean payback(Map<Class<T>, Set<T>> pool, T codec) { 106 if (codec != null) { 107 Class<T> codecClass = ReflectionUtils.getClass(codec); 108 Set<T> codecSet; 109 synchronized (pool) { 110 codecSet = pool.get(codecClass); 111 if (codecSet == null) { 112 codecSet = new HashSet<T>(); 113 pool.put(codecClass, codecSet); 114 } 115 } 116 117 synchronized (codecSet) { 118 return codecSet.add(codec); 119 } 120 } 121 return false; 122 } 123 124 @SuppressWarnings("unchecked") 125 private static <T> int getLeaseCount( 126 LoadingCache<Class<T>, AtomicInteger> usageCounts, 127 Class<? extends T> codecClass) { 128 return usageCounts.getUnchecked((Class<T>) codecClass).get(); 129 } 130 131 private static <T> void updateLeaseCount( 132 LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) { 133 if (codec != null) { 134 Class<T> codecClass = ReflectionUtils.getClass(codec); 135 usageCounts.getUnchecked(codecClass).addAndGet(delta); 136 } 137 } 138 139 /** 140 * Get a {@link Compressor} for the given {@link CompressionCodec} from the 141 * pool or a new one. 142 * 143 * @param codec the <code>CompressionCodec</code> for which to get the 144 * <code>Compressor</code> 145 * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor 146 * @return <code>Compressor</code> for the given 147 * <code>CompressionCodec</code> from the pool or a new one 148 */ 149 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { 150 Compressor compressor = borrow(compressorPool, codec.getCompressorType()); 151 if (compressor == null) { 152 compressor = codec.createCompressor(); 153 LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); 154 } else { 155 compressor.reinit(conf); 156 if(LOG.isDebugEnabled()) { 157 LOG.debug("Got recycled compressor"); 158 } 159 } 160 updateLeaseCount(compressorCounts, compressor, 1); 161 return compressor; 162 } 163 164 public static Compressor getCompressor(CompressionCodec codec) { 165 return getCompressor(codec, null); 166 } 167 168 /** 169 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the 170 * pool or a new one. 171 * 172 * @param codec the <code>CompressionCodec</code> for which to get the 173 * <code>Decompressor</code> 174 * @return <code>Decompressor</code> for the given 175 * <code>CompressionCodec</code> the pool or a new one 176 */ 177 public static Decompressor getDecompressor(CompressionCodec codec) { 178 Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType()); 179 if (decompressor == null) { 180 decompressor = codec.createDecompressor(); 181 LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]"); 182 } else { 183 if(LOG.isDebugEnabled()) { 184 LOG.debug("Got recycled decompressor"); 185 } 186 } 187 updateLeaseCount(decompressorCounts, decompressor, 1); 188 return decompressor; 189 } 190 191 /** 192 * Return the {@link Compressor} to the pool. 193 * 194 * @param compressor the <code>Compressor</code> to be returned to the pool 195 */ 196 public static void returnCompressor(Compressor compressor) { 197 if (compressor == null) { 198 return; 199 } 200 // if the compressor can't be reused, don't pool it. 201 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 202 return; 203 } 204 compressor.reset(); 205 if (payback(compressorPool, compressor)) { 206 updateLeaseCount(compressorCounts, compressor, -1); 207 } 208 } 209 210 /** 211 * Return the {@link Decompressor} to the pool. 212 * 213 * @param decompressor the <code>Decompressor</code> to be returned to the 214 * pool 215 */ 216 public static void returnDecompressor(Decompressor decompressor) { 217 if (decompressor == null) { 218 return; 219 } 220 // if the decompressor can't be reused, don't pool it. 221 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 222 return; 223 } 224 decompressor.reset(); 225 if (payback(decompressorPool, decompressor)) { 226 updateLeaseCount(decompressorCounts, decompressor, -1); 227 } 228 } 229 230 /** 231 * Return the number of leased {@link Compressor}s for this 232 * {@link CompressionCodec} 233 */ 234 public static int getLeasedCompressorsCount(CompressionCodec codec) { 235 return (codec == null) ? 0 : getLeaseCount(compressorCounts, 236 codec.getCompressorType()); 237 } 238 239 /** 240 * Return the number of leased {@link Decompressor}s for this 241 * {@link CompressionCodec} 242 */ 243 public static int getLeasedDecompressorsCount(CompressionCodec codec) { 244 return (codec == null) ? 0 : getLeaseCount(decompressorCounts, 245 codec.getDecompressorType()); 246 } 247}