001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import java.util.concurrent.TimeUnit;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.LocatedFileStatus;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PathFilter;
036import org.apache.hadoop.fs.BlockLocation;
037import org.apache.hadoop.fs.RemoteIterator;
038import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
039import org.apache.hadoop.mapreduce.InputFormat;
040import org.apache.hadoop.mapreduce.InputSplit;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.mapreduce.JobContext;
043import org.apache.hadoop.mapreduce.Mapper;
044import org.apache.hadoop.mapreduce.security.TokenCache;
045import org.apache.hadoop.util.ReflectionUtils;
046import org.apache.hadoop.util.StopWatch;
047import org.apache.hadoop.util.StringUtils;
048
049import com.google.common.collect.Lists;
050
051/** 
052 * A base class for file-based {@link InputFormat}s.
053 *
054 * <p><code>FileInputFormat</code> is the base class for all file-based 
055 * <code>InputFormat</code>s. This provides a generic implementation of
056 * {@link #getSplits(JobContext)}.
057 *
058 * Implementations of <code>FileInputFormat</code> can also override the
059 * {@link #isSplitable(JobContext, Path)} method to prevent input files
060 * from being split-up in certain situations. Implementations that may
061 * deal with non-splittable files <i>must</i> override this method, since
062 * the default implementation assumes splitting is always possible.
063 */
064@InterfaceAudience.Public
065@InterfaceStability.Stable
066public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
067  public static final String INPUT_DIR = 
068    "mapreduce.input.fileinputformat.inputdir";
069  public static final String SPLIT_MAXSIZE = 
070    "mapreduce.input.fileinputformat.split.maxsize";
071  public static final String SPLIT_MINSIZE = 
072    "mapreduce.input.fileinputformat.split.minsize";
073  public static final String PATHFILTER_CLASS = 
074    "mapreduce.input.pathFilter.class";
075  public static final String NUM_INPUT_FILES =
076    "mapreduce.input.fileinputformat.numinputfiles";
077  public static final String INPUT_DIR_RECURSIVE =
078    "mapreduce.input.fileinputformat.input.dir.recursive";
079  public static final String LIST_STATUS_NUM_THREADS =
080      "mapreduce.input.fileinputformat.list-status.num-threads";
081  public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
082
083  private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
084
085  private static final double SPLIT_SLOP = 1.1;   // 10% slop
086  
087  @Deprecated
088  public static enum Counter { 
089    BYTES_READ
090  }
091
092  private static final PathFilter hiddenFileFilter = new PathFilter(){
093      public boolean accept(Path p){
094        String name = p.getName(); 
095        return !name.startsWith("_") && !name.startsWith("."); 
096      }
097    }; 
098
099  /**
100   * Proxy PathFilter that accepts a path only if all filters given in the
101   * constructor do. Used by the listPaths() to apply the built-in
102   * hiddenFileFilter together with a user provided one (if any).
103   */
104  private static class MultiPathFilter implements PathFilter {
105    private List<PathFilter> filters;
106
107    public MultiPathFilter(List<PathFilter> filters) {
108      this.filters = filters;
109    }
110
111    public boolean accept(Path path) {
112      for (PathFilter filter : filters) {
113        if (!filter.accept(path)) {
114          return false;
115        }
116      }
117      return true;
118    }
119  }
120  
121  /**
122   * @param job
123   *          the job to modify
124   * @param inputDirRecursive
125   */
126  public static void setInputDirRecursive(Job job,
127      boolean inputDirRecursive) {
128    job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
129        inputDirRecursive);
130  }
131 
132  /**
133   * @param job
134   *          the job to look at.
135   * @return should the files to be read recursively?
136   */
137  public static boolean getInputDirRecursive(JobContext job) {
138    return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
139        false);
140  }
141
142  /**
143   * Get the lower bound on split size imposed by the format.
144   * @return the number of bytes of the minimal split for this format
145   */
146  protected long getFormatMinSplitSize() {
147    return 1;
148  }
149
150  /**
151   * Is the given filename splittable? Usually, true, but if the file is
152   * stream compressed, it will not be.
153   *
154   * The default implementation in <code>FileInputFormat</code> always returns
155   * true. Implementations that may deal with non-splittable files <i>must</i>
156   * override this method.
157   *
158   * <code>FileInputFormat</code> implementations can override this and return
159   * <code>false</code> to ensure that individual input files are never split-up
160   * so that {@link Mapper}s process entire files.
161   * 
162   * @param context the job context
163   * @param filename the file name to check
164   * @return is this file splitable?
165   */
166  protected boolean isSplitable(JobContext context, Path filename) {
167    return true;
168  }
169
170  /**
171   * Set a PathFilter to be applied to the input paths for the map-reduce job.
172   * @param job the job to modify
173   * @param filter the PathFilter class use for filtering the input paths.
174   */
175  public static void setInputPathFilter(Job job,
176                                        Class<? extends PathFilter> filter) {
177    job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
178                                    PathFilter.class);
179  }
180
181  /**
182   * Set the minimum input split size
183   * @param job the job to modify
184   * @param size the minimum size
185   */
186  public static void setMinInputSplitSize(Job job,
187                                          long size) {
188    job.getConfiguration().setLong(SPLIT_MINSIZE, size);
189  }
190
191  /**
192   * Get the minimum split size
193   * @param job the job
194   * @return the minimum number of bytes that can be in a split
195   */
196  public static long getMinSplitSize(JobContext job) {
197    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
198  }
199
200  /**
201   * Set the maximum split size
202   * @param job the job to modify
203   * @param size the maximum split size
204   */
205  public static void setMaxInputSplitSize(Job job,
206                                          long size) {
207    job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
208  }
209
210  /**
211   * Get the maximum split size.
212   * @param context the job to look at.
213   * @return the maximum number of bytes a split can include
214   */
215  public static long getMaxSplitSize(JobContext context) {
216    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
217                                              Long.MAX_VALUE);
218  }
219
220  /**
221   * Get a PathFilter instance of the filter set for the input paths.
222   *
223   * @return the PathFilter instance set for the job, NULL if none has been set.
224   */
225  public static PathFilter getInputPathFilter(JobContext context) {
226    Configuration conf = context.getConfiguration();
227    Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
228        PathFilter.class);
229    return (filterClass != null) ?
230        (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
231  }
232
233  /** List input directories.
234   * Subclasses may override to, e.g., select only files matching a regular
235   * expression. 
236   * 
237   * @param job the job to list input paths for
238   * @return array of FileStatus objects
239   * @throws IOException if zero items.
240   */
241  protected List<FileStatus> listStatus(JobContext job
242                                        ) throws IOException {
243    Path[] dirs = getInputPaths(job);
244    if (dirs.length == 0) {
245      throw new IOException("No input paths specified in job");
246    }
247    
248    // get tokens for all the required FileSystems..
249    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
250                                        job.getConfiguration());
251
252    // Whether we need to recursive look into the directory structure
253    boolean recursive = getInputDirRecursive(job);
254
255    // creates a MultiPathFilter with the hiddenFileFilter and the
256    // user provided one (if any).
257    List<PathFilter> filters = new ArrayList<PathFilter>();
258    filters.add(hiddenFileFilter);
259    PathFilter jobFilter = getInputPathFilter(job);
260    if (jobFilter != null) {
261      filters.add(jobFilter);
262    }
263    PathFilter inputFilter = new MultiPathFilter(filters);
264    
265    List<FileStatus> result = null;
266
267    int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
268        DEFAULT_LIST_STATUS_NUM_THREADS);
269    StopWatch sw = new StopWatch().start();
270    if (numThreads == 1) {
271      result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
272    } else {
273      Iterable<FileStatus> locatedFiles = null;
274      try {
275        LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
276            job.getConfiguration(), dirs, recursive, inputFilter, true);
277        locatedFiles = locatedFileStatusFetcher.getFileStatuses();
278      } catch (InterruptedException e) {
279        throw new IOException("Interrupted while getting file statuses");
280      }
281      result = Lists.newArrayList(locatedFiles);
282    }
283    
284    sw.stop();
285    if (LOG.isDebugEnabled()) {
286      LOG.debug("Time taken to get FileStatuses: "
287          + sw.now(TimeUnit.MILLISECONDS));
288    }
289    LOG.info("Total input files to process : " + result.size());
290    return result;
291  }
292
293  private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
294      PathFilter inputFilter, boolean recursive) throws IOException {
295    List<FileStatus> result = new ArrayList<FileStatus>();
296    List<IOException> errors = new ArrayList<IOException>();
297    for (int i=0; i < dirs.length; ++i) {
298      Path p = dirs[i];
299      FileSystem fs = p.getFileSystem(job.getConfiguration()); 
300      FileStatus[] matches = fs.globStatus(p, inputFilter);
301      if (matches == null) {
302        errors.add(new IOException("Input path does not exist: " + p));
303      } else if (matches.length == 0) {
304        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
305      } else {
306        for (FileStatus globStat: matches) {
307          if (globStat.isDirectory()) {
308            RemoteIterator<LocatedFileStatus> iter =
309                fs.listLocatedStatus(globStat.getPath());
310            while (iter.hasNext()) {
311              LocatedFileStatus stat = iter.next();
312              if (inputFilter.accept(stat.getPath())) {
313                if (recursive && stat.isDirectory()) {
314                  addInputPathRecursively(result, fs, stat.getPath(),
315                      inputFilter);
316                } else {
317                  result.add(stat);
318                }
319              }
320            }
321          } else {
322            result.add(globStat);
323          }
324        }
325      }
326    }
327
328    if (!errors.isEmpty()) {
329      throw new InvalidInputException(errors);
330    }
331    return result;
332  }
333  
334  /**
335   * Add files in the input path recursively into the results.
336   * @param result
337   *          The List to store all files.
338   * @param fs
339   *          The FileSystem.
340   * @param path
341   *          The input path.
342   * @param inputFilter
343   *          The input filter that can be used to filter files/dirs. 
344   * @throws IOException
345   */
346  protected void addInputPathRecursively(List<FileStatus> result,
347      FileSystem fs, Path path, PathFilter inputFilter) 
348      throws IOException {
349    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
350    while (iter.hasNext()) {
351      LocatedFileStatus stat = iter.next();
352      if (inputFilter.accept(stat.getPath())) {
353        if (stat.isDirectory()) {
354          addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
355        } else {
356          result.add(stat);
357        }
358      }
359    }
360  }
361  
362  
363  /**
364   * A factory that makes the split for this class. It can be overridden
365   * by sub-classes to make sub-types
366   */
367  protected FileSplit makeSplit(Path file, long start, long length, 
368                                String[] hosts) {
369    return new FileSplit(file, start, length, hosts);
370  }
371  
372  /**
373   * A factory that makes the split for this class. It can be overridden
374   * by sub-classes to make sub-types
375   */
376  protected FileSplit makeSplit(Path file, long start, long length, 
377                                String[] hosts, String[] inMemoryHosts) {
378    return new FileSplit(file, start, length, hosts, inMemoryHosts);
379  }
380
381  /** 
382   * Generate the list of files and make them into FileSplits.
383   * @param job the job context
384   * @throws IOException
385   */
386  public List<InputSplit> getSplits(JobContext job) throws IOException {
387    StopWatch sw = new StopWatch().start();
388    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
389    long maxSize = getMaxSplitSize(job);
390
391    // generate splits
392    List<InputSplit> splits = new ArrayList<InputSplit>();
393    List<FileStatus> files = listStatus(job);
394    for (FileStatus file: files) {
395      Path path = file.getPath();
396      long length = file.getLen();
397      if (length != 0) {
398        BlockLocation[] blkLocations;
399        if (file instanceof LocatedFileStatus) {
400          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
401        } else {
402          FileSystem fs = path.getFileSystem(job.getConfiguration());
403          blkLocations = fs.getFileBlockLocations(file, 0, length);
404        }
405        if (isSplitable(job, path)) {
406          long blockSize = file.getBlockSize();
407          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
408
409          long bytesRemaining = length;
410          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
411            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
412            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
413                        blkLocations[blkIndex].getHosts(),
414                        blkLocations[blkIndex].getCachedHosts()));
415            bytesRemaining -= splitSize;
416          }
417
418          if (bytesRemaining != 0) {
419            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
420            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
421                       blkLocations[blkIndex].getHosts(),
422                       blkLocations[blkIndex].getCachedHosts()));
423          }
424        } else { // not splitable
425          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
426                      blkLocations[0].getCachedHosts()));
427        }
428      } else { 
429        //Create empty hosts array for zero length files
430        splits.add(makeSplit(path, 0, length, new String[0]));
431      }
432    }
433    // Save the number of input files for metrics/loadgen
434    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
435    sw.stop();
436    if (LOG.isDebugEnabled()) {
437      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
438          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
439    }
440    return splits;
441  }
442
443  protected long computeSplitSize(long blockSize, long minSize,
444                                  long maxSize) {
445    return Math.max(minSize, Math.min(maxSize, blockSize));
446  }
447
448  protected int getBlockIndex(BlockLocation[] blkLocations, 
449                              long offset) {
450    for (int i = 0 ; i < blkLocations.length; i++) {
451      // is the offset inside this block?
452      if ((blkLocations[i].getOffset() <= offset) &&
453          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
454        return i;
455      }
456    }
457    BlockLocation last = blkLocations[blkLocations.length -1];
458    long fileLength = last.getOffset() + last.getLength() -1;
459    throw new IllegalArgumentException("Offset " + offset + 
460                                       " is outside of file (0.." +
461                                       fileLength + ")");
462  }
463
464  /**
465   * Sets the given comma separated paths as the list of inputs 
466   * for the map-reduce job.
467   * 
468   * @param job the job
469   * @param commaSeparatedPaths Comma separated paths to be set as 
470   *        the list of inputs for the map-reduce job.
471   */
472  public static void setInputPaths(Job job, 
473                                   String commaSeparatedPaths
474                                   ) throws IOException {
475    setInputPaths(job, StringUtils.stringToPath(
476                        getPathStrings(commaSeparatedPaths)));
477  }
478
479  /**
480   * Add the given comma separated paths to the list of inputs for
481   *  the map-reduce job.
482   * 
483   * @param job The job to modify
484   * @param commaSeparatedPaths Comma separated paths to be added to
485   *        the list of inputs for the map-reduce job.
486   */
487  public static void addInputPaths(Job job, 
488                                   String commaSeparatedPaths
489                                   ) throws IOException {
490    for (String str : getPathStrings(commaSeparatedPaths)) {
491      addInputPath(job, new Path(str));
492    }
493  }
494
495  /**
496   * Set the array of {@link Path}s as the list of inputs
497   * for the map-reduce job.
498   * 
499   * @param job The job to modify 
500   * @param inputPaths the {@link Path}s of the input directories/files 
501   * for the map-reduce job.
502   */ 
503  public static void setInputPaths(Job job, 
504                                   Path... inputPaths) throws IOException {
505    Configuration conf = job.getConfiguration();
506    Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
507    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
508    for(int i = 1; i < inputPaths.length;i++) {
509      str.append(StringUtils.COMMA_STR);
510      path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
511      str.append(StringUtils.escapeString(path.toString()));
512    }
513    conf.set(INPUT_DIR, str.toString());
514  }
515
516  /**
517   * Add a {@link Path} to the list of inputs for the map-reduce job.
518   * 
519   * @param job The {@link Job} to modify
520   * @param path {@link Path} to be added to the list of inputs for 
521   *            the map-reduce job.
522   */
523  public static void addInputPath(Job job, 
524                                  Path path) throws IOException {
525    Configuration conf = job.getConfiguration();
526    path = path.getFileSystem(conf).makeQualified(path);
527    String dirStr = StringUtils.escapeString(path.toString());
528    String dirs = conf.get(INPUT_DIR);
529    conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
530  }
531  
532  // This method escapes commas in the glob pattern of the given paths.
533  private static String[] getPathStrings(String commaSeparatedPaths) {
534    int length = commaSeparatedPaths.length();
535    int curlyOpen = 0;
536    int pathStart = 0;
537    boolean globPattern = false;
538    List<String> pathStrings = new ArrayList<String>();
539    
540    for (int i=0; i<length; i++) {
541      char ch = commaSeparatedPaths.charAt(i);
542      switch(ch) {
543        case '{' : {
544          curlyOpen++;
545          if (!globPattern) {
546            globPattern = true;
547          }
548          break;
549        }
550        case '}' : {
551          curlyOpen--;
552          if (curlyOpen == 0 && globPattern) {
553            globPattern = false;
554          }
555          break;
556        }
557        case ',' : {
558          if (!globPattern) {
559            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
560            pathStart = i + 1 ;
561          }
562          break;
563        }
564        default:
565          continue; // nothing special to do for this character
566      }
567    }
568    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
569    
570    return pathStrings.toArray(new String[0]);
571  }
572  
573  /**
574   * Get the list of input {@link Path}s for the map-reduce job.
575   * 
576   * @param context The job
577   * @return the list of input {@link Path}s for the map-reduce job.
578   */
579  public static Path[] getInputPaths(JobContext context) {
580    String dirs = context.getConfiguration().get(INPUT_DIR, "");
581    String [] list = StringUtils.split(dirs);
582    Path[] result = new Path[list.length];
583    for (int i = 0; i < list.length; i++) {
584      result[i] = new Path(StringUtils.unEscapeString(list[i]));
585    }
586    return result;
587  }
588
589}