001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.lib;
020
021import org.apache.hadoop.util.ReflectionUtils;
022import org.apache.hadoop.classification.InterfaceAudience;
023import org.apache.hadoop.classification.InterfaceStability;
024import org.apache.hadoop.mapred.MapRunnable;
025import org.apache.hadoop.mapred.JobConf;
026import org.apache.hadoop.mapred.Mapper;
027import org.apache.hadoop.mapred.RecordReader;
028import org.apache.hadoop.mapred.OutputCollector;
029import org.apache.hadoop.mapred.Reporter;
030import org.apache.hadoop.mapred.SkipBadRecords;
031import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
035
036import java.io.IOException;
037import java.util.concurrent.*;
038
039/**
040 * Multithreaded implementation for {@link MapRunnable}.
041 * <p>
042 * It can be used instead of the default implementation,
043 * of {@link org.apache.hadoop.mapred.MapRunner}, when the Map
044 * operation is not CPU bound in order to improve throughput.
045 * <p>
046 * Map implementations using this MapRunnable must be thread-safe.
047 * <p>
048 * The Map-Reduce job has to be configured to use this MapRunnable class (using
049 * the JobConf.setMapRunnerClass method) and
050 * the number of threads the thread-pool can use with the
051 * <code>mapred.map.multithreadedrunner.threads</code> property, its default
052 * value is 10 threads.
053 * <p>
054 */
055@InterfaceAudience.Public
056@InterfaceStability.Stable
057public class MultithreadedMapRunner<K1, V1, K2, V2>
058    implements MapRunnable<K1, V1, K2, V2> {
059
060  private static final Log LOG =
061    LogFactory.getLog(MultithreadedMapRunner.class.getName());
062
063  private JobConf job;
064  private Mapper<K1, V1, K2, V2> mapper;
065  private ExecutorService executorService;
066  private volatile IOException ioException;
067  private volatile RuntimeException runtimeException;
068  private boolean incrProcCount;
069
070  @SuppressWarnings("unchecked")
071  public void configure(JobConf jobConf) {
072    int numberOfThreads =
073      jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
074    if (LOG.isDebugEnabled()) {
075      LOG.debug("Configuring jobConf " + jobConf.getJobName() +
076                " to use " + numberOfThreads + " threads");
077    }
078
079    this.job = jobConf;
080    //increment processed counter only if skipping feature is enabled
081    this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
082      SkipBadRecords.getAutoIncrMapperProcCount(job);
083    this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
084        jobConf);
085
086    // Creating a threadpool of the configured size to execute the Mapper
087    // map method in parallel.
088    executorService = new HadoopThreadPoolExecutor(numberOfThreads,
089        numberOfThreads,
090                                             0L, TimeUnit.MILLISECONDS,
091                                             new BlockingArrayQueue
092                                               (numberOfThreads));
093  }
094
095  /**
096   * A blocking array queue that replaces offer and add, which throws on a full
097   * queue, to a put, which waits on a full queue.
098   */
099  private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
100 
101    private static final long serialVersionUID = 1L;
102    public BlockingArrayQueue(int capacity) {
103      super(capacity);
104    }
105    public boolean offer(Runnable r) {
106      return add(r);
107    }
108    public boolean add(Runnable r) {
109      try {
110        put(r);
111      } catch (InterruptedException ie) {
112        Thread.currentThread().interrupt();
113      }
114      return true;
115    }
116  }
117
118  private void checkForExceptionsFromProcessingThreads()
119      throws IOException, RuntimeException {
120    // Checking if a Mapper.map within a Runnable has generated an
121    // IOException. If so we rethrow it to force an abort of the Map
122    // operation thus keeping the semantics of the default
123    // implementation.
124    if (ioException != null) {
125      throw ioException;
126    }
127
128    // Checking if a Mapper.map within a Runnable has generated a
129    // RuntimeException. If so we rethrow it to force an abort of the Map
130    // operation thus keeping the semantics of the default
131    // implementation.
132    if (runtimeException != null) {
133      throw runtimeException;
134    }
135  }
136
137  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
138                  Reporter reporter)
139    throws IOException {
140    try {
141      // allocate key & value instances these objects will not be reused
142      // because execution of Mapper.map is not serialized.
143      K1 key = input.createKey();
144      V1 value = input.createValue();
145
146      while (input.next(key, value)) {
147
148        executorService.execute(new MapperInvokeRunable(key, value, output,
149                                reporter));
150
151        checkForExceptionsFromProcessingThreads();
152
153        // Allocate new key & value instances as mapper is running in parallel
154        key = input.createKey();
155        value = input.createValue();
156      }
157
158      if (LOG.isDebugEnabled()) {
159        LOG.debug("Finished dispatching all Mappper.map calls, job "
160                  + job.getJobName());
161      }
162
163      // Graceful shutdown of the Threadpool, it will let all scheduled
164      // Runnables to end.
165      executorService.shutdown();
166
167      try {
168
169        // Now waiting for all Runnables to end.
170        while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
171          if (LOG.isDebugEnabled()) {
172            LOG.debug("Awaiting all running Mappper.map calls to finish, job "
173                      + job.getJobName());
174          }
175
176          // NOTE: while Mapper.map dispatching has concluded there are still
177          // map calls in progress and exceptions would be thrown.
178          checkForExceptionsFromProcessingThreads();
179
180        }
181
182        // NOTE: it could be that a map call has had an exception after the
183        // call for awaitTermination() returing true. And edge case but it
184        // could happen.
185        checkForExceptionsFromProcessingThreads();
186
187      } catch (IOException ioEx) {
188        // Forcing a shutdown of all thread of the threadpool and rethrowing
189        // the IOException
190        executorService.shutdownNow();
191        throw ioEx;
192      } catch (InterruptedException iEx) {
193        throw new RuntimeException(iEx);
194      }
195
196    } finally {
197      mapper.close();
198    }
199  }
200
201
202  /**
203   * Runnable to execute a single Mapper.map call from a forked thread.
204   */
205  private class MapperInvokeRunable implements Runnable {
206    private K1 key;
207    private V1 value;
208    private OutputCollector<K2, V2> output;
209    private Reporter reporter;
210
211    /**
212     * Collecting all required parameters to execute a Mapper.map call.
213     * <p>
214     *
215     * @param key
216     * @param value
217     * @param output
218     * @param reporter
219     */
220    public MapperInvokeRunable(K1 key, V1 value,
221                               OutputCollector<K2, V2> output,
222                               Reporter reporter) {
223      this.key = key;
224      this.value = value;
225      this.output = output;
226      this.reporter = reporter;
227    }
228
229    /**
230     * Executes a Mapper.map call with the given Mapper and parameters.
231     * <p>
232     * This method is called from the thread-pool thread.
233     *
234     */
235    public void run() {
236      try {
237        // map pair to output
238        MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
239        if(incrProcCount) {
240          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
241              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
242        }
243      } catch (IOException ex) {
244        // If there is an IOException during the call it is set in an instance
245        // variable of the MultithreadedMapRunner from where it will be
246        // rethrown.
247        synchronized (MultithreadedMapRunner.this) {
248          if (MultithreadedMapRunner.this.ioException == null) {
249            MultithreadedMapRunner.this.ioException = ex;
250          }
251        }
252      } catch (RuntimeException ex) {
253        // If there is a RuntimeException during the call it is set in an
254        // instance variable of the MultithreadedMapRunner from where it will be
255        // rethrown.
256        synchronized (MultithreadedMapRunner.this) {
257          if (MultithreadedMapRunner.this.runtimeException == null) {
258            MultithreadedMapRunner.this.runtimeException = ex;
259          }
260        }
261      }
262    }
263  }
264
265}