001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021
022import java.io.IOException;
023import java.util.regex.Matcher;
024import java.util.regex.Pattern;
025
026import com.google.common.annotations.VisibleForTesting;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceAudience.Private;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.io.LongWritable;
037import org.apache.hadoop.io.RawComparator;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.io.WritableComparable;
040import org.apache.hadoop.io.WritableComparator;
041import org.apache.hadoop.io.compress.CompressionCodec;
042import org.apache.hadoop.mapred.lib.HashPartitioner;
043import org.apache.hadoop.mapred.lib.IdentityMapper;
044import org.apache.hadoop.mapred.lib.IdentityReducer;
045import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
046import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
047import org.apache.hadoop.mapreduce.MRConfig;
048import org.apache.hadoop.mapreduce.MRJobConfig;
049import org.apache.hadoop.mapreduce.TaskType;
050import org.apache.hadoop.mapreduce.filecache.DistributedCache;
051import org.apache.hadoop.mapreduce.util.ConfigUtil;
052import org.apache.hadoop.security.Credentials;
053import org.apache.hadoop.util.ClassUtil;
054import org.apache.hadoop.util.ReflectionUtils;
055import org.apache.hadoop.util.Tool;
056import org.apache.log4j.Level;
057
058/** 
059 * A map/reduce job configuration.
060 * 
061 * <p><code>JobConf</code> is the primary interface for a user to describe a 
062 * map-reduce job to the Hadoop framework for execution. The framework tries to
063 * faithfully execute the job as-is described by <code>JobConf</code>, however:
064 * <ol>
065 *   <li>
066 *   Some configuration parameters might have been marked as 
067 *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
068 *   final</a> by administrators and hence cannot be altered.
069 *   </li>
070 *   <li>
071 *   While some job parameters are straight-forward to set 
072 *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
073 *   with the rest of the framework and/or job-configuration and is relatively
074 *   more complex for the user to control finely
075 *   (e.g. {@link #setNumMapTasks(int)}).
076 *   </li>
077 * </ol>
078 * 
079 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
080 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
081 * {@link OutputFormat} implementations to be used etc.
082 *
083 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
084 * of the job such as <code>Comparator</code>s to be used, files to be put in  
085 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
086 * are to be compressed (and how), debugability via user-provided scripts 
087 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
088 * for doing post-processing on task logs, task's stdout, stderr, syslog. 
089 * and etc.</p>
090 * 
091 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
092 * <p><blockquote><pre>
093 *     // Create a new JobConf
094 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
095 *     
096 *     // Specify various job-specific parameters     
097 *     job.setJobName("myjob");
098 *     
099 *     FileInputFormat.setInputPaths(job, new Path("in"));
100 *     FileOutputFormat.setOutputPath(job, new Path("out"));
101 *     
102 *     job.setMapperClass(MyJob.MyMapper.class);
103 *     job.setCombinerClass(MyJob.MyReducer.class);
104 *     job.setReducerClass(MyJob.MyReducer.class);
105 *     
106 *     job.setInputFormat(SequenceFileInputFormat.class);
107 *     job.setOutputFormat(SequenceFileOutputFormat.class);
108 * </pre></blockquote>
109 * 
110 * @see JobClient
111 * @see ClusterStatus
112 * @see Tool
113 * @see DistributedCache
114 */
115@InterfaceAudience.Public
116@InterfaceStability.Stable
117public class JobConf extends Configuration {
118
119  private static final Log LOG = LogFactory.getLog(JobConf.class);
120  private static final Pattern JAVA_OPTS_XMX_PATTERN =
121          Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*");
122
123  static{
124    ConfigUtil.loadResources();
125  }
126
127  /**
128   * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
129   * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
130   */
131  @Deprecated
132  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
133    "mapred.task.maxvmem";
134
135  /**
136   * @deprecated 
137   */
138  @Deprecated
139  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
140    "mapred.task.limit.maxvmem";
141
142  /**
143   * @deprecated
144   */
145  @Deprecated
146  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
147    "mapred.task.default.maxvmem";
148
149  /**
150   * @deprecated
151   */
152  @Deprecated
153  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
154    "mapred.task.maxpmem";
155
156  /**
157   * A value which if set for memory related configuration options,
158   * indicates that the options are turned off.
159   * Deprecated because it makes no sense in the context of MR2.
160   */
161  @Deprecated
162  public static final long DISABLED_MEMORY_LIMIT = -1L;
163
164  /**
165   * Property name for the configuration property mapreduce.cluster.local.dir
166   */
167  public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
168
169  /**
170   * Name of the queue to which jobs will be submitted, if no queue
171   * name is mentioned.
172   */
173  public static final String DEFAULT_QUEUE_NAME = "default";
174
175  static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
176      JobContext.MAP_MEMORY_MB;
177
178  static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
179    JobContext.REDUCE_MEMORY_MB;
180
181  /**
182   * The variable is kept for M/R 1.x applications, while M/R 2.x applications
183   * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
184   */
185  @Deprecated
186  public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
187      "mapred.job.map.memory.mb";
188
189  /**
190   * The variable is kept for M/R 1.x applications, while M/R 2.x applications
191   * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
192   */
193  @Deprecated
194  public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
195      "mapred.job.reduce.memory.mb";
196
197  /** Pattern for the default unpacking behavior for job jars */
198  public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
199    Pattern.compile("(?:classes/|lib/).*");
200
201  /**
202   * Configuration key to set the java command line options for the child
203   * map and reduce tasks.
204   * 
205   * Java opts for the task tracker child processes.
206   * The following symbol, if present, will be interpolated: @taskid@. 
207   * It is replaced by current TaskID. Any other occurrences of '@' will go 
208   * unchanged.
209   * For example, to enable verbose gc logging to a file named for the taskid in
210   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
211   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
212   * 
213   * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
214   * other environment variables to the child processes.
215   * 
216   * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
217   *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
218   */
219  @Deprecated
220  public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
221  
222  /**
223   * Configuration key to set the java command line options for the map tasks.
224   * 
225   * Java opts for the task tracker child map processes.
226   * The following symbol, if present, will be interpolated: @taskid@. 
227   * It is replaced by current TaskID. Any other occurrences of '@' will go 
228   * unchanged.
229   * For example, to enable verbose gc logging to a file named for the taskid in
230   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
231   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
232   * 
233   * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
234   * other environment variables to the map processes.
235   */
236  public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
237    JobContext.MAP_JAVA_OPTS;
238  
239  /**
240   * Configuration key to set the java command line options for the reduce tasks.
241   * 
242   * Java opts for the task tracker child reduce processes.
243   * The following symbol, if present, will be interpolated: @taskid@. 
244   * It is replaced by current TaskID. Any other occurrences of '@' will go 
245   * unchanged.
246   * For example, to enable verbose gc logging to a file named for the taskid in
247   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
248   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
249   * 
250   * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
251   * pass process environment variables to the reduce processes.
252   */
253  public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
254    JobContext.REDUCE_JAVA_OPTS;
255
256  public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "";
257
258  /**
259   * @deprecated
260   * Configuration key to set the maximum virtual memory available to the child
261   * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
262   * longer have any effect.
263   */
264  @Deprecated
265  public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
266
267  /**
268   * @deprecated
269   * Configuration key to set the maximum virtual memory available to the
270   * map tasks (in kilo-bytes). This has been deprecated and will no
271   * longer have any effect.
272   */
273  @Deprecated
274  public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
275  
276  /**
277   * @deprecated
278   * Configuration key to set the maximum virtual memory available to the
279   * reduce tasks (in kilo-bytes). This has been deprecated and will no
280   * longer have any effect.
281   */
282  @Deprecated
283  public static final String MAPRED_REDUCE_TASK_ULIMIT =
284    "mapreduce.reduce.ulimit";
285
286
287  /**
288   * Configuration key to set the environment of the child map/reduce tasks.
289   * 
290   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
291   * reference existing environment variables via <code>$key</code> on
292   * Linux or <code>%key%</code> on Windows.
293   * 
294   * Example:
295   * <ul>
296   *   <li> A=foo - This will set the env variable A to foo. </li>
297   * </ul>
298   * 
299   * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
300   *                 {@link #MAPRED_REDUCE_TASK_ENV}
301   */
302  @Deprecated
303  public static final String MAPRED_TASK_ENV = "mapred.child.env";
304
305  /**
306   * Configuration key to set the environment of the child map tasks.
307   * 
308   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
309   * reference existing environment variables via <code>$key</code> on
310   * Linux or <code>%key%</code> on Windows.
311   * 
312   * Example:
313   * <ul>
314   *   <li> A=foo - This will set the env variable A to foo. </li>
315   * </ul>
316   */
317  public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
318  
319  /**
320   * Configuration key to set the environment of the child reduce tasks.
321   * 
322   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
323   * reference existing environment variables via <code>$key</code> on
324   * Linux or <code>%key%</code> on Windows.
325   * 
326   * Example:
327   * <ul>
328   *   <li> A=foo - This will set the env variable A to foo. </li>
329   * </ul>
330   */
331  public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
332
333  private Credentials credentials = new Credentials();
334  
335  /**
336   * Configuration key to set the logging {@link Level} for the map task.
337   *
338   * The allowed logging levels are:
339   * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
340   */
341  public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
342    JobContext.MAP_LOG_LEVEL;
343  
344  /**
345   * Configuration key to set the logging {@link Level} for the reduce task.
346   *
347   * The allowed logging levels are:
348   * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
349   */
350  public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
351    JobContext.REDUCE_LOG_LEVEL;
352  
353  /**
354   * Default logging level for map/reduce tasks.
355   */
356  public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
357
358  /**
359   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
360   * use {@link MRJobConfig#WORKFLOW_ID} instead
361   */
362  @Deprecated
363  public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
364
365  /**
366   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
367   * use {@link MRJobConfig#WORKFLOW_NAME} instead
368   */
369  @Deprecated
370  public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
371
372  /**
373   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
374   * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
375   */
376  @Deprecated
377  public static final String WORKFLOW_NODE_NAME =
378      MRJobConfig.WORKFLOW_NODE_NAME;
379
380  /**
381   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
382   * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
383   */
384  @Deprecated
385  public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
386      MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
387
388  /**
389   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
390   * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
391   */
392  @Deprecated
393  public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
394      MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
395
396  /**
397   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
398   * use {@link MRJobConfig#WORKFLOW_TAGS} instead
399   */
400  @Deprecated
401  public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
402
403  /**
404   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
405   * not use it
406   */
407  @Deprecated
408  public static final String MAPREDUCE_RECOVER_JOB =
409      "mapreduce.job.restart.recover";
410
411  /**
412   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
413   * not use it
414   */
415  @Deprecated
416  public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
417
418  /**
419   * Construct a map/reduce job configuration.
420   */
421  public JobConf() {
422    checkAndWarnDeprecation();
423  }
424
425  /** 
426   * Construct a map/reduce job configuration.
427   * 
428   * @param exampleClass a class whose containing jar is used as the job's jar.
429   */
430  public JobConf(Class exampleClass) {
431    setJarByClass(exampleClass);
432    checkAndWarnDeprecation();
433  }
434  
435  /**
436   * Construct a map/reduce job configuration.
437   * 
438   * @param conf a Configuration whose settings will be inherited.
439   */
440  public JobConf(Configuration conf) {
441    super(conf);
442    
443    if (conf instanceof JobConf) {
444      JobConf that = (JobConf)conf;
445      credentials = that.credentials;
446    }
447    
448    checkAndWarnDeprecation();
449  }
450
451
452  /** Construct a map/reduce job configuration.
453   * 
454   * @param conf a Configuration whose settings will be inherited.
455   * @param exampleClass a class whose containing jar is used as the job's jar.
456   */
457  public JobConf(Configuration conf, Class exampleClass) {
458    this(conf);
459    setJarByClass(exampleClass);
460  }
461
462
463  /** Construct a map/reduce configuration.
464   *
465   * @param config a Configuration-format XML job description file.
466   */
467  public JobConf(String config) {
468    this(new Path(config));
469  }
470
471  /** Construct a map/reduce configuration.
472   *
473   * @param config a Configuration-format XML job description file.
474   */
475  public JobConf(Path config) {
476    super();
477    addResource(config);
478    checkAndWarnDeprecation();
479  }
480
481  /** A new map/reduce configuration where the behavior of reading from the
482   * default resources can be turned off.
483   * <p>
484   * If the parameter {@code loadDefaults} is false, the new instance
485   * will not load resources from the default files.
486   *
487   * @param loadDefaults specifies whether to load from the default files
488   */
489  public JobConf(boolean loadDefaults) {
490    super(loadDefaults);
491    checkAndWarnDeprecation();
492  }
493
494  /**
495   * Get credentials for the job.
496   * @return credentials for the job
497   */
498  public Credentials getCredentials() {
499    return credentials;
500  }
501  
502  @Private
503  public void setCredentials(Credentials credentials) {
504    this.credentials = credentials;
505  }
506  
507  /**
508   * Get the user jar for the map-reduce job.
509   * 
510   * @return the user jar for the map-reduce job.
511   */
512  public String getJar() { return get(JobContext.JAR); }
513  
514  /**
515   * Set the user jar for the map-reduce job.
516   * 
517   * @param jar the user jar for the map-reduce job.
518   */
519  public void setJar(String jar) { set(JobContext.JAR, jar); }
520
521  /**
522   * Get the pattern for jar contents to unpack on the tasktracker
523   */
524  public Pattern getJarUnpackPattern() {
525    return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
526  }
527
528  
529  /**
530   * Set the job's jar file by finding an example class location.
531   * 
532   * @param cls the example class.
533   */
534  public void setJarByClass(Class cls) {
535    String jar = ClassUtil.findContainingJar(cls);
536    if (jar != null) {
537      setJar(jar);
538    }   
539  }
540
541  public String[] getLocalDirs() throws IOException {
542    return getTrimmedStrings(MRConfig.LOCAL_DIR);
543  }
544
545  /**
546   * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
547   */
548  @Deprecated
549  public void deleteLocalFiles() throws IOException {
550    String[] localDirs = getLocalDirs();
551    for (int i = 0; i < localDirs.length; i++) {
552      FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
553    }
554  }
555
556  public void deleteLocalFiles(String subdir) throws IOException {
557    String[] localDirs = getLocalDirs();
558    for (int i = 0; i < localDirs.length; i++) {
559      FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
560    }
561  }
562
563  /** 
564   * Constructs a local file name. Files are distributed among configured
565   * local directories.
566   */
567  public Path getLocalPath(String pathString) throws IOException {
568    return getLocalPath(MRConfig.LOCAL_DIR, pathString);
569  }
570
571  /**
572   * Get the reported username for this job.
573   * 
574   * @return the username
575   */
576  public String getUser() {
577    return get(JobContext.USER_NAME);
578  }
579  
580  /**
581   * Set the reported username for this job.
582   * 
583   * @param user the username for this job.
584   */
585  public void setUser(String user) {
586    set(JobContext.USER_NAME, user);
587  }
588
589
590  
591  /**
592   * Set whether the framework should keep the intermediate files for 
593   * failed tasks.
594   * 
595   * @param keep <code>true</code> if framework should keep the intermediate files 
596   *             for failed tasks, <code>false</code> otherwise.
597   * 
598   */
599  public void setKeepFailedTaskFiles(boolean keep) {
600    setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
601  }
602  
603  /**
604   * Should the temporary files for failed tasks be kept?
605   * 
606   * @return should the files be kept?
607   */
608  public boolean getKeepFailedTaskFiles() {
609    return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
610  }
611  
612  /**
613   * Set a regular expression for task names that should be kept. 
614   * The regular expression ".*_m_000123_0" would keep the files
615   * for the first instance of map 123 that ran.
616   * 
617   * @param pattern the java.util.regex.Pattern to match against the 
618   *        task names.
619   */
620  public void setKeepTaskFilesPattern(String pattern) {
621    set(JobContext.PRESERVE_FILES_PATTERN, pattern);
622  }
623  
624  /**
625   * Get the regular expression that is matched against the task names
626   * to see if we need to keep the files.
627   * 
628   * @return the pattern as a string, if it was set, othewise null.
629   */
630  public String getKeepTaskFilesPattern() {
631    return get(JobContext.PRESERVE_FILES_PATTERN);
632  }
633  
634  /**
635   * Set the current working directory for the default file system.
636   * 
637   * @param dir the new current working directory.
638   */
639  public void setWorkingDirectory(Path dir) {
640    dir = new Path(getWorkingDirectory(), dir);
641    set(JobContext.WORKING_DIR, dir.toString());
642  }
643  
644  /**
645   * Get the current working directory for the default file system.
646   * 
647   * @return the directory name.
648   */
649  public Path getWorkingDirectory() {
650    String name = get(JobContext.WORKING_DIR);
651    if (name != null) {
652      return new Path(name);
653    } else {
654      try {
655        Path dir = FileSystem.get(this).getWorkingDirectory();
656        set(JobContext.WORKING_DIR, dir.toString());
657        return dir;
658      } catch (IOException e) {
659        throw new RuntimeException(e);
660      }
661    }
662  }
663  
664  /**
665   * Sets the number of tasks that a spawned task JVM should run
666   * before it exits
667   * @param numTasks the number of tasks to execute; defaults to 1;
668   * -1 signifies no limit
669   */
670  public void setNumTasksToExecutePerJvm(int numTasks) {
671    setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
672  }
673  
674  /**
675   * Get the number of tasks that a spawned JVM should execute
676   */
677  public int getNumTasksToExecutePerJvm() {
678    return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
679  }
680  
681  /**
682   * Get the {@link InputFormat} implementation for the map-reduce job,
683   * defaults to {@link TextInputFormat} if not specified explicity.
684   * 
685   * @return the {@link InputFormat} implementation for the map-reduce job.
686   */
687  public InputFormat getInputFormat() {
688    return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
689                                                             TextInputFormat.class,
690                                                             InputFormat.class),
691                                                    this);
692  }
693  
694  /**
695   * Set the {@link InputFormat} implementation for the map-reduce job.
696   * 
697   * @param theClass the {@link InputFormat} implementation for the map-reduce 
698   *                 job.
699   */
700  public void setInputFormat(Class<? extends InputFormat> theClass) {
701    setClass("mapred.input.format.class", theClass, InputFormat.class);
702  }
703  
704  /**
705   * Get the {@link OutputFormat} implementation for the map-reduce job,
706   * defaults to {@link TextOutputFormat} if not specified explicity.
707   * 
708   * @return the {@link OutputFormat} implementation for the map-reduce job.
709   */
710  public OutputFormat getOutputFormat() {
711    return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
712                                                              TextOutputFormat.class,
713                                                              OutputFormat.class),
714                                                     this);
715  }
716
717  /**
718   * Get the {@link OutputCommitter} implementation for the map-reduce job,
719   * defaults to {@link FileOutputCommitter} if not specified explicitly.
720   * 
721   * @return the {@link OutputCommitter} implementation for the map-reduce job.
722   */
723  public OutputCommitter getOutputCommitter() {
724    return (OutputCommitter)ReflectionUtils.newInstance(
725      getClass("mapred.output.committer.class", FileOutputCommitter.class,
726               OutputCommitter.class), this);
727  }
728
729  /**
730   * Set the {@link OutputCommitter} implementation for the map-reduce job.
731   * 
732   * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
733   *                 job.
734   */
735  public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
736    setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
737  }
738  
739  /**
740   * Set the {@link OutputFormat} implementation for the map-reduce job.
741   * 
742   * @param theClass the {@link OutputFormat} implementation for the map-reduce 
743   *                 job.
744   */
745  public void setOutputFormat(Class<? extends OutputFormat> theClass) {
746    setClass("mapred.output.format.class", theClass, OutputFormat.class);
747  }
748
749  /**
750   * Should the map outputs be compressed before transfer?
751   * 
752   * @param compress should the map outputs be compressed?
753   */
754  public void setCompressMapOutput(boolean compress) {
755    setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
756  }
757  
758  /**
759   * Are the outputs of the maps be compressed?
760   * 
761   * @return <code>true</code> if the outputs of the maps are to be compressed,
762   *         <code>false</code> otherwise.
763   */
764  public boolean getCompressMapOutput() {
765    return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
766  }
767
768  /**
769   * Set the given class as the  {@link CompressionCodec} for the map outputs.
770   * 
771   * @param codecClass the {@link CompressionCodec} class that will compress  
772   *                   the map outputs.
773   */
774  public void 
775  setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
776    setCompressMapOutput(true);
777    setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
778             CompressionCodec.class);
779  }
780  
781  /**
782   * Get the {@link CompressionCodec} for compressing the map outputs.
783   * 
784   * @param defaultValue the {@link CompressionCodec} to return if not set
785   * @return the {@link CompressionCodec} class that should be used to compress the 
786   *         map outputs.
787   * @throws IllegalArgumentException if the class was specified, but not found
788   */
789  public Class<? extends CompressionCodec> 
790  getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
791    Class<? extends CompressionCodec> codecClass = defaultValue;
792    String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
793    if (name != null) {
794      try {
795        codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
796      } catch (ClassNotFoundException e) {
797        throw new IllegalArgumentException("Compression codec " + name + 
798                                           " was not found.", e);
799      }
800    }
801    return codecClass;
802  }
803  
804  /**
805   * Get the key class for the map output data. If it is not set, use the
806   * (final) output key class. This allows the map output key class to be
807   * different than the final output key class.
808   *  
809   * @return the map output key class.
810   */
811  public Class<?> getMapOutputKeyClass() {
812    Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
813    if (retv == null) {
814      retv = getOutputKeyClass();
815    }
816    return retv;
817  }
818  
819  /**
820   * Set the key class for the map output data. This allows the user to
821   * specify the map output key class to be different than the final output
822   * value class.
823   * 
824   * @param theClass the map output key class.
825   */
826  public void setMapOutputKeyClass(Class<?> theClass) {
827    setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
828  }
829  
830  /**
831   * Get the value class for the map output data. If it is not set, use the
832   * (final) output value class This allows the map output value class to be
833   * different than the final output value class.
834   *  
835   * @return the map output value class.
836   */
837  public Class<?> getMapOutputValueClass() {
838    Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
839        Object.class);
840    if (retv == null) {
841      retv = getOutputValueClass();
842    }
843    return retv;
844  }
845  
846  /**
847   * Set the value class for the map output data. This allows the user to
848   * specify the map output value class to be different than the final output
849   * value class.
850   * 
851   * @param theClass the map output value class.
852   */
853  public void setMapOutputValueClass(Class<?> theClass) {
854    setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
855  }
856  
857  /**
858   * Get the key class for the job output data.
859   * 
860   * @return the key class for the job output data.
861   */
862  public Class<?> getOutputKeyClass() {
863    return getClass(JobContext.OUTPUT_KEY_CLASS,
864                    LongWritable.class, Object.class);
865  }
866  
867  /**
868   * Set the key class for the job output data.
869   * 
870   * @param theClass the key class for the job output data.
871   */
872  public void setOutputKeyClass(Class<?> theClass) {
873    setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
874  }
875
876  /**
877   * Get the {@link RawComparator} comparator used to compare keys.
878   * 
879   * @return the {@link RawComparator} comparator used to compare keys.
880   */
881  public RawComparator getOutputKeyComparator() {
882    Class<? extends RawComparator> theClass = getClass(
883      JobContext.KEY_COMPARATOR, null, RawComparator.class);
884    if (theClass != null)
885      return ReflectionUtils.newInstance(theClass, this);
886    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
887  }
888
889  /**
890   * Set the {@link RawComparator} comparator used to compare keys.
891   * 
892   * @param theClass the {@link RawComparator} comparator used to 
893   *                 compare keys.
894   * @see #setOutputValueGroupingComparator(Class)                 
895   */
896  public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
897    setClass(JobContext.KEY_COMPARATOR,
898             theClass, RawComparator.class);
899  }
900
901  /**
902   * Set the {@link KeyFieldBasedComparator} options used to compare keys.
903   * 
904   * @param keySpec the key specification of the form -k pos1[,pos2], where,
905   *  pos is of the form f[.c][opts], where f is the number
906   *  of the key field to use, and c is the number of the first character from
907   *  the beginning of the field. Fields and character posns are numbered 
908   *  starting with 1; a character position of zero in pos2 indicates the
909   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
910   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
911   *  (the end of the field). opts are ordering options. The supported options
912   *  are:
913   *    -n, (Sort numerically)
914   *    -r, (Reverse the result of comparison)                 
915   */
916  public void setKeyFieldComparatorOptions(String keySpec) {
917    setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
918    set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
919  }
920  
921  /**
922   * Get the {@link KeyFieldBasedComparator} options
923   */
924  public String getKeyFieldComparatorOption() {
925    return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
926  }
927
928  /**
929   * Set the {@link KeyFieldBasedPartitioner} options used for 
930   * {@link Partitioner}
931   * 
932   * @param keySpec the key specification of the form -k pos1[,pos2], where,
933   *  pos is of the form f[.c][opts], where f is the number
934   *  of the key field to use, and c is the number of the first character from
935   *  the beginning of the field. Fields and character posns are numbered 
936   *  starting with 1; a character position of zero in pos2 indicates the
937   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
938   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
939   *  (the end of the field).
940   */
941  public void setKeyFieldPartitionerOptions(String keySpec) {
942    setPartitionerClass(KeyFieldBasedPartitioner.class);
943    set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
944  }
945  
946  /**
947   * Get the {@link KeyFieldBasedPartitioner} options
948   */
949  public String getKeyFieldPartitionerOption() {
950    return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
951  }
952
953  /**
954   * Get the user defined {@link WritableComparable} comparator for
955   * grouping keys of inputs to the combiner.
956   *
957   * @return comparator set by the user for grouping values.
958   * @see #setCombinerKeyGroupingComparator(Class) for details.
959   */
960  public RawComparator getCombinerKeyGroupingComparator() {
961    Class<? extends RawComparator> theClass = getClass(
962        JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
963    if (theClass == null) {
964      return getOutputKeyComparator();
965    }
966
967    return ReflectionUtils.newInstance(theClass, this);
968  }
969
970  /** 
971   * Get the user defined {@link WritableComparable} comparator for 
972   * grouping keys of inputs to the reduce.
973   * 
974   * @return comparator set by the user for grouping values.
975   * @see #setOutputValueGroupingComparator(Class) for details.
976   */
977  public RawComparator getOutputValueGroupingComparator() {
978    Class<? extends RawComparator> theClass = getClass(
979      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
980    if (theClass == null) {
981      return getOutputKeyComparator();
982    }
983    
984    return ReflectionUtils.newInstance(theClass, this);
985  }
986
987  /**
988   * Set the user defined {@link RawComparator} comparator for
989   * grouping keys in the input to the combiner.
990   *
991   * <p>This comparator should be provided if the equivalence rules for keys
992   * for sorting the intermediates are different from those for grouping keys
993   * before each call to
994   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
995   *
996   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
997   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
998   *
999   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1000   * how keys are sorted, this can be used in conjunction to simulate
1001   * <i>secondary sort on values</i>.</p>
1002   *
1003   * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1004   * <i>stable</i> in any sense. (In any case, with the order of available
1005   * map-outputs to the combiner being non-deterministic, it wouldn't make
1006   * that much sense.)</p>
1007   *
1008   * @param theClass the comparator class to be used for grouping keys for the
1009   * combiner. It should implement <code>RawComparator</code>.
1010   * @see #setOutputKeyComparatorClass(Class)
1011   */
1012  public void setCombinerKeyGroupingComparator(
1013      Class<? extends RawComparator> theClass) {
1014    setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1015        theClass, RawComparator.class);
1016  }
1017
1018  /** 
1019   * Set the user defined {@link RawComparator} comparator for 
1020   * grouping keys in the input to the reduce.
1021   * 
1022   * <p>This comparator should be provided if the equivalence rules for keys
1023   * for sorting the intermediates are different from those for grouping keys
1024   * before each call to 
1025   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1026   *  
1027   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1028   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1029   * 
1030   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
1031   * how keys are sorted, this can be used in conjunction to simulate 
1032   * <i>secondary sort on values</i>.</p>
1033   *  
1034   * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
1035   * <i>stable</i> in any sense. (In any case, with the order of available 
1036   * map-outputs to the reduce being non-deterministic, it wouldn't make 
1037   * that much sense.)</p>
1038   * 
1039   * @param theClass the comparator class to be used for grouping keys. 
1040   *                 It should implement <code>RawComparator</code>.
1041   * @see #setOutputKeyComparatorClass(Class)
1042   * @see #setCombinerKeyGroupingComparator(Class)
1043   */
1044  public void setOutputValueGroupingComparator(
1045      Class<? extends RawComparator> theClass) {
1046    setClass(JobContext.GROUP_COMPARATOR_CLASS,
1047             theClass, RawComparator.class);
1048  }
1049
1050  /**
1051   * Should the framework use the new context-object code for running
1052   * the mapper?
1053   * @return true, if the new api should be used
1054   */
1055  public boolean getUseNewMapper() {
1056    return getBoolean("mapred.mapper.new-api", false);
1057  }
1058  /**
1059   * Set whether the framework should use the new api for the mapper.
1060   * This is the default for jobs submitted with the new Job api.
1061   * @param flag true, if the new api should be used
1062   */
1063  public void setUseNewMapper(boolean flag) {
1064    setBoolean("mapred.mapper.new-api", flag);
1065  }
1066
1067  /**
1068   * Should the framework use the new context-object code for running
1069   * the reducer?
1070   * @return true, if the new api should be used
1071   */
1072  public boolean getUseNewReducer() {
1073    return getBoolean("mapred.reducer.new-api", false);
1074  }
1075  /**
1076   * Set whether the framework should use the new api for the reducer. 
1077   * This is the default for jobs submitted with the new Job api.
1078   * @param flag true, if the new api should be used
1079   */
1080  public void setUseNewReducer(boolean flag) {
1081    setBoolean("mapred.reducer.new-api", flag);
1082  }
1083
1084  /**
1085   * Get the value class for job outputs.
1086   * 
1087   * @return the value class for job outputs.
1088   */
1089  public Class<?> getOutputValueClass() {
1090    return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1091  }
1092  
1093  /**
1094   * Set the value class for job outputs.
1095   * 
1096   * @param theClass the value class for job outputs.
1097   */
1098  public void setOutputValueClass(Class<?> theClass) {
1099    setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1100  }
1101
1102  /**
1103   * Get the {@link Mapper} class for the job.
1104   * 
1105   * @return the {@link Mapper} class for the job.
1106   */
1107  public Class<? extends Mapper> getMapperClass() {
1108    return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1109  }
1110  
1111  /**
1112   * Set the {@link Mapper} class for the job.
1113   * 
1114   * @param theClass the {@link Mapper} class for the job.
1115   */
1116  public void setMapperClass(Class<? extends Mapper> theClass) {
1117    setClass("mapred.mapper.class", theClass, Mapper.class);
1118  }
1119
1120  /**
1121   * Get the {@link MapRunnable} class for the job.
1122   * 
1123   * @return the {@link MapRunnable} class for the job.
1124   */
1125  public Class<? extends MapRunnable> getMapRunnerClass() {
1126    return getClass("mapred.map.runner.class",
1127                    MapRunner.class, MapRunnable.class);
1128  }
1129  
1130  /**
1131   * Expert: Set the {@link MapRunnable} class for the job.
1132   * 
1133   * Typically used to exert greater control on {@link Mapper}s.
1134   * 
1135   * @param theClass the {@link MapRunnable} class for the job.
1136   */
1137  public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1138    setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1139  }
1140
1141  /**
1142   * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
1143   * to be sent to the {@link Reducer}s.
1144   * 
1145   * @return the {@link Partitioner} used to partition map-outputs.
1146   */
1147  public Class<? extends Partitioner> getPartitionerClass() {
1148    return getClass("mapred.partitioner.class",
1149                    HashPartitioner.class, Partitioner.class);
1150  }
1151  
1152  /**
1153   * Set the {@link Partitioner} class used to partition 
1154   * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1155   * 
1156   * @param theClass the {@link Partitioner} used to partition map-outputs.
1157   */
1158  public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1159    setClass("mapred.partitioner.class", theClass, Partitioner.class);
1160  }
1161
1162  /**
1163   * Get the {@link Reducer} class for the job.
1164   * 
1165   * @return the {@link Reducer} class for the job.
1166   */
1167  public Class<? extends Reducer> getReducerClass() {
1168    return getClass("mapred.reducer.class",
1169                    IdentityReducer.class, Reducer.class);
1170  }
1171  
1172  /**
1173   * Set the {@link Reducer} class for the job.
1174   * 
1175   * @param theClass the {@link Reducer} class for the job.
1176   */
1177  public void setReducerClass(Class<? extends Reducer> theClass) {
1178    setClass("mapred.reducer.class", theClass, Reducer.class);
1179  }
1180
1181  /**
1182   * Get the user-defined <i>combiner</i> class used to combine map-outputs 
1183   * before being sent to the reducers. Typically the combiner is same as the
1184   * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1185   * 
1186   * @return the user-defined combiner class used to combine map-outputs.
1187   */
1188  public Class<? extends Reducer> getCombinerClass() {
1189    return getClass("mapred.combiner.class", null, Reducer.class);
1190  }
1191
1192  /**
1193   * Set the user-defined <i>combiner</i> class used to combine map-outputs 
1194   * before being sent to the reducers. 
1195   * 
1196   * <p>The combiner is an application-specified aggregation operation, which
1197   * can help cut down the amount of data transferred between the 
1198   * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1199   * 
1200   * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1201   * the mapper and reducer tasks. In general, the combiner is called as the
1202   * sort/merge result is written to disk. The combiner must:
1203   * <ul>
1204   *   <li> be side-effect free</li>
1205   *   <li> have the same input and output key types and the same input and 
1206   *        output value types</li>
1207   * </ul>
1208   * 
1209   * <p>Typically the combiner is same as the <code>Reducer</code> for the  
1210   * job i.e. {@link #setReducerClass(Class)}.</p>
1211   * 
1212   * @param theClass the user-defined combiner class used to combine 
1213   *                 map-outputs.
1214   */
1215  public void setCombinerClass(Class<? extends Reducer> theClass) {
1216    setClass("mapred.combiner.class", theClass, Reducer.class);
1217  }
1218  
1219  /**
1220   * Should speculative execution be used for this job? 
1221   * Defaults to <code>true</code>.
1222   * 
1223   * @return <code>true</code> if speculative execution be used for this job,
1224   *         <code>false</code> otherwise.
1225   */
1226  public boolean getSpeculativeExecution() { 
1227    return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1228  }
1229  
1230  /**
1231   * Turn speculative execution on or off for this job. 
1232   * 
1233   * @param speculativeExecution <code>true</code> if speculative execution 
1234   *                             should be turned on, else <code>false</code>.
1235   */
1236  public void setSpeculativeExecution(boolean speculativeExecution) {
1237    setMapSpeculativeExecution(speculativeExecution);
1238    setReduceSpeculativeExecution(speculativeExecution);
1239  }
1240
1241  /**
1242   * Should speculative execution be used for this job for map tasks? 
1243   * Defaults to <code>true</code>.
1244   * 
1245   * @return <code>true</code> if speculative execution be 
1246   *                           used for this job for map tasks,
1247   *         <code>false</code> otherwise.
1248   */
1249  public boolean getMapSpeculativeExecution() { 
1250    return getBoolean(JobContext.MAP_SPECULATIVE, true);
1251  }
1252  
1253  /**
1254   * Turn speculative execution on or off for this job for map tasks. 
1255   * 
1256   * @param speculativeExecution <code>true</code> if speculative execution 
1257   *                             should be turned on for map tasks,
1258   *                             else <code>false</code>.
1259   */
1260  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1261    setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1262  }
1263
1264  /**
1265   * Should speculative execution be used for this job for reduce tasks? 
1266   * Defaults to <code>true</code>.
1267   * 
1268   * @return <code>true</code> if speculative execution be used 
1269   *                           for reduce tasks for this job,
1270   *         <code>false</code> otherwise.
1271   */
1272  public boolean getReduceSpeculativeExecution() { 
1273    return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1274  }
1275  
1276  /**
1277   * Turn speculative execution on or off for this job for reduce tasks. 
1278   * 
1279   * @param speculativeExecution <code>true</code> if speculative execution 
1280   *                             should be turned on for reduce tasks,
1281   *                             else <code>false</code>.
1282   */
1283  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1284    setBoolean(JobContext.REDUCE_SPECULATIVE, 
1285               speculativeExecution);
1286  }
1287
1288  /**
1289   * Get configured the number of reduce tasks for this job.
1290   * Defaults to <code>1</code>.
1291   * 
1292   * @return the number of reduce tasks for this job.
1293   */
1294  public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1295  
1296  /**
1297   * Set the number of map tasks for this job.
1298   * 
1299   * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
1300   * number of spawned map tasks depends on the number of {@link InputSplit}s 
1301   * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1302   *  
1303   * A custom {@link InputFormat} is typically used to accurately control 
1304   * the number of map tasks for the job.</p>
1305   * 
1306   * <b id="NoOfMaps">How many maps?</b>
1307   * 
1308   * <p>The number of maps is usually driven by the total size of the inputs 
1309   * i.e. total number of blocks of the input files.</p>
1310   *  
1311   * <p>The right level of parallelism for maps seems to be around 10-100 maps 
1312   * per-node, although it has been set up to 300 or so for very cpu-light map 
1313   * tasks. Task setup takes awhile, so it is best if the maps take at least a 
1314   * minute to execute.</p>
1315   * 
1316   * <p>The default behavior of file-based {@link InputFormat}s is to split the 
1317   * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
1318   * bytes, of input files. However, the {@link FileSystem} blocksize of the 
1319   * input files is treated as an upper bound for input splits. A lower bound 
1320   * on the split size can be set via 
1321   * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1322   * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1323   *  
1324   * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
1325   * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
1326   * used to set it even higher.</p>
1327   * 
1328   * @param n the number of map tasks for this job.
1329   * @see InputFormat#getSplits(JobConf, int)
1330   * @see FileInputFormat
1331   * @see FileSystem#getDefaultBlockSize()
1332   * @see FileStatus#getBlockSize()
1333   */
1334  public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1335
1336  /**
1337   * Get configured the number of reduce tasks for this job. Defaults to 
1338   * <code>1</code>.
1339   * 
1340   * @return the number of reduce tasks for this job.
1341   */
1342  public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1343  
1344  /**
1345   * Set the requisite number of reduce tasks for this job.
1346   * 
1347   * <b id="NoOfReduces">How many reduces?</b>
1348   * 
1349   * <p>The right number of reduces seems to be <code>0.95</code> or 
1350   * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
1351   * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1352   * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1353   * </p>
1354   * 
1355   * <p>With <code>0.95</code> all of the reduces can launch immediately and 
1356   * start transfering map outputs as the maps finish. With <code>1.75</code> 
1357   * the faster nodes will finish their first round of reduces and launch a 
1358   * second wave of reduces doing a much better job of load balancing.</p>
1359   * 
1360   * <p>Increasing the number of reduces increases the framework overhead, but 
1361   * increases load balancing and lowers the cost of failures.</p>
1362   * 
1363   * <p>The scaling factors above are slightly less than whole numbers to 
1364   * reserve a few reduce slots in the framework for speculative-tasks, failures
1365   * etc.</p> 
1366   *
1367   * <b id="ReducerNone">Reducer NONE</b>
1368   * 
1369   * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1370   * 
1371   * <p>In this case the output of the map-tasks directly go to distributed 
1372   * file-system, to the path set by 
1373   * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
1374   * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1375   * 
1376   * @param n the number of reduce tasks for this job.
1377   */
1378  public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1379  
1380  /** 
1381   * Get the configured number of maximum attempts that will be made to run a
1382   * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1383   * property. If this property is not already set, the default is 4 attempts.
1384   *  
1385   * @return the max number of attempts per map task.
1386   */
1387  public int getMaxMapAttempts() {
1388    return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1389  }
1390  
1391  /** 
1392   * Expert: Set the number of maximum attempts that will be made to run a
1393   * map task.
1394   * 
1395   * @param n the number of attempts per map task.
1396   */
1397  public void setMaxMapAttempts(int n) {
1398    setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1399  }
1400
1401  /** 
1402   * Get the configured number of maximum attempts  that will be made to run a
1403   * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1404   * property. If this property is not already set, the default is 4 attempts.
1405   * 
1406   * @return the max number of attempts per reduce task.
1407   */
1408  public int getMaxReduceAttempts() {
1409    return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1410  }
1411  /** 
1412   * Expert: Set the number of maximum attempts that will be made to run a
1413   * reduce task.
1414   * 
1415   * @param n the number of attempts per reduce task.
1416   */
1417  public void setMaxReduceAttempts(int n) {
1418    setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1419  }
1420  
1421  /**
1422   * Get the user-specified job name. This is only used to identify the 
1423   * job to the user.
1424   * 
1425   * @return the job's name, defaulting to "".
1426   */
1427  public String getJobName() {
1428    return get(JobContext.JOB_NAME, "");
1429  }
1430  
1431  /**
1432   * Set the user-specified job name.
1433   * 
1434   * @param name the job's new name.
1435   */
1436  public void setJobName(String name) {
1437    set(JobContext.JOB_NAME, name);
1438  }
1439  
1440  /**
1441   * Get the user-specified session identifier. The default is the empty string.
1442   *
1443   * The session identifier is used to tag metric data that is reported to some
1444   * performance metrics system via the org.apache.hadoop.metrics API.  The 
1445   * session identifier is intended, in particular, for use by Hadoop-On-Demand 
1446   * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
1447   * HOD will set the session identifier by modifying the mapred-site.xml file 
1448   * before starting the cluster.
1449   *
1450   * When not running under HOD, this identifer is expected to remain set to 
1451   * the empty string.
1452   *
1453   * @return the session identifier, defaulting to "".
1454   */
1455  @Deprecated
1456  public String getSessionId() {
1457      return get("session.id", "");
1458  }
1459  
1460  /**
1461   * Set the user-specified session identifier.  
1462   *
1463   * @param sessionId the new session id.
1464   */
1465  @Deprecated
1466  public void setSessionId(String sessionId) {
1467      set("session.id", sessionId);
1468  }
1469    
1470  /**
1471   * Set the maximum no. of failures of a given job per tasktracker.
1472   * If the no. of task failures exceeds <code>noFailures</code>, the 
1473   * tasktracker is <i>blacklisted</i> for this job. 
1474   * 
1475   * @param noFailures maximum no. of failures of a given job per tasktracker.
1476   */
1477  public void setMaxTaskFailuresPerTracker(int noFailures) {
1478    setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1479  }
1480  
1481  /**
1482   * Expert: Get the maximum no. of failures of a given job per tasktracker.
1483   * If the no. of task failures exceeds this, the tasktracker is
1484   * <i>blacklisted</i> for this job. 
1485   * 
1486   * @return the maximum no. of failures of a given job per tasktracker.
1487   */
1488  public int getMaxTaskFailuresPerTracker() {
1489    return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1490  }
1491
1492  /**
1493   * Get the maximum percentage of map tasks that can fail without 
1494   * the job being aborted. 
1495   * 
1496   * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
1497   * attempts before being declared as <i>failed</i>.
1498   *  
1499   * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1500   * the job being declared as {@link JobStatus#FAILED}.
1501   * 
1502   * @return the maximum percentage of map tasks that can fail without
1503   *         the job being aborted.
1504   */
1505  public int getMaxMapTaskFailuresPercent() {
1506    return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1507  }
1508
1509  /**
1510   * Expert: Set the maximum percentage of map tasks that can fail without the
1511   * job being aborted. 
1512   * 
1513   * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
1514   * before being declared as <i>failed</i>.
1515   * 
1516   * @param percent the maximum percentage of map tasks that can fail without 
1517   *                the job being aborted.
1518   */
1519  public void setMaxMapTaskFailuresPercent(int percent) {
1520    setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1521  }
1522  
1523  /**
1524   * Get the maximum percentage of reduce tasks that can fail without 
1525   * the job being aborted. 
1526   * 
1527   * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1528   * attempts before being declared as <i>failed</i>.
1529   * 
1530   * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
1531   * in the job being declared as {@link JobStatus#FAILED}.
1532   * 
1533   * @return the maximum percentage of reduce tasks that can fail without
1534   *         the job being aborted.
1535   */
1536  public int getMaxReduceTaskFailuresPercent() {
1537    return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1538  }
1539  
1540  /**
1541   * Set the maximum percentage of reduce tasks that can fail without the job
1542   * being aborted.
1543   * 
1544   * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1545   * attempts before being declared as <i>failed</i>.
1546   * 
1547   * @param percent the maximum percentage of reduce tasks that can fail without 
1548   *                the job being aborted.
1549   */
1550  public void setMaxReduceTaskFailuresPercent(int percent) {
1551    setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1552  }
1553  
1554  /**
1555   * Set {@link JobPriority} for this job.
1556   *
1557   * @param prio the {@link JobPriority} for this job.
1558   */
1559  public void setJobPriority(JobPriority prio) {
1560    set(JobContext.PRIORITY, prio.toString());
1561  }
1562
1563  /**
1564   * Set {@link JobPriority} for this job.
1565   *
1566   * @param prio the {@link JobPriority} for this job.
1567   */
1568  public void setJobPriorityAsInteger(int prio) {
1569    set(JobContext.PRIORITY, Integer.toString(prio));
1570  }
1571
1572  /**
1573   * Get the {@link JobPriority} for this job.
1574   *
1575   * @return the {@link JobPriority} for this job.
1576   */
1577  public JobPriority getJobPriority() {
1578    String prio = get(JobContext.PRIORITY);
1579    if (prio == null) {
1580      return JobPriority.DEFAULT;
1581    }
1582
1583    JobPriority priority = JobPriority.DEFAULT;
1584    try {
1585      priority = JobPriority.valueOf(prio);
1586    } catch (IllegalArgumentException e) {
1587      return convertToJobPriority(Integer.parseInt(prio));
1588    }
1589    return priority;
1590  }
1591
1592  /**
1593   * Get the priority for this job.
1594   *
1595   * @return the priority for this job.
1596   */
1597  public int getJobPriorityAsInteger() {
1598    String priority = get(JobContext.PRIORITY);
1599    if (priority == null) {
1600      return 0;
1601    }
1602
1603    int jobPriority = 0;
1604    try {
1605      jobPriority = convertPriorityToInteger(priority);
1606    } catch (IllegalArgumentException e) {
1607      return Integer.parseInt(priority);
1608    }
1609    return jobPriority;
1610  }
1611
1612  private int convertPriorityToInteger(String priority) {
1613    JobPriority jobPriority = JobPriority.valueOf(priority);
1614    switch (jobPriority) {
1615    case VERY_HIGH :
1616      return 5;
1617    case HIGH :
1618      return 4;
1619    case NORMAL :
1620      return 3;
1621    case LOW :
1622      return 2;
1623    case VERY_LOW :
1624      return 1;
1625    case DEFAULT :
1626      return 0;
1627    default:
1628      break;
1629    }
1630
1631    // If a user sets the priority as "UNDEFINED_PRIORITY", we can return
1632    // 0 which is also default value.
1633    return 0;
1634  }
1635
1636  private JobPriority convertToJobPriority(int priority) {
1637    switch (priority) {
1638    case 5 :
1639      return JobPriority.VERY_HIGH;
1640    case 4 :
1641      return JobPriority.HIGH;
1642    case 3 :
1643      return JobPriority.NORMAL;
1644    case 2 :
1645      return JobPriority.LOW;
1646    case 1 :
1647      return JobPriority.VERY_LOW;
1648    case 0 :
1649      return JobPriority.DEFAULT;
1650    default:
1651      break;
1652    }
1653
1654    return JobPriority.UNDEFINED_PRIORITY;
1655  }
1656
1657  /**
1658   * Set JobSubmitHostName for this job.
1659   * 
1660   * @param hostname the JobSubmitHostName for this job.
1661   */
1662  void setJobSubmitHostName(String hostname) {
1663    set(MRJobConfig.JOB_SUBMITHOST, hostname);
1664  }
1665  
1666  /**
1667   * Get the  JobSubmitHostName for this job.
1668   * 
1669   * @return the JobSubmitHostName for this job.
1670   */
1671  String getJobSubmitHostName() {
1672    String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1673    
1674    return hostname;
1675  }
1676
1677  /**
1678   * Set JobSubmitHostAddress for this job.
1679   * 
1680   * @param hostadd the JobSubmitHostAddress for this job.
1681   */
1682  void setJobSubmitHostAddress(String hostadd) {
1683    set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1684  }
1685  
1686  /**
1687   * Get JobSubmitHostAddress for this job.
1688   * 
1689   * @return  JobSubmitHostAddress for this job.
1690   */
1691  String getJobSubmitHostAddress() {
1692    String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1693    
1694    return hostadd;
1695  }
1696
1697  /**
1698   * Get whether the task profiling is enabled.
1699   * @return true if some tasks will be profiled
1700   */
1701  public boolean getProfileEnabled() {
1702    return getBoolean(JobContext.TASK_PROFILE, false);
1703  }
1704
1705  /**
1706   * Set whether the system should collect profiler information for some of 
1707   * the tasks in this job? The information is stored in the user log 
1708   * directory.
1709   * @param newValue true means it should be gathered
1710   */
1711  public void setProfileEnabled(boolean newValue) {
1712    setBoolean(JobContext.TASK_PROFILE, newValue);
1713  }
1714
1715  /**
1716   * Get the profiler configuration arguments.
1717   *
1718   * The default value for this property is
1719   * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1720   * 
1721   * @return the parameters to pass to the task child to configure profiling
1722   */
1723  public String getProfileParams() {
1724    return get(JobContext.TASK_PROFILE_PARAMS,
1725        MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
1726  }
1727
1728  /**
1729   * Set the profiler configuration arguments. If the string contains a '%s' it
1730   * will be replaced with the name of the profiling output file when the task
1731   * runs.
1732   *
1733   * This value is passed to the task child JVM on the command line.
1734   *
1735   * @param value the configuration string
1736   */
1737  public void setProfileParams(String value) {
1738    set(JobContext.TASK_PROFILE_PARAMS, value);
1739  }
1740
1741  /**
1742   * Get the range of maps or reduces to profile.
1743   * @param isMap is the task a map?
1744   * @return the task ranges
1745   */
1746  public IntegerRanges getProfileTaskRange(boolean isMap) {
1747    return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
1748                       JobContext.NUM_REDUCE_PROFILES), "0-2");
1749  }
1750
1751  /**
1752   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1753   * must also be called.
1754   * @param newValue a set of integer ranges of the map ids
1755   */
1756  public void setProfileTaskRange(boolean isMap, String newValue) {
1757    // parse the value to make sure it is legal
1758      new Configuration.IntegerRanges(newValue);
1759    set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
1760          newValue);
1761  }
1762
1763  /**
1764   * Set the debug script to run when the map tasks fail.
1765   * 
1766   * <p>The debug script can aid debugging of failed map tasks. The script is 
1767   * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1768   * 
1769   * <p>The debug command, run on the node where the map failed, is:</p>
1770   * <p><blockquote><pre>
1771   * $script $stdout $stderr $syslog $jobconf.
1772   * </pre></blockquote>
1773   * 
1774   * <p> The script file is distributed through {@link DistributedCache} 
1775   * APIs. The script needs to be symlinked. </p>
1776   * 
1777   * <p>Here is an example on how to submit a script 
1778   * <p><blockquote><pre>
1779   * job.setMapDebugScript("./myscript");
1780   * DistributedCache.createSymlink(job);
1781   * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1782   * </pre></blockquote>
1783   * 
1784   * @param mDbgScript the script name
1785   */
1786  public void  setMapDebugScript(String mDbgScript) {
1787    set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1788  }
1789  
1790  /**
1791   * Get the map task's debug script.
1792   * 
1793   * @return the debug Script for the mapred job for failed map tasks.
1794   * @see #setMapDebugScript(String)
1795   */
1796  public String getMapDebugScript() {
1797    return get(JobContext.MAP_DEBUG_SCRIPT);
1798  }
1799  
1800  /**
1801   * Set the debug script to run when the reduce tasks fail.
1802   * 
1803   * <p>The debug script can aid debugging of failed reduce tasks. The script
1804   * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1805   * 
1806   * <p>The debug command, run on the node where the map failed, is:</p>
1807   * <p><blockquote><pre>
1808   * $script $stdout $stderr $syslog $jobconf.
1809   * </pre></blockquote>
1810   * 
1811   * <p> The script file is distributed through {@link DistributedCache} 
1812   * APIs. The script file needs to be symlinked </p>
1813   * 
1814   * <p>Here is an example on how to submit a script 
1815   * <p><blockquote><pre>
1816   * job.setReduceDebugScript("./myscript");
1817   * DistributedCache.createSymlink(job);
1818   * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1819   * </pre></blockquote>
1820   * 
1821   * @param rDbgScript the script name
1822   */
1823  public void  setReduceDebugScript(String rDbgScript) {
1824    set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1825  }
1826  
1827  /**
1828   * Get the reduce task's debug Script
1829   * 
1830   * @return the debug script for the mapred job for failed reduce tasks.
1831   * @see #setReduceDebugScript(String)
1832   */
1833  public String getReduceDebugScript() {
1834    return get(JobContext.REDUCE_DEBUG_SCRIPT);
1835  }
1836
1837  /**
1838   * Get the uri to be invoked in-order to send a notification after the job 
1839   * has completed (success/failure). 
1840   * 
1841   * @return the job end notification uri, <code>null</code> if it hasn't
1842   *         been set.
1843   * @see #setJobEndNotificationURI(String)
1844   */
1845  public String getJobEndNotificationURI() {
1846    return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1847  }
1848
1849  /**
1850   * Set the uri to be invoked in-order to send a notification after the job
1851   * has completed (success/failure).
1852   * 
1853   * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
1854   * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
1855   * identifier and completion-status respectively.</p>
1856   * 
1857   * <p>This is typically used by application-writers to implement chaining of 
1858   * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1859   * 
1860   * @param uri the job end notification uri
1861   * @see JobStatus
1862   */
1863  public void setJobEndNotificationURI(String uri) {
1864    set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1865  }
1866
1867  /**
1868   * Get job-specific shared directory for use as scratch space
1869   * 
1870   * <p>
1871   * When a job starts, a shared directory is created at location
1872   * <code>
1873   * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1874   * This directory is exposed to the users through 
1875   * <code>mapreduce.job.local.dir </code>.
1876   * So, the tasks can use this space 
1877   * as scratch space and share files among them. </p>
1878   * This value is available as System property also.
1879   * 
1880   * @return The localized job specific shared directory
1881   */
1882  public String getJobLocalDir() {
1883    return get(JobContext.JOB_LOCAL_DIR);
1884  }
1885
1886  /**
1887   * Get memory required to run a map task of the job, in MB.
1888   * 
1889   * If a value is specified in the configuration, it is returned.
1890   * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}.
1891   * <p>
1892   * For backward compatibility, if the job configuration sets the
1893   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1894   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1895   * after converting it from bytes to MB.
1896   * @return memory required to run a map task of the job, in MB,
1897   */
1898  public long getMemoryForMapTask() {
1899    long value = getDeprecatedMemoryValue();
1900    if (value < 0) {
1901      return getMemoryRequired(TaskType.MAP);
1902    }
1903    return value;
1904  }
1905
1906  public void setMemoryForMapTask(long mem) {
1907    setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1908    // In case that M/R 1.x applications use the old property name
1909    setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1910  }
1911
1912  /**
1913   * Get memory required to run a reduce task of the job, in MB.
1914   * 
1915   * If a value is specified in the configuration, it is returned.
1916   * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}.
1917   * <p>
1918   * For backward compatibility, if the job configuration sets the
1919   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1920   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1921   * after converting it from bytes to MB.
1922   * @return memory required to run a reduce task of the job, in MB.
1923   */
1924  public long getMemoryForReduceTask() {
1925    long value = getDeprecatedMemoryValue();
1926    if (value < 0) {
1927      return getMemoryRequired(TaskType.REDUCE);
1928    }
1929    return value;
1930  }
1931  
1932  // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1933  // converted into MBs.
1934  // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1935  // value.
1936  private long getDeprecatedMemoryValue() {
1937    long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
1938        DISABLED_MEMORY_LIMIT);
1939    if (oldValue > 0) {
1940      oldValue /= (1024*1024);
1941    }
1942    return oldValue;
1943  }
1944
1945  public void setMemoryForReduceTask(long mem) {
1946    setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1947    // In case that M/R 1.x applications use the old property name
1948    setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1949  }
1950
1951  /**
1952   * Return the name of the queue to which this job is submitted.
1953   * Defaults to 'default'.
1954   * 
1955   * @return name of the queue
1956   */
1957  public String getQueueName() {
1958    return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1959  }
1960  
1961  /**
1962   * Set the name of the queue to which this job should be submitted.
1963   * 
1964   * @param queueName Name of the queue
1965   */
1966  public void setQueueName(String queueName) {
1967    set(JobContext.QUEUE_NAME, queueName);
1968  }
1969  
1970  /**
1971   * Normalize the negative values in configuration
1972   * 
1973   * @param val
1974   * @return normalized value
1975   */
1976  public static long normalizeMemoryConfigValue(long val) {
1977    if (val < 0) {
1978      val = DISABLED_MEMORY_LIMIT;
1979    }
1980    return val;
1981  }
1982
1983  /** 
1984   * Find a jar that contains a class of the same name, if any.
1985   * It will return a jar file, even if that is not the first thing
1986   * on the class path that has a class with the same name.
1987   * 
1988   * @param my_class the class to find.
1989   * @return a jar file that contains the class, or null.
1990   */
1991  public static String findContainingJar(Class my_class) {
1992    return ClassUtil.findContainingJar(my_class);
1993  }
1994
1995  /**
1996   * Get the memory required to run a task of this job, in bytes. See
1997   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1998   * <p>
1999   * This method is deprecated. Now, different memory limits can be
2000   * set for map and reduce tasks of a job, in MB. 
2001   * <p>
2002   * For backward compatibility, if the job configuration sets the
2003   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. 
2004   * Otherwise, this method will return the larger of the values returned by 
2005   * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
2006   * after converting them into bytes.
2007   *
2008   * @return Memory required to run a task of this job, in bytes.
2009   * @see #setMaxVirtualMemoryForTask(long)
2010   * @deprecated Use {@link #getMemoryForMapTask()} and
2011   *             {@link #getMemoryForReduceTask()}
2012   */
2013  @Deprecated
2014  public long getMaxVirtualMemoryForTask() {
2015    LOG.warn(
2016      "getMaxVirtualMemoryForTask() is deprecated. " +
2017      "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
2018
2019    long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
2020        Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024);
2021    return value;
2022  }
2023
2024  /**
2025   * Set the maximum amount of memory any task of this job can use. See
2026   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
2027   * <p>
2028   * mapred.task.maxvmem is split into
2029   * mapreduce.map.memory.mb
2030   * and mapreduce.map.memory.mb,mapred
2031   * each of the new key are set
2032   * as mapred.task.maxvmem / 1024
2033   * as new values are in MB
2034   *
2035   * @param vmem Maximum amount of virtual memory in bytes any task of this job
2036   *             can use.
2037   * @see #getMaxVirtualMemoryForTask()
2038   * @deprecated
2039   *  Use {@link #setMemoryForMapTask(long mem)}  and
2040   *  Use {@link #setMemoryForReduceTask(long mem)}
2041   */
2042  @Deprecated
2043  public void setMaxVirtualMemoryForTask(long vmem) {
2044    LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
2045      "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
2046    if (vmem < 0) {
2047      throw new IllegalArgumentException("Task memory allocation may not be < 0");
2048    }
2049
2050    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
2051      setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
2052      setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
2053    }else{
2054      this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
2055    }
2056  }
2057
2058  /**
2059   * @deprecated this variable is deprecated and nolonger in use.
2060   */
2061  @Deprecated
2062  public long getMaxPhysicalMemoryForTask() {
2063    LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
2064              + " Refer to the APIs getMemoryForMapTask() and"
2065              + " getMemoryForReduceTask() for details.");
2066    return -1;
2067  }
2068
2069  /*
2070   * @deprecated this
2071   */
2072  @Deprecated
2073  public void setMaxPhysicalMemoryForTask(long mem) {
2074    LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2075        + " The value set is ignored. Refer to "
2076        + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2077  }
2078
2079  static String deprecatedString(String key) {
2080    return "The variable " + key + " is no longer used.";
2081  }
2082
2083  private void checkAndWarnDeprecation() {
2084    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2085      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2086                + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2087                + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2088    }
2089    if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2090      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2091    }
2092    if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2093      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2094    }
2095    if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2096      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2097    }
2098  }
2099
2100  private String getConfiguredTaskJavaOpts(TaskType taskType) {
2101    String userClasspath = "";
2102    String adminClasspath = "";
2103    if (taskType == TaskType.MAP) {
2104      userClasspath = get(MAPRED_MAP_TASK_JAVA_OPTS,
2105          get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS));
2106      adminClasspath = get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
2107          MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
2108    } else {
2109      userClasspath = get(MAPRED_REDUCE_TASK_JAVA_OPTS,
2110          get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS));
2111      adminClasspath = get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
2112          MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
2113    }
2114
2115    return adminClasspath + " " + userClasspath;
2116  }
2117
2118  @Private
2119  public String getTaskJavaOpts(TaskType taskType) {
2120    String javaOpts = getConfiguredTaskJavaOpts(taskType);
2121
2122    if (!javaOpts.contains("-Xmx")) {
2123      float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
2124          MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
2125
2126      if (heapRatio > 1.0f || heapRatio < 0) {
2127        LOG.warn("Invalid value for " + MRJobConfig.HEAP_MEMORY_MB_RATIO
2128            + ", using the default.");
2129        heapRatio = MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO;
2130      }
2131
2132      int taskContainerMb = getMemoryRequired(taskType);
2133      int taskHeapSize = (int)Math.ceil(taskContainerMb * heapRatio);
2134
2135      String xmxArg = String.format("-Xmx%dm", taskHeapSize);
2136      LOG.info("Task java-opts do not specify heap size. Setting task attempt" +
2137          " jvm max heap size to " + xmxArg);
2138
2139      javaOpts += " " + xmxArg;
2140    }
2141
2142    return javaOpts;
2143  }
2144
2145  /**
2146   * Parse the Maximum heap size from the java opts as specified by the -Xmx option
2147   * Format: -Xmx&lt;size&gt;[g|G|m|M|k|K]
2148   * @param javaOpts String to parse to read maximum heap size
2149   * @return Maximum heap size in MB or -1 if not specified
2150   */
2151  @Private
2152  @VisibleForTesting
2153  public static int parseMaximumHeapSizeMB(String javaOpts) {
2154    // Find the last matching -Xmx following word boundaries
2155    Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts);
2156    if (m.matches()) {
2157      long size = Long.parseLong(m.group(1));
2158      if (size <= 0) {
2159        return -1;
2160      }
2161      if (m.group(2).isEmpty()) {
2162        // -Xmx specified in bytes
2163        return (int) (size / (1024 * 1024));
2164      }
2165      char unit = m.group(2).charAt(0);
2166      switch (unit) {
2167        case 'g':
2168        case 'G':
2169          // -Xmx specified in GB
2170          return (int) (size * 1024);
2171        case 'm':
2172        case 'M':
2173          // -Xmx specified in MB
2174          return (int) size;
2175        case 'k':
2176        case 'K':
2177          // -Xmx specified in KB
2178          return (int) (size / 1024);
2179      }
2180    }
2181    // -Xmx not specified
2182    return -1;
2183  }
2184
2185  private int getMemoryRequiredHelper(
2186      String configName, int defaultValue, int heapSize, float heapRatio) {
2187    int memory = getInt(configName, -1);
2188    if (memory <= 0) {
2189      if (heapSize > 0) {
2190        memory = (int) Math.ceil(heapSize / heapRatio);
2191        LOG.info("Figured value for " + configName + " from javaOpts");
2192      } else {
2193        memory = defaultValue;
2194      }
2195    }
2196
2197    return memory;
2198  }
2199
2200  @Private
2201  public int getMemoryRequired(TaskType taskType) {
2202    int memory = 1024;
2203    int heapSize = parseMaximumHeapSizeMB(getConfiguredTaskJavaOpts(taskType));
2204    float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
2205        MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
2206    if (taskType == TaskType.MAP) {
2207      return getMemoryRequiredHelper(MRJobConfig.MAP_MEMORY_MB,
2208          MRJobConfig.DEFAULT_MAP_MEMORY_MB, heapSize, heapRatio);
2209    } else if (taskType == TaskType.REDUCE) {
2210      return getMemoryRequiredHelper(MRJobConfig.REDUCE_MEMORY_MB,
2211          MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, heapSize, heapRatio);
2212    } else {
2213      return memory;
2214    }
2215  }
2216
2217  /* For debugging. Dump configurations to system output as XML format. */
2218  public static void main(String[] args) throws Exception {
2219    new JobConf(new Configuration()).writeXml(System.out);
2220  }
2221
2222}
2223