001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.tools; 020 021import java.io.IOException; 022import java.net.URL; 023import java.util.Random; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.FileUtil; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.Text; 035import org.apache.hadoop.mapreduce.Cluster; 036import org.apache.hadoop.mapreduce.Job; 037import org.apache.hadoop.mapreduce.JobContext; 038import org.apache.hadoop.mapreduce.JobSubmissionFiles; 039import org.apache.hadoop.tools.CopyListing.*; 040import org.apache.hadoop.tools.mapred.CopyMapper; 041import org.apache.hadoop.tools.mapred.CopyOutputFormat; 042import org.apache.hadoop.tools.util.DistCpUtils; 043import org.apache.hadoop.util.ShutdownHookManager; 044import org.apache.hadoop.util.Tool; 045import org.apache.hadoop.util.ToolRunner; 046 047import com.google.common.annotations.VisibleForTesting; 048 049/** 050 * DistCp is the main driver-class for DistCpV2. 051 * For command-line use, DistCp::main() orchestrates the parsing of command-line 052 * parameters and the launch of the DistCp job. 053 * For programmatic use, a DistCp object can be constructed by specifying 054 * options (in a DistCpOptions object), and DistCp::execute() may be used to 055 * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune 056 * behaviour. 057 */ 058@InterfaceAudience.Public 059@InterfaceStability.Evolving 060public class DistCp extends Configured implements Tool { 061 062 /** 063 * Priority of the shutdown hook. 064 */ 065 static final int SHUTDOWN_HOOK_PRIORITY = 30; 066 067 static final Log LOG = LogFactory.getLog(DistCp.class); 068 069 private DistCpOptions inputOptions; 070 private Path metaFolder; 071 072 private static final String PREFIX = "_distcp"; 073 private static final String WIP_PREFIX = "._WIP_"; 074 private static final String DISTCP_DEFAULT_XML = "distcp-default.xml"; 075 static final Random rand = new Random(); 076 077 private boolean submitted; 078 private FileSystem jobFS; 079 080 /** 081 * Public Constructor. Creates DistCp object with specified input-parameters. 082 * (E.g. source-paths, target-location, etc.) 083 * @param inputOptions Options (indicating source-paths, target-location.) 084 * @param configuration The Hadoop configuration against which the Copy-mapper must run. 085 * @throws Exception 086 */ 087 public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception { 088 Configuration config = new Configuration(configuration); 089 config.addResource(DISTCP_DEFAULT_XML); 090 setConf(config); 091 this.inputOptions = inputOptions; 092 this.metaFolder = createMetaFolderPath(); 093 } 094 095 /** 096 * To be used with the ToolRunner. Not for public consumption. 097 */ 098 @VisibleForTesting 099 DistCp() {} 100 101 /** 102 * Implementation of Tool::run(). Orchestrates the copy of source file(s) 103 * to target location, by: 104 * 1. Creating a list of files to be copied to target. 105 * 2. Launching a Map-only job to copy the files. (Delegates to execute().) 106 * @param argv List of arguments passed to DistCp, from the ToolRunner. 107 * @return On success, it returns 0. Else, -1. 108 */ 109 @Override 110 public int run(String[] argv) { 111 if (argv.length < 1) { 112 OptionsParser.usage(); 113 return DistCpConstants.INVALID_ARGUMENT; 114 } 115 116 try { 117 inputOptions = (OptionsParser.parse(argv)); 118 setTargetPathExists(); 119 LOG.info("Input Options: " + inputOptions); 120 } catch (Throwable e) { 121 LOG.error("Invalid arguments: ", e); 122 System.err.println("Invalid arguments: " + e.getMessage()); 123 OptionsParser.usage(); 124 return DistCpConstants.INVALID_ARGUMENT; 125 } 126 127 try { 128 execute(); 129 } catch (InvalidInputException e) { 130 LOG.error("Invalid input: ", e); 131 return DistCpConstants.INVALID_ARGUMENT; 132 } catch (DuplicateFileException e) { 133 LOG.error("Duplicate files in input path: ", e); 134 return DistCpConstants.DUPLICATE_INPUT; 135 } catch (AclsNotSupportedException e) { 136 LOG.error("ACLs not supported on at least one file system: ", e); 137 return DistCpConstants.ACLS_NOT_SUPPORTED; 138 } catch (XAttrsNotSupportedException e) { 139 LOG.error("XAttrs not supported on at least one file system: ", e); 140 return DistCpConstants.XATTRS_NOT_SUPPORTED; 141 } catch (Exception e) { 142 LOG.error("Exception encountered ", e); 143 return DistCpConstants.UNKNOWN_ERROR; 144 } 145 return DistCpConstants.SUCCESS; 146 } 147 148 /** 149 * Implements the core-execution. Creates the file-list for copy, 150 * and launches the Hadoop-job, to do the copy. 151 * @return Job handle 152 * @throws Exception 153 */ 154 public Job execute() throws Exception { 155 Job job = createAndSubmitJob(); 156 157 if (inputOptions.shouldBlock()) { 158 waitForJobCompletion(job); 159 } 160 return job; 161 } 162 163 /** 164 * Create and submit the mapreduce job. 165 * @return The mapreduce job object that has been submitted 166 */ 167 public Job createAndSubmitJob() throws Exception { 168 assert inputOptions != null; 169 assert getConf() != null; 170 Job job = null; 171 try { 172 synchronized(this) { 173 //Don't cleanup while we are setting up. 174 metaFolder = createMetaFolderPath(); 175 jobFS = metaFolder.getFileSystem(getConf()); 176 job = createJob(); 177 } 178 if (inputOptions.shouldUseDiff()) { 179 DistCpSync distCpSync = new DistCpSync(inputOptions, getConf()); 180 if (distCpSync.sync()) { 181 createInputFileListingWithDiff(job, distCpSync); 182 } else { 183 inputOptions.disableUsingDiff(); 184 } 185 } 186 187 // Fallback to default DistCp if without "diff" option or sync failed. 188 if (!inputOptions.shouldUseDiff()) { 189 createInputFileListing(job); 190 } 191 192 job.submit(); 193 submitted = true; 194 } finally { 195 if (!submitted) { 196 cleanup(); 197 } 198 } 199 200 String jobID = job.getJobID().toString(); 201 job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); 202 LOG.info("DistCp job-id: " + jobID); 203 204 return job; 205 } 206 207 /** 208 * Wait for the given job to complete. 209 * @param job the given mapreduce job that has already been submitted 210 */ 211 public void waitForJobCompletion(Job job) throws Exception { 212 assert job != null; 213 if (!job.waitForCompletion(true)) { 214 throw new IOException("DistCp failure: Job " + job.getJobID() 215 + " has failed: " + job.getStatus().getFailureInfo()); 216 } 217 } 218 219 /** 220 * Set targetPathExists in both inputOptions and job config, 221 * for the benefit of CopyCommitter 222 */ 223 private void setTargetPathExists() throws IOException { 224 Path target = inputOptions.getTargetPath(); 225 FileSystem targetFS = target.getFileSystem(getConf()); 226 boolean targetExists = targetFS.exists(target); 227 inputOptions.setTargetPathExists(targetExists); 228 getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 229 targetExists); 230 } 231 /** 232 * Create Job object for submitting it, with all the configuration 233 * 234 * @return Reference to job object. 235 * @throws IOException - Exception if any 236 */ 237 private Job createJob() throws IOException { 238 String jobName = "distcp"; 239 String userChosenName = getConf().get(JobContext.JOB_NAME); 240 if (userChosenName != null) 241 jobName += ": " + userChosenName; 242 Job job = Job.getInstance(getConf()); 243 job.setJobName(jobName); 244 job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions)); 245 job.setJarByClass(CopyMapper.class); 246 configureOutputFormat(job); 247 248 job.setMapperClass(CopyMapper.class); 249 job.setNumReduceTasks(0); 250 job.setMapOutputKeyClass(Text.class); 251 job.setMapOutputValueClass(Text.class); 252 job.setOutputFormatClass(CopyOutputFormat.class); 253 job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); 254 job.getConfiguration().set(JobContext.NUM_MAPS, 255 String.valueOf(inputOptions.getMaxMaps())); 256 257 if (inputOptions.getSslConfigurationFile() != null) { 258 setupSSLConfig(job); 259 } 260 261 inputOptions.appendToConf(job.getConfiguration()); 262 return job; 263 } 264 265 /** 266 * Setup ssl configuration on the job configuration to enable hsftp access 267 * from map job. Also copy the ssl configuration file to Distributed cache 268 * 269 * @param job - Reference to job's handle 270 * @throws java.io.IOException - Exception if unable to locate ssl config file 271 */ 272 private void setupSSLConfig(Job job) throws IOException { 273 Configuration configuration = job.getConfiguration(); 274 URL sslFileUrl = configuration.getResource(inputOptions 275 .getSslConfigurationFile()); 276 if (sslFileUrl == null) { 277 throw new IOException( 278 "Given ssl configuration file doesn't exist in class path : " 279 + inputOptions.getSslConfigurationFile()); 280 } 281 Path sslConfigPath = new Path(sslFileUrl.toString()); 282 283 addSSLFilesToDistCache(job, sslConfigPath); 284 configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName()); 285 configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName()); 286 } 287 288 /** 289 * Add SSL files to distributed cache. Trust store, key store and ssl config xml 290 * 291 * @param job - Job handle 292 * @param sslConfigPath - ssl Configuration file specified through options 293 * @throws IOException - If any 294 */ 295 private void addSSLFilesToDistCache(Job job, 296 Path sslConfigPath) throws IOException { 297 Configuration configuration = job.getConfiguration(); 298 FileSystem localFS = FileSystem.getLocal(configuration); 299 300 Configuration sslConf = new Configuration(false); 301 sslConf.addResource(sslConfigPath); 302 303 Path localStorePath = getLocalStorePath(sslConf, 304 DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION); 305 job.addCacheFile(localStorePath.makeQualified(localFS.getUri(), 306 localFS.getWorkingDirectory()).toUri()); 307 configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION, 308 localStorePath.getName()); 309 310 localStorePath = getLocalStorePath(sslConf, 311 DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION); 312 job.addCacheFile(localStorePath.makeQualified(localFS.getUri(), 313 localFS.getWorkingDirectory()).toUri()); 314 configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION, 315 localStorePath.getName()); 316 317 job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(), 318 localFS.getWorkingDirectory()).toUri()); 319 320 } 321 322 /** 323 * Get Local Trust store/key store path 324 * 325 * @param sslConf - Config from SSL Client xml 326 * @param storeKey - Key for either trust store or key store 327 * @return - Path where the store is present 328 * @throws IOException -If any 329 */ 330 private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException { 331 if (sslConf.get(storeKey) != null) { 332 return new Path(sslConf.get(storeKey)); 333 } else { 334 throw new IOException("Store for " + storeKey + " is not set in " + 335 inputOptions.getSslConfigurationFile()); 336 } 337 } 338 339 /** 340 * Setup output format appropriately 341 * 342 * @param job - Job handle 343 * @throws IOException - Exception if any 344 */ 345 private void configureOutputFormat(Job job) throws IOException { 346 final Configuration configuration = job.getConfiguration(); 347 Path targetPath = inputOptions.getTargetPath(); 348 FileSystem targetFS = targetPath.getFileSystem(configuration); 349 targetPath = targetPath.makeQualified(targetFS.getUri(), 350 targetFS.getWorkingDirectory()); 351 if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { 352 DistCpUtils.checkFileSystemAclSupport(targetFS); 353 } 354 if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { 355 DistCpUtils.checkFileSystemXAttrSupport(targetFS); 356 } 357 if (inputOptions.shouldAtomicCommit()) { 358 Path workDir = inputOptions.getAtomicWorkPath(); 359 if (workDir == null) { 360 workDir = targetPath.getParent(); 361 } 362 workDir = new Path(workDir, WIP_PREFIX + targetPath.getName() 363 + rand.nextInt()); 364 FileSystem workFS = workDir.getFileSystem(configuration); 365 if (!FileUtil.compareFs(targetFS, workFS)) { 366 throw new IllegalArgumentException("Work path " + workDir + 367 " and target path " + targetPath + " are in different file system"); 368 } 369 CopyOutputFormat.setWorkingDirectory(job, workDir); 370 } else { 371 CopyOutputFormat.setWorkingDirectory(job, targetPath); 372 } 373 CopyOutputFormat.setCommitDirectory(job, targetPath); 374 375 Path logPath = inputOptions.getLogPath(); 376 if (logPath == null) { 377 logPath = new Path(metaFolder, "_logs"); 378 } else { 379 LOG.info("DistCp job log path: " + logPath); 380 } 381 CopyOutputFormat.setOutputPath(job, logPath); 382 } 383 384 /** 385 * Create input listing by invoking an appropriate copy listing 386 * implementation. Also add delegation tokens for each path 387 * to job's credential store 388 * 389 * @param job - Handle to job 390 * @return Returns the path where the copy listing is created 391 * @throws IOException - If any 392 */ 393 protected Path createInputFileListing(Job job) throws IOException { 394 Path fileListingPath = getFileListingPath(); 395 CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(), 396 job.getCredentials(), inputOptions); 397 copyListing.buildListing(fileListingPath, inputOptions); 398 return fileListingPath; 399 } 400 401 /** 402 * Create input listing based on snapshot diff report. 403 * @param job - Handle to job 404 * @param distCpSync the class wraps the snapshot diff report 405 * @return Returns the path where the copy listing is created 406 * @throws IOException - If any 407 */ 408 private Path createInputFileListingWithDiff(Job job, DistCpSync distCpSync) 409 throws IOException { 410 Path fileListingPath = getFileListingPath(); 411 CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(), 412 job.getCredentials(), distCpSync); 413 copyListing.buildListing(fileListingPath, inputOptions); 414 return fileListingPath; 415 } 416 417 /** 418 * Get default name of the copy listing file. Use the meta folder 419 * to create the copy listing file 420 * 421 * @return - Path where the copy listing file has to be saved 422 * @throws IOException - Exception if any 423 */ 424 protected Path getFileListingPath() throws IOException { 425 String fileListPathStr = metaFolder + "/fileList.seq"; 426 Path path = new Path(fileListPathStr); 427 return new Path(path.toUri().normalize().toString()); 428 } 429 430 /** 431 * Create a default working folder for the job, under the 432 * job staging directory 433 * 434 * @return Returns the working folder information 435 * @throws Exception - Exception if any 436 */ 437 private Path createMetaFolderPath() throws Exception { 438 Configuration configuration = getConf(); 439 Path stagingDir = JobSubmissionFiles.getStagingDir( 440 new Cluster(configuration), configuration); 441 Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt())); 442 if (LOG.isDebugEnabled()) 443 LOG.debug("Meta folder location: " + metaFolderPath); 444 configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString()); 445 return metaFolderPath; 446 } 447 448 /** 449 * Main function of the DistCp program. Parses the input arguments (via OptionsParser), 450 * and invokes the DistCp::run() method, via the ToolRunner. 451 * @param argv Command-line arguments sent to DistCp. 452 */ 453 public static void main(String argv[]) { 454 int exitCode; 455 try { 456 DistCp distCp = new DistCp(); 457 Cleanup CLEANUP = new Cleanup(distCp); 458 459 ShutdownHookManager.get().addShutdownHook(CLEANUP, 460 SHUTDOWN_HOOK_PRIORITY); 461 exitCode = ToolRunner.run(getDefaultConf(), distCp, argv); 462 } 463 catch (Exception e) { 464 LOG.error("Couldn't complete DistCp operation: ", e); 465 exitCode = DistCpConstants.UNKNOWN_ERROR; 466 } 467 System.exit(exitCode); 468 } 469 470 /** 471 * Loads properties from distcp-default.xml into configuration 472 * object 473 * @return Configuration which includes properties from distcp-default.xml 474 */ 475 private static Configuration getDefaultConf() { 476 Configuration config = new Configuration(); 477 config.addResource(DISTCP_DEFAULT_XML); 478 return config; 479 } 480 481 private synchronized void cleanup() { 482 try { 483 if (metaFolder == null) return; 484 485 jobFS.delete(metaFolder, true); 486 metaFolder = null; 487 } catch (IOException e) { 488 LOG.error("Unable to cleanup meta folder: " + metaFolder, e); 489 } 490 } 491 492 private boolean isSubmitted() { 493 return submitted; 494 } 495 496 private static class Cleanup implements Runnable { 497 private final DistCp distCp; 498 499 Cleanup(DistCp distCp) { 500 this.distCp = distCp; 501 } 502 503 @Override 504 public void run() { 505 if (distCp.isSubmitted()) return; 506 507 distCp.cleanup(); 508 } 509 } 510}