001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.tools;
020
021import java.io.IOException;
022import java.util.Random;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.conf.Configured;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.FileUtil;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.io.Text;
034import org.apache.hadoop.mapreduce.Cluster;
035import org.apache.hadoop.mapreduce.Job;
036import org.apache.hadoop.mapreduce.JobContext;
037import org.apache.hadoop.mapreduce.JobSubmissionFiles;
038import org.apache.hadoop.tools.CopyListing.*;
039import org.apache.hadoop.tools.mapred.CopyMapper;
040import org.apache.hadoop.tools.mapred.CopyOutputFormat;
041import org.apache.hadoop.tools.util.DistCpUtils;
042import org.apache.hadoop.util.ShutdownHookManager;
043import org.apache.hadoop.util.Tool;
044import org.apache.hadoop.util.ToolRunner;
045
046import com.google.common.annotations.VisibleForTesting;
047
048/**
049 * DistCp is the main driver-class for DistCpV2.
050 * For command-line use, DistCp::main() orchestrates the parsing of command-line
051 * parameters and the launch of the DistCp job.
052 * For programmatic use, a DistCp object can be constructed by specifying
053 * options (in a DistCpOptions object), and DistCp::execute() may be used to
054 * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
055 * behaviour.
056 */
057@InterfaceAudience.Public
058@InterfaceStability.Evolving
059public class DistCp extends Configured implements Tool {
060
061  /**
062   * Priority of the shutdown hook.
063   */
064  static final int SHUTDOWN_HOOK_PRIORITY = 30;
065
066  static final Log LOG = LogFactory.getLog(DistCp.class);
067
068  private DistCpOptions inputOptions;
069  private Path metaFolder;
070
071  private static final String PREFIX = "_distcp";
072  private static final String WIP_PREFIX = "._WIP_";
073  private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
074  static final Random rand = new Random();
075
076  private boolean submitted;
077  private FileSystem jobFS;
078
079  /**
080   * Public Constructor. Creates DistCp object with specified input-parameters.
081   * (E.g. source-paths, target-location, etc.)
082   * @param inputOptions Options (indicating source-paths, target-location.)
083   * @param configuration The Hadoop configuration against which the Copy-mapper must run.
084   * @throws Exception
085   */
086  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
087    Configuration config = new Configuration(configuration);
088    config.addResource(DISTCP_DEFAULT_XML);
089    setConf(config);
090    this.inputOptions = inputOptions;
091    this.metaFolder   = createMetaFolderPath();
092  }
093
094  /**
095   * To be used with the ToolRunner. Not for public consumption.
096   */
097  @VisibleForTesting
098  DistCp() {}
099
100  /**
101   * Implementation of Tool::run(). Orchestrates the copy of source file(s)
102   * to target location, by:
103   *  1. Creating a list of files to be copied to target.
104   *  2. Launching a Map-only job to copy the files. (Delegates to execute().)
105   * @param argv List of arguments passed to DistCp, from the ToolRunner.
106   * @return On success, it returns 0. Else, -1.
107   */
108  @Override
109  public int run(String[] argv) {
110    if (argv.length < 1) {
111      OptionsParser.usage();
112      return DistCpConstants.INVALID_ARGUMENT;
113    }
114    
115    try {
116      inputOptions = (OptionsParser.parse(argv));
117      setTargetPathExists();
118      LOG.info("Input Options: " + inputOptions);
119    } catch (Throwable e) {
120      LOG.error("Invalid arguments: ", e);
121      System.err.println("Invalid arguments: " + e.getMessage());
122      OptionsParser.usage();      
123      return DistCpConstants.INVALID_ARGUMENT;
124    }
125    
126    try {
127      execute();
128    } catch (InvalidInputException e) {
129      LOG.error("Invalid input: ", e);
130      return DistCpConstants.INVALID_ARGUMENT;
131    } catch (DuplicateFileException e) {
132      LOG.error("Duplicate files in input path: ", e);
133      return DistCpConstants.DUPLICATE_INPUT;
134    } catch (AclsNotSupportedException e) {
135      LOG.error("ACLs not supported on at least one file system: ", e);
136      return DistCpConstants.ACLS_NOT_SUPPORTED;
137    } catch (XAttrsNotSupportedException e) {
138      LOG.error("XAttrs not supported on at least one file system: ", e);
139      return DistCpConstants.XATTRS_NOT_SUPPORTED;
140    } catch (Exception e) {
141      LOG.error("Exception encountered ", e);
142      return DistCpConstants.UNKNOWN_ERROR;
143    }
144    return DistCpConstants.SUCCESS;
145  }
146
147  /**
148   * Implements the core-execution. Creates the file-list for copy,
149   * and launches the Hadoop-job, to do the copy.
150   * @return Job handle
151   * @throws Exception
152   */
153  public Job execute() throws Exception {
154    Job job = createAndSubmitJob();
155
156    if (inputOptions.shouldBlock()) {
157      waitForJobCompletion(job);
158    }
159    return job;
160  }
161
162  /**
163   * Create and submit the mapreduce job.
164   * @return The mapreduce job object that has been submitted
165   */
166  public Job createAndSubmitJob() throws Exception {
167    assert inputOptions != null;
168    assert getConf() != null;
169    Job job = null;
170    try {
171      synchronized(this) {
172        //Don't cleanup while we are setting up.
173        metaFolder = createMetaFolderPath();
174        jobFS = metaFolder.getFileSystem(getConf());
175        job = createJob();
176      }
177      if (inputOptions.shouldUseDiff()) {
178        DistCpSync distCpSync = new DistCpSync(inputOptions, getConf());
179        if (distCpSync.sync()) {
180          createInputFileListingWithDiff(job, distCpSync);
181        } else {
182          throw new Exception("DistCp sync failed, input options: "
183              + inputOptions);
184        }
185      }
186
187      // Fallback to default DistCp if without "diff" option or sync failed.
188      if (!inputOptions.shouldUseDiff()) {
189        createInputFileListing(job);
190      }
191
192      job.submit();
193      submitted = true;
194    } finally {
195      if (!submitted) {
196        cleanup();
197      }
198    }
199
200    String jobID = job.getJobID().toString();
201    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
202    LOG.info("DistCp job-id: " + jobID);
203
204    return job;
205  }
206
207  /**
208   * Wait for the given job to complete.
209   * @param job the given mapreduce job that has already been submitted
210   */
211  public void waitForJobCompletion(Job job) throws Exception {
212    assert job != null;
213    if (!job.waitForCompletion(true)) {
214      throw new IOException("DistCp failure: Job " + job.getJobID()
215          + " has failed: " + job.getStatus().getFailureInfo());
216    }
217  }
218
219  /**
220   * Set targetPathExists in both inputOptions and job config,
221   * for the benefit of CopyCommitter
222   */
223  private void setTargetPathExists() throws IOException {
224    Path target = inputOptions.getTargetPath();
225    FileSystem targetFS = target.getFileSystem(getConf());
226    boolean targetExists = targetFS.exists(target);
227    inputOptions.setTargetPathExists(targetExists);
228    getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 
229        targetExists);
230  }
231  /**
232   * Create Job object for submitting it, with all the configuration
233   *
234   * @return Reference to job object.
235   * @throws IOException - Exception if any
236   */
237  private Job createJob() throws IOException {
238    String jobName = "distcp";
239    String userChosenName = getConf().get(JobContext.JOB_NAME);
240    if (userChosenName != null)
241      jobName += ": " + userChosenName;
242    Job job = Job.getInstance(getConf());
243    job.setJobName(jobName);
244    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
245    job.setJarByClass(CopyMapper.class);
246    configureOutputFormat(job);
247
248    job.setMapperClass(CopyMapper.class);
249    job.setNumReduceTasks(0);
250    job.setMapOutputKeyClass(Text.class);
251    job.setMapOutputValueClass(Text.class);
252    job.setOutputFormatClass(CopyOutputFormat.class);
253    job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
254    job.getConfiguration().set(JobContext.NUM_MAPS,
255                  String.valueOf(inputOptions.getMaxMaps()));
256
257    inputOptions.appendToConf(job.getConfiguration());
258    return job;
259  }
260
261  /**
262   * Setup output format appropriately
263   *
264   * @param job - Job handle
265   * @throws IOException - Exception if any
266   */
267  private void configureOutputFormat(Job job) throws IOException {
268    final Configuration configuration = job.getConfiguration();
269    Path targetPath = inputOptions.getTargetPath();
270    FileSystem targetFS = targetPath.getFileSystem(configuration);
271    targetPath = targetPath.makeQualified(targetFS.getUri(),
272                                          targetFS.getWorkingDirectory());
273    if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
274      DistCpUtils.checkFileSystemAclSupport(targetFS);
275    }
276    if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
277      DistCpUtils.checkFileSystemXAttrSupport(targetFS);
278    }
279    if (inputOptions.shouldAtomicCommit()) {
280      Path workDir = inputOptions.getAtomicWorkPath();
281      if (workDir == null) {
282        workDir = targetPath.getParent();
283      }
284      workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
285                                + rand.nextInt());
286      FileSystem workFS = workDir.getFileSystem(configuration);
287      if (!FileUtil.compareFs(targetFS, workFS)) {
288        throw new IllegalArgumentException("Work path " + workDir +
289            " and target path " + targetPath + " are in different file system");
290      }
291      CopyOutputFormat.setWorkingDirectory(job, workDir);
292    } else {
293      CopyOutputFormat.setWorkingDirectory(job, targetPath);
294    }
295    CopyOutputFormat.setCommitDirectory(job, targetPath);
296
297    Path logPath = inputOptions.getLogPath();
298    if (logPath == null) {
299      logPath = new Path(metaFolder, "_logs");
300    } else {
301      LOG.info("DistCp job log path: " + logPath);
302    }
303    CopyOutputFormat.setOutputPath(job, logPath);
304  }
305
306  /**
307   * Create input listing by invoking an appropriate copy listing
308   * implementation. Also add delegation tokens for each path
309   * to job's credential store
310   *
311   * @param job - Handle to job
312   * @return Returns the path where the copy listing is created
313   * @throws IOException - If any
314   */
315  protected Path createInputFileListing(Job job) throws IOException {
316    Path fileListingPath = getFileListingPath();
317    CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
318        job.getCredentials(), inputOptions);
319    copyListing.buildListing(fileListingPath, inputOptions);
320    return fileListingPath;
321  }
322
323  /**
324   * Create input listing based on snapshot diff report.
325   * @param job - Handle to job
326   * @param distCpSync the class wraps the snapshot diff report
327   * @return Returns the path where the copy listing is created
328   * @throws IOException - If any
329   */
330  private Path createInputFileListingWithDiff(Job job, DistCpSync distCpSync)
331      throws IOException {
332    Path fileListingPath = getFileListingPath();
333    CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(),
334        job.getCredentials(), distCpSync);
335    copyListing.buildListing(fileListingPath, inputOptions);
336    return fileListingPath;
337  }
338
339  /**
340   * Get default name of the copy listing file. Use the meta folder
341   * to create the copy listing file
342   *
343   * @return - Path where the copy listing file has to be saved
344   * @throws IOException - Exception if any
345   */
346  protected Path getFileListingPath() throws IOException {
347    String fileListPathStr = metaFolder + "/fileList.seq";
348    Path path = new Path(fileListPathStr);
349    return new Path(path.toUri().normalize().toString());
350  }
351
352  /**
353   * Create a default working folder for the job, under the
354   * job staging directory
355   *
356   * @return Returns the working folder information
357   * @throws Exception - Exception if any
358   */
359  private Path createMetaFolderPath() throws Exception {
360    Configuration configuration = getConf();
361    Path stagingDir = JobSubmissionFiles.getStagingDir(
362            new Cluster(configuration), configuration);
363    Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
364    if (LOG.isDebugEnabled())
365      LOG.debug("Meta folder location: " + metaFolderPath);
366    configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
367    return metaFolderPath;
368  }
369
370  /**
371   * Main function of the DistCp program. Parses the input arguments (via OptionsParser),
372   * and invokes the DistCp::run() method, via the ToolRunner.
373   * @param argv Command-line arguments sent to DistCp.
374   */
375  public static void main(String argv[]) {
376    int exitCode;
377    try {
378      DistCp distCp = new DistCp();
379      Cleanup CLEANUP = new Cleanup(distCp);
380
381      ShutdownHookManager.get().addShutdownHook(CLEANUP,
382        SHUTDOWN_HOOK_PRIORITY);
383      exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
384    }
385    catch (Exception e) {
386      LOG.error("Couldn't complete DistCp operation: ", e);
387      exitCode = DistCpConstants.UNKNOWN_ERROR;
388    }
389    System.exit(exitCode);
390  }
391
392  /**
393   * Loads properties from distcp-default.xml into configuration
394   * object
395   * @return Configuration which includes properties from distcp-default.xml
396   */
397  private static Configuration getDefaultConf() {
398    Configuration config = new Configuration();
399    config.addResource(DISTCP_DEFAULT_XML);
400    return config;
401  }
402
403  private synchronized void cleanup() {
404    try {
405      if (metaFolder == null) return;
406
407      jobFS.delete(metaFolder, true);
408      metaFolder = null;
409    } catch (IOException e) {
410      LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
411    }
412  }
413
414  private boolean isSubmitted() {
415    return submitted;
416  }
417
418  private static class Cleanup implements Runnable {
419    private final DistCp distCp;
420
421    Cleanup(DistCp distCp) {
422      this.distCp = distCp;
423    }
424
425    @Override
426    public void run() {
427      if (distCp.isSubmitted()) return;
428
429      distCp.cleanup();
430    }
431  }
432}