001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021 022import java.io.IOException; 023import java.util.regex.Matcher; 024import java.util.regex.Pattern; 025 026import com.google.common.annotations.VisibleForTesting; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceAudience.Private; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.LongWritable; 037import org.apache.hadoop.io.RawComparator; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.io.WritableComparable; 040import org.apache.hadoop.io.WritableComparator; 041import org.apache.hadoop.io.compress.CompressionCodec; 042import org.apache.hadoop.mapred.lib.HashPartitioner; 043import org.apache.hadoop.mapred.lib.IdentityMapper; 044import org.apache.hadoop.mapred.lib.IdentityReducer; 045import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 046import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 047import org.apache.hadoop.mapreduce.MRConfig; 048import org.apache.hadoop.mapreduce.MRJobConfig; 049import org.apache.hadoop.mapreduce.TaskType; 050import org.apache.hadoop.mapreduce.filecache.DistributedCache; 051import org.apache.hadoop.mapreduce.util.ConfigUtil; 052import org.apache.hadoop.security.Credentials; 053import org.apache.hadoop.util.ClassUtil; 054import org.apache.hadoop.util.ReflectionUtils; 055import org.apache.hadoop.util.Tool; 056import org.apache.log4j.Level; 057 058/** 059 * A map/reduce job configuration. 060 * 061 * <p><code>JobConf</code> is the primary interface for a user to describe a 062 * map-reduce job to the Hadoop framework for execution. The framework tries to 063 * faithfully execute the job as-is described by <code>JobConf</code>, however: 064 * <ol> 065 * <li> 066 * Some configuration parameters might have been marked as 067 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 068 * final</a> by administrators and hence cannot be altered. 069 * </li> 070 * <li> 071 * While some job parameters are straight-forward to set 072 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 073 * with the rest of the framework and/or job-configuration and is relatively 074 * more complex for the user to control finely 075 * (e.g. {@link #setNumMapTasks(int)}). 076 * </li> 077 * </ol> 078 * 079 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 080 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 081 * {@link OutputFormat} implementations to be used etc. 082 * 083 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 084 * of the job such as <code>Comparator</code>s to be used, files to be put in 085 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 086 * are to be compressed (and how), debugability via user-provided scripts 087 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 088 * for doing post-processing on task logs, task's stdout, stderr, syslog. 089 * and etc.</p> 090 * 091 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 092 * <p><blockquote><pre> 093 * // Create a new JobConf 094 * JobConf job = new JobConf(new Configuration(), MyJob.class); 095 * 096 * // Specify various job-specific parameters 097 * job.setJobName("myjob"); 098 * 099 * FileInputFormat.setInputPaths(job, new Path("in")); 100 * FileOutputFormat.setOutputPath(job, new Path("out")); 101 * 102 * job.setMapperClass(MyJob.MyMapper.class); 103 * job.setCombinerClass(MyJob.MyReducer.class); 104 * job.setReducerClass(MyJob.MyReducer.class); 105 * 106 * job.setInputFormat(SequenceFileInputFormat.class); 107 * job.setOutputFormat(SequenceFileOutputFormat.class); 108 * </pre></blockquote> 109 * 110 * @see JobClient 111 * @see ClusterStatus 112 * @see Tool 113 * @see DistributedCache 114 */ 115@InterfaceAudience.Public 116@InterfaceStability.Stable 117public class JobConf extends Configuration { 118 119 private static final Log LOG = LogFactory.getLog(JobConf.class); 120 private static final Pattern JAVA_OPTS_XMX_PATTERN = 121 Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*"); 122 123 static{ 124 ConfigUtil.loadResources(); 125 } 126 127 /** 128 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and 129 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 130 */ 131 @Deprecated 132 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 133 "mapred.task.maxvmem"; 134 135 /** 136 * @deprecated 137 */ 138 @Deprecated 139 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 140 "mapred.task.limit.maxvmem"; 141 142 /** 143 * @deprecated 144 */ 145 @Deprecated 146 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 147 "mapred.task.default.maxvmem"; 148 149 /** 150 * @deprecated 151 */ 152 @Deprecated 153 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 154 "mapred.task.maxpmem"; 155 156 /** 157 * A value which if set for memory related configuration options, 158 * indicates that the options are turned off. 159 * Deprecated because it makes no sense in the context of MR2. 160 */ 161 @Deprecated 162 public static final long DISABLED_MEMORY_LIMIT = -1L; 163 164 /** 165 * Property name for the configuration property mapreduce.cluster.local.dir 166 */ 167 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 168 169 /** 170 * Name of the queue to which jobs will be submitted, if no queue 171 * name is mentioned. 172 */ 173 public static final String DEFAULT_QUEUE_NAME = "default"; 174 175 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY = 176 JobContext.MAP_MEMORY_MB; 177 178 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY = 179 JobContext.REDUCE_MEMORY_MB; 180 181 /** 182 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 183 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} 184 */ 185 @Deprecated 186 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 187 "mapred.job.map.memory.mb"; 188 189 /** 190 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 191 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 192 */ 193 @Deprecated 194 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 195 "mapred.job.reduce.memory.mb"; 196 197 /** Pattern for the default unpacking behavior for job jars */ 198 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 199 Pattern.compile("(?:classes/|lib/).*"); 200 201 /** 202 * Configuration key to set the java command line options for the child 203 * map and reduce tasks. 204 * 205 * Java opts for the task tracker child processes. 206 * The following symbol, if present, will be interpolated: @taskid@. 207 * It is replaced by current TaskID. Any other occurrences of '@' will go 208 * unchanged. 209 * For example, to enable verbose gc logging to a file named for the taskid in 210 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 211 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 212 * 213 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 214 * other environment variables to the child processes. 215 * 216 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 217 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 218 */ 219 @Deprecated 220 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 221 222 /** 223 * Configuration key to set the java command line options for the map tasks. 224 * 225 * Java opts for the task tracker child map processes. 226 * The following symbol, if present, will be interpolated: @taskid@. 227 * It is replaced by current TaskID. Any other occurrences of '@' will go 228 * unchanged. 229 * For example, to enable verbose gc logging to a file named for the taskid in 230 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 231 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 232 * 233 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 234 * other environment variables to the map processes. 235 */ 236 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 237 JobContext.MAP_JAVA_OPTS; 238 239 /** 240 * Configuration key to set the java command line options for the reduce tasks. 241 * 242 * Java opts for the task tracker child reduce processes. 243 * The following symbol, if present, will be interpolated: @taskid@. 244 * It is replaced by current TaskID. Any other occurrences of '@' will go 245 * unchanged. 246 * For example, to enable verbose gc logging to a file named for the taskid in 247 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 248 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 249 * 250 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 251 * pass process environment variables to the reduce processes. 252 */ 253 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 254 JobContext.REDUCE_JAVA_OPTS; 255 256 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = ""; 257 258 /** 259 * @deprecated 260 * Configuration key to set the maximum virtual memory available to the child 261 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 262 * longer have any effect. 263 */ 264 @Deprecated 265 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 266 267 /** 268 * @deprecated 269 * Configuration key to set the maximum virtual memory available to the 270 * map tasks (in kilo-bytes). This has been deprecated and will no 271 * longer have any effect. 272 */ 273 @Deprecated 274 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 275 276 /** 277 * @deprecated 278 * Configuration key to set the maximum virtual memory available to the 279 * reduce tasks (in kilo-bytes). This has been deprecated and will no 280 * longer have any effect. 281 */ 282 @Deprecated 283 public static final String MAPRED_REDUCE_TASK_ULIMIT = 284 "mapreduce.reduce.ulimit"; 285 286 287 /** 288 * Configuration key to set the environment of the child map/reduce tasks. 289 * 290 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 291 * reference existing environment variables via <code>$key</code> on 292 * Linux or <code>%key%</code> on Windows. 293 * 294 * Example: 295 * <ul> 296 * <li> A=foo - This will set the env variable A to foo. </li> 297 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 298 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 299 * </ul> 300 * 301 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 302 * {@link #MAPRED_REDUCE_TASK_ENV} 303 */ 304 @Deprecated 305 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 306 307 /** 308 * Configuration key to set the environment of the child map tasks. 309 * 310 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 311 * reference existing environment variables via <code>$key</code> on 312 * Linux or <code>%key%</code> on Windows. 313 * 314 * Example: 315 * <ul> 316 * <li> A=foo - This will set the env variable A to foo. </li> 317 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 318 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 319 * </ul> 320 */ 321 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 322 323 /** 324 * Configuration key to set the environment of the child reduce tasks. 325 * 326 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 327 * reference existing environment variables via <code>$key</code> on 328 * Linux or <code>%key%</code> on Windows. 329 * 330 * Example: 331 * <ul> 332 * <li> A=foo - This will set the env variable A to foo. </li> 333 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 334 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 335 * </ul> 336 */ 337 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 338 339 private Credentials credentials = new Credentials(); 340 341 /** 342 * Configuration key to set the logging {@link Level} for the map task. 343 * 344 * The allowed logging levels are: 345 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 346 */ 347 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 348 JobContext.MAP_LOG_LEVEL; 349 350 /** 351 * Configuration key to set the logging {@link Level} for the reduce task. 352 * 353 * The allowed logging levels are: 354 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 355 */ 356 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 357 JobContext.REDUCE_LOG_LEVEL; 358 359 /** 360 * Default logging level for map/reduce tasks. 361 */ 362 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 363 364 /** 365 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 366 * use {@link MRJobConfig#WORKFLOW_ID} instead 367 */ 368 @Deprecated 369 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID; 370 371 /** 372 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 373 * use {@link MRJobConfig#WORKFLOW_NAME} instead 374 */ 375 @Deprecated 376 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME; 377 378 /** 379 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 380 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead 381 */ 382 @Deprecated 383 public static final String WORKFLOW_NODE_NAME = 384 MRJobConfig.WORKFLOW_NODE_NAME; 385 386 /** 387 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 388 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead 389 */ 390 @Deprecated 391 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING = 392 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING; 393 394 /** 395 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 396 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead 397 */ 398 @Deprecated 399 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = 400 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN; 401 402 /** 403 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 404 * use {@link MRJobConfig#WORKFLOW_TAGS} instead 405 */ 406 @Deprecated 407 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS; 408 409 /** 410 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 411 * not use it 412 */ 413 @Deprecated 414 public static final String MAPREDUCE_RECOVER_JOB = 415 "mapreduce.job.restart.recover"; 416 417 /** 418 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 419 * not use it 420 */ 421 @Deprecated 422 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; 423 424 /** 425 * Construct a map/reduce job configuration. 426 */ 427 public JobConf() { 428 checkAndWarnDeprecation(); 429 } 430 431 /** 432 * Construct a map/reduce job configuration. 433 * 434 * @param exampleClass a class whose containing jar is used as the job's jar. 435 */ 436 public JobConf(Class exampleClass) { 437 setJarByClass(exampleClass); 438 checkAndWarnDeprecation(); 439 } 440 441 /** 442 * Construct a map/reduce job configuration. 443 * 444 * @param conf a Configuration whose settings will be inherited. 445 */ 446 public JobConf(Configuration conf) { 447 super(conf); 448 449 if (conf instanceof JobConf) { 450 JobConf that = (JobConf)conf; 451 credentials = that.credentials; 452 } 453 454 checkAndWarnDeprecation(); 455 } 456 457 458 /** Construct a map/reduce job configuration. 459 * 460 * @param conf a Configuration whose settings will be inherited. 461 * @param exampleClass a class whose containing jar is used as the job's jar. 462 */ 463 public JobConf(Configuration conf, Class exampleClass) { 464 this(conf); 465 setJarByClass(exampleClass); 466 } 467 468 469 /** Construct a map/reduce configuration. 470 * 471 * @param config a Configuration-format XML job description file. 472 */ 473 public JobConf(String config) { 474 this(new Path(config)); 475 } 476 477 /** Construct a map/reduce configuration. 478 * 479 * @param config a Configuration-format XML job description file. 480 */ 481 public JobConf(Path config) { 482 super(); 483 addResource(config); 484 checkAndWarnDeprecation(); 485 } 486 487 /** A new map/reduce configuration where the behavior of reading from the 488 * default resources can be turned off. 489 * <p> 490 * If the parameter {@code loadDefaults} is false, the new instance 491 * will not load resources from the default files. 492 * 493 * @param loadDefaults specifies whether to load from the default files 494 */ 495 public JobConf(boolean loadDefaults) { 496 super(loadDefaults); 497 checkAndWarnDeprecation(); 498 } 499 500 /** 501 * Get credentials for the job. 502 * @return credentials for the job 503 */ 504 public Credentials getCredentials() { 505 return credentials; 506 } 507 508 @Private 509 public void setCredentials(Credentials credentials) { 510 this.credentials = credentials; 511 } 512 513 /** 514 * Get the user jar for the map-reduce job. 515 * 516 * @return the user jar for the map-reduce job. 517 */ 518 public String getJar() { return get(JobContext.JAR); } 519 520 /** 521 * Set the user jar for the map-reduce job. 522 * 523 * @param jar the user jar for the map-reduce job. 524 */ 525 public void setJar(String jar) { set(JobContext.JAR, jar); } 526 527 /** 528 * Get the pattern for jar contents to unpack on the tasktracker 529 */ 530 public Pattern getJarUnpackPattern() { 531 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 532 } 533 534 535 /** 536 * Set the job's jar file by finding an example class location. 537 * 538 * @param cls the example class. 539 */ 540 public void setJarByClass(Class cls) { 541 String jar = ClassUtil.findContainingJar(cls); 542 if (jar != null) { 543 setJar(jar); 544 } 545 } 546 547 public String[] getLocalDirs() throws IOException { 548 return getTrimmedStrings(MRConfig.LOCAL_DIR); 549 } 550 551 /** 552 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 553 */ 554 @Deprecated 555 public void deleteLocalFiles() throws IOException { 556 String[] localDirs = getLocalDirs(); 557 for (int i = 0; i < localDirs.length; i++) { 558 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 559 } 560 } 561 562 public void deleteLocalFiles(String subdir) throws IOException { 563 String[] localDirs = getLocalDirs(); 564 for (int i = 0; i < localDirs.length; i++) { 565 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 566 } 567 } 568 569 /** 570 * Constructs a local file name. Files are distributed among configured 571 * local directories. 572 */ 573 public Path getLocalPath(String pathString) throws IOException { 574 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 575 } 576 577 /** 578 * Get the reported username for this job. 579 * 580 * @return the username 581 */ 582 public String getUser() { 583 return get(JobContext.USER_NAME); 584 } 585 586 /** 587 * Set the reported username for this job. 588 * 589 * @param user the username for this job. 590 */ 591 public void setUser(String user) { 592 set(JobContext.USER_NAME, user); 593 } 594 595 596 597 /** 598 * Set whether the framework should keep the intermediate files for 599 * failed tasks. 600 * 601 * @param keep <code>true</code> if framework should keep the intermediate files 602 * for failed tasks, <code>false</code> otherwise. 603 * 604 */ 605 public void setKeepFailedTaskFiles(boolean keep) { 606 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 607 } 608 609 /** 610 * Should the temporary files for failed tasks be kept? 611 * 612 * @return should the files be kept? 613 */ 614 public boolean getKeepFailedTaskFiles() { 615 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 616 } 617 618 /** 619 * Set a regular expression for task names that should be kept. 620 * The regular expression ".*_m_000123_0" would keep the files 621 * for the first instance of map 123 that ran. 622 * 623 * @param pattern the java.util.regex.Pattern to match against the 624 * task names. 625 */ 626 public void setKeepTaskFilesPattern(String pattern) { 627 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 628 } 629 630 /** 631 * Get the regular expression that is matched against the task names 632 * to see if we need to keep the files. 633 * 634 * @return the pattern as a string, if it was set, othewise null. 635 */ 636 public String getKeepTaskFilesPattern() { 637 return get(JobContext.PRESERVE_FILES_PATTERN); 638 } 639 640 /** 641 * Set the current working directory for the default file system. 642 * 643 * @param dir the new current working directory. 644 */ 645 public void setWorkingDirectory(Path dir) { 646 dir = new Path(getWorkingDirectory(), dir); 647 set(JobContext.WORKING_DIR, dir.toString()); 648 } 649 650 /** 651 * Get the current working directory for the default file system. 652 * 653 * @return the directory name. 654 */ 655 public Path getWorkingDirectory() { 656 String name = get(JobContext.WORKING_DIR); 657 if (name != null) { 658 return new Path(name); 659 } else { 660 try { 661 Path dir = FileSystem.get(this).getWorkingDirectory(); 662 set(JobContext.WORKING_DIR, dir.toString()); 663 return dir; 664 } catch (IOException e) { 665 throw new RuntimeException(e); 666 } 667 } 668 } 669 670 /** 671 * Sets the number of tasks that a spawned task JVM should run 672 * before it exits 673 * @param numTasks the number of tasks to execute; defaults to 1; 674 * -1 signifies no limit 675 */ 676 public void setNumTasksToExecutePerJvm(int numTasks) { 677 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 678 } 679 680 /** 681 * Get the number of tasks that a spawned JVM should execute 682 */ 683 public int getNumTasksToExecutePerJvm() { 684 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 685 } 686 687 /** 688 * Get the {@link InputFormat} implementation for the map-reduce job, 689 * defaults to {@link TextInputFormat} if not specified explicity. 690 * 691 * @return the {@link InputFormat} implementation for the map-reduce job. 692 */ 693 public InputFormat getInputFormat() { 694 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 695 TextInputFormat.class, 696 InputFormat.class), 697 this); 698 } 699 700 /** 701 * Set the {@link InputFormat} implementation for the map-reduce job. 702 * 703 * @param theClass the {@link InputFormat} implementation for the map-reduce 704 * job. 705 */ 706 public void setInputFormat(Class<? extends InputFormat> theClass) { 707 setClass("mapred.input.format.class", theClass, InputFormat.class); 708 } 709 710 /** 711 * Get the {@link OutputFormat} implementation for the map-reduce job, 712 * defaults to {@link TextOutputFormat} if not specified explicity. 713 * 714 * @return the {@link OutputFormat} implementation for the map-reduce job. 715 */ 716 public OutputFormat getOutputFormat() { 717 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 718 TextOutputFormat.class, 719 OutputFormat.class), 720 this); 721 } 722 723 /** 724 * Get the {@link OutputCommitter} implementation for the map-reduce job, 725 * defaults to {@link FileOutputCommitter} if not specified explicitly. 726 * 727 * @return the {@link OutputCommitter} implementation for the map-reduce job. 728 */ 729 public OutputCommitter getOutputCommitter() { 730 return (OutputCommitter)ReflectionUtils.newInstance( 731 getClass("mapred.output.committer.class", FileOutputCommitter.class, 732 OutputCommitter.class), this); 733 } 734 735 /** 736 * Set the {@link OutputCommitter} implementation for the map-reduce job. 737 * 738 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 739 * job. 740 */ 741 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 742 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 743 } 744 745 /** 746 * Set the {@link OutputFormat} implementation for the map-reduce job. 747 * 748 * @param theClass the {@link OutputFormat} implementation for the map-reduce 749 * job. 750 */ 751 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 752 setClass("mapred.output.format.class", theClass, OutputFormat.class); 753 } 754 755 /** 756 * Should the map outputs be compressed before transfer? 757 * 758 * @param compress should the map outputs be compressed? 759 */ 760 public void setCompressMapOutput(boolean compress) { 761 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 762 } 763 764 /** 765 * Are the outputs of the maps be compressed? 766 * 767 * @return <code>true</code> if the outputs of the maps are to be compressed, 768 * <code>false</code> otherwise. 769 */ 770 public boolean getCompressMapOutput() { 771 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 772 } 773 774 /** 775 * Set the given class as the {@link CompressionCodec} for the map outputs. 776 * 777 * @param codecClass the {@link CompressionCodec} class that will compress 778 * the map outputs. 779 */ 780 public void 781 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 782 setCompressMapOutput(true); 783 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 784 CompressionCodec.class); 785 } 786 787 /** 788 * Get the {@link CompressionCodec} for compressing the map outputs. 789 * 790 * @param defaultValue the {@link CompressionCodec} to return if not set 791 * @return the {@link CompressionCodec} class that should be used to compress the 792 * map outputs. 793 * @throws IllegalArgumentException if the class was specified, but not found 794 */ 795 public Class<? extends CompressionCodec> 796 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 797 Class<? extends CompressionCodec> codecClass = defaultValue; 798 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 799 if (name != null) { 800 try { 801 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 802 } catch (ClassNotFoundException e) { 803 throw new IllegalArgumentException("Compression codec " + name + 804 " was not found.", e); 805 } 806 } 807 return codecClass; 808 } 809 810 /** 811 * Get the key class for the map output data. If it is not set, use the 812 * (final) output key class. This allows the map output key class to be 813 * different than the final output key class. 814 * 815 * @return the map output key class. 816 */ 817 public Class<?> getMapOutputKeyClass() { 818 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 819 if (retv == null) { 820 retv = getOutputKeyClass(); 821 } 822 return retv; 823 } 824 825 /** 826 * Set the key class for the map output data. This allows the user to 827 * specify the map output key class to be different than the final output 828 * value class. 829 * 830 * @param theClass the map output key class. 831 */ 832 public void setMapOutputKeyClass(Class<?> theClass) { 833 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 834 } 835 836 /** 837 * Get the value class for the map output data. If it is not set, use the 838 * (final) output value class This allows the map output value class to be 839 * different than the final output value class. 840 * 841 * @return the map output value class. 842 */ 843 public Class<?> getMapOutputValueClass() { 844 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 845 Object.class); 846 if (retv == null) { 847 retv = getOutputValueClass(); 848 } 849 return retv; 850 } 851 852 /** 853 * Set the value class for the map output data. This allows the user to 854 * specify the map output value class to be different than the final output 855 * value class. 856 * 857 * @param theClass the map output value class. 858 */ 859 public void setMapOutputValueClass(Class<?> theClass) { 860 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 861 } 862 863 /** 864 * Get the key class for the job output data. 865 * 866 * @return the key class for the job output data. 867 */ 868 public Class<?> getOutputKeyClass() { 869 return getClass(JobContext.OUTPUT_KEY_CLASS, 870 LongWritable.class, Object.class); 871 } 872 873 /** 874 * Set the key class for the job output data. 875 * 876 * @param theClass the key class for the job output data. 877 */ 878 public void setOutputKeyClass(Class<?> theClass) { 879 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 880 } 881 882 /** 883 * Get the {@link RawComparator} comparator used to compare keys. 884 * 885 * @return the {@link RawComparator} comparator used to compare keys. 886 */ 887 public RawComparator getOutputKeyComparator() { 888 Class<? extends RawComparator> theClass = getClass( 889 JobContext.KEY_COMPARATOR, null, RawComparator.class); 890 if (theClass != null) 891 return ReflectionUtils.newInstance(theClass, this); 892 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); 893 } 894 895 /** 896 * Set the {@link RawComparator} comparator used to compare keys. 897 * 898 * @param theClass the {@link RawComparator} comparator used to 899 * compare keys. 900 * @see #setOutputValueGroupingComparator(Class) 901 */ 902 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 903 setClass(JobContext.KEY_COMPARATOR, 904 theClass, RawComparator.class); 905 } 906 907 /** 908 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 909 * 910 * @param keySpec the key specification of the form -k pos1[,pos2], where, 911 * pos is of the form f[.c][opts], where f is the number 912 * of the key field to use, and c is the number of the first character from 913 * the beginning of the field. Fields and character posns are numbered 914 * starting with 1; a character position of zero in pos2 indicates the 915 * field's last character. If '.c' is omitted from pos1, it defaults to 1 916 * (the beginning of the field); if omitted from pos2, it defaults to 0 917 * (the end of the field). opts are ordering options. The supported options 918 * are: 919 * -n, (Sort numerically) 920 * -r, (Reverse the result of comparison) 921 */ 922 public void setKeyFieldComparatorOptions(String keySpec) { 923 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 924 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 925 } 926 927 /** 928 * Get the {@link KeyFieldBasedComparator} options 929 */ 930 public String getKeyFieldComparatorOption() { 931 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 932 } 933 934 /** 935 * Set the {@link KeyFieldBasedPartitioner} options used for 936 * {@link Partitioner} 937 * 938 * @param keySpec the key specification of the form -k pos1[,pos2], where, 939 * pos is of the form f[.c][opts], where f is the number 940 * of the key field to use, and c is the number of the first character from 941 * the beginning of the field. Fields and character posns are numbered 942 * starting with 1; a character position of zero in pos2 indicates the 943 * field's last character. If '.c' is omitted from pos1, it defaults to 1 944 * (the beginning of the field); if omitted from pos2, it defaults to 0 945 * (the end of the field). 946 */ 947 public void setKeyFieldPartitionerOptions(String keySpec) { 948 setPartitionerClass(KeyFieldBasedPartitioner.class); 949 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 950 } 951 952 /** 953 * Get the {@link KeyFieldBasedPartitioner} options 954 */ 955 public String getKeyFieldPartitionerOption() { 956 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 957 } 958 959 /** 960 * Get the user defined {@link WritableComparable} comparator for 961 * grouping keys of inputs to the combiner. 962 * 963 * @return comparator set by the user for grouping values. 964 * @see #setCombinerKeyGroupingComparator(Class) for details. 965 */ 966 public RawComparator getCombinerKeyGroupingComparator() { 967 Class<? extends RawComparator> theClass = getClass( 968 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class); 969 if (theClass == null) { 970 return getOutputKeyComparator(); 971 } 972 973 return ReflectionUtils.newInstance(theClass, this); 974 } 975 976 /** 977 * Get the user defined {@link WritableComparable} comparator for 978 * grouping keys of inputs to the reduce. 979 * 980 * @return comparator set by the user for grouping values. 981 * @see #setOutputValueGroupingComparator(Class) for details. 982 */ 983 public RawComparator getOutputValueGroupingComparator() { 984 Class<? extends RawComparator> theClass = getClass( 985 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 986 if (theClass == null) { 987 return getOutputKeyComparator(); 988 } 989 990 return ReflectionUtils.newInstance(theClass, this); 991 } 992 993 /** 994 * Set the user defined {@link RawComparator} comparator for 995 * grouping keys in the input to the combiner. 996 * 997 * <p>This comparator should be provided if the equivalence rules for keys 998 * for sorting the intermediates are different from those for grouping keys 999 * before each call to 1000 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 1001 * 1002 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 1003 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 1004 * 1005 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1006 * how keys are sorted, this can be used in conjunction to simulate 1007 * <i>secondary sort on values</i>.</p> 1008 * 1009 * <p><i>Note</i>: This is not a guarantee of the combiner sort being 1010 * <i>stable</i> in any sense. (In any case, with the order of available 1011 * map-outputs to the combiner being non-deterministic, it wouldn't make 1012 * that much sense.)</p> 1013 * 1014 * @param theClass the comparator class to be used for grouping keys for the 1015 * combiner. It should implement <code>RawComparator</code>. 1016 * @see #setOutputKeyComparatorClass(Class) 1017 */ 1018 public void setCombinerKeyGroupingComparator( 1019 Class<? extends RawComparator> theClass) { 1020 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS, 1021 theClass, RawComparator.class); 1022 } 1023 1024 /** 1025 * Set the user defined {@link RawComparator} comparator for 1026 * grouping keys in the input to the reduce. 1027 * 1028 * <p>This comparator should be provided if the equivalence rules for keys 1029 * for sorting the intermediates are different from those for grouping keys 1030 * before each call to 1031 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 1032 * 1033 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 1034 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 1035 * 1036 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1037 * how keys are sorted, this can be used in conjunction to simulate 1038 * <i>secondary sort on values</i>.</p> 1039 * 1040 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 1041 * <i>stable</i> in any sense. (In any case, with the order of available 1042 * map-outputs to the reduce being non-deterministic, it wouldn't make 1043 * that much sense.)</p> 1044 * 1045 * @param theClass the comparator class to be used for grouping keys. 1046 * It should implement <code>RawComparator</code>. 1047 * @see #setOutputKeyComparatorClass(Class) 1048 * @see #setCombinerKeyGroupingComparator(Class) 1049 */ 1050 public void setOutputValueGroupingComparator( 1051 Class<? extends RawComparator> theClass) { 1052 setClass(JobContext.GROUP_COMPARATOR_CLASS, 1053 theClass, RawComparator.class); 1054 } 1055 1056 /** 1057 * Should the framework use the new context-object code for running 1058 * the mapper? 1059 * @return true, if the new api should be used 1060 */ 1061 public boolean getUseNewMapper() { 1062 return getBoolean("mapred.mapper.new-api", false); 1063 } 1064 /** 1065 * Set whether the framework should use the new api for the mapper. 1066 * This is the default for jobs submitted with the new Job api. 1067 * @param flag true, if the new api should be used 1068 */ 1069 public void setUseNewMapper(boolean flag) { 1070 setBoolean("mapred.mapper.new-api", flag); 1071 } 1072 1073 /** 1074 * Should the framework use the new context-object code for running 1075 * the reducer? 1076 * @return true, if the new api should be used 1077 */ 1078 public boolean getUseNewReducer() { 1079 return getBoolean("mapred.reducer.new-api", false); 1080 } 1081 /** 1082 * Set whether the framework should use the new api for the reducer. 1083 * This is the default for jobs submitted with the new Job api. 1084 * @param flag true, if the new api should be used 1085 */ 1086 public void setUseNewReducer(boolean flag) { 1087 setBoolean("mapred.reducer.new-api", flag); 1088 } 1089 1090 /** 1091 * Get the value class for job outputs. 1092 * 1093 * @return the value class for job outputs. 1094 */ 1095 public Class<?> getOutputValueClass() { 1096 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 1097 } 1098 1099 /** 1100 * Set the value class for job outputs. 1101 * 1102 * @param theClass the value class for job outputs. 1103 */ 1104 public void setOutputValueClass(Class<?> theClass) { 1105 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 1106 } 1107 1108 /** 1109 * Get the {@link Mapper} class for the job. 1110 * 1111 * @return the {@link Mapper} class for the job. 1112 */ 1113 public Class<? extends Mapper> getMapperClass() { 1114 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 1115 } 1116 1117 /** 1118 * Set the {@link Mapper} class for the job. 1119 * 1120 * @param theClass the {@link Mapper} class for the job. 1121 */ 1122 public void setMapperClass(Class<? extends Mapper> theClass) { 1123 setClass("mapred.mapper.class", theClass, Mapper.class); 1124 } 1125 1126 /** 1127 * Get the {@link MapRunnable} class for the job. 1128 * 1129 * @return the {@link MapRunnable} class for the job. 1130 */ 1131 public Class<? extends MapRunnable> getMapRunnerClass() { 1132 return getClass("mapred.map.runner.class", 1133 MapRunner.class, MapRunnable.class); 1134 } 1135 1136 /** 1137 * Expert: Set the {@link MapRunnable} class for the job. 1138 * 1139 * Typically used to exert greater control on {@link Mapper}s. 1140 * 1141 * @param theClass the {@link MapRunnable} class for the job. 1142 */ 1143 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1144 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1145 } 1146 1147 /** 1148 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1149 * to be sent to the {@link Reducer}s. 1150 * 1151 * @return the {@link Partitioner} used to partition map-outputs. 1152 */ 1153 public Class<? extends Partitioner> getPartitionerClass() { 1154 return getClass("mapred.partitioner.class", 1155 HashPartitioner.class, Partitioner.class); 1156 } 1157 1158 /** 1159 * Set the {@link Partitioner} class used to partition 1160 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1161 * 1162 * @param theClass the {@link Partitioner} used to partition map-outputs. 1163 */ 1164 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1165 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1166 } 1167 1168 /** 1169 * Get the {@link Reducer} class for the job. 1170 * 1171 * @return the {@link Reducer} class for the job. 1172 */ 1173 public Class<? extends Reducer> getReducerClass() { 1174 return getClass("mapred.reducer.class", 1175 IdentityReducer.class, Reducer.class); 1176 } 1177 1178 /** 1179 * Set the {@link Reducer} class for the job. 1180 * 1181 * @param theClass the {@link Reducer} class for the job. 1182 */ 1183 public void setReducerClass(Class<? extends Reducer> theClass) { 1184 setClass("mapred.reducer.class", theClass, Reducer.class); 1185 } 1186 1187 /** 1188 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1189 * before being sent to the reducers. Typically the combiner is same as the 1190 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1191 * 1192 * @return the user-defined combiner class used to combine map-outputs. 1193 */ 1194 public Class<? extends Reducer> getCombinerClass() { 1195 return getClass("mapred.combiner.class", null, Reducer.class); 1196 } 1197 1198 /** 1199 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1200 * before being sent to the reducers. 1201 * 1202 * <p>The combiner is an application-specified aggregation operation, which 1203 * can help cut down the amount of data transferred between the 1204 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1205 * 1206 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1207 * the mapper and reducer tasks. In general, the combiner is called as the 1208 * sort/merge result is written to disk. The combiner must: 1209 * <ul> 1210 * <li> be side-effect free</li> 1211 * <li> have the same input and output key types and the same input and 1212 * output value types</li> 1213 * </ul> 1214 * 1215 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1216 * job i.e. {@link #setReducerClass(Class)}.</p> 1217 * 1218 * @param theClass the user-defined combiner class used to combine 1219 * map-outputs. 1220 */ 1221 public void setCombinerClass(Class<? extends Reducer> theClass) { 1222 setClass("mapred.combiner.class", theClass, Reducer.class); 1223 } 1224 1225 /** 1226 * Should speculative execution be used for this job? 1227 * Defaults to <code>true</code>. 1228 * 1229 * @return <code>true</code> if speculative execution be used for this job, 1230 * <code>false</code> otherwise. 1231 */ 1232 public boolean getSpeculativeExecution() { 1233 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1234 } 1235 1236 /** 1237 * Turn speculative execution on or off for this job. 1238 * 1239 * @param speculativeExecution <code>true</code> if speculative execution 1240 * should be turned on, else <code>false</code>. 1241 */ 1242 public void setSpeculativeExecution(boolean speculativeExecution) { 1243 setMapSpeculativeExecution(speculativeExecution); 1244 setReduceSpeculativeExecution(speculativeExecution); 1245 } 1246 1247 /** 1248 * Should speculative execution be used for this job for map tasks? 1249 * Defaults to <code>true</code>. 1250 * 1251 * @return <code>true</code> if speculative execution be 1252 * used for this job for map tasks, 1253 * <code>false</code> otherwise. 1254 */ 1255 public boolean getMapSpeculativeExecution() { 1256 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1257 } 1258 1259 /** 1260 * Turn speculative execution on or off for this job for map tasks. 1261 * 1262 * @param speculativeExecution <code>true</code> if speculative execution 1263 * should be turned on for map tasks, 1264 * else <code>false</code>. 1265 */ 1266 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1267 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1268 } 1269 1270 /** 1271 * Should speculative execution be used for this job for reduce tasks? 1272 * Defaults to <code>true</code>. 1273 * 1274 * @return <code>true</code> if speculative execution be used 1275 * for reduce tasks for this job, 1276 * <code>false</code> otherwise. 1277 */ 1278 public boolean getReduceSpeculativeExecution() { 1279 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1280 } 1281 1282 /** 1283 * Turn speculative execution on or off for this job for reduce tasks. 1284 * 1285 * @param speculativeExecution <code>true</code> if speculative execution 1286 * should be turned on for reduce tasks, 1287 * else <code>false</code>. 1288 */ 1289 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1290 setBoolean(JobContext.REDUCE_SPECULATIVE, 1291 speculativeExecution); 1292 } 1293 1294 /** 1295 * Get configured the number of reduce tasks for this job. 1296 * Defaults to <code>1</code>. 1297 * 1298 * @return the number of reduce tasks for this job. 1299 */ 1300 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1301 1302 /** 1303 * Set the number of map tasks for this job. 1304 * 1305 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1306 * number of spawned map tasks depends on the number of {@link InputSplit}s 1307 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1308 * 1309 * A custom {@link InputFormat} is typically used to accurately control 1310 * the number of map tasks for the job.</p> 1311 * 1312 * <b id="NoOfMaps">How many maps?</b> 1313 * 1314 * <p>The number of maps is usually driven by the total size of the inputs 1315 * i.e. total number of blocks of the input files.</p> 1316 * 1317 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1318 * per-node, although it has been set up to 300 or so for very cpu-light map 1319 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1320 * minute to execute.</p> 1321 * 1322 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1323 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1324 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1325 * input files is treated as an upper bound for input splits. A lower bound 1326 * on the split size can be set via 1327 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> 1328 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1329 * 1330 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1331 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1332 * used to set it even higher.</p> 1333 * 1334 * @param n the number of map tasks for this job. 1335 * @see InputFormat#getSplits(JobConf, int) 1336 * @see FileInputFormat 1337 * @see FileSystem#getDefaultBlockSize() 1338 * @see FileStatus#getBlockSize() 1339 */ 1340 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1341 1342 /** 1343 * Get configured the number of reduce tasks for this job. Defaults to 1344 * <code>1</code>. 1345 * 1346 * @return the number of reduce tasks for this job. 1347 */ 1348 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1349 1350 /** 1351 * Set the requisite number of reduce tasks for this job. 1352 * 1353 * <b id="NoOfReduces">How many reduces?</b> 1354 * 1355 * <p>The right number of reduces seems to be <code>0.95</code> or 1356 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> * 1357 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum"> 1358 * mapreduce.tasktracker.reduce.tasks.maximum</a>). 1359 * </p> 1360 * 1361 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1362 * start transfering map outputs as the maps finish. With <code>1.75</code> 1363 * the faster nodes will finish their first round of reduces and launch a 1364 * second wave of reduces doing a much better job of load balancing.</p> 1365 * 1366 * <p>Increasing the number of reduces increases the framework overhead, but 1367 * increases load balancing and lowers the cost of failures.</p> 1368 * 1369 * <p>The scaling factors above are slightly less than whole numbers to 1370 * reserve a few reduce slots in the framework for speculative-tasks, failures 1371 * etc.</p> 1372 * 1373 * <b id="ReducerNone">Reducer NONE</b> 1374 * 1375 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1376 * 1377 * <p>In this case the output of the map-tasks directly go to distributed 1378 * file-system, to the path set by 1379 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1380 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1381 * 1382 * @param n the number of reduce tasks for this job. 1383 */ 1384 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1385 1386 /** 1387 * Get the configured number of maximum attempts that will be made to run a 1388 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1389 * property. If this property is not already set, the default is 4 attempts. 1390 * 1391 * @return the max number of attempts per map task. 1392 */ 1393 public int getMaxMapAttempts() { 1394 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1395 } 1396 1397 /** 1398 * Expert: Set the number of maximum attempts that will be made to run a 1399 * map task. 1400 * 1401 * @param n the number of attempts per map task. 1402 */ 1403 public void setMaxMapAttempts(int n) { 1404 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1405 } 1406 1407 /** 1408 * Get the configured number of maximum attempts that will be made to run a 1409 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1410 * property. If this property is not already set, the default is 4 attempts. 1411 * 1412 * @return the max number of attempts per reduce task. 1413 */ 1414 public int getMaxReduceAttempts() { 1415 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1416 } 1417 /** 1418 * Expert: Set the number of maximum attempts that will be made to run a 1419 * reduce task. 1420 * 1421 * @param n the number of attempts per reduce task. 1422 */ 1423 public void setMaxReduceAttempts(int n) { 1424 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1425 } 1426 1427 /** 1428 * Get the user-specified job name. This is only used to identify the 1429 * job to the user. 1430 * 1431 * @return the job's name, defaulting to "". 1432 */ 1433 public String getJobName() { 1434 return get(JobContext.JOB_NAME, ""); 1435 } 1436 1437 /** 1438 * Set the user-specified job name. 1439 * 1440 * @param name the job's new name. 1441 */ 1442 public void setJobName(String name) { 1443 set(JobContext.JOB_NAME, name); 1444 } 1445 1446 /** 1447 * Get the user-specified session identifier. The default is the empty string. 1448 * 1449 * The session identifier is used to tag metric data that is reported to some 1450 * performance metrics system via the org.apache.hadoop.metrics API. The 1451 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1452 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1453 * HOD will set the session identifier by modifying the mapred-site.xml file 1454 * before starting the cluster. 1455 * 1456 * When not running under HOD, this identifer is expected to remain set to 1457 * the empty string. 1458 * 1459 * @return the session identifier, defaulting to "". 1460 */ 1461 @Deprecated 1462 public String getSessionId() { 1463 return get("session.id", ""); 1464 } 1465 1466 /** 1467 * Set the user-specified session identifier. 1468 * 1469 * @param sessionId the new session id. 1470 */ 1471 @Deprecated 1472 public void setSessionId(String sessionId) { 1473 set("session.id", sessionId); 1474 } 1475 1476 /** 1477 * Set the maximum no. of failures of a given job per tasktracker. 1478 * If the no. of task failures exceeds <code>noFailures</code>, the 1479 * tasktracker is <i>blacklisted</i> for this job. 1480 * 1481 * @param noFailures maximum no. of failures of a given job per tasktracker. 1482 */ 1483 public void setMaxTaskFailuresPerTracker(int noFailures) { 1484 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1485 } 1486 1487 /** 1488 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1489 * If the no. of task failures exceeds this, the tasktracker is 1490 * <i>blacklisted</i> for this job. 1491 * 1492 * @return the maximum no. of failures of a given job per tasktracker. 1493 */ 1494 public int getMaxTaskFailuresPerTracker() { 1495 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); 1496 } 1497 1498 /** 1499 * Get the maximum percentage of map tasks that can fail without 1500 * the job being aborted. 1501 * 1502 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1503 * attempts before being declared as <i>failed</i>. 1504 * 1505 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1506 * the job being declared as {@link JobStatus#FAILED}. 1507 * 1508 * @return the maximum percentage of map tasks that can fail without 1509 * the job being aborted. 1510 */ 1511 public int getMaxMapTaskFailuresPercent() { 1512 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1513 } 1514 1515 /** 1516 * Expert: Set the maximum percentage of map tasks that can fail without the 1517 * job being aborted. 1518 * 1519 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1520 * before being declared as <i>failed</i>. 1521 * 1522 * @param percent the maximum percentage of map tasks that can fail without 1523 * the job being aborted. 1524 */ 1525 public void setMaxMapTaskFailuresPercent(int percent) { 1526 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1527 } 1528 1529 /** 1530 * Get the maximum percentage of reduce tasks that can fail without 1531 * the job being aborted. 1532 * 1533 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1534 * attempts before being declared as <i>failed</i>. 1535 * 1536 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1537 * in the job being declared as {@link JobStatus#FAILED}. 1538 * 1539 * @return the maximum percentage of reduce tasks that can fail without 1540 * the job being aborted. 1541 */ 1542 public int getMaxReduceTaskFailuresPercent() { 1543 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1544 } 1545 1546 /** 1547 * Set the maximum percentage of reduce tasks that can fail without the job 1548 * being aborted. 1549 * 1550 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1551 * attempts before being declared as <i>failed</i>. 1552 * 1553 * @param percent the maximum percentage of reduce tasks that can fail without 1554 * the job being aborted. 1555 */ 1556 public void setMaxReduceTaskFailuresPercent(int percent) { 1557 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1558 } 1559 1560 /** 1561 * Set {@link JobPriority} for this job. 1562 * 1563 * @param prio the {@link JobPriority} for this job. 1564 */ 1565 public void setJobPriority(JobPriority prio) { 1566 set(JobContext.PRIORITY, prio.toString()); 1567 } 1568 1569 /** 1570 * Get the {@link JobPriority} for this job. 1571 * 1572 * @return the {@link JobPriority} for this job. 1573 */ 1574 public JobPriority getJobPriority() { 1575 String prio = get(JobContext.PRIORITY); 1576 if(prio == null) { 1577 return JobPriority.NORMAL; 1578 } 1579 1580 return JobPriority.valueOf(prio); 1581 } 1582 1583 /** 1584 * Set JobSubmitHostName for this job. 1585 * 1586 * @param hostname the JobSubmitHostName for this job. 1587 */ 1588 void setJobSubmitHostName(String hostname) { 1589 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1590 } 1591 1592 /** 1593 * Get the JobSubmitHostName for this job. 1594 * 1595 * @return the JobSubmitHostName for this job. 1596 */ 1597 String getJobSubmitHostName() { 1598 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1599 1600 return hostname; 1601 } 1602 1603 /** 1604 * Set JobSubmitHostAddress for this job. 1605 * 1606 * @param hostadd the JobSubmitHostAddress for this job. 1607 */ 1608 void setJobSubmitHostAddress(String hostadd) { 1609 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1610 } 1611 1612 /** 1613 * Get JobSubmitHostAddress for this job. 1614 * 1615 * @return JobSubmitHostAddress for this job. 1616 */ 1617 String getJobSubmitHostAddress() { 1618 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1619 1620 return hostadd; 1621 } 1622 1623 /** 1624 * Get whether the task profiling is enabled. 1625 * @return true if some tasks will be profiled 1626 */ 1627 public boolean getProfileEnabled() { 1628 return getBoolean(JobContext.TASK_PROFILE, false); 1629 } 1630 1631 /** 1632 * Set whether the system should collect profiler information for some of 1633 * the tasks in this job? The information is stored in the user log 1634 * directory. 1635 * @param newValue true means it should be gathered 1636 */ 1637 public void setProfileEnabled(boolean newValue) { 1638 setBoolean(JobContext.TASK_PROFILE, newValue); 1639 } 1640 1641 /** 1642 * Get the profiler configuration arguments. 1643 * 1644 * The default value for this property is 1645 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1646 * 1647 * @return the parameters to pass to the task child to configure profiling 1648 */ 1649 public String getProfileParams() { 1650 return get(JobContext.TASK_PROFILE_PARAMS, 1651 MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS); 1652 } 1653 1654 /** 1655 * Set the profiler configuration arguments. If the string contains a '%s' it 1656 * will be replaced with the name of the profiling output file when the task 1657 * runs. 1658 * 1659 * This value is passed to the task child JVM on the command line. 1660 * 1661 * @param value the configuration string 1662 */ 1663 public void setProfileParams(String value) { 1664 set(JobContext.TASK_PROFILE_PARAMS, value); 1665 } 1666 1667 /** 1668 * Get the range of maps or reduces to profile. 1669 * @param isMap is the task a map? 1670 * @return the task ranges 1671 */ 1672 public IntegerRanges getProfileTaskRange(boolean isMap) { 1673 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1674 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1675 } 1676 1677 /** 1678 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1679 * must also be called. 1680 * @param newValue a set of integer ranges of the map ids 1681 */ 1682 public void setProfileTaskRange(boolean isMap, String newValue) { 1683 // parse the value to make sure it is legal 1684 new Configuration.IntegerRanges(newValue); 1685 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1686 newValue); 1687 } 1688 1689 /** 1690 * Set the debug script to run when the map tasks fail. 1691 * 1692 * <p>The debug script can aid debugging of failed map tasks. The script is 1693 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1694 * 1695 * <p>The debug command, run on the node where the map failed, is:</p> 1696 * <p><blockquote><pre> 1697 * $script $stdout $stderr $syslog $jobconf. 1698 * </pre></blockquote> 1699 * 1700 * <p> The script file is distributed through {@link DistributedCache} 1701 * APIs. The script needs to be symlinked. </p> 1702 * 1703 * <p>Here is an example on how to submit a script 1704 * <p><blockquote><pre> 1705 * job.setMapDebugScript("./myscript"); 1706 * DistributedCache.createSymlink(job); 1707 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1708 * </pre></blockquote> 1709 * 1710 * @param mDbgScript the script name 1711 */ 1712 public void setMapDebugScript(String mDbgScript) { 1713 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1714 } 1715 1716 /** 1717 * Get the map task's debug script. 1718 * 1719 * @return the debug Script for the mapred job for failed map tasks. 1720 * @see #setMapDebugScript(String) 1721 */ 1722 public String getMapDebugScript() { 1723 return get(JobContext.MAP_DEBUG_SCRIPT); 1724 } 1725 1726 /** 1727 * Set the debug script to run when the reduce tasks fail. 1728 * 1729 * <p>The debug script can aid debugging of failed reduce tasks. The script 1730 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1731 * 1732 * <p>The debug command, run on the node where the map failed, is:</p> 1733 * <p><blockquote><pre> 1734 * $script $stdout $stderr $syslog $jobconf. 1735 * </pre></blockquote> 1736 * 1737 * <p> The script file is distributed through {@link DistributedCache} 1738 * APIs. The script file needs to be symlinked </p> 1739 * 1740 * <p>Here is an example on how to submit a script 1741 * <p><blockquote><pre> 1742 * job.setReduceDebugScript("./myscript"); 1743 * DistributedCache.createSymlink(job); 1744 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1745 * </pre></blockquote> 1746 * 1747 * @param rDbgScript the script name 1748 */ 1749 public void setReduceDebugScript(String rDbgScript) { 1750 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1751 } 1752 1753 /** 1754 * Get the reduce task's debug Script 1755 * 1756 * @return the debug script for the mapred job for failed reduce tasks. 1757 * @see #setReduceDebugScript(String) 1758 */ 1759 public String getReduceDebugScript() { 1760 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1761 } 1762 1763 /** 1764 * Get the uri to be invoked in-order to send a notification after the job 1765 * has completed (success/failure). 1766 * 1767 * @return the job end notification uri, <code>null</code> if it hasn't 1768 * been set. 1769 * @see #setJobEndNotificationURI(String) 1770 */ 1771 public String getJobEndNotificationURI() { 1772 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1773 } 1774 1775 /** 1776 * Set the uri to be invoked in-order to send a notification after the job 1777 * has completed (success/failure). 1778 * 1779 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1780 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1781 * identifier and completion-status respectively.</p> 1782 * 1783 * <p>This is typically used by application-writers to implement chaining of 1784 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1785 * 1786 * @param uri the job end notification uri 1787 * @see JobStatus 1788 */ 1789 public void setJobEndNotificationURI(String uri) { 1790 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1791 } 1792 1793 /** 1794 * Get job-specific shared directory for use as scratch space 1795 * 1796 * <p> 1797 * When a job starts, a shared directory is created at location 1798 * <code> 1799 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1800 * This directory is exposed to the users through 1801 * <code>mapreduce.job.local.dir </code>. 1802 * So, the tasks can use this space 1803 * as scratch space and share files among them. </p> 1804 * This value is available as System property also. 1805 * 1806 * @return The localized job specific shared directory 1807 */ 1808 public String getJobLocalDir() { 1809 return get(JobContext.JOB_LOCAL_DIR); 1810 } 1811 1812 /** 1813 * Get memory required to run a map task of the job, in MB. 1814 * 1815 * If a value is specified in the configuration, it is returned. 1816 * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}. 1817 * <p> 1818 * For backward compatibility, if the job configuration sets the 1819 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1820 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1821 * after converting it from bytes to MB. 1822 * @return memory required to run a map task of the job, in MB, 1823 */ 1824 public long getMemoryForMapTask() { 1825 long value = getDeprecatedMemoryValue(); 1826 if (value < 0) { 1827 return getMemoryRequired(TaskType.MAP); 1828 } 1829 return value; 1830 } 1831 1832 public void setMemoryForMapTask(long mem) { 1833 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1834 // In case that M/R 1.x applications use the old property name 1835 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1836 } 1837 1838 /** 1839 * Get memory required to run a reduce task of the job, in MB. 1840 * 1841 * If a value is specified in the configuration, it is returned. 1842 * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}. 1843 * <p> 1844 * For backward compatibility, if the job configuration sets the 1845 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1846 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1847 * after converting it from bytes to MB. 1848 * @return memory required to run a reduce task of the job, in MB. 1849 */ 1850 public long getMemoryForReduceTask() { 1851 long value = getDeprecatedMemoryValue(); 1852 if (value < 0) { 1853 return getMemoryRequired(TaskType.REDUCE); 1854 } 1855 return value; 1856 } 1857 1858 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1859 // converted into MBs. 1860 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1861 // value. 1862 private long getDeprecatedMemoryValue() { 1863 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1864 DISABLED_MEMORY_LIMIT); 1865 if (oldValue > 0) { 1866 oldValue /= (1024*1024); 1867 } 1868 return oldValue; 1869 } 1870 1871 public void setMemoryForReduceTask(long mem) { 1872 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1873 // In case that M/R 1.x applications use the old property name 1874 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1875 } 1876 1877 /** 1878 * Return the name of the queue to which this job is submitted. 1879 * Defaults to 'default'. 1880 * 1881 * @return name of the queue 1882 */ 1883 public String getQueueName() { 1884 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1885 } 1886 1887 /** 1888 * Set the name of the queue to which this job should be submitted. 1889 * 1890 * @param queueName Name of the queue 1891 */ 1892 public void setQueueName(String queueName) { 1893 set(JobContext.QUEUE_NAME, queueName); 1894 } 1895 1896 /** 1897 * Normalize the negative values in configuration 1898 * 1899 * @param val 1900 * @return normalized value 1901 */ 1902 public static long normalizeMemoryConfigValue(long val) { 1903 if (val < 0) { 1904 val = DISABLED_MEMORY_LIMIT; 1905 } 1906 return val; 1907 } 1908 1909 /** 1910 * Find a jar that contains a class of the same name, if any. 1911 * It will return a jar file, even if that is not the first thing 1912 * on the class path that has a class with the same name. 1913 * 1914 * @param my_class the class to find. 1915 * @return a jar file that contains the class, or null. 1916 */ 1917 public static String findContainingJar(Class my_class) { 1918 return ClassUtil.findContainingJar(my_class); 1919 } 1920 1921 /** 1922 * Get the memory required to run a task of this job, in bytes. See 1923 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1924 * <p> 1925 * This method is deprecated. Now, different memory limits can be 1926 * set for map and reduce tasks of a job, in MB. 1927 * <p> 1928 * For backward compatibility, if the job configuration sets the 1929 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. 1930 * Otherwise, this method will return the larger of the values returned by 1931 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 1932 * after converting them into bytes. 1933 * 1934 * @return Memory required to run a task of this job, in bytes. 1935 * @see #setMaxVirtualMemoryForTask(long) 1936 * @deprecated Use {@link #getMemoryForMapTask()} and 1937 * {@link #getMemoryForReduceTask()} 1938 */ 1939 @Deprecated 1940 public long getMaxVirtualMemoryForTask() { 1941 LOG.warn( 1942 "getMaxVirtualMemoryForTask() is deprecated. " + 1943 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 1944 1945 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1946 Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024); 1947 return value; 1948 } 1949 1950 /** 1951 * Set the maximum amount of memory any task of this job can use. See 1952 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1953 * <p> 1954 * mapred.task.maxvmem is split into 1955 * mapreduce.map.memory.mb 1956 * and mapreduce.map.memory.mb,mapred 1957 * each of the new key are set 1958 * as mapred.task.maxvmem / 1024 1959 * as new values are in MB 1960 * 1961 * @param vmem Maximum amount of virtual memory in bytes any task of this job 1962 * can use. 1963 * @see #getMaxVirtualMemoryForTask() 1964 * @deprecated 1965 * Use {@link #setMemoryForMapTask(long mem)} and 1966 * Use {@link #setMemoryForReduceTask(long mem)} 1967 */ 1968 @Deprecated 1969 public void setMaxVirtualMemoryForTask(long vmem) { 1970 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 1971 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 1972 if (vmem < 0) { 1973 throw new IllegalArgumentException("Task memory allocation may not be < 0"); 1974 } 1975 1976 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 1977 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 1978 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 1979 }else{ 1980 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 1981 } 1982 } 1983 1984 /** 1985 * @deprecated this variable is deprecated and nolonger in use. 1986 */ 1987 @Deprecated 1988 public long getMaxPhysicalMemoryForTask() { 1989 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 1990 + " Refer to the APIs getMemoryForMapTask() and" 1991 + " getMemoryForReduceTask() for details."); 1992 return -1; 1993 } 1994 1995 /* 1996 * @deprecated this 1997 */ 1998 @Deprecated 1999 public void setMaxPhysicalMemoryForTask(long mem) { 2000 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 2001 + " The value set is ignored. Refer to " 2002 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 2003 } 2004 2005 static String deprecatedString(String key) { 2006 return "The variable " + key + " is no longer used."; 2007 } 2008 2009 private void checkAndWarnDeprecation() { 2010 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 2011 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 2012 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY 2013 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY); 2014 } 2015 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 2016 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 2017 } 2018 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 2019 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 2020 } 2021 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 2022 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 2023 } 2024 } 2025 2026 private String getConfiguredTaskJavaOpts(TaskType taskType) { 2027 String userClasspath = ""; 2028 String adminClasspath = ""; 2029 if (taskType == TaskType.MAP) { 2030 userClasspath = get(MAPRED_MAP_TASK_JAVA_OPTS, 2031 get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS)); 2032 adminClasspath = get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, 2033 MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); 2034 } else { 2035 userClasspath = get(MAPRED_REDUCE_TASK_JAVA_OPTS, 2036 get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS)); 2037 adminClasspath = get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, 2038 MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); 2039 } 2040 2041 return adminClasspath + " " + userClasspath; 2042 } 2043 2044 @Private 2045 public String getTaskJavaOpts(TaskType taskType) { 2046 String javaOpts = getConfiguredTaskJavaOpts(taskType); 2047 2048 if (!javaOpts.contains("-Xmx")) { 2049 float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO, 2050 MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO); 2051 2052 if (heapRatio > 1.0f || heapRatio < 0) { 2053 LOG.warn("Invalid value for " + MRJobConfig.HEAP_MEMORY_MB_RATIO 2054 + ", using the default."); 2055 heapRatio = MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO; 2056 } 2057 2058 int taskContainerMb = getMemoryRequired(taskType); 2059 int taskHeapSize = (int)Math.ceil(taskContainerMb * heapRatio); 2060 2061 String xmxArg = String.format("-Xmx%dm", taskHeapSize); 2062 LOG.info("Task java-opts do not specify heap size. Setting task attempt" + 2063 " jvm max heap size to " + xmxArg); 2064 2065 javaOpts += " " + xmxArg; 2066 } 2067 2068 return javaOpts; 2069 } 2070 2071 /** 2072 * Parse the Maximum heap size from the java opts as specified by the -Xmx option 2073 * Format: -Xmx<size>[g|G|m|M|k|K] 2074 * @param javaOpts String to parse to read maximum heap size 2075 * @return Maximum heap size in MB or -1 if not specified 2076 */ 2077 @Private 2078 @VisibleForTesting 2079 public static int parseMaximumHeapSizeMB(String javaOpts) { 2080 // Find the last matching -Xmx following word boundaries 2081 Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts); 2082 if (m.matches()) { 2083 long size = Long.parseLong(m.group(1)); 2084 if (size <= 0) { 2085 return -1; 2086 } 2087 if (m.group(2).isEmpty()) { 2088 // -Xmx specified in bytes 2089 return (int) (size / (1024 * 1024)); 2090 } 2091 char unit = m.group(2).charAt(0); 2092 switch (unit) { 2093 case 'g': 2094 case 'G': 2095 // -Xmx specified in GB 2096 return (int) (size * 1024); 2097 case 'm': 2098 case 'M': 2099 // -Xmx specified in MB 2100 return (int) size; 2101 case 'k': 2102 case 'K': 2103 // -Xmx specified in KB 2104 return (int) (size / 1024); 2105 } 2106 } 2107 // -Xmx not specified 2108 return -1; 2109 } 2110 2111 private int getMemoryRequiredHelper( 2112 String configName, int defaultValue, int heapSize, float heapRatio) { 2113 int memory = getInt(configName, -1); 2114 if (memory <= 0) { 2115 if (heapSize > 0) { 2116 memory = (int) Math.ceil(heapSize / heapRatio); 2117 LOG.info("Figured value for " + configName + " from javaOpts"); 2118 } else { 2119 memory = defaultValue; 2120 } 2121 } 2122 2123 return memory; 2124 } 2125 2126 @Private 2127 public int getMemoryRequired(TaskType taskType) { 2128 int memory = 1024; 2129 int heapSize = parseMaximumHeapSizeMB(getConfiguredTaskJavaOpts(taskType)); 2130 float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO, 2131 MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO); 2132 if (taskType == TaskType.MAP) { 2133 return getMemoryRequiredHelper(MRJobConfig.MAP_MEMORY_MB, 2134 MRJobConfig.DEFAULT_MAP_MEMORY_MB, heapSize, heapRatio); 2135 } else if (taskType == TaskType.REDUCE) { 2136 return getMemoryRequiredHelper(MRJobConfig.REDUCE_MEMORY_MB, 2137 MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, heapSize, heapRatio); 2138 } else { 2139 return memory; 2140 } 2141 } 2142 2143 /* For debugging. Dump configurations to system output as XML format. */ 2144 public static void main(String[] args) throws Exception { 2145 new JobConf(new Configuration()).writeXml(System.out); 2146 } 2147 2148} 2149