001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.lib; 020 021import org.apache.hadoop.util.ReflectionUtils; 022import org.apache.hadoop.classification.InterfaceAudience; 023import org.apache.hadoop.classification.InterfaceStability; 024import org.apache.hadoop.mapred.MapRunnable; 025import org.apache.hadoop.mapred.JobConf; 026import org.apache.hadoop.mapred.Mapper; 027import org.apache.hadoop.mapred.RecordReader; 028import org.apache.hadoop.mapred.OutputCollector; 029import org.apache.hadoop.mapred.Reporter; 030import org.apache.hadoop.mapred.SkipBadRecords; 031import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; 035 036import java.io.IOException; 037import java.util.concurrent.*; 038 039/** 040 * Multithreaded implementation for {@link MapRunnable}. 041 * <p> 042 * It can be used instead of the default implementation, 043 * of {@link org.apache.hadoop.mapred.MapRunner}, when the Map 044 * operation is not CPU bound in order to improve throughput. 045 * <p> 046 * Map implementations using this MapRunnable must be thread-safe. 047 * <p> 048 * The Map-Reduce job has to be configured to use this MapRunnable class (using 049 * the JobConf.setMapRunnerClass method) and 050 * the number of threads the thread-pool can use with the 051 * <code>mapred.map.multithreadedrunner.threads</code> property, its default 052 * value is 10 threads. 053 * <p> 054 */ 055@InterfaceAudience.Public 056@InterfaceStability.Stable 057public class MultithreadedMapRunner<K1, V1, K2, V2> 058 implements MapRunnable<K1, V1, K2, V2> { 059 060 private static final Log LOG = 061 LogFactory.getLog(MultithreadedMapRunner.class.getName()); 062 063 private JobConf job; 064 private Mapper<K1, V1, K2, V2> mapper; 065 private ExecutorService executorService; 066 private volatile IOException ioException; 067 private volatile RuntimeException runtimeException; 068 private boolean incrProcCount; 069 070 @SuppressWarnings("unchecked") 071 public void configure(JobConf jobConf) { 072 int numberOfThreads = 073 jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10); 074 if (LOG.isDebugEnabled()) { 075 LOG.debug("Configuring jobConf " + jobConf.getJobName() + 076 " to use " + numberOfThreads + " threads"); 077 } 078 079 this.job = jobConf; 080 //increment processed counter only if skipping feature is enabled 081 this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 082 SkipBadRecords.getAutoIncrMapperProcCount(job); 083 this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), 084 jobConf); 085 086 // Creating a threadpool of the configured size to execute the Mapper 087 // map method in parallel. 088 executorService = new HadoopThreadPoolExecutor(numberOfThreads, 089 numberOfThreads, 090 0L, TimeUnit.MILLISECONDS, 091 new BlockingArrayQueue 092 (numberOfThreads)); 093 } 094 095 /** 096 * A blocking array queue that replaces offer and add, which throws on a full 097 * queue, to a put, which waits on a full queue. 098 */ 099 private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> { 100 101 private static final long serialVersionUID = 1L; 102 public BlockingArrayQueue(int capacity) { 103 super(capacity); 104 } 105 public boolean offer(Runnable r) { 106 return add(r); 107 } 108 public boolean add(Runnable r) { 109 try { 110 put(r); 111 } catch (InterruptedException ie) { 112 Thread.currentThread().interrupt(); 113 } 114 return true; 115 } 116 } 117 118 private void checkForExceptionsFromProcessingThreads() 119 throws IOException, RuntimeException { 120 // Checking if a Mapper.map within a Runnable has generated an 121 // IOException. If so we rethrow it to force an abort of the Map 122 // operation thus keeping the semantics of the default 123 // implementation. 124 if (ioException != null) { 125 throw ioException; 126 } 127 128 // Checking if a Mapper.map within a Runnable has generated a 129 // RuntimeException. If so we rethrow it to force an abort of the Map 130 // operation thus keeping the semantics of the default 131 // implementation. 132 if (runtimeException != null) { 133 throw runtimeException; 134 } 135 } 136 137 public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, 138 Reporter reporter) 139 throws IOException { 140 try { 141 // allocate key & value instances these objects will not be reused 142 // because execution of Mapper.map is not serialized. 143 K1 key = input.createKey(); 144 V1 value = input.createValue(); 145 146 while (input.next(key, value)) { 147 148 executorService.execute(new MapperInvokeRunable(key, value, output, 149 reporter)); 150 151 checkForExceptionsFromProcessingThreads(); 152 153 // Allocate new key & value instances as mapper is running in parallel 154 key = input.createKey(); 155 value = input.createValue(); 156 } 157 158 if (LOG.isDebugEnabled()) { 159 LOG.debug("Finished dispatching all Mappper.map calls, job " 160 + job.getJobName()); 161 } 162 163 // Graceful shutdown of the Threadpool, it will let all scheduled 164 // Runnables to end. 165 executorService.shutdown(); 166 167 try { 168 169 // Now waiting for all Runnables to end. 170 while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { 171 if (LOG.isDebugEnabled()) { 172 LOG.debug("Awaiting all running Mappper.map calls to finish, job " 173 + job.getJobName()); 174 } 175 176 // NOTE: while Mapper.map dispatching has concluded there are still 177 // map calls in progress and exceptions would be thrown. 178 checkForExceptionsFromProcessingThreads(); 179 180 } 181 182 // NOTE: it could be that a map call has had an exception after the 183 // call for awaitTermination() returing true. And edge case but it 184 // could happen. 185 checkForExceptionsFromProcessingThreads(); 186 187 } catch (IOException ioEx) { 188 // Forcing a shutdown of all thread of the threadpool and rethrowing 189 // the IOException 190 executorService.shutdownNow(); 191 throw ioEx; 192 } catch (InterruptedException iEx) { 193 throw new RuntimeException(iEx); 194 } 195 196 } finally { 197 mapper.close(); 198 } 199 } 200 201 202 /** 203 * Runnable to execute a single Mapper.map call from a forked thread. 204 */ 205 private class MapperInvokeRunable implements Runnable { 206 private K1 key; 207 private V1 value; 208 private OutputCollector<K2, V2> output; 209 private Reporter reporter; 210 211 /** 212 * Collecting all required parameters to execute a Mapper.map call. 213 * <p> 214 * 215 * @param key 216 * @param value 217 * @param output 218 * @param reporter 219 */ 220 public MapperInvokeRunable(K1 key, V1 value, 221 OutputCollector<K2, V2> output, 222 Reporter reporter) { 223 this.key = key; 224 this.value = value; 225 this.output = output; 226 this.reporter = reporter; 227 } 228 229 /** 230 * Executes a Mapper.map call with the given Mapper and parameters. 231 * <p> 232 * This method is called from the thread-pool thread. 233 * 234 */ 235 public void run() { 236 try { 237 // map pair to output 238 MultithreadedMapRunner.this.mapper.map(key, value, output, reporter); 239 if(incrProcCount) { 240 reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 241 SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); 242 } 243 } catch (IOException ex) { 244 // If there is an IOException during the call it is set in an instance 245 // variable of the MultithreadedMapRunner from where it will be 246 // rethrown. 247 synchronized (MultithreadedMapRunner.this) { 248 if (MultithreadedMapRunner.this.ioException == null) { 249 MultithreadedMapRunner.this.ioException = ex; 250 } 251 } 252 } catch (RuntimeException ex) { 253 // If there is a RuntimeException during the call it is set in an 254 // instance variable of the MultithreadedMapRunner from where it will be 255 // rethrown. 256 synchronized (MultithreadedMapRunner.this) { 257 if (MultithreadedMapRunner.this.runtimeException == null) { 258 MultithreadedMapRunner.this.runtimeException = ex; 259 } 260 } 261 } 262 } 263 } 264 265}