001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021
022import java.io.IOException;
023import java.util.regex.Matcher;
024import java.util.regex.Pattern;
025
026import com.google.common.annotations.VisibleForTesting;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceAudience.Private;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.io.LongWritable;
037import org.apache.hadoop.io.RawComparator;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.io.WritableComparable;
040import org.apache.hadoop.io.WritableComparator;
041import org.apache.hadoop.io.compress.CompressionCodec;
042import org.apache.hadoop.mapred.lib.HashPartitioner;
043import org.apache.hadoop.mapred.lib.IdentityMapper;
044import org.apache.hadoop.mapred.lib.IdentityReducer;
045import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
046import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
047import org.apache.hadoop.mapreduce.MRConfig;
048import org.apache.hadoop.mapreduce.MRJobConfig;
049import org.apache.hadoop.mapreduce.TaskType;
050import org.apache.hadoop.mapreduce.filecache.DistributedCache;
051import org.apache.hadoop.mapreduce.util.ConfigUtil;
052import org.apache.hadoop.security.Credentials;
053import org.apache.hadoop.util.ClassUtil;
054import org.apache.hadoop.util.ReflectionUtils;
055import org.apache.hadoop.util.Tool;
056import org.apache.log4j.Level;
057
058/** 
059 * A map/reduce job configuration.
060 * 
061 * <p><code>JobConf</code> is the primary interface for a user to describe a 
062 * map-reduce job to the Hadoop framework for execution. The framework tries to
063 * faithfully execute the job as-is described by <code>JobConf</code>, however:
064 * <ol>
065 *   <li>
066 *   Some configuration parameters might have been marked as 
067 *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
068 *   final</a> by administrators and hence cannot be altered.
069 *   </li>
070 *   <li>
071 *   While some job parameters are straight-forward to set 
072 *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
073 *   with the rest of the framework and/or job-configuration and is relatively
074 *   more complex for the user to control finely
075 *   (e.g. {@link #setNumMapTasks(int)}).
076 *   </li>
077 * </ol>
078 * 
079 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
080 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
081 * {@link OutputFormat} implementations to be used etc.
082 *
083 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
084 * of the job such as <code>Comparator</code>s to be used, files to be put in  
085 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
086 * are to be compressed (and how), debugability via user-provided scripts 
087 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
088 * for doing post-processing on task logs, task's stdout, stderr, syslog. 
089 * and etc.</p>
090 * 
091 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
092 * <p><blockquote><pre>
093 *     // Create a new JobConf
094 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
095 *     
096 *     // Specify various job-specific parameters     
097 *     job.setJobName("myjob");
098 *     
099 *     FileInputFormat.setInputPaths(job, new Path("in"));
100 *     FileOutputFormat.setOutputPath(job, new Path("out"));
101 *     
102 *     job.setMapperClass(MyJob.MyMapper.class);
103 *     job.setCombinerClass(MyJob.MyReducer.class);
104 *     job.setReducerClass(MyJob.MyReducer.class);
105 *     
106 *     job.setInputFormat(SequenceFileInputFormat.class);
107 *     job.setOutputFormat(SequenceFileOutputFormat.class);
108 * </pre></blockquote>
109 * 
110 * @see JobClient
111 * @see ClusterStatus
112 * @see Tool
113 * @see DistributedCache
114 */
115@InterfaceAudience.Public
116@InterfaceStability.Stable
117public class JobConf extends Configuration {
118
119  private static final Log LOG = LogFactory.getLog(JobConf.class);
120  private static final Pattern JAVA_OPTS_XMX_PATTERN =
121          Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*");
122
123  static{
124    ConfigUtil.loadResources();
125  }
126
127  /**
128   * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
129   * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
130   */
131  @Deprecated
132  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
133    "mapred.task.maxvmem";
134
135  /**
136   * @deprecated 
137   */
138  @Deprecated
139  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
140    "mapred.task.limit.maxvmem";
141
142  /**
143   * @deprecated
144   */
145  @Deprecated
146  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
147    "mapred.task.default.maxvmem";
148
149  /**
150   * @deprecated
151   */
152  @Deprecated
153  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
154    "mapred.task.maxpmem";
155
156  /**
157   * A value which if set for memory related configuration options,
158   * indicates that the options are turned off.
159   * Deprecated because it makes no sense in the context of MR2.
160   */
161  @Deprecated
162  public static final long DISABLED_MEMORY_LIMIT = -1L;
163
164  /**
165   * Property name for the configuration property mapreduce.cluster.local.dir
166   */
167  public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
168
169  /**
170   * Name of the queue to which jobs will be submitted, if no queue
171   * name is mentioned.
172   */
173  public static final String DEFAULT_QUEUE_NAME = "default";
174
175  static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
176      JobContext.MAP_MEMORY_MB;
177
178  static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
179    JobContext.REDUCE_MEMORY_MB;
180
181  /**
182   * The variable is kept for M/R 1.x applications, while M/R 2.x applications
183   * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
184   */
185  @Deprecated
186  public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
187      "mapred.job.map.memory.mb";
188
189  /**
190   * The variable is kept for M/R 1.x applications, while M/R 2.x applications
191   * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
192   */
193  @Deprecated
194  public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
195      "mapred.job.reduce.memory.mb";
196
197  /** Pattern for the default unpacking behavior for job jars */
198  public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
199    Pattern.compile("(?:classes/|lib/).*");
200
201  /**
202   * Configuration key to set the java command line options for the child
203   * map and reduce tasks.
204   * 
205   * Java opts for the task tracker child processes.
206   * The following symbol, if present, will be interpolated: @taskid@. 
207   * It is replaced by current TaskID. Any other occurrences of '@' will go 
208   * unchanged.
209   * For example, to enable verbose gc logging to a file named for the taskid in
210   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
211   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
212   * 
213   * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
214   * other environment variables to the child processes.
215   * 
216   * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
217   *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
218   */
219  @Deprecated
220  public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
221  
222  /**
223   * Configuration key to set the java command line options for the map tasks.
224   * 
225   * Java opts for the task tracker child map processes.
226   * The following symbol, if present, will be interpolated: @taskid@. 
227   * It is replaced by current TaskID. Any other occurrences of '@' will go 
228   * unchanged.
229   * For example, to enable verbose gc logging to a file named for the taskid in
230   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
231   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
232   * 
233   * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
234   * other environment variables to the map processes.
235   */
236  public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
237    JobContext.MAP_JAVA_OPTS;
238  
239  /**
240   * Configuration key to set the java command line options for the reduce tasks.
241   * 
242   * Java opts for the task tracker child reduce processes.
243   * The following symbol, if present, will be interpolated: @taskid@. 
244   * It is replaced by current TaskID. Any other occurrences of '@' will go 
245   * unchanged.
246   * For example, to enable verbose gc logging to a file named for the taskid in
247   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
248   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
249   * 
250   * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
251   * pass process environment variables to the reduce processes.
252   */
253  public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
254    JobContext.REDUCE_JAVA_OPTS;
255
256  public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "";
257
258  /**
259   * @deprecated
260   * Configuration key to set the maximum virtual memory available to the child
261   * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
262   * longer have any effect.
263   */
264  @Deprecated
265  public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
266
267  /**
268   * @deprecated
269   * Configuration key to set the maximum virtual memory available to the
270   * map tasks (in kilo-bytes). This has been deprecated and will no
271   * longer have any effect.
272   */
273  @Deprecated
274  public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
275  
276  /**
277   * @deprecated
278   * Configuration key to set the maximum virtual memory available to the
279   * reduce tasks (in kilo-bytes). This has been deprecated and will no
280   * longer have any effect.
281   */
282  @Deprecated
283  public static final String MAPRED_REDUCE_TASK_ULIMIT =
284    "mapreduce.reduce.ulimit";
285
286
287  /**
288   * Configuration key to set the environment of the child map/reduce tasks.
289   * 
290   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
291   * reference existing environment variables via <code>$key</code> on
292   * Linux or <code>%key%</code> on Windows.
293   * 
294   * Example:
295   * <ul>
296   *   <li> A=foo - This will set the env variable A to foo. </li>
297   *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
298   *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
299   * </ul>
300   * 
301   * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
302   *                 {@link #MAPRED_REDUCE_TASK_ENV}
303   */
304  @Deprecated
305  public static final String MAPRED_TASK_ENV = "mapred.child.env";
306
307  /**
308   * Configuration key to set the environment of the child map tasks.
309   * 
310   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
311   * reference existing environment variables via <code>$key</code> on
312   * Linux or <code>%key%</code> on Windows.
313   * 
314   * Example:
315   * <ul>
316   *   <li> A=foo - This will set the env variable A to foo. </li>
317   *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
318   *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
319   * </ul>
320   */
321  public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
322  
323  /**
324   * Configuration key to set the environment of the child reduce tasks.
325   * 
326   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
327   * reference existing environment variables via <code>$key</code> on
328   * Linux or <code>%key%</code> on Windows.
329   * 
330   * Example:
331   * <ul>
332   *   <li> A=foo - This will set the env variable A to foo. </li>
333   *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
334   *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
335   * </ul>
336   */
337  public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
338
339  private Credentials credentials = new Credentials();
340  
341  /**
342   * Configuration key to set the logging {@link Level} for the map task.
343   *
344   * The allowed logging levels are:
345   * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
346   */
347  public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
348    JobContext.MAP_LOG_LEVEL;
349  
350  /**
351   * Configuration key to set the logging {@link Level} for the reduce task.
352   *
353   * The allowed logging levels are:
354   * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
355   */
356  public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
357    JobContext.REDUCE_LOG_LEVEL;
358  
359  /**
360   * Default logging level for map/reduce tasks.
361   */
362  public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
363
364  /**
365   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
366   * use {@link MRJobConfig#WORKFLOW_ID} instead
367   */
368  @Deprecated
369  public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
370
371  /**
372   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
373   * use {@link MRJobConfig#WORKFLOW_NAME} instead
374   */
375  @Deprecated
376  public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
377
378  /**
379   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
380   * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
381   */
382  @Deprecated
383  public static final String WORKFLOW_NODE_NAME =
384      MRJobConfig.WORKFLOW_NODE_NAME;
385
386  /**
387   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
388   * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
389   */
390  @Deprecated
391  public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
392      MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
393
394  /**
395   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
396   * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
397   */
398  @Deprecated
399  public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
400      MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
401
402  /**
403   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
404   * use {@link MRJobConfig#WORKFLOW_TAGS} instead
405   */
406  @Deprecated
407  public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
408
409  /**
410   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
411   * not use it
412   */
413  @Deprecated
414  public static final String MAPREDUCE_RECOVER_JOB =
415      "mapreduce.job.restart.recover";
416
417  /**
418   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
419   * not use it
420   */
421  @Deprecated
422  public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
423
424  /**
425   * Construct a map/reduce job configuration.
426   */
427  public JobConf() {
428    checkAndWarnDeprecation();
429  }
430
431  /** 
432   * Construct a map/reduce job configuration.
433   * 
434   * @param exampleClass a class whose containing jar is used as the job's jar.
435   */
436  public JobConf(Class exampleClass) {
437    setJarByClass(exampleClass);
438    checkAndWarnDeprecation();
439  }
440  
441  /**
442   * Construct a map/reduce job configuration.
443   * 
444   * @param conf a Configuration whose settings will be inherited.
445   */
446  public JobConf(Configuration conf) {
447    super(conf);
448    
449    if (conf instanceof JobConf) {
450      JobConf that = (JobConf)conf;
451      credentials = that.credentials;
452    }
453    
454    checkAndWarnDeprecation();
455  }
456
457
458  /** Construct a map/reduce job configuration.
459   * 
460   * @param conf a Configuration whose settings will be inherited.
461   * @param exampleClass a class whose containing jar is used as the job's jar.
462   */
463  public JobConf(Configuration conf, Class exampleClass) {
464    this(conf);
465    setJarByClass(exampleClass);
466  }
467
468
469  /** Construct a map/reduce configuration.
470   *
471   * @param config a Configuration-format XML job description file.
472   */
473  public JobConf(String config) {
474    this(new Path(config));
475  }
476
477  /** Construct a map/reduce configuration.
478   *
479   * @param config a Configuration-format XML job description file.
480   */
481  public JobConf(Path config) {
482    super();
483    addResource(config);
484    checkAndWarnDeprecation();
485  }
486
487  /** A new map/reduce configuration where the behavior of reading from the
488   * default resources can be turned off.
489   * <p>
490   * If the parameter {@code loadDefaults} is false, the new instance
491   * will not load resources from the default files.
492   *
493   * @param loadDefaults specifies whether to load from the default files
494   */
495  public JobConf(boolean loadDefaults) {
496    super(loadDefaults);
497    checkAndWarnDeprecation();
498  }
499
500  /**
501   * Get credentials for the job.
502   * @return credentials for the job
503   */
504  public Credentials getCredentials() {
505    return credentials;
506  }
507  
508  @Private
509  public void setCredentials(Credentials credentials) {
510    this.credentials = credentials;
511  }
512  
513  /**
514   * Get the user jar for the map-reduce job.
515   * 
516   * @return the user jar for the map-reduce job.
517   */
518  public String getJar() { return get(JobContext.JAR); }
519  
520  /**
521   * Set the user jar for the map-reduce job.
522   * 
523   * @param jar the user jar for the map-reduce job.
524   */
525  public void setJar(String jar) { set(JobContext.JAR, jar); }
526
527  /**
528   * Get the pattern for jar contents to unpack on the tasktracker
529   */
530  public Pattern getJarUnpackPattern() {
531    return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
532  }
533
534  
535  /**
536   * Set the job's jar file by finding an example class location.
537   * 
538   * @param cls the example class.
539   */
540  public void setJarByClass(Class cls) {
541    String jar = ClassUtil.findContainingJar(cls);
542    if (jar != null) {
543      setJar(jar);
544    }   
545  }
546
547  public String[] getLocalDirs() throws IOException {
548    return getTrimmedStrings(MRConfig.LOCAL_DIR);
549  }
550
551  /**
552   * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
553   */
554  @Deprecated
555  public void deleteLocalFiles() throws IOException {
556    String[] localDirs = getLocalDirs();
557    for (int i = 0; i < localDirs.length; i++) {
558      FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
559    }
560  }
561
562  public void deleteLocalFiles(String subdir) throws IOException {
563    String[] localDirs = getLocalDirs();
564    for (int i = 0; i < localDirs.length; i++) {
565      FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
566    }
567  }
568
569  /** 
570   * Constructs a local file name. Files are distributed among configured
571   * local directories.
572   */
573  public Path getLocalPath(String pathString) throws IOException {
574    return getLocalPath(MRConfig.LOCAL_DIR, pathString);
575  }
576
577  /**
578   * Get the reported username for this job.
579   * 
580   * @return the username
581   */
582  public String getUser() {
583    return get(JobContext.USER_NAME);
584  }
585  
586  /**
587   * Set the reported username for this job.
588   * 
589   * @param user the username for this job.
590   */
591  public void setUser(String user) {
592    set(JobContext.USER_NAME, user);
593  }
594
595
596  
597  /**
598   * Set whether the framework should keep the intermediate files for 
599   * failed tasks.
600   * 
601   * @param keep <code>true</code> if framework should keep the intermediate files 
602   *             for failed tasks, <code>false</code> otherwise.
603   * 
604   */
605  public void setKeepFailedTaskFiles(boolean keep) {
606    setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
607  }
608  
609  /**
610   * Should the temporary files for failed tasks be kept?
611   * 
612   * @return should the files be kept?
613   */
614  public boolean getKeepFailedTaskFiles() {
615    return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
616  }
617  
618  /**
619   * Set a regular expression for task names that should be kept. 
620   * The regular expression ".*_m_000123_0" would keep the files
621   * for the first instance of map 123 that ran.
622   * 
623   * @param pattern the java.util.regex.Pattern to match against the 
624   *        task names.
625   */
626  public void setKeepTaskFilesPattern(String pattern) {
627    set(JobContext.PRESERVE_FILES_PATTERN, pattern);
628  }
629  
630  /**
631   * Get the regular expression that is matched against the task names
632   * to see if we need to keep the files.
633   * 
634   * @return the pattern as a string, if it was set, othewise null.
635   */
636  public String getKeepTaskFilesPattern() {
637    return get(JobContext.PRESERVE_FILES_PATTERN);
638  }
639  
640  /**
641   * Set the current working directory for the default file system.
642   * 
643   * @param dir the new current working directory.
644   */
645  public void setWorkingDirectory(Path dir) {
646    dir = new Path(getWorkingDirectory(), dir);
647    set(JobContext.WORKING_DIR, dir.toString());
648  }
649  
650  /**
651   * Get the current working directory for the default file system.
652   * 
653   * @return the directory name.
654   */
655  public Path getWorkingDirectory() {
656    String name = get(JobContext.WORKING_DIR);
657    if (name != null) {
658      return new Path(name);
659    } else {
660      try {
661        Path dir = FileSystem.get(this).getWorkingDirectory();
662        set(JobContext.WORKING_DIR, dir.toString());
663        return dir;
664      } catch (IOException e) {
665        throw new RuntimeException(e);
666      }
667    }
668  }
669  
670  /**
671   * Sets the number of tasks that a spawned task JVM should run
672   * before it exits
673   * @param numTasks the number of tasks to execute; defaults to 1;
674   * -1 signifies no limit
675   */
676  public void setNumTasksToExecutePerJvm(int numTasks) {
677    setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
678  }
679  
680  /**
681   * Get the number of tasks that a spawned JVM should execute
682   */
683  public int getNumTasksToExecutePerJvm() {
684    return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
685  }
686  
687  /**
688   * Get the {@link InputFormat} implementation for the map-reduce job,
689   * defaults to {@link TextInputFormat} if not specified explicity.
690   * 
691   * @return the {@link InputFormat} implementation for the map-reduce job.
692   */
693  public InputFormat getInputFormat() {
694    return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
695                                                             TextInputFormat.class,
696                                                             InputFormat.class),
697                                                    this);
698  }
699  
700  /**
701   * Set the {@link InputFormat} implementation for the map-reduce job.
702   * 
703   * @param theClass the {@link InputFormat} implementation for the map-reduce 
704   *                 job.
705   */
706  public void setInputFormat(Class<? extends InputFormat> theClass) {
707    setClass("mapred.input.format.class", theClass, InputFormat.class);
708  }
709  
710  /**
711   * Get the {@link OutputFormat} implementation for the map-reduce job,
712   * defaults to {@link TextOutputFormat} if not specified explicity.
713   * 
714   * @return the {@link OutputFormat} implementation for the map-reduce job.
715   */
716  public OutputFormat getOutputFormat() {
717    return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
718                                                              TextOutputFormat.class,
719                                                              OutputFormat.class),
720                                                     this);
721  }
722
723  /**
724   * Get the {@link OutputCommitter} implementation for the map-reduce job,
725   * defaults to {@link FileOutputCommitter} if not specified explicitly.
726   * 
727   * @return the {@link OutputCommitter} implementation for the map-reduce job.
728   */
729  public OutputCommitter getOutputCommitter() {
730    return (OutputCommitter)ReflectionUtils.newInstance(
731      getClass("mapred.output.committer.class", FileOutputCommitter.class,
732               OutputCommitter.class), this);
733  }
734
735  /**
736   * Set the {@link OutputCommitter} implementation for the map-reduce job.
737   * 
738   * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
739   *                 job.
740   */
741  public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
742    setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
743  }
744  
745  /**
746   * Set the {@link OutputFormat} implementation for the map-reduce job.
747   * 
748   * @param theClass the {@link OutputFormat} implementation for the map-reduce 
749   *                 job.
750   */
751  public void setOutputFormat(Class<? extends OutputFormat> theClass) {
752    setClass("mapred.output.format.class", theClass, OutputFormat.class);
753  }
754
755  /**
756   * Should the map outputs be compressed before transfer?
757   * 
758   * @param compress should the map outputs be compressed?
759   */
760  public void setCompressMapOutput(boolean compress) {
761    setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
762  }
763  
764  /**
765   * Are the outputs of the maps be compressed?
766   * 
767   * @return <code>true</code> if the outputs of the maps are to be compressed,
768   *         <code>false</code> otherwise.
769   */
770  public boolean getCompressMapOutput() {
771    return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
772  }
773
774  /**
775   * Set the given class as the  {@link CompressionCodec} for the map outputs.
776   * 
777   * @param codecClass the {@link CompressionCodec} class that will compress  
778   *                   the map outputs.
779   */
780  public void 
781  setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
782    setCompressMapOutput(true);
783    setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
784             CompressionCodec.class);
785  }
786  
787  /**
788   * Get the {@link CompressionCodec} for compressing the map outputs.
789   * 
790   * @param defaultValue the {@link CompressionCodec} to return if not set
791   * @return the {@link CompressionCodec} class that should be used to compress the 
792   *         map outputs.
793   * @throws IllegalArgumentException if the class was specified, but not found
794   */
795  public Class<? extends CompressionCodec> 
796  getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
797    Class<? extends CompressionCodec> codecClass = defaultValue;
798    String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
799    if (name != null) {
800      try {
801        codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
802      } catch (ClassNotFoundException e) {
803        throw new IllegalArgumentException("Compression codec " + name + 
804                                           " was not found.", e);
805      }
806    }
807    return codecClass;
808  }
809  
810  /**
811   * Get the key class for the map output data. If it is not set, use the
812   * (final) output key class. This allows the map output key class to be
813   * different than the final output key class.
814   *  
815   * @return the map output key class.
816   */
817  public Class<?> getMapOutputKeyClass() {
818    Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
819    if (retv == null) {
820      retv = getOutputKeyClass();
821    }
822    return retv;
823  }
824  
825  /**
826   * Set the key class for the map output data. This allows the user to
827   * specify the map output key class to be different than the final output
828   * value class.
829   * 
830   * @param theClass the map output key class.
831   */
832  public void setMapOutputKeyClass(Class<?> theClass) {
833    setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
834  }
835  
836  /**
837   * Get the value class for the map output data. If it is not set, use the
838   * (final) output value class This allows the map output value class to be
839   * different than the final output value class.
840   *  
841   * @return the map output value class.
842   */
843  public Class<?> getMapOutputValueClass() {
844    Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
845        Object.class);
846    if (retv == null) {
847      retv = getOutputValueClass();
848    }
849    return retv;
850  }
851  
852  /**
853   * Set the value class for the map output data. This allows the user to
854   * specify the map output value class to be different than the final output
855   * value class.
856   * 
857   * @param theClass the map output value class.
858   */
859  public void setMapOutputValueClass(Class<?> theClass) {
860    setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
861  }
862  
863  /**
864   * Get the key class for the job output data.
865   * 
866   * @return the key class for the job output data.
867   */
868  public Class<?> getOutputKeyClass() {
869    return getClass(JobContext.OUTPUT_KEY_CLASS,
870                    LongWritable.class, Object.class);
871  }
872  
873  /**
874   * Set the key class for the job output data.
875   * 
876   * @param theClass the key class for the job output data.
877   */
878  public void setOutputKeyClass(Class<?> theClass) {
879    setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
880  }
881
882  /**
883   * Get the {@link RawComparator} comparator used to compare keys.
884   * 
885   * @return the {@link RawComparator} comparator used to compare keys.
886   */
887  public RawComparator getOutputKeyComparator() {
888    Class<? extends RawComparator> theClass = getClass(
889      JobContext.KEY_COMPARATOR, null, RawComparator.class);
890    if (theClass != null)
891      return ReflectionUtils.newInstance(theClass, this);
892    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
893  }
894
895  /**
896   * Set the {@link RawComparator} comparator used to compare keys.
897   * 
898   * @param theClass the {@link RawComparator} comparator used to 
899   *                 compare keys.
900   * @see #setOutputValueGroupingComparator(Class)                 
901   */
902  public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
903    setClass(JobContext.KEY_COMPARATOR,
904             theClass, RawComparator.class);
905  }
906
907  /**
908   * Set the {@link KeyFieldBasedComparator} options used to compare keys.
909   * 
910   * @param keySpec the key specification of the form -k pos1[,pos2], where,
911   *  pos is of the form f[.c][opts], where f is the number
912   *  of the key field to use, and c is the number of the first character from
913   *  the beginning of the field. Fields and character posns are numbered 
914   *  starting with 1; a character position of zero in pos2 indicates the
915   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
916   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
917   *  (the end of the field). opts are ordering options. The supported options
918   *  are:
919   *    -n, (Sort numerically)
920   *    -r, (Reverse the result of comparison)                 
921   */
922  public void setKeyFieldComparatorOptions(String keySpec) {
923    setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
924    set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
925  }
926  
927  /**
928   * Get the {@link KeyFieldBasedComparator} options
929   */
930  public String getKeyFieldComparatorOption() {
931    return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
932  }
933
934  /**
935   * Set the {@link KeyFieldBasedPartitioner} options used for 
936   * {@link Partitioner}
937   * 
938   * @param keySpec the key specification of the form -k pos1[,pos2], where,
939   *  pos is of the form f[.c][opts], where f is the number
940   *  of the key field to use, and c is the number of the first character from
941   *  the beginning of the field. Fields and character posns are numbered 
942   *  starting with 1; a character position of zero in pos2 indicates the
943   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
944   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
945   *  (the end of the field).
946   */
947  public void setKeyFieldPartitionerOptions(String keySpec) {
948    setPartitionerClass(KeyFieldBasedPartitioner.class);
949    set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
950  }
951  
952  /**
953   * Get the {@link KeyFieldBasedPartitioner} options
954   */
955  public String getKeyFieldPartitionerOption() {
956    return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
957  }
958
959  /**
960   * Get the user defined {@link WritableComparable} comparator for
961   * grouping keys of inputs to the combiner.
962   *
963   * @return comparator set by the user for grouping values.
964   * @see #setCombinerKeyGroupingComparator(Class) for details.
965   */
966  public RawComparator getCombinerKeyGroupingComparator() {
967    Class<? extends RawComparator> theClass = getClass(
968        JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
969    if (theClass == null) {
970      return getOutputKeyComparator();
971    }
972
973    return ReflectionUtils.newInstance(theClass, this);
974  }
975
976  /** 
977   * Get the user defined {@link WritableComparable} comparator for 
978   * grouping keys of inputs to the reduce.
979   * 
980   * @return comparator set by the user for grouping values.
981   * @see #setOutputValueGroupingComparator(Class) for details.
982   */
983  public RawComparator getOutputValueGroupingComparator() {
984    Class<? extends RawComparator> theClass = getClass(
985      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
986    if (theClass == null) {
987      return getOutputKeyComparator();
988    }
989    
990    return ReflectionUtils.newInstance(theClass, this);
991  }
992
993  /**
994   * Set the user defined {@link RawComparator} comparator for
995   * grouping keys in the input to the combiner.
996   *
997   * <p>This comparator should be provided if the equivalence rules for keys
998   * for sorting the intermediates are different from those for grouping keys
999   * before each call to
1000   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1001   *
1002   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1003   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1004   *
1005   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1006   * how keys are sorted, this can be used in conjunction to simulate
1007   * <i>secondary sort on values</i>.</p>
1008   *
1009   * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1010   * <i>stable</i> in any sense. (In any case, with the order of available
1011   * map-outputs to the combiner being non-deterministic, it wouldn't make
1012   * that much sense.)</p>
1013   *
1014   * @param theClass the comparator class to be used for grouping keys for the
1015   * combiner. It should implement <code>RawComparator</code>.
1016   * @see #setOutputKeyComparatorClass(Class)
1017   */
1018  public void setCombinerKeyGroupingComparator(
1019      Class<? extends RawComparator> theClass) {
1020    setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1021        theClass, RawComparator.class);
1022  }
1023
1024  /** 
1025   * Set the user defined {@link RawComparator} comparator for 
1026   * grouping keys in the input to the reduce.
1027   * 
1028   * <p>This comparator should be provided if the equivalence rules for keys
1029   * for sorting the intermediates are different from those for grouping keys
1030   * before each call to 
1031   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1032   *  
1033   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1034   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1035   * 
1036   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
1037   * how keys are sorted, this can be used in conjunction to simulate 
1038   * <i>secondary sort on values</i>.</p>
1039   *  
1040   * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
1041   * <i>stable</i> in any sense. (In any case, with the order of available 
1042   * map-outputs to the reduce being non-deterministic, it wouldn't make 
1043   * that much sense.)</p>
1044   * 
1045   * @param theClass the comparator class to be used for grouping keys. 
1046   *                 It should implement <code>RawComparator</code>.
1047   * @see #setOutputKeyComparatorClass(Class)
1048   * @see #setCombinerKeyGroupingComparator(Class)
1049   */
1050  public void setOutputValueGroupingComparator(
1051      Class<? extends RawComparator> theClass) {
1052    setClass(JobContext.GROUP_COMPARATOR_CLASS,
1053             theClass, RawComparator.class);
1054  }
1055
1056  /**
1057   * Should the framework use the new context-object code for running
1058   * the mapper?
1059   * @return true, if the new api should be used
1060   */
1061  public boolean getUseNewMapper() {
1062    return getBoolean("mapred.mapper.new-api", false);
1063  }
1064  /**
1065   * Set whether the framework should use the new api for the mapper.
1066   * This is the default for jobs submitted with the new Job api.
1067   * @param flag true, if the new api should be used
1068   */
1069  public void setUseNewMapper(boolean flag) {
1070    setBoolean("mapred.mapper.new-api", flag);
1071  }
1072
1073  /**
1074   * Should the framework use the new context-object code for running
1075   * the reducer?
1076   * @return true, if the new api should be used
1077   */
1078  public boolean getUseNewReducer() {
1079    return getBoolean("mapred.reducer.new-api", false);
1080  }
1081  /**
1082   * Set whether the framework should use the new api for the reducer. 
1083   * This is the default for jobs submitted with the new Job api.
1084   * @param flag true, if the new api should be used
1085   */
1086  public void setUseNewReducer(boolean flag) {
1087    setBoolean("mapred.reducer.new-api", flag);
1088  }
1089
1090  /**
1091   * Get the value class for job outputs.
1092   * 
1093   * @return the value class for job outputs.
1094   */
1095  public Class<?> getOutputValueClass() {
1096    return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1097  }
1098  
1099  /**
1100   * Set the value class for job outputs.
1101   * 
1102   * @param theClass the value class for job outputs.
1103   */
1104  public void setOutputValueClass(Class<?> theClass) {
1105    setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1106  }
1107
1108  /**
1109   * Get the {@link Mapper} class for the job.
1110   * 
1111   * @return the {@link Mapper} class for the job.
1112   */
1113  public Class<? extends Mapper> getMapperClass() {
1114    return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1115  }
1116  
1117  /**
1118   * Set the {@link Mapper} class for the job.
1119   * 
1120   * @param theClass the {@link Mapper} class for the job.
1121   */
1122  public void setMapperClass(Class<? extends Mapper> theClass) {
1123    setClass("mapred.mapper.class", theClass, Mapper.class);
1124  }
1125
1126  /**
1127   * Get the {@link MapRunnable} class for the job.
1128   * 
1129   * @return the {@link MapRunnable} class for the job.
1130   */
1131  public Class<? extends MapRunnable> getMapRunnerClass() {
1132    return getClass("mapred.map.runner.class",
1133                    MapRunner.class, MapRunnable.class);
1134  }
1135  
1136  /**
1137   * Expert: Set the {@link MapRunnable} class for the job.
1138   * 
1139   * Typically used to exert greater control on {@link Mapper}s.
1140   * 
1141   * @param theClass the {@link MapRunnable} class for the job.
1142   */
1143  public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1144    setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1145  }
1146
1147  /**
1148   * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
1149   * to be sent to the {@link Reducer}s.
1150   * 
1151   * @return the {@link Partitioner} used to partition map-outputs.
1152   */
1153  public Class<? extends Partitioner> getPartitionerClass() {
1154    return getClass("mapred.partitioner.class",
1155                    HashPartitioner.class, Partitioner.class);
1156  }
1157  
1158  /**
1159   * Set the {@link Partitioner} class used to partition 
1160   * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1161   * 
1162   * @param theClass the {@link Partitioner} used to partition map-outputs.
1163   */
1164  public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1165    setClass("mapred.partitioner.class", theClass, Partitioner.class);
1166  }
1167
1168  /**
1169   * Get the {@link Reducer} class for the job.
1170   * 
1171   * @return the {@link Reducer} class for the job.
1172   */
1173  public Class<? extends Reducer> getReducerClass() {
1174    return getClass("mapred.reducer.class",
1175                    IdentityReducer.class, Reducer.class);
1176  }
1177  
1178  /**
1179   * Set the {@link Reducer} class for the job.
1180   * 
1181   * @param theClass the {@link Reducer} class for the job.
1182   */
1183  public void setReducerClass(Class<? extends Reducer> theClass) {
1184    setClass("mapred.reducer.class", theClass, Reducer.class);
1185  }
1186
1187  /**
1188   * Get the user-defined <i>combiner</i> class used to combine map-outputs 
1189   * before being sent to the reducers. Typically the combiner is same as the
1190   * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1191   * 
1192   * @return the user-defined combiner class used to combine map-outputs.
1193   */
1194  public Class<? extends Reducer> getCombinerClass() {
1195    return getClass("mapred.combiner.class", null, Reducer.class);
1196  }
1197
1198  /**
1199   * Set the user-defined <i>combiner</i> class used to combine map-outputs 
1200   * before being sent to the reducers. 
1201   * 
1202   * <p>The combiner is an application-specified aggregation operation, which
1203   * can help cut down the amount of data transferred between the 
1204   * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1205   * 
1206   * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1207   * the mapper and reducer tasks. In general, the combiner is called as the
1208   * sort/merge result is written to disk. The combiner must:
1209   * <ul>
1210   *   <li> be side-effect free</li>
1211   *   <li> have the same input and output key types and the same input and 
1212   *        output value types</li>
1213   * </ul>
1214   * 
1215   * <p>Typically the combiner is same as the <code>Reducer</code> for the  
1216   * job i.e. {@link #setReducerClass(Class)}.</p>
1217   * 
1218   * @param theClass the user-defined combiner class used to combine 
1219   *                 map-outputs.
1220   */
1221  public void setCombinerClass(Class<? extends Reducer> theClass) {
1222    setClass("mapred.combiner.class", theClass, Reducer.class);
1223  }
1224  
1225  /**
1226   * Should speculative execution be used for this job? 
1227   * Defaults to <code>true</code>.
1228   * 
1229   * @return <code>true</code> if speculative execution be used for this job,
1230   *         <code>false</code> otherwise.
1231   */
1232  public boolean getSpeculativeExecution() { 
1233    return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1234  }
1235  
1236  /**
1237   * Turn speculative execution on or off for this job. 
1238   * 
1239   * @param speculativeExecution <code>true</code> if speculative execution 
1240   *                             should be turned on, else <code>false</code>.
1241   */
1242  public void setSpeculativeExecution(boolean speculativeExecution) {
1243    setMapSpeculativeExecution(speculativeExecution);
1244    setReduceSpeculativeExecution(speculativeExecution);
1245  }
1246
1247  /**
1248   * Should speculative execution be used for this job for map tasks? 
1249   * Defaults to <code>true</code>.
1250   * 
1251   * @return <code>true</code> if speculative execution be 
1252   *                           used for this job for map tasks,
1253   *         <code>false</code> otherwise.
1254   */
1255  public boolean getMapSpeculativeExecution() { 
1256    return getBoolean(JobContext.MAP_SPECULATIVE, true);
1257  }
1258  
1259  /**
1260   * Turn speculative execution on or off for this job for map tasks. 
1261   * 
1262   * @param speculativeExecution <code>true</code> if speculative execution 
1263   *                             should be turned on for map tasks,
1264   *                             else <code>false</code>.
1265   */
1266  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1267    setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1268  }
1269
1270  /**
1271   * Should speculative execution be used for this job for reduce tasks? 
1272   * Defaults to <code>true</code>.
1273   * 
1274   * @return <code>true</code> if speculative execution be used 
1275   *                           for reduce tasks for this job,
1276   *         <code>false</code> otherwise.
1277   */
1278  public boolean getReduceSpeculativeExecution() { 
1279    return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1280  }
1281  
1282  /**
1283   * Turn speculative execution on or off for this job for reduce tasks. 
1284   * 
1285   * @param speculativeExecution <code>true</code> if speculative execution 
1286   *                             should be turned on for reduce tasks,
1287   *                             else <code>false</code>.
1288   */
1289  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1290    setBoolean(JobContext.REDUCE_SPECULATIVE, 
1291               speculativeExecution);
1292  }
1293
1294  /**
1295   * Get configured the number of reduce tasks for this job.
1296   * Defaults to <code>1</code>.
1297   * 
1298   * @return the number of reduce tasks for this job.
1299   */
1300  public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1301  
1302  /**
1303   * Set the number of map tasks for this job.
1304   * 
1305   * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
1306   * number of spawned map tasks depends on the number of {@link InputSplit}s 
1307   * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1308   *  
1309   * A custom {@link InputFormat} is typically used to accurately control 
1310   * the number of map tasks for the job.</p>
1311   * 
1312   * <b id="NoOfMaps">How many maps?</b>
1313   * 
1314   * <p>The number of maps is usually driven by the total size of the inputs 
1315   * i.e. total number of blocks of the input files.</p>
1316   *  
1317   * <p>The right level of parallelism for maps seems to be around 10-100 maps 
1318   * per-node, although it has been set up to 300 or so for very cpu-light map 
1319   * tasks. Task setup takes awhile, so it is best if the maps take at least a 
1320   * minute to execute.</p>
1321   * 
1322   * <p>The default behavior of file-based {@link InputFormat}s is to split the 
1323   * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
1324   * bytes, of input files. However, the {@link FileSystem} blocksize of the 
1325   * input files is treated as an upper bound for input splits. A lower bound 
1326   * on the split size can be set via 
1327   * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1328   * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1329   *  
1330   * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
1331   * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
1332   * used to set it even higher.</p>
1333   * 
1334   * @param n the number of map tasks for this job.
1335   * @see InputFormat#getSplits(JobConf, int)
1336   * @see FileInputFormat
1337   * @see FileSystem#getDefaultBlockSize()
1338   * @see FileStatus#getBlockSize()
1339   */
1340  public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1341
1342  /**
1343   * Get configured the number of reduce tasks for this job. Defaults to 
1344   * <code>1</code>.
1345   * 
1346   * @return the number of reduce tasks for this job.
1347   */
1348  public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1349  
1350  /**
1351   * Set the requisite number of reduce tasks for this job.
1352   * 
1353   * <b id="NoOfReduces">How many reduces?</b>
1354   * 
1355   * <p>The right number of reduces seems to be <code>0.95</code> or 
1356   * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
1357   * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1358   * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1359   * </p>
1360   * 
1361   * <p>With <code>0.95</code> all of the reduces can launch immediately and 
1362   * start transfering map outputs as the maps finish. With <code>1.75</code> 
1363   * the faster nodes will finish their first round of reduces and launch a 
1364   * second wave of reduces doing a much better job of load balancing.</p>
1365   * 
1366   * <p>Increasing the number of reduces increases the framework overhead, but 
1367   * increases load balancing and lowers the cost of failures.</p>
1368   * 
1369   * <p>The scaling factors above are slightly less than whole numbers to 
1370   * reserve a few reduce slots in the framework for speculative-tasks, failures
1371   * etc.</p> 
1372   *
1373   * <b id="ReducerNone">Reducer NONE</b>
1374   * 
1375   * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1376   * 
1377   * <p>In this case the output of the map-tasks directly go to distributed 
1378   * file-system, to the path set by 
1379   * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
1380   * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1381   * 
1382   * @param n the number of reduce tasks for this job.
1383   */
1384  public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1385  
1386  /** 
1387   * Get the configured number of maximum attempts that will be made to run a
1388   * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1389   * property. If this property is not already set, the default is 4 attempts.
1390   *  
1391   * @return the max number of attempts per map task.
1392   */
1393  public int getMaxMapAttempts() {
1394    return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1395  }
1396  
1397  /** 
1398   * Expert: Set the number of maximum attempts that will be made to run a
1399   * map task.
1400   * 
1401   * @param n the number of attempts per map task.
1402   */
1403  public void setMaxMapAttempts(int n) {
1404    setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1405  }
1406
1407  /** 
1408   * Get the configured number of maximum attempts  that will be made to run a
1409   * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1410   * property. If this property is not already set, the default is 4 attempts.
1411   * 
1412   * @return the max number of attempts per reduce task.
1413   */
1414  public int getMaxReduceAttempts() {
1415    return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1416  }
1417  /** 
1418   * Expert: Set the number of maximum attempts that will be made to run a
1419   * reduce task.
1420   * 
1421   * @param n the number of attempts per reduce task.
1422   */
1423  public void setMaxReduceAttempts(int n) {
1424    setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1425  }
1426  
1427  /**
1428   * Get the user-specified job name. This is only used to identify the 
1429   * job to the user.
1430   * 
1431   * @return the job's name, defaulting to "".
1432   */
1433  public String getJobName() {
1434    return get(JobContext.JOB_NAME, "");
1435  }
1436  
1437  /**
1438   * Set the user-specified job name.
1439   * 
1440   * @param name the job's new name.
1441   */
1442  public void setJobName(String name) {
1443    set(JobContext.JOB_NAME, name);
1444  }
1445  
1446  /**
1447   * Get the user-specified session identifier. The default is the empty string.
1448   *
1449   * The session identifier is used to tag metric data that is reported to some
1450   * performance metrics system via the org.apache.hadoop.metrics API.  The 
1451   * session identifier is intended, in particular, for use by Hadoop-On-Demand 
1452   * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
1453   * HOD will set the session identifier by modifying the mapred-site.xml file 
1454   * before starting the cluster.
1455   *
1456   * When not running under HOD, this identifer is expected to remain set to 
1457   * the empty string.
1458   *
1459   * @return the session identifier, defaulting to "".
1460   */
1461  @Deprecated
1462  public String getSessionId() {
1463      return get("session.id", "");
1464  }
1465  
1466  /**
1467   * Set the user-specified session identifier.  
1468   *
1469   * @param sessionId the new session id.
1470   */
1471  @Deprecated
1472  public void setSessionId(String sessionId) {
1473      set("session.id", sessionId);
1474  }
1475    
1476  /**
1477   * Set the maximum no. of failures of a given job per tasktracker.
1478   * If the no. of task failures exceeds <code>noFailures</code>, the 
1479   * tasktracker is <i>blacklisted</i> for this job. 
1480   * 
1481   * @param noFailures maximum no. of failures of a given job per tasktracker.
1482   */
1483  public void setMaxTaskFailuresPerTracker(int noFailures) {
1484    setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1485  }
1486  
1487  /**
1488   * Expert: Get the maximum no. of failures of a given job per tasktracker.
1489   * If the no. of task failures exceeds this, the tasktracker is
1490   * <i>blacklisted</i> for this job. 
1491   * 
1492   * @return the maximum no. of failures of a given job per tasktracker.
1493   */
1494  public int getMaxTaskFailuresPerTracker() {
1495    return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1496  }
1497
1498  /**
1499   * Get the maximum percentage of map tasks that can fail without 
1500   * the job being aborted. 
1501   * 
1502   * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
1503   * attempts before being declared as <i>failed</i>.
1504   *  
1505   * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1506   * the job being declared as {@link JobStatus#FAILED}.
1507   * 
1508   * @return the maximum percentage of map tasks that can fail without
1509   *         the job being aborted.
1510   */
1511  public int getMaxMapTaskFailuresPercent() {
1512    return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1513  }
1514
1515  /**
1516   * Expert: Set the maximum percentage of map tasks that can fail without the
1517   * job being aborted. 
1518   * 
1519   * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
1520   * before being declared as <i>failed</i>.
1521   * 
1522   * @param percent the maximum percentage of map tasks that can fail without 
1523   *                the job being aborted.
1524   */
1525  public void setMaxMapTaskFailuresPercent(int percent) {
1526    setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1527  }
1528  
1529  /**
1530   * Get the maximum percentage of reduce tasks that can fail without 
1531   * the job being aborted. 
1532   * 
1533   * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1534   * attempts before being declared as <i>failed</i>.
1535   * 
1536   * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
1537   * in the job being declared as {@link JobStatus#FAILED}.
1538   * 
1539   * @return the maximum percentage of reduce tasks that can fail without
1540   *         the job being aborted.
1541   */
1542  public int getMaxReduceTaskFailuresPercent() {
1543    return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1544  }
1545  
1546  /**
1547   * Set the maximum percentage of reduce tasks that can fail without the job
1548   * being aborted.
1549   * 
1550   * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1551   * attempts before being declared as <i>failed</i>.
1552   * 
1553   * @param percent the maximum percentage of reduce tasks that can fail without 
1554   *                the job being aborted.
1555   */
1556  public void setMaxReduceTaskFailuresPercent(int percent) {
1557    setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1558  }
1559  
1560  /**
1561   * Set {@link JobPriority} for this job.
1562   * 
1563   * @param prio the {@link JobPriority} for this job.
1564   */
1565  public void setJobPriority(JobPriority prio) {
1566    set(JobContext.PRIORITY, prio.toString());
1567  }
1568  
1569  /**
1570   * Get the {@link JobPriority} for this job.
1571   * 
1572   * @return the {@link JobPriority} for this job.
1573   */
1574  public JobPriority getJobPriority() {
1575    String prio = get(JobContext.PRIORITY);
1576    if(prio == null) {
1577      return JobPriority.NORMAL;
1578    }
1579    
1580    return JobPriority.valueOf(prio);
1581  }
1582
1583  /**
1584   * Set JobSubmitHostName for this job.
1585   * 
1586   * @param hostname the JobSubmitHostName for this job.
1587   */
1588  void setJobSubmitHostName(String hostname) {
1589    set(MRJobConfig.JOB_SUBMITHOST, hostname);
1590  }
1591  
1592  /**
1593   * Get the  JobSubmitHostName for this job.
1594   * 
1595   * @return the JobSubmitHostName for this job.
1596   */
1597  String getJobSubmitHostName() {
1598    String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1599    
1600    return hostname;
1601  }
1602
1603  /**
1604   * Set JobSubmitHostAddress for this job.
1605   * 
1606   * @param hostadd the JobSubmitHostAddress for this job.
1607   */
1608  void setJobSubmitHostAddress(String hostadd) {
1609    set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1610  }
1611  
1612  /**
1613   * Get JobSubmitHostAddress for this job.
1614   * 
1615   * @return  JobSubmitHostAddress for this job.
1616   */
1617  String getJobSubmitHostAddress() {
1618    String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1619    
1620    return hostadd;
1621  }
1622
1623  /**
1624   * Get whether the task profiling is enabled.
1625   * @return true if some tasks will be profiled
1626   */
1627  public boolean getProfileEnabled() {
1628    return getBoolean(JobContext.TASK_PROFILE, false);
1629  }
1630
1631  /**
1632   * Set whether the system should collect profiler information for some of 
1633   * the tasks in this job? The information is stored in the user log 
1634   * directory.
1635   * @param newValue true means it should be gathered
1636   */
1637  public void setProfileEnabled(boolean newValue) {
1638    setBoolean(JobContext.TASK_PROFILE, newValue);
1639  }
1640
1641  /**
1642   * Get the profiler configuration arguments.
1643   *
1644   * The default value for this property is
1645   * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1646   * 
1647   * @return the parameters to pass to the task child to configure profiling
1648   */
1649  public String getProfileParams() {
1650    return get(JobContext.TASK_PROFILE_PARAMS,
1651        MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
1652  }
1653
1654  /**
1655   * Set the profiler configuration arguments. If the string contains a '%s' it
1656   * will be replaced with the name of the profiling output file when the task
1657   * runs.
1658   *
1659   * This value is passed to the task child JVM on the command line.
1660   *
1661   * @param value the configuration string
1662   */
1663  public void setProfileParams(String value) {
1664    set(JobContext.TASK_PROFILE_PARAMS, value);
1665  }
1666
1667  /**
1668   * Get the range of maps or reduces to profile.
1669   * @param isMap is the task a map?
1670   * @return the task ranges
1671   */
1672  public IntegerRanges getProfileTaskRange(boolean isMap) {
1673    return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
1674                       JobContext.NUM_REDUCE_PROFILES), "0-2");
1675  }
1676
1677  /**
1678   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1679   * must also be called.
1680   * @param newValue a set of integer ranges of the map ids
1681   */
1682  public void setProfileTaskRange(boolean isMap, String newValue) {
1683    // parse the value to make sure it is legal
1684      new Configuration.IntegerRanges(newValue);
1685    set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
1686          newValue);
1687  }
1688
1689  /**
1690   * Set the debug script to run when the map tasks fail.
1691   * 
1692   * <p>The debug script can aid debugging of failed map tasks. The script is 
1693   * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1694   * 
1695   * <p>The debug command, run on the node where the map failed, is:</p>
1696   * <p><blockquote><pre>
1697   * $script $stdout $stderr $syslog $jobconf.
1698   * </pre></blockquote>
1699   * 
1700   * <p> The script file is distributed through {@link DistributedCache} 
1701   * APIs. The script needs to be symlinked. </p>
1702   * 
1703   * <p>Here is an example on how to submit a script 
1704   * <p><blockquote><pre>
1705   * job.setMapDebugScript("./myscript");
1706   * DistributedCache.createSymlink(job);
1707   * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1708   * </pre></blockquote>
1709   * 
1710   * @param mDbgScript the script name
1711   */
1712  public void  setMapDebugScript(String mDbgScript) {
1713    set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1714  }
1715  
1716  /**
1717   * Get the map task's debug script.
1718   * 
1719   * @return the debug Script for the mapred job for failed map tasks.
1720   * @see #setMapDebugScript(String)
1721   */
1722  public String getMapDebugScript() {
1723    return get(JobContext.MAP_DEBUG_SCRIPT);
1724  }
1725  
1726  /**
1727   * Set the debug script to run when the reduce tasks fail.
1728   * 
1729   * <p>The debug script can aid debugging of failed reduce tasks. The script
1730   * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1731   * 
1732   * <p>The debug command, run on the node where the map failed, is:</p>
1733   * <p><blockquote><pre>
1734   * $script $stdout $stderr $syslog $jobconf.
1735   * </pre></blockquote>
1736   * 
1737   * <p> The script file is distributed through {@link DistributedCache} 
1738   * APIs. The script file needs to be symlinked </p>
1739   * 
1740   * <p>Here is an example on how to submit a script 
1741   * <p><blockquote><pre>
1742   * job.setReduceDebugScript("./myscript");
1743   * DistributedCache.createSymlink(job);
1744   * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1745   * </pre></blockquote>
1746   * 
1747   * @param rDbgScript the script name
1748   */
1749  public void  setReduceDebugScript(String rDbgScript) {
1750    set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1751  }
1752  
1753  /**
1754   * Get the reduce task's debug Script
1755   * 
1756   * @return the debug script for the mapred job for failed reduce tasks.
1757   * @see #setReduceDebugScript(String)
1758   */
1759  public String getReduceDebugScript() {
1760    return get(JobContext.REDUCE_DEBUG_SCRIPT);
1761  }
1762
1763  /**
1764   * Get the uri to be invoked in-order to send a notification after the job 
1765   * has completed (success/failure). 
1766   * 
1767   * @return the job end notification uri, <code>null</code> if it hasn't
1768   *         been set.
1769   * @see #setJobEndNotificationURI(String)
1770   */
1771  public String getJobEndNotificationURI() {
1772    return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1773  }
1774
1775  /**
1776   * Set the uri to be invoked in-order to send a notification after the job
1777   * has completed (success/failure).
1778   * 
1779   * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
1780   * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
1781   * identifier and completion-status respectively.</p>
1782   * 
1783   * <p>This is typically used by application-writers to implement chaining of 
1784   * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1785   * 
1786   * @param uri the job end notification uri
1787   * @see JobStatus
1788   */
1789  public void setJobEndNotificationURI(String uri) {
1790    set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1791  }
1792
1793  /**
1794   * Get job-specific shared directory for use as scratch space
1795   * 
1796   * <p>
1797   * When a job starts, a shared directory is created at location
1798   * <code>
1799   * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1800   * This directory is exposed to the users through 
1801   * <code>mapreduce.job.local.dir </code>.
1802   * So, the tasks can use this space 
1803   * as scratch space and share files among them. </p>
1804   * This value is available as System property also.
1805   * 
1806   * @return The localized job specific shared directory
1807   */
1808  public String getJobLocalDir() {
1809    return get(JobContext.JOB_LOCAL_DIR);
1810  }
1811
1812  /**
1813   * Get memory required to run a map task of the job, in MB.
1814   * 
1815   * If a value is specified in the configuration, it is returned.
1816   * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}.
1817   * <p>
1818   * For backward compatibility, if the job configuration sets the
1819   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1820   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1821   * after converting it from bytes to MB.
1822   * @return memory required to run a map task of the job, in MB,
1823   */
1824  public long getMemoryForMapTask() {
1825    long value = getDeprecatedMemoryValue();
1826    if (value < 0) {
1827      return getMemoryRequired(TaskType.MAP);
1828    }
1829    return value;
1830  }
1831
1832  public void setMemoryForMapTask(long mem) {
1833    setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1834    // In case that M/R 1.x applications use the old property name
1835    setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1836  }
1837
1838  /**
1839   * Get memory required to run a reduce task of the job, in MB.
1840   * 
1841   * If a value is specified in the configuration, it is returned.
1842   * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}.
1843   * <p>
1844   * For backward compatibility, if the job configuration sets the
1845   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1846   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1847   * after converting it from bytes to MB.
1848   * @return memory required to run a reduce task of the job, in MB.
1849   */
1850  public long getMemoryForReduceTask() {
1851    long value = getDeprecatedMemoryValue();
1852    if (value < 0) {
1853      return getMemoryRequired(TaskType.REDUCE);
1854    }
1855    return value;
1856  }
1857  
1858  // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1859  // converted into MBs.
1860  // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1861  // value.
1862  private long getDeprecatedMemoryValue() {
1863    long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
1864        DISABLED_MEMORY_LIMIT);
1865    if (oldValue > 0) {
1866      oldValue /= (1024*1024);
1867    }
1868    return oldValue;
1869  }
1870
1871  public void setMemoryForReduceTask(long mem) {
1872    setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1873    // In case that M/R 1.x applications use the old property name
1874    setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1875  }
1876
1877  /**
1878   * Return the name of the queue to which this job is submitted.
1879   * Defaults to 'default'.
1880   * 
1881   * @return name of the queue
1882   */
1883  public String getQueueName() {
1884    return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1885  }
1886  
1887  /**
1888   * Set the name of the queue to which this job should be submitted.
1889   * 
1890   * @param queueName Name of the queue
1891   */
1892  public void setQueueName(String queueName) {
1893    set(JobContext.QUEUE_NAME, queueName);
1894  }
1895  
1896  /**
1897   * Normalize the negative values in configuration
1898   * 
1899   * @param val
1900   * @return normalized value
1901   */
1902  public static long normalizeMemoryConfigValue(long val) {
1903    if (val < 0) {
1904      val = DISABLED_MEMORY_LIMIT;
1905    }
1906    return val;
1907  }
1908
1909  /** 
1910   * Find a jar that contains a class of the same name, if any.
1911   * It will return a jar file, even if that is not the first thing
1912   * on the class path that has a class with the same name.
1913   * 
1914   * @param my_class the class to find.
1915   * @return a jar file that contains the class, or null.
1916   */
1917  public static String findContainingJar(Class my_class) {
1918    return ClassUtil.findContainingJar(my_class);
1919  }
1920
1921  /**
1922   * Get the memory required to run a task of this job, in bytes. See
1923   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1924   * <p>
1925   * This method is deprecated. Now, different memory limits can be
1926   * set for map and reduce tasks of a job, in MB. 
1927   * <p>
1928   * For backward compatibility, if the job configuration sets the
1929   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. 
1930   * Otherwise, this method will return the larger of the values returned by 
1931   * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1932   * after converting them into bytes.
1933   *
1934   * @return Memory required to run a task of this job, in bytes.
1935   * @see #setMaxVirtualMemoryForTask(long)
1936   * @deprecated Use {@link #getMemoryForMapTask()} and
1937   *             {@link #getMemoryForReduceTask()}
1938   */
1939  @Deprecated
1940  public long getMaxVirtualMemoryForTask() {
1941    LOG.warn(
1942      "getMaxVirtualMemoryForTask() is deprecated. " +
1943      "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1944
1945    long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1946        Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024);
1947    return value;
1948  }
1949
1950  /**
1951   * Set the maximum amount of memory any task of this job can use. See
1952   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1953   * <p>
1954   * mapred.task.maxvmem is split into
1955   * mapreduce.map.memory.mb
1956   * and mapreduce.map.memory.mb,mapred
1957   * each of the new key are set
1958   * as mapred.task.maxvmem / 1024
1959   * as new values are in MB
1960   *
1961   * @param vmem Maximum amount of virtual memory in bytes any task of this job
1962   *             can use.
1963   * @see #getMaxVirtualMemoryForTask()
1964   * @deprecated
1965   *  Use {@link #setMemoryForMapTask(long mem)}  and
1966   *  Use {@link #setMemoryForReduceTask(long mem)}
1967   */
1968  @Deprecated
1969  public void setMaxVirtualMemoryForTask(long vmem) {
1970    LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
1971      "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
1972    if (vmem < 0) {
1973      throw new IllegalArgumentException("Task memory allocation may not be < 0");
1974    }
1975
1976    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
1977      setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
1978      setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
1979    }else{
1980      this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
1981    }
1982  }
1983
1984  /**
1985   * @deprecated this variable is deprecated and nolonger in use.
1986   */
1987  @Deprecated
1988  public long getMaxPhysicalMemoryForTask() {
1989    LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
1990              + " Refer to the APIs getMemoryForMapTask() and"
1991              + " getMemoryForReduceTask() for details.");
1992    return -1;
1993  }
1994
1995  /*
1996   * @deprecated this
1997   */
1998  @Deprecated
1999  public void setMaxPhysicalMemoryForTask(long mem) {
2000    LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2001        + " The value set is ignored. Refer to "
2002        + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2003  }
2004
2005  static String deprecatedString(String key) {
2006    return "The variable " + key + " is no longer used.";
2007  }
2008
2009  private void checkAndWarnDeprecation() {
2010    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2011      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2012                + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2013                + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2014    }
2015    if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2016      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2017    }
2018    if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2019      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2020    }
2021    if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2022      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2023    }
2024  }
2025
2026  private String getConfiguredTaskJavaOpts(TaskType taskType) {
2027    String userClasspath = "";
2028    String adminClasspath = "";
2029    if (taskType == TaskType.MAP) {
2030      userClasspath = get(MAPRED_MAP_TASK_JAVA_OPTS,
2031          get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS));
2032      adminClasspath = get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
2033          MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
2034    } else {
2035      userClasspath = get(MAPRED_REDUCE_TASK_JAVA_OPTS,
2036          get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS));
2037      adminClasspath = get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
2038          MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
2039    }
2040
2041    return adminClasspath + " " + userClasspath;
2042  }
2043
2044  @Private
2045  public String getTaskJavaOpts(TaskType taskType) {
2046    String javaOpts = getConfiguredTaskJavaOpts(taskType);
2047
2048    if (!javaOpts.contains("-Xmx")) {
2049      float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
2050          MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
2051
2052      if (heapRatio > 1.0f || heapRatio < 0) {
2053        LOG.warn("Invalid value for " + MRJobConfig.HEAP_MEMORY_MB_RATIO
2054            + ", using the default.");
2055        heapRatio = MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO;
2056      }
2057
2058      int taskContainerMb = getMemoryRequired(taskType);
2059      int taskHeapSize = (int)Math.ceil(taskContainerMb * heapRatio);
2060
2061      String xmxArg = String.format("-Xmx%dm", taskHeapSize);
2062      LOG.info("Task java-opts do not specify heap size. Setting task attempt" +
2063          " jvm max heap size to " + xmxArg);
2064
2065      javaOpts += " " + xmxArg;
2066    }
2067
2068    return javaOpts;
2069  }
2070
2071  /**
2072   * Parse the Maximum heap size from the java opts as specified by the -Xmx option
2073   * Format: -Xmx&lt;size&gt;[g|G|m|M|k|K]
2074   * @param javaOpts String to parse to read maximum heap size
2075   * @return Maximum heap size in MB or -1 if not specified
2076   */
2077  @Private
2078  @VisibleForTesting
2079  public static int parseMaximumHeapSizeMB(String javaOpts) {
2080    // Find the last matching -Xmx following word boundaries
2081    Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts);
2082    if (m.matches()) {
2083      long size = Long.parseLong(m.group(1));
2084      if (size <= 0) {
2085        return -1;
2086      }
2087      if (m.group(2).isEmpty()) {
2088        // -Xmx specified in bytes
2089        return (int) (size / (1024 * 1024));
2090      }
2091      char unit = m.group(2).charAt(0);
2092      switch (unit) {
2093        case 'g':
2094        case 'G':
2095          // -Xmx specified in GB
2096          return (int) (size * 1024);
2097        case 'm':
2098        case 'M':
2099          // -Xmx specified in MB
2100          return (int) size;
2101        case 'k':
2102        case 'K':
2103          // -Xmx specified in KB
2104          return (int) (size / 1024);
2105      }
2106    }
2107    // -Xmx not specified
2108    return -1;
2109  }
2110
2111  private int getMemoryRequiredHelper(
2112      String configName, int defaultValue, int heapSize, float heapRatio) {
2113    int memory = getInt(configName, -1);
2114    if (memory <= 0) {
2115      if (heapSize > 0) {
2116        memory = (int) Math.ceil(heapSize / heapRatio);
2117        LOG.info("Figured value for " + configName + " from javaOpts");
2118      } else {
2119        memory = defaultValue;
2120      }
2121    }
2122
2123    return memory;
2124  }
2125
2126  @Private
2127  public int getMemoryRequired(TaskType taskType) {
2128    int memory = 1024;
2129    int heapSize = parseMaximumHeapSizeMB(getConfiguredTaskJavaOpts(taskType));
2130    float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
2131        MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
2132    if (taskType == TaskType.MAP) {
2133      return getMemoryRequiredHelper(MRJobConfig.MAP_MEMORY_MB,
2134          MRJobConfig.DEFAULT_MAP_MEMORY_MB, heapSize, heapRatio);
2135    } else if (taskType == TaskType.REDUCE) {
2136      return getMemoryRequiredHelper(MRJobConfig.REDUCE_MEMORY_MB,
2137          MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, heapSize, heapRatio);
2138    } else {
2139      return memory;
2140    }
2141  }
2142
2143  /* For debugging. Dump configurations to system output as XML format. */
2144  public static void main(String[] args) throws Exception {
2145    new JobConf(new Configuration()).writeXml(System.out);
2146  }
2147
2148}
2149