001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.tools; 020 021import java.io.IOException; 022import java.util.Random; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.conf.Configured; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.FileUtil; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.io.Text; 034import org.apache.hadoop.mapreduce.Cluster; 035import org.apache.hadoop.mapreduce.Job; 036import org.apache.hadoop.mapreduce.JobContext; 037import org.apache.hadoop.mapreduce.JobSubmissionFiles; 038import org.apache.hadoop.tools.CopyListing.*; 039import org.apache.hadoop.tools.mapred.CopyMapper; 040import org.apache.hadoop.tools.mapred.CopyOutputFormat; 041import org.apache.hadoop.tools.util.DistCpUtils; 042import org.apache.hadoop.util.ShutdownHookManager; 043import org.apache.hadoop.util.Tool; 044import org.apache.hadoop.util.ToolRunner; 045 046import com.google.common.annotations.VisibleForTesting; 047 048/** 049 * DistCp is the main driver-class for DistCpV2. 050 * For command-line use, DistCp::main() orchestrates the parsing of command-line 051 * parameters and the launch of the DistCp job. 052 * For programmatic use, a DistCp object can be constructed by specifying 053 * options (in a DistCpOptions object), and DistCp::execute() may be used to 054 * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune 055 * behaviour. 056 */ 057@InterfaceAudience.Public 058@InterfaceStability.Evolving 059public class DistCp extends Configured implements Tool { 060 061 /** 062 * Priority of the shutdown hook. 063 */ 064 static final int SHUTDOWN_HOOK_PRIORITY = 30; 065 066 static final Log LOG = LogFactory.getLog(DistCp.class); 067 068 private DistCpOptions inputOptions; 069 private Path metaFolder; 070 071 private static final String PREFIX = "_distcp"; 072 private static final String WIP_PREFIX = "._WIP_"; 073 private static final String DISTCP_DEFAULT_XML = "distcp-default.xml"; 074 static final Random rand = new Random(); 075 076 private boolean submitted; 077 private FileSystem jobFS; 078 079 /** 080 * Public Constructor. Creates DistCp object with specified input-parameters. 081 * (E.g. source-paths, target-location, etc.) 082 * @param inputOptions Options (indicating source-paths, target-location.) 083 * @param configuration The Hadoop configuration against which the Copy-mapper must run. 084 * @throws Exception 085 */ 086 public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception { 087 Configuration config = new Configuration(configuration); 088 config.addResource(DISTCP_DEFAULT_XML); 089 setConf(config); 090 this.inputOptions = inputOptions; 091 this.metaFolder = createMetaFolderPath(); 092 } 093 094 /** 095 * To be used with the ToolRunner. Not for public consumption. 096 */ 097 @VisibleForTesting 098 DistCp() {} 099 100 /** 101 * Implementation of Tool::run(). Orchestrates the copy of source file(s) 102 * to target location, by: 103 * 1. Creating a list of files to be copied to target. 104 * 2. Launching a Map-only job to copy the files. (Delegates to execute().) 105 * @param argv List of arguments passed to DistCp, from the ToolRunner. 106 * @return On success, it returns 0. Else, -1. 107 */ 108 @Override 109 public int run(String[] argv) { 110 if (argv.length < 1) { 111 OptionsParser.usage(); 112 return DistCpConstants.INVALID_ARGUMENT; 113 } 114 115 try { 116 inputOptions = (OptionsParser.parse(argv)); 117 setTargetPathExists(); 118 LOG.info("Input Options: " + inputOptions); 119 } catch (Throwable e) { 120 LOG.error("Invalid arguments: ", e); 121 System.err.println("Invalid arguments: " + e.getMessage()); 122 OptionsParser.usage(); 123 return DistCpConstants.INVALID_ARGUMENT; 124 } 125 126 try { 127 execute(); 128 } catch (InvalidInputException e) { 129 LOG.error("Invalid input: ", e); 130 return DistCpConstants.INVALID_ARGUMENT; 131 } catch (DuplicateFileException e) { 132 LOG.error("Duplicate files in input path: ", e); 133 return DistCpConstants.DUPLICATE_INPUT; 134 } catch (AclsNotSupportedException e) { 135 LOG.error("ACLs not supported on at least one file system: ", e); 136 return DistCpConstants.ACLS_NOT_SUPPORTED; 137 } catch (XAttrsNotSupportedException e) { 138 LOG.error("XAttrs not supported on at least one file system: ", e); 139 return DistCpConstants.XATTRS_NOT_SUPPORTED; 140 } catch (Exception e) { 141 LOG.error("Exception encountered ", e); 142 return DistCpConstants.UNKNOWN_ERROR; 143 } 144 return DistCpConstants.SUCCESS; 145 } 146 147 /** 148 * Implements the core-execution. Creates the file-list for copy, 149 * and launches the Hadoop-job, to do the copy. 150 * @return Job handle 151 * @throws Exception 152 */ 153 public Job execute() throws Exception { 154 Job job = createAndSubmitJob(); 155 156 if (inputOptions.shouldBlock()) { 157 waitForJobCompletion(job); 158 } 159 return job; 160 } 161 162 /** 163 * Create and submit the mapreduce job. 164 * @return The mapreduce job object that has been submitted 165 */ 166 public Job createAndSubmitJob() throws Exception { 167 assert inputOptions != null; 168 assert getConf() != null; 169 Job job = null; 170 try { 171 synchronized(this) { 172 //Don't cleanup while we are setting up. 173 metaFolder = createMetaFolderPath(); 174 jobFS = metaFolder.getFileSystem(getConf()); 175 job = createJob(); 176 } 177 if (inputOptions.shouldUseDiff()) { 178 DistCpSync distCpSync = new DistCpSync(inputOptions, getConf()); 179 if (distCpSync.sync()) { 180 createInputFileListingWithDiff(job, distCpSync); 181 } else { 182 throw new Exception("DistCp sync failed, input options: " 183 + inputOptions); 184 } 185 } 186 187 // Fallback to default DistCp if without "diff" option or sync failed. 188 if (!inputOptions.shouldUseDiff()) { 189 createInputFileListing(job); 190 } 191 192 job.submit(); 193 submitted = true; 194 } finally { 195 if (!submitted) { 196 cleanup(); 197 } 198 } 199 200 String jobID = job.getJobID().toString(); 201 job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); 202 LOG.info("DistCp job-id: " + jobID); 203 204 return job; 205 } 206 207 /** 208 * Wait for the given job to complete. 209 * @param job the given mapreduce job that has already been submitted 210 */ 211 public void waitForJobCompletion(Job job) throws Exception { 212 assert job != null; 213 if (!job.waitForCompletion(true)) { 214 throw new IOException("DistCp failure: Job " + job.getJobID() 215 + " has failed: " + job.getStatus().getFailureInfo()); 216 } 217 } 218 219 /** 220 * Set targetPathExists in both inputOptions and job config, 221 * for the benefit of CopyCommitter 222 */ 223 private void setTargetPathExists() throws IOException { 224 Path target = inputOptions.getTargetPath(); 225 FileSystem targetFS = target.getFileSystem(getConf()); 226 boolean targetExists = targetFS.exists(target); 227 inputOptions.setTargetPathExists(targetExists); 228 getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 229 targetExists); 230 } 231 /** 232 * Create Job object for submitting it, with all the configuration 233 * 234 * @return Reference to job object. 235 * @throws IOException - Exception if any 236 */ 237 private Job createJob() throws IOException { 238 String jobName = "distcp"; 239 String userChosenName = getConf().get(JobContext.JOB_NAME); 240 if (userChosenName != null) 241 jobName += ": " + userChosenName; 242 Job job = Job.getInstance(getConf()); 243 job.setJobName(jobName); 244 job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions)); 245 job.setJarByClass(CopyMapper.class); 246 configureOutputFormat(job); 247 248 job.setMapperClass(CopyMapper.class); 249 job.setNumReduceTasks(0); 250 job.setMapOutputKeyClass(Text.class); 251 job.setMapOutputValueClass(Text.class); 252 job.setOutputFormatClass(CopyOutputFormat.class); 253 job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); 254 job.getConfiguration().set(JobContext.NUM_MAPS, 255 String.valueOf(inputOptions.getMaxMaps())); 256 257 inputOptions.appendToConf(job.getConfiguration()); 258 return job; 259 } 260 261 /** 262 * Setup output format appropriately 263 * 264 * @param job - Job handle 265 * @throws IOException - Exception if any 266 */ 267 private void configureOutputFormat(Job job) throws IOException { 268 final Configuration configuration = job.getConfiguration(); 269 Path targetPath = inputOptions.getTargetPath(); 270 FileSystem targetFS = targetPath.getFileSystem(configuration); 271 targetPath = targetPath.makeQualified(targetFS.getUri(), 272 targetFS.getWorkingDirectory()); 273 if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { 274 DistCpUtils.checkFileSystemAclSupport(targetFS); 275 } 276 if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { 277 DistCpUtils.checkFileSystemXAttrSupport(targetFS); 278 } 279 if (inputOptions.shouldAtomicCommit()) { 280 Path workDir = inputOptions.getAtomicWorkPath(); 281 if (workDir == null) { 282 workDir = targetPath.getParent(); 283 } 284 workDir = new Path(workDir, WIP_PREFIX + targetPath.getName() 285 + rand.nextInt()); 286 FileSystem workFS = workDir.getFileSystem(configuration); 287 if (!FileUtil.compareFs(targetFS, workFS)) { 288 throw new IllegalArgumentException("Work path " + workDir + 289 " and target path " + targetPath + " are in different file system"); 290 } 291 CopyOutputFormat.setWorkingDirectory(job, workDir); 292 } else { 293 CopyOutputFormat.setWorkingDirectory(job, targetPath); 294 } 295 CopyOutputFormat.setCommitDirectory(job, targetPath); 296 297 Path logPath = inputOptions.getLogPath(); 298 if (logPath == null) { 299 logPath = new Path(metaFolder, "_logs"); 300 } else { 301 LOG.info("DistCp job log path: " + logPath); 302 } 303 CopyOutputFormat.setOutputPath(job, logPath); 304 } 305 306 /** 307 * Create input listing by invoking an appropriate copy listing 308 * implementation. Also add delegation tokens for each path 309 * to job's credential store 310 * 311 * @param job - Handle to job 312 * @return Returns the path where the copy listing is created 313 * @throws IOException - If any 314 */ 315 protected Path createInputFileListing(Job job) throws IOException { 316 Path fileListingPath = getFileListingPath(); 317 CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(), 318 job.getCredentials(), inputOptions); 319 copyListing.buildListing(fileListingPath, inputOptions); 320 return fileListingPath; 321 } 322 323 /** 324 * Create input listing based on snapshot diff report. 325 * @param job - Handle to job 326 * @param distCpSync the class wraps the snapshot diff report 327 * @return Returns the path where the copy listing is created 328 * @throws IOException - If any 329 */ 330 private Path createInputFileListingWithDiff(Job job, DistCpSync distCpSync) 331 throws IOException { 332 Path fileListingPath = getFileListingPath(); 333 CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(), 334 job.getCredentials(), distCpSync); 335 copyListing.buildListing(fileListingPath, inputOptions); 336 return fileListingPath; 337 } 338 339 /** 340 * Get default name of the copy listing file. Use the meta folder 341 * to create the copy listing file 342 * 343 * @return - Path where the copy listing file has to be saved 344 * @throws IOException - Exception if any 345 */ 346 protected Path getFileListingPath() throws IOException { 347 String fileListPathStr = metaFolder + "/fileList.seq"; 348 Path path = new Path(fileListPathStr); 349 return new Path(path.toUri().normalize().toString()); 350 } 351 352 /** 353 * Create a default working folder for the job, under the 354 * job staging directory 355 * 356 * @return Returns the working folder information 357 * @throws Exception - Exception if any 358 */ 359 private Path createMetaFolderPath() throws Exception { 360 Configuration configuration = getConf(); 361 Path stagingDir = JobSubmissionFiles.getStagingDir( 362 new Cluster(configuration), configuration); 363 Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt())); 364 if (LOG.isDebugEnabled()) 365 LOG.debug("Meta folder location: " + metaFolderPath); 366 configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString()); 367 return metaFolderPath; 368 } 369 370 /** 371 * Main function of the DistCp program. Parses the input arguments (via OptionsParser), 372 * and invokes the DistCp::run() method, via the ToolRunner. 373 * @param argv Command-line arguments sent to DistCp. 374 */ 375 public static void main(String argv[]) { 376 int exitCode; 377 try { 378 DistCp distCp = new DistCp(); 379 Cleanup CLEANUP = new Cleanup(distCp); 380 381 ShutdownHookManager.get().addShutdownHook(CLEANUP, 382 SHUTDOWN_HOOK_PRIORITY); 383 exitCode = ToolRunner.run(getDefaultConf(), distCp, argv); 384 } 385 catch (Exception e) { 386 LOG.error("Couldn't complete DistCp operation: ", e); 387 exitCode = DistCpConstants.UNKNOWN_ERROR; 388 } 389 System.exit(exitCode); 390 } 391 392 /** 393 * Loads properties from distcp-default.xml into configuration 394 * object 395 * @return Configuration which includes properties from distcp-default.xml 396 */ 397 private static Configuration getDefaultConf() { 398 Configuration config = new Configuration(); 399 config.addResource(DISTCP_DEFAULT_XML); 400 return config; 401 } 402 403 private synchronized void cleanup() { 404 try { 405 if (metaFolder == null) return; 406 407 jobFS.delete(metaFolder, true); 408 metaFolder = null; 409 } catch (IOException e) { 410 LOG.error("Unable to cleanup meta folder: " + metaFolder, e); 411 } 412 } 413 414 private boolean isSubmitted() { 415 return submitted; 416 } 417 418 private static class Cleanup implements Runnable { 419 private final DistCp distCp; 420 421 Cleanup(DistCp distCp) { 422 this.distCp = distCp; 423 } 424 425 @Override 426 public void run() { 427 if (distCp.isSubmitted()) return; 428 429 distCp.cleanup(); 430 } 431 } 432}