001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024 025import java.util.concurrent.TimeUnit; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.LocatedFileStatus; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.PathFilter; 036import org.apache.hadoop.fs.BlockLocation; 037import org.apache.hadoop.fs.RemoteIterator; 038import org.apache.hadoop.mapred.LocatedFileStatusFetcher; 039import org.apache.hadoop.mapreduce.InputFormat; 040import org.apache.hadoop.mapreduce.InputSplit; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.mapreduce.JobContext; 043import org.apache.hadoop.mapreduce.Mapper; 044import org.apache.hadoop.mapreduce.security.TokenCache; 045import org.apache.hadoop.util.ReflectionUtils; 046import org.apache.hadoop.util.StopWatch; 047import org.apache.hadoop.util.StringUtils; 048 049import com.google.common.collect.Lists; 050 051/** 052 * A base class for file-based {@link InputFormat}s. 053 * 054 * <p><code>FileInputFormat</code> is the base class for all file-based 055 * <code>InputFormat</code>s. This provides a generic implementation of 056 * {@link #getSplits(JobContext)}. 057 * 058 * Implementations of <code>FileInputFormat</code> can also override the 059 * {@link #isSplitable(JobContext, Path)} method to prevent input files 060 * from being split-up in certain situations. Implementations that may 061 * deal with non-splittable files <i>must</i> override this method, since 062 * the default implementation assumes splitting is always possible. 063 */ 064@InterfaceAudience.Public 065@InterfaceStability.Stable 066public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { 067 public static final String INPUT_DIR = 068 "mapreduce.input.fileinputformat.inputdir"; 069 public static final String SPLIT_MAXSIZE = 070 "mapreduce.input.fileinputformat.split.maxsize"; 071 public static final String SPLIT_MINSIZE = 072 "mapreduce.input.fileinputformat.split.minsize"; 073 public static final String PATHFILTER_CLASS = 074 "mapreduce.input.pathFilter.class"; 075 public static final String NUM_INPUT_FILES = 076 "mapreduce.input.fileinputformat.numinputfiles"; 077 public static final String INPUT_DIR_RECURSIVE = 078 "mapreduce.input.fileinputformat.input.dir.recursive"; 079 public static final String LIST_STATUS_NUM_THREADS = 080 "mapreduce.input.fileinputformat.list-status.num-threads"; 081 public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; 082 083 private static final Log LOG = LogFactory.getLog(FileInputFormat.class); 084 085 private static final double SPLIT_SLOP = 1.1; // 10% slop 086 087 @Deprecated 088 public static enum Counter { 089 BYTES_READ 090 } 091 092 private static final PathFilter hiddenFileFilter = new PathFilter(){ 093 public boolean accept(Path p){ 094 String name = p.getName(); 095 return !name.startsWith("_") && !name.startsWith("."); 096 } 097 }; 098 099 /** 100 * Proxy PathFilter that accepts a path only if all filters given in the 101 * constructor do. Used by the listPaths() to apply the built-in 102 * hiddenFileFilter together with a user provided one (if any). 103 */ 104 private static class MultiPathFilter implements PathFilter { 105 private List<PathFilter> filters; 106 107 public MultiPathFilter(List<PathFilter> filters) { 108 this.filters = filters; 109 } 110 111 public boolean accept(Path path) { 112 for (PathFilter filter : filters) { 113 if (!filter.accept(path)) { 114 return false; 115 } 116 } 117 return true; 118 } 119 } 120 121 /** 122 * @param job 123 * the job to modify 124 * @param inputDirRecursive 125 */ 126 public static void setInputDirRecursive(Job job, 127 boolean inputDirRecursive) { 128 job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, 129 inputDirRecursive); 130 } 131 132 /** 133 * @param job 134 * the job to look at. 135 * @return should the files to be read recursively? 136 */ 137 public static boolean getInputDirRecursive(JobContext job) { 138 return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, 139 false); 140 } 141 142 /** 143 * Get the lower bound on split size imposed by the format. 144 * @return the number of bytes of the minimal split for this format 145 */ 146 protected long getFormatMinSplitSize() { 147 return 1; 148 } 149 150 /** 151 * Is the given filename splittable? Usually, true, but if the file is 152 * stream compressed, it will not be. 153 * 154 * The default implementation in <code>FileInputFormat</code> always returns 155 * true. Implementations that may deal with non-splittable files <i>must</i> 156 * override this method. 157 * 158 * <code>FileInputFormat</code> implementations can override this and return 159 * <code>false</code> to ensure that individual input files are never split-up 160 * so that {@link Mapper}s process entire files. 161 * 162 * @param context the job context 163 * @param filename the file name to check 164 * @return is this file splitable? 165 */ 166 protected boolean isSplitable(JobContext context, Path filename) { 167 return true; 168 } 169 170 /** 171 * Set a PathFilter to be applied to the input paths for the map-reduce job. 172 * @param job the job to modify 173 * @param filter the PathFilter class use for filtering the input paths. 174 */ 175 public static void setInputPathFilter(Job job, 176 Class<? extends PathFilter> filter) { 177 job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 178 PathFilter.class); 179 } 180 181 /** 182 * Set the minimum input split size 183 * @param job the job to modify 184 * @param size the minimum size 185 */ 186 public static void setMinInputSplitSize(Job job, 187 long size) { 188 job.getConfiguration().setLong(SPLIT_MINSIZE, size); 189 } 190 191 /** 192 * Get the minimum split size 193 * @param job the job 194 * @return the minimum number of bytes that can be in a split 195 */ 196 public static long getMinSplitSize(JobContext job) { 197 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 198 } 199 200 /** 201 * Set the maximum split size 202 * @param job the job to modify 203 * @param size the maximum split size 204 */ 205 public static void setMaxInputSplitSize(Job job, 206 long size) { 207 job.getConfiguration().setLong(SPLIT_MAXSIZE, size); 208 } 209 210 /** 211 * Get the maximum split size. 212 * @param context the job to look at. 213 * @return the maximum number of bytes a split can include 214 */ 215 public static long getMaxSplitSize(JobContext context) { 216 return context.getConfiguration().getLong(SPLIT_MAXSIZE, 217 Long.MAX_VALUE); 218 } 219 220 /** 221 * Get a PathFilter instance of the filter set for the input paths. 222 * 223 * @return the PathFilter instance set for the job, NULL if none has been set. 224 */ 225 public static PathFilter getInputPathFilter(JobContext context) { 226 Configuration conf = context.getConfiguration(); 227 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, 228 PathFilter.class); 229 return (filterClass != null) ? 230 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; 231 } 232 233 /** List input directories. 234 * Subclasses may override to, e.g., select only files matching a regular 235 * expression. 236 * 237 * @param job the job to list input paths for 238 * @return array of FileStatus objects 239 * @throws IOException if zero items. 240 */ 241 protected List<FileStatus> listStatus(JobContext job 242 ) throws IOException { 243 Path[] dirs = getInputPaths(job); 244 if (dirs.length == 0) { 245 throw new IOException("No input paths specified in job"); 246 } 247 248 // get tokens for all the required FileSystems.. 249 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 250 job.getConfiguration()); 251 252 // Whether we need to recursive look into the directory structure 253 boolean recursive = getInputDirRecursive(job); 254 255 // creates a MultiPathFilter with the hiddenFileFilter and the 256 // user provided one (if any). 257 List<PathFilter> filters = new ArrayList<PathFilter>(); 258 filters.add(hiddenFileFilter); 259 PathFilter jobFilter = getInputPathFilter(job); 260 if (jobFilter != null) { 261 filters.add(jobFilter); 262 } 263 PathFilter inputFilter = new MultiPathFilter(filters); 264 265 List<FileStatus> result = null; 266 267 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, 268 DEFAULT_LIST_STATUS_NUM_THREADS); 269 StopWatch sw = new StopWatch().start(); 270 if (numThreads == 1) { 271 result = singleThreadedListStatus(job, dirs, inputFilter, recursive); 272 } else { 273 Iterable<FileStatus> locatedFiles = null; 274 try { 275 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 276 job.getConfiguration(), dirs, recursive, inputFilter, true); 277 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 278 } catch (InterruptedException e) { 279 throw new IOException("Interrupted while getting file statuses"); 280 } 281 result = Lists.newArrayList(locatedFiles); 282 } 283 284 sw.stop(); 285 if (LOG.isDebugEnabled()) { 286 LOG.debug("Time taken to get FileStatuses: " 287 + sw.now(TimeUnit.MILLISECONDS)); 288 } 289 LOG.info("Total input files to process : " + result.size()); 290 return result; 291 } 292 293 private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, 294 PathFilter inputFilter, boolean recursive) throws IOException { 295 List<FileStatus> result = new ArrayList<FileStatus>(); 296 List<IOException> errors = new ArrayList<IOException>(); 297 for (int i=0; i < dirs.length; ++i) { 298 Path p = dirs[i]; 299 FileSystem fs = p.getFileSystem(job.getConfiguration()); 300 FileStatus[] matches = fs.globStatus(p, inputFilter); 301 if (matches == null) { 302 errors.add(new IOException("Input path does not exist: " + p)); 303 } else if (matches.length == 0) { 304 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 305 } else { 306 for (FileStatus globStat: matches) { 307 if (globStat.isDirectory()) { 308 RemoteIterator<LocatedFileStatus> iter = 309 fs.listLocatedStatus(globStat.getPath()); 310 while (iter.hasNext()) { 311 LocatedFileStatus stat = iter.next(); 312 if (inputFilter.accept(stat.getPath())) { 313 if (recursive && stat.isDirectory()) { 314 addInputPathRecursively(result, fs, stat.getPath(), 315 inputFilter); 316 } else { 317 result.add(stat); 318 } 319 } 320 } 321 } else { 322 result.add(globStat); 323 } 324 } 325 } 326 } 327 328 if (!errors.isEmpty()) { 329 throw new InvalidInputException(errors); 330 } 331 return result; 332 } 333 334 /** 335 * Add files in the input path recursively into the results. 336 * @param result 337 * The List to store all files. 338 * @param fs 339 * The FileSystem. 340 * @param path 341 * The input path. 342 * @param inputFilter 343 * The input filter that can be used to filter files/dirs. 344 * @throws IOException 345 */ 346 protected void addInputPathRecursively(List<FileStatus> result, 347 FileSystem fs, Path path, PathFilter inputFilter) 348 throws IOException { 349 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 350 while (iter.hasNext()) { 351 LocatedFileStatus stat = iter.next(); 352 if (inputFilter.accept(stat.getPath())) { 353 if (stat.isDirectory()) { 354 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 355 } else { 356 result.add(stat); 357 } 358 } 359 } 360 } 361 362 363 /** 364 * A factory that makes the split for this class. It can be overridden 365 * by sub-classes to make sub-types 366 */ 367 protected FileSplit makeSplit(Path file, long start, long length, 368 String[] hosts) { 369 return new FileSplit(file, start, length, hosts); 370 } 371 372 /** 373 * A factory that makes the split for this class. It can be overridden 374 * by sub-classes to make sub-types 375 */ 376 protected FileSplit makeSplit(Path file, long start, long length, 377 String[] hosts, String[] inMemoryHosts) { 378 return new FileSplit(file, start, length, hosts, inMemoryHosts); 379 } 380 381 /** 382 * Generate the list of files and make them into FileSplits. 383 * @param job the job context 384 * @throws IOException 385 */ 386 public List<InputSplit> getSplits(JobContext job) throws IOException { 387 StopWatch sw = new StopWatch().start(); 388 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 389 long maxSize = getMaxSplitSize(job); 390 391 // generate splits 392 List<InputSplit> splits = new ArrayList<InputSplit>(); 393 List<FileStatus> files = listStatus(job); 394 for (FileStatus file: files) { 395 Path path = file.getPath(); 396 long length = file.getLen(); 397 if (length != 0) { 398 BlockLocation[] blkLocations; 399 if (file instanceof LocatedFileStatus) { 400 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 401 } else { 402 FileSystem fs = path.getFileSystem(job.getConfiguration()); 403 blkLocations = fs.getFileBlockLocations(file, 0, length); 404 } 405 if (isSplitable(job, path)) { 406 long blockSize = file.getBlockSize(); 407 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 408 409 long bytesRemaining = length; 410 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 411 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 412 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 413 blkLocations[blkIndex].getHosts(), 414 blkLocations[blkIndex].getCachedHosts())); 415 bytesRemaining -= splitSize; 416 } 417 418 if (bytesRemaining != 0) { 419 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 420 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 421 blkLocations[blkIndex].getHosts(), 422 blkLocations[blkIndex].getCachedHosts())); 423 } 424 } else { // not splitable 425 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), 426 blkLocations[0].getCachedHosts())); 427 } 428 } else { 429 //Create empty hosts array for zero length files 430 splits.add(makeSplit(path, 0, length, new String[0])); 431 } 432 } 433 // Save the number of input files for metrics/loadgen 434 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 435 sw.stop(); 436 if (LOG.isDebugEnabled()) { 437 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 438 + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 439 } 440 return splits; 441 } 442 443 protected long computeSplitSize(long blockSize, long minSize, 444 long maxSize) { 445 return Math.max(minSize, Math.min(maxSize, blockSize)); 446 } 447 448 protected int getBlockIndex(BlockLocation[] blkLocations, 449 long offset) { 450 for (int i = 0 ; i < blkLocations.length; i++) { 451 // is the offset inside this block? 452 if ((blkLocations[i].getOffset() <= offset) && 453 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 454 return i; 455 } 456 } 457 BlockLocation last = blkLocations[blkLocations.length -1]; 458 long fileLength = last.getOffset() + last.getLength() -1; 459 throw new IllegalArgumentException("Offset " + offset + 460 " is outside of file (0.." + 461 fileLength + ")"); 462 } 463 464 /** 465 * Sets the given comma separated paths as the list of inputs 466 * for the map-reduce job. 467 * 468 * @param job the job 469 * @param commaSeparatedPaths Comma separated paths to be set as 470 * the list of inputs for the map-reduce job. 471 */ 472 public static void setInputPaths(Job job, 473 String commaSeparatedPaths 474 ) throws IOException { 475 setInputPaths(job, StringUtils.stringToPath( 476 getPathStrings(commaSeparatedPaths))); 477 } 478 479 /** 480 * Add the given comma separated paths to the list of inputs for 481 * the map-reduce job. 482 * 483 * @param job The job to modify 484 * @param commaSeparatedPaths Comma separated paths to be added to 485 * the list of inputs for the map-reduce job. 486 */ 487 public static void addInputPaths(Job job, 488 String commaSeparatedPaths 489 ) throws IOException { 490 for (String str : getPathStrings(commaSeparatedPaths)) { 491 addInputPath(job, new Path(str)); 492 } 493 } 494 495 /** 496 * Set the array of {@link Path}s as the list of inputs 497 * for the map-reduce job. 498 * 499 * @param job The job to modify 500 * @param inputPaths the {@link Path}s of the input directories/files 501 * for the map-reduce job. 502 */ 503 public static void setInputPaths(Job job, 504 Path... inputPaths) throws IOException { 505 Configuration conf = job.getConfiguration(); 506 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]); 507 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 508 for(int i = 1; i < inputPaths.length;i++) { 509 str.append(StringUtils.COMMA_STR); 510 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]); 511 str.append(StringUtils.escapeString(path.toString())); 512 } 513 conf.set(INPUT_DIR, str.toString()); 514 } 515 516 /** 517 * Add a {@link Path} to the list of inputs for the map-reduce job. 518 * 519 * @param job The {@link Job} to modify 520 * @param path {@link Path} to be added to the list of inputs for 521 * the map-reduce job. 522 */ 523 public static void addInputPath(Job job, 524 Path path) throws IOException { 525 Configuration conf = job.getConfiguration(); 526 path = path.getFileSystem(conf).makeQualified(path); 527 String dirStr = StringUtils.escapeString(path.toString()); 528 String dirs = conf.get(INPUT_DIR); 529 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); 530 } 531 532 // This method escapes commas in the glob pattern of the given paths. 533 private static String[] getPathStrings(String commaSeparatedPaths) { 534 int length = commaSeparatedPaths.length(); 535 int curlyOpen = 0; 536 int pathStart = 0; 537 boolean globPattern = false; 538 List<String> pathStrings = new ArrayList<String>(); 539 540 for (int i=0; i<length; i++) { 541 char ch = commaSeparatedPaths.charAt(i); 542 switch(ch) { 543 case '{' : { 544 curlyOpen++; 545 if (!globPattern) { 546 globPattern = true; 547 } 548 break; 549 } 550 case '}' : { 551 curlyOpen--; 552 if (curlyOpen == 0 && globPattern) { 553 globPattern = false; 554 } 555 break; 556 } 557 case ',' : { 558 if (!globPattern) { 559 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 560 pathStart = i + 1 ; 561 } 562 break; 563 } 564 default: 565 continue; // nothing special to do for this character 566 } 567 } 568 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 569 570 return pathStrings.toArray(new String[0]); 571 } 572 573 /** 574 * Get the list of input {@link Path}s for the map-reduce job. 575 * 576 * @param context The job 577 * @return the list of input {@link Path}s for the map-reduce job. 578 */ 579 public static Path[] getInputPaths(JobContext context) { 580 String dirs = context.getConfiguration().get(INPUT_DIR, ""); 581 String [] list = StringUtils.split(dirs); 582 Path[] result = new Path[list.length]; 583 for (int i = 0; i < list.length; i++) { 584 result[i] = new Path(StringUtils.unEscapeString(list[i])); 585 } 586 return result; 587 } 588 589}