001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceAudience.Private; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.PathFilter; 034import org.apache.hadoop.mapreduce.JobContext; 035import org.apache.hadoop.mapreduce.JobStatus; 036import org.apache.hadoop.mapreduce.MRJobConfig; 037import org.apache.hadoop.mapreduce.OutputCommitter; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.TaskAttemptID; 040 041/** An {@link OutputCommitter} that commits files specified 042 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 043 **/ 044@InterfaceAudience.Public 045@InterfaceStability.Stable 046public class FileOutputCommitter extends OutputCommitter { 047 private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class); 048 049 /** 050 * Name of directory where pending data is placed. Data that has not been 051 * committed yet. 052 */ 053 public static final String PENDING_DIR_NAME = "_temporary"; 054 /** 055 * Temporary directory name 056 * 057 * The static variable to be compatible with M/R 1.x 058 */ 059 @Deprecated 060 protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME; 061 public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; 062 public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 063 "mapreduce.fileoutputcommitter.marksuccessfuljobs"; 064 public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = 065 "mapreduce.fileoutputcommitter.algorithm.version"; 066 public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2; 067 private Path outputPath = null; 068 private Path workPath = null; 069 private final int algorithmVersion; 070 071 /** 072 * Create a file output committer 073 * @param outputPath the job's output path, or null if you want the output 074 * committer to act as a noop. 075 * @param context the task's context 076 * @throws IOException 077 */ 078 public FileOutputCommitter(Path outputPath, 079 TaskAttemptContext context) throws IOException { 080 this(outputPath, (JobContext)context); 081 if (outputPath != null) { 082 workPath = getTaskAttemptPath(context, outputPath); 083 } 084 } 085 086 /** 087 * Create a file output committer 088 * @param outputPath the job's output path, or null if you want the output 089 * committer to act as a noop. 090 * @param context the task's context 091 * @throws IOException 092 */ 093 @Private 094 public FileOutputCommitter(Path outputPath, 095 JobContext context) throws IOException { 096 Configuration conf = context.getConfiguration(); 097 algorithmVersion = 098 conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 099 FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); 100 LOG.info("File Output Committer Algorithm version is " + algorithmVersion); 101 if (algorithmVersion != 1 && algorithmVersion != 2) { 102 throw new IOException("Only 1 or 2 algorithm version is supported"); 103 } 104 if (outputPath != null) { 105 FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); 106 this.outputPath = fs.makeQualified(outputPath); 107 } 108 } 109 110 /** 111 * @return the path where final output of the job should be placed. This 112 * could also be considered the committed application attempt path. 113 */ 114 private Path getOutputPath() { 115 return this.outputPath; 116 } 117 118 /** 119 * @return true if we have an output path set, else false. 120 */ 121 private boolean hasOutputPath() { 122 return this.outputPath != null; 123 } 124 125 /** 126 * @return the path where the output of pending job attempts are 127 * stored. 128 */ 129 private Path getPendingJobAttemptsPath() { 130 return getPendingJobAttemptsPath(getOutputPath()); 131 } 132 133 /** 134 * Get the location of pending job attempts. 135 * @param out the base output directory. 136 * @return the location of pending job attempts. 137 */ 138 private static Path getPendingJobAttemptsPath(Path out) { 139 return new Path(out, PENDING_DIR_NAME); 140 } 141 142 /** 143 * Get the Application Attempt Id for this job 144 * @param context the context to look in 145 * @return the Application Attempt Id for a given job. 146 */ 147 private static int getAppAttemptId(JobContext context) { 148 return context.getConfiguration().getInt( 149 MRJobConfig.APPLICATION_ATTEMPT_ID, 0); 150 } 151 152 /** 153 * Compute the path where the output of a given job attempt will be placed. 154 * @param context the context of the job. This is used to get the 155 * application attempt id. 156 * @return the path to store job attempt data. 157 */ 158 public Path getJobAttemptPath(JobContext context) { 159 return getJobAttemptPath(context, getOutputPath()); 160 } 161 162 /** 163 * Compute the path where the output of a given job attempt will be placed. 164 * @param context the context of the job. This is used to get the 165 * application attempt id. 166 * @param out the output path to place these in. 167 * @return the path to store job attempt data. 168 */ 169 public static Path getJobAttemptPath(JobContext context, Path out) { 170 return getJobAttemptPath(getAppAttemptId(context), out); 171 } 172 173 /** 174 * Compute the path where the output of a given job attempt will be placed. 175 * @param appAttemptId the ID of the application attempt for this job. 176 * @return the path to store job attempt data. 177 */ 178 protected Path getJobAttemptPath(int appAttemptId) { 179 return getJobAttemptPath(appAttemptId, getOutputPath()); 180 } 181 182 /** 183 * Compute the path where the output of a given job attempt will be placed. 184 * @param appAttemptId the ID of the application attempt for this job. 185 * @return the path to store job attempt data. 186 */ 187 private static Path getJobAttemptPath(int appAttemptId, Path out) { 188 return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId)); 189 } 190 191 /** 192 * Compute the path where the output of pending task attempts are stored. 193 * @param context the context of the job with pending tasks. 194 * @return the path where the output of pending task attempts are stored. 195 */ 196 private Path getPendingTaskAttemptsPath(JobContext context) { 197 return getPendingTaskAttemptsPath(context, getOutputPath()); 198 } 199 200 /** 201 * Compute the path where the output of pending task attempts are stored. 202 * @param context the context of the job with pending tasks. 203 * @return the path where the output of pending task attempts are stored. 204 */ 205 private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { 206 return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME); 207 } 208 209 /** 210 * Compute the path where the output of a task attempt is stored until 211 * that task is committed. 212 * 213 * @param context the context of the task attempt. 214 * @return the path where a task attempt should be stored. 215 */ 216 public Path getTaskAttemptPath(TaskAttemptContext context) { 217 return new Path(getPendingTaskAttemptsPath(context), 218 String.valueOf(context.getTaskAttemptID())); 219 } 220 221 /** 222 * Compute the path where the output of a task attempt is stored until 223 * that task is committed. 224 * 225 * @param context the context of the task attempt. 226 * @param out The output path to put things in. 227 * @return the path where a task attempt should be stored. 228 */ 229 public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { 230 return new Path(getPendingTaskAttemptsPath(context, out), 231 String.valueOf(context.getTaskAttemptID())); 232 } 233 234 /** 235 * Compute the path where the output of a committed task is stored until 236 * the entire job is committed. 237 * @param context the context of the task attempt 238 * @return the path where the output of a committed task is stored until 239 * the entire job is committed. 240 */ 241 public Path getCommittedTaskPath(TaskAttemptContext context) { 242 return getCommittedTaskPath(getAppAttemptId(context), context); 243 } 244 245 public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) { 246 return getCommittedTaskPath(getAppAttemptId(context), context, out); 247 } 248 249 /** 250 * Compute the path where the output of a committed task is stored until the 251 * entire job is committed for a specific application attempt. 252 * @param appAttemptId the id of the application attempt to use 253 * @param context the context of any task. 254 * @return the path where the output of a committed task is stored. 255 */ 256 protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { 257 return new Path(getJobAttemptPath(appAttemptId), 258 String.valueOf(context.getTaskAttemptID().getTaskID())); 259 } 260 261 private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) { 262 return new Path(getJobAttemptPath(appAttemptId, out), 263 String.valueOf(context.getTaskAttemptID().getTaskID())); 264 } 265 266 private static class CommittedTaskFilter implements PathFilter { 267 @Override 268 public boolean accept(Path path) { 269 return !PENDING_DIR_NAME.equals(path.getName()); 270 } 271 } 272 273 /** 274 * Get a list of all paths where output from committed tasks are stored. 275 * @param context the context of the current job 276 * @return the list of these Paths/FileStatuses. 277 * @throws IOException 278 */ 279 private FileStatus[] getAllCommittedTaskPaths(JobContext context) 280 throws IOException { 281 Path jobAttemptPath = getJobAttemptPath(context); 282 FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration()); 283 return fs.listStatus(jobAttemptPath, new CommittedTaskFilter()); 284 } 285 286 /** 287 * Get the directory that the task should write results into. 288 * @return the work directory 289 * @throws IOException 290 */ 291 public Path getWorkPath() throws IOException { 292 return workPath; 293 } 294 295 /** 296 * Create the temporary directory that is the root of all of the task 297 * work directories. 298 * @param context the job's context 299 */ 300 public void setupJob(JobContext context) throws IOException { 301 if (hasOutputPath()) { 302 Path jobAttemptPath = getJobAttemptPath(context); 303 FileSystem fs = jobAttemptPath.getFileSystem( 304 context.getConfiguration()); 305 if (!fs.mkdirs(jobAttemptPath)) { 306 LOG.error("Mkdirs failed to create " + jobAttemptPath); 307 } 308 } else { 309 LOG.warn("Output Path is null in setupJob()"); 310 } 311 } 312 313 /** 314 * The job has completed so move all committed tasks to the final output dir. 315 * Delete the temporary directory, including all of the work directories. 316 * Create a _SUCCESS file to make it as successful. 317 * @param context the job's context 318 */ 319 public void commitJob(JobContext context) throws IOException { 320 if (hasOutputPath()) { 321 Path finalOutput = getOutputPath(); 322 FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); 323 324 if (algorithmVersion == 1) { 325 for (FileStatus stat: getAllCommittedTaskPaths(context)) { 326 mergePaths(fs, stat, finalOutput); 327 } 328 } 329 330 // delete the _temporary folder and create a _done file in the o/p folder 331 cleanupJob(context); 332 // True if the job requires output.dir marked on successful job. 333 // Note that by default it is set to true. 334 if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { 335 Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); 336 fs.create(markerPath).close(); 337 } 338 } else { 339 LOG.warn("Output Path is null in commitJob()"); 340 } 341 } 342 343 /** 344 * Merge two paths together. Anything in from will be moved into to, if there 345 * are any name conflicts while merging the files or directories in from win. 346 * @param fs the File System to use 347 * @param from the path data is coming from. 348 * @param to the path data is going to. 349 * @throws IOException on any error 350 */ 351 private void mergePaths(FileSystem fs, final FileStatus from, 352 final Path to) throws IOException { 353 if (LOG.isDebugEnabled()) { 354 LOG.debug("Merging data from " + from + " to " + to); 355 } 356 FileStatus toStat; 357 try { 358 toStat = fs.getFileStatus(to); 359 } catch (FileNotFoundException fnfe) { 360 toStat = null; 361 } 362 363 if (from.isFile()) { 364 if (toStat != null) { 365 if (!fs.delete(to, true)) { 366 throw new IOException("Failed to delete " + to); 367 } 368 } 369 370 if (!fs.rename(from.getPath(), to)) { 371 throw new IOException("Failed to rename " + from + " to " + to); 372 } 373 } else if (from.isDirectory()) { 374 if (toStat != null) { 375 if (!toStat.isDirectory()) { 376 if (!fs.delete(to, true)) { 377 throw new IOException("Failed to delete " + to); 378 } 379 renameOrMerge(fs, from, to); 380 } else { 381 //It is a directory so merge everything in the directories 382 for (FileStatus subFrom : fs.listStatus(from.getPath())) { 383 Path subTo = new Path(to, subFrom.getPath().getName()); 384 mergePaths(fs, subFrom, subTo); 385 } 386 } 387 } else { 388 renameOrMerge(fs, from, to); 389 } 390 } 391 } 392 393 private void renameOrMerge(FileSystem fs, FileStatus from, Path to) 394 throws IOException { 395 if (algorithmVersion == 1) { 396 if (!fs.rename(from.getPath(), to)) { 397 throw new IOException("Failed to rename " + from + " to " + to); 398 } 399 } else { 400 fs.mkdirs(to); 401 for (FileStatus subFrom : fs.listStatus(from.getPath())) { 402 Path subTo = new Path(to, subFrom.getPath().getName()); 403 mergePaths(fs, subFrom, subTo); 404 } 405 } 406 } 407 408 @Override 409 @Deprecated 410 public void cleanupJob(JobContext context) throws IOException { 411 if (hasOutputPath()) { 412 Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); 413 FileSystem fs = pendingJobAttemptsPath 414 .getFileSystem(context.getConfiguration()); 415 fs.delete(pendingJobAttemptsPath, true); 416 } else { 417 LOG.warn("Output Path is null in cleanupJob()"); 418 } 419 } 420 421 /** 422 * Delete the temporary directory, including all of the work directories. 423 * @param context the job's context 424 */ 425 @Override 426 public void abortJob(JobContext context, JobStatus.State state) 427 throws IOException { 428 // delete the _temporary folder 429 cleanupJob(context); 430 } 431 432 /** 433 * No task setup required. 434 */ 435 @Override 436 public void setupTask(TaskAttemptContext context) throws IOException { 437 // FileOutputCommitter's setupTask doesn't do anything. Because the 438 // temporary task directory is created on demand when the 439 // task is writing. 440 } 441 442 /** 443 * Move the files from the work directory to the job output directory 444 * @param context the task context 445 */ 446 @Override 447 public void commitTask(TaskAttemptContext context) 448 throws IOException { 449 commitTask(context, null); 450 } 451 452 @Private 453 public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 454 throws IOException { 455 456 TaskAttemptID attemptId = context.getTaskAttemptID(); 457 if (hasOutputPath()) { 458 context.progress(); 459 if(taskAttemptPath == null) { 460 taskAttemptPath = getTaskAttemptPath(context); 461 } 462 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 463 FileStatus taskAttemptDirStatus; 464 try { 465 taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath); 466 } catch (FileNotFoundException e) { 467 taskAttemptDirStatus = null; 468 } 469 470 if (taskAttemptDirStatus != null) { 471 if (algorithmVersion == 1) { 472 Path committedTaskPath = getCommittedTaskPath(context); 473 if (fs.exists(committedTaskPath)) { 474 if (!fs.delete(committedTaskPath, true)) { 475 throw new IOException("Could not delete " + committedTaskPath); 476 } 477 } 478 if (!fs.rename(taskAttemptPath, committedTaskPath)) { 479 throw new IOException("Could not rename " + taskAttemptPath + " to " 480 + committedTaskPath); 481 } 482 LOG.info("Saved output of task '" + attemptId + "' to " + 483 committedTaskPath); 484 } else { 485 // directly merge everything from taskAttemptPath to output directory 486 mergePaths(fs, taskAttemptDirStatus, outputPath); 487 LOG.info("Saved output of task '" + attemptId + "' to " + 488 outputPath); 489 } 490 } else { 491 LOG.warn("No Output found for " + attemptId); 492 } 493 } else { 494 LOG.warn("Output Path is null in commitTask()"); 495 } 496 } 497 498 /** 499 * Delete the work directory 500 * @throws IOException 501 */ 502 @Override 503 public void abortTask(TaskAttemptContext context) throws IOException { 504 abortTask(context, null); 505 } 506 507 @Private 508 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { 509 if (hasOutputPath()) { 510 context.progress(); 511 if(taskAttemptPath == null) { 512 taskAttemptPath = getTaskAttemptPath(context); 513 } 514 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 515 if(!fs.delete(taskAttemptPath, true)) { 516 LOG.warn("Could not delete "+taskAttemptPath); 517 } 518 } else { 519 LOG.warn("Output Path is null in abortTask()"); 520 } 521 } 522 523 /** 524 * Did this task write any files in the work directory? 525 * @param context the task's context 526 */ 527 @Override 528 public boolean needsTaskCommit(TaskAttemptContext context 529 ) throws IOException { 530 return needsTaskCommit(context, null); 531 } 532 533 @Private 534 public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath 535 ) throws IOException { 536 if(hasOutputPath()) { 537 if(taskAttemptPath == null) { 538 taskAttemptPath = getTaskAttemptPath(context); 539 } 540 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 541 return fs.exists(taskAttemptPath); 542 } 543 return false; 544 } 545 546 @Override 547 @Deprecated 548 public boolean isRecoverySupported() { 549 return true; 550 } 551 552 @Override 553 public void recoverTask(TaskAttemptContext context) 554 throws IOException { 555 if(hasOutputPath()) { 556 context.progress(); 557 TaskAttemptID attemptId = context.getTaskAttemptID(); 558 int previousAttempt = getAppAttemptId(context) - 1; 559 if (previousAttempt < 0) { 560 throw new IOException ("Cannot recover task output for first attempt..."); 561 } 562 563 Path previousCommittedTaskPath = getCommittedTaskPath( 564 previousAttempt, context); 565 FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration()); 566 if (LOG.isDebugEnabled()) { 567 LOG.debug("Trying to recover task from " + previousCommittedTaskPath); 568 } 569 if (algorithmVersion == 1) { 570 if (fs.exists(previousCommittedTaskPath)) { 571 Path committedTaskPath = getCommittedTaskPath(context); 572 if (fs.exists(committedTaskPath)) { 573 if (!fs.delete(committedTaskPath, true)) { 574 throw new IOException("Could not delete "+committedTaskPath); 575 } 576 } 577 //Rename can fail if the parent directory does not yet exist. 578 Path committedParent = committedTaskPath.getParent(); 579 fs.mkdirs(committedParent); 580 if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { 581 throw new IOException("Could not rename " + previousCommittedTaskPath + 582 " to " + committedTaskPath); 583 } 584 } else { 585 LOG.warn(attemptId+" had no output to recover."); 586 } 587 } else { 588 // essentially a no-op, but for backwards compatibility 589 // after upgrade to the new fileOutputCommitter, 590 // check if there are any output left in committedTaskPath 591 if (fs.exists(previousCommittedTaskPath)) { 592 LOG.info("Recovering task for upgrading scenario, moving files from " 593 + previousCommittedTaskPath + " to " + outputPath); 594 FileStatus from = fs.getFileStatus(previousCommittedTaskPath); 595 mergePaths(fs, from, outputPath); 596 } 597 LOG.info("Done recovering task " + attemptId); 598 } 599 } else { 600 LOG.warn("Output Path is null in recoverTask()"); 601 } 602 } 603}