001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.IdentityHashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.fs.BlockLocation; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.LocatedFileStatus; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.PathFilter; 043import org.apache.hadoop.fs.RemoteIterator; 044import org.apache.hadoop.mapreduce.security.TokenCache; 045import org.apache.hadoop.net.NetworkTopology; 046import org.apache.hadoop.net.Node; 047import org.apache.hadoop.net.NodeBase; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.StopWatch; 050import org.apache.hadoop.util.StringUtils; 051 052import com.google.common.collect.Iterables; 053 054/** 055 * A base class for file-based {@link InputFormat}. 056 * 057 * <p><code>FileInputFormat</code> is the base class for all file-based 058 * <code>InputFormat</code>s. This provides a generic implementation of 059 * {@link #getSplits(JobConf, int)}. 060 * 061 * Implementations of <code>FileInputFormat</code> can also override the 062 * {@link #isSplitable(FileSystem, Path)} method to prevent input files 063 * from being split-up in certain situations. Implementations that may 064 * deal with non-splittable files <i>must</i> override this method, since 065 * the default implementation assumes splitting is always possible. 066 */ 067@InterfaceAudience.Public 068@InterfaceStability.Stable 069public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { 070 071 public static final Log LOG = 072 LogFactory.getLog(FileInputFormat.class); 073 074 @Deprecated 075 public static enum Counter { 076 BYTES_READ 077 } 078 079 public static final String NUM_INPUT_FILES = 080 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; 081 082 public static final String INPUT_DIR_RECURSIVE = 083 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; 084 085 086 private static final double SPLIT_SLOP = 1.1; // 10% slop 087 088 private long minSplitSize = 1; 089 private static final PathFilter hiddenFileFilter = new PathFilter(){ 090 public boolean accept(Path p){ 091 String name = p.getName(); 092 return !name.startsWith("_") && !name.startsWith("."); 093 } 094 }; 095 protected void setMinSplitSize(long minSplitSize) { 096 this.minSplitSize = minSplitSize; 097 } 098 099 /** 100 * Proxy PathFilter that accepts a path only if all filters given in the 101 * constructor do. Used by the listPaths() to apply the built-in 102 * hiddenFileFilter together with a user provided one (if any). 103 */ 104 private static class MultiPathFilter implements PathFilter { 105 private List<PathFilter> filters; 106 107 public MultiPathFilter(List<PathFilter> filters) { 108 this.filters = filters; 109 } 110 111 public boolean accept(Path path) { 112 for (PathFilter filter : filters) { 113 if (!filter.accept(path)) { 114 return false; 115 } 116 } 117 return true; 118 } 119 } 120 121 /** 122 * Is the given filename splittable? Usually, true, but if the file is 123 * stream compressed, it will not be. 124 * 125 * The default implementation in <code>FileInputFormat</code> always returns 126 * true. Implementations that may deal with non-splittable files <i>must</i> 127 * override this method. 128 * 129 * <code>FileInputFormat</code> implementations can override this and return 130 * <code>false</code> to ensure that individual input files are never split-up 131 * so that {@link Mapper}s process entire files. 132 * 133 * @param fs the file system that the file is on 134 * @param filename the file name to check 135 * @return is this file splitable? 136 */ 137 protected boolean isSplitable(FileSystem fs, Path filename) { 138 return true; 139 } 140 141 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 142 JobConf job, 143 Reporter reporter) 144 throws IOException; 145 146 /** 147 * Set a PathFilter to be applied to the input paths for the map-reduce job. 148 * 149 * @param filter the PathFilter class use for filtering the input paths. 150 */ 151 public static void setInputPathFilter(JobConf conf, 152 Class<? extends PathFilter> filter) { 153 conf.setClass(org.apache.hadoop.mapreduce.lib.input. 154 FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class); 155 } 156 157 /** 158 * Get a PathFilter instance of the filter set for the input paths. 159 * 160 * @return the PathFilter instance set for the job, NULL if none has been set. 161 */ 162 public static PathFilter getInputPathFilter(JobConf conf) { 163 Class<? extends PathFilter> filterClass = conf.getClass( 164 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS, 165 null, PathFilter.class); 166 return (filterClass != null) ? 167 ReflectionUtils.newInstance(filterClass, conf) : null; 168 } 169 170 /** 171 * Add files in the input path recursively into the results. 172 * @param result 173 * The List to store all files. 174 * @param fs 175 * The FileSystem. 176 * @param path 177 * The input path. 178 * @param inputFilter 179 * The input filter that can be used to filter files/dirs. 180 * @throws IOException 181 */ 182 protected void addInputPathRecursively(List<FileStatus> result, 183 FileSystem fs, Path path, PathFilter inputFilter) 184 throws IOException { 185 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 186 while (iter.hasNext()) { 187 LocatedFileStatus stat = iter.next(); 188 if (inputFilter.accept(stat.getPath())) { 189 if (stat.isDirectory()) { 190 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 191 } else { 192 result.add(stat); 193 } 194 } 195 } 196 } 197 198 /** List input directories. 199 * Subclasses may override to, e.g., select only files matching a regular 200 * expression. 201 * 202 * @param job the job to list input paths for 203 * @return array of FileStatus objects 204 * @throws IOException if zero items. 205 */ 206 protected FileStatus[] listStatus(JobConf job) throws IOException { 207 Path[] dirs = getInputPaths(job); 208 if (dirs.length == 0) { 209 throw new IOException("No input paths specified in job"); 210 } 211 212 // get tokens for all the required FileSystems.. 213 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); 214 215 // Whether we need to recursive look into the directory structure 216 boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); 217 218 // creates a MultiPathFilter with the hiddenFileFilter and the 219 // user provided one (if any). 220 List<PathFilter> filters = new ArrayList<PathFilter>(); 221 filters.add(hiddenFileFilter); 222 PathFilter jobFilter = getInputPathFilter(job); 223 if (jobFilter != null) { 224 filters.add(jobFilter); 225 } 226 PathFilter inputFilter = new MultiPathFilter(filters); 227 228 FileStatus[] result; 229 int numThreads = job 230 .getInt( 231 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, 232 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); 233 234 StopWatch sw = new StopWatch().start(); 235 if (numThreads == 1) { 236 List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 237 result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); 238 } else { 239 Iterable<FileStatus> locatedFiles = null; 240 try { 241 242 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 243 job, dirs, recursive, inputFilter, false); 244 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 245 } catch (InterruptedException e) { 246 throw new IOException("Interrupted while getting file statuses"); 247 } 248 result = Iterables.toArray(locatedFiles, FileStatus.class); 249 } 250 251 sw.stop(); 252 if (LOG.isDebugEnabled()) { 253 LOG.debug("Time taken to get FileStatuses: " 254 + sw.now(TimeUnit.MILLISECONDS)); 255 } 256 LOG.info("Total input files to process : " + result.length); 257 return result; 258 } 259 260 private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs, 261 PathFilter inputFilter, boolean recursive) throws IOException { 262 List<FileStatus> result = new ArrayList<FileStatus>(); 263 List<IOException> errors = new ArrayList<IOException>(); 264 for (Path p: dirs) { 265 FileSystem fs = p.getFileSystem(job); 266 FileStatus[] matches = fs.globStatus(p, inputFilter); 267 if (matches == null) { 268 errors.add(new IOException("Input path does not exist: " + p)); 269 } else if (matches.length == 0) { 270 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 271 } else { 272 for (FileStatus globStat: matches) { 273 if (globStat.isDirectory()) { 274 RemoteIterator<LocatedFileStatus> iter = 275 fs.listLocatedStatus(globStat.getPath()); 276 while (iter.hasNext()) { 277 LocatedFileStatus stat = iter.next(); 278 if (inputFilter.accept(stat.getPath())) { 279 if (recursive && stat.isDirectory()) { 280 addInputPathRecursively(result, fs, stat.getPath(), 281 inputFilter); 282 } else { 283 result.add(stat); 284 } 285 } 286 } 287 } else { 288 result.add(globStat); 289 } 290 } 291 } 292 } 293 if (!errors.isEmpty()) { 294 throw new InvalidInputException(errors); 295 } 296 return result; 297 } 298 299 /** 300 * A factory that makes the split for this class. It can be overridden 301 * by sub-classes to make sub-types 302 */ 303 protected FileSplit makeSplit(Path file, long start, long length, 304 String[] hosts) { 305 return new FileSplit(file, start, length, hosts); 306 } 307 308 /** 309 * A factory that makes the split for this class. It can be overridden 310 * by sub-classes to make sub-types 311 */ 312 protected FileSplit makeSplit(Path file, long start, long length, 313 String[] hosts, String[] inMemoryHosts) { 314 return new FileSplit(file, start, length, hosts, inMemoryHosts); 315 } 316 317 /** Splits files returned by {@link #listStatus(JobConf)} when 318 * they're too big.*/ 319 public InputSplit[] getSplits(JobConf job, int numSplits) 320 throws IOException { 321 StopWatch sw = new StopWatch().start(); 322 FileStatus[] files = listStatus(job); 323 324 // Save the number of input files for metrics/loadgen 325 job.setLong(NUM_INPUT_FILES, files.length); 326 long totalSize = 0; // compute total size 327 for (FileStatus file: files) { // check we have valid files 328 if (file.isDirectory()) { 329 throw new IOException("Not a file: "+ file.getPath()); 330 } 331 totalSize += file.getLen(); 332 } 333 334 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 335 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. 336 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); 337 338 // generate splits 339 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 340 NetworkTopology clusterMap = new NetworkTopology(); 341 for (FileStatus file: files) { 342 Path path = file.getPath(); 343 long length = file.getLen(); 344 if (length != 0) { 345 FileSystem fs = path.getFileSystem(job); 346 BlockLocation[] blkLocations; 347 if (file instanceof LocatedFileStatus) { 348 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 349 } else { 350 blkLocations = fs.getFileBlockLocations(file, 0, length); 351 } 352 if (isSplitable(fs, path)) { 353 long blockSize = file.getBlockSize(); 354 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 355 356 long bytesRemaining = length; 357 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 358 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 359 length-bytesRemaining, splitSize, clusterMap); 360 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 361 splitHosts[0], splitHosts[1])); 362 bytesRemaining -= splitSize; 363 } 364 365 if (bytesRemaining != 0) { 366 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length 367 - bytesRemaining, bytesRemaining, clusterMap); 368 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 369 splitHosts[0], splitHosts[1])); 370 } 371 } else { 372 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); 373 splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); 374 } 375 } else { 376 //Create empty hosts array for zero length files 377 splits.add(makeSplit(path, 0, length, new String[0])); 378 } 379 } 380 sw.stop(); 381 if (LOG.isDebugEnabled()) { 382 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 383 + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 384 } 385 return splits.toArray(new FileSplit[splits.size()]); 386 } 387 388 protected long computeSplitSize(long goalSize, long minSize, 389 long blockSize) { 390 return Math.max(minSize, Math.min(goalSize, blockSize)); 391 } 392 393 protected int getBlockIndex(BlockLocation[] blkLocations, 394 long offset) { 395 for (int i = 0 ; i < blkLocations.length; i++) { 396 // is the offset inside this block? 397 if ((blkLocations[i].getOffset() <= offset) && 398 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 399 return i; 400 } 401 } 402 BlockLocation last = blkLocations[blkLocations.length -1]; 403 long fileLength = last.getOffset() + last.getLength() -1; 404 throw new IllegalArgumentException("Offset " + offset + 405 " is outside of file (0.." + 406 fileLength + ")"); 407 } 408 409 /** 410 * Sets the given comma separated paths as the list of inputs 411 * for the map-reduce job. 412 * 413 * @param conf Configuration of the job 414 * @param commaSeparatedPaths Comma separated paths to be set as 415 * the list of inputs for the map-reduce job. 416 */ 417 public static void setInputPaths(JobConf conf, String commaSeparatedPaths) { 418 setInputPaths(conf, StringUtils.stringToPath( 419 getPathStrings(commaSeparatedPaths))); 420 } 421 422 /** 423 * Add the given comma separated paths to the list of inputs for 424 * the map-reduce job. 425 * 426 * @param conf The configuration of the job 427 * @param commaSeparatedPaths Comma separated paths to be added to 428 * the list of inputs for the map-reduce job. 429 */ 430 public static void addInputPaths(JobConf conf, String commaSeparatedPaths) { 431 for (String str : getPathStrings(commaSeparatedPaths)) { 432 addInputPath(conf, new Path(str)); 433 } 434 } 435 436 /** 437 * Set the array of {@link Path}s as the list of inputs 438 * for the map-reduce job. 439 * 440 * @param conf Configuration of the job. 441 * @param inputPaths the {@link Path}s of the input directories/files 442 * for the map-reduce job. 443 */ 444 public static void setInputPaths(JobConf conf, Path... inputPaths) { 445 Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); 446 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 447 for(int i = 1; i < inputPaths.length;i++) { 448 str.append(StringUtils.COMMA_STR); 449 path = new Path(conf.getWorkingDirectory(), inputPaths[i]); 450 str.append(StringUtils.escapeString(path.toString())); 451 } 452 conf.set(org.apache.hadoop.mapreduce.lib.input. 453 FileInputFormat.INPUT_DIR, str.toString()); 454 } 455 456 /** 457 * Add a {@link Path} to the list of inputs for the map-reduce job. 458 * 459 * @param conf The configuration of the job 460 * @param path {@link Path} to be added to the list of inputs for 461 * the map-reduce job. 462 */ 463 public static void addInputPath(JobConf conf, Path path ) { 464 path = new Path(conf.getWorkingDirectory(), path); 465 String dirStr = StringUtils.escapeString(path.toString()); 466 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 467 FileInputFormat.INPUT_DIR); 468 conf.set(org.apache.hadoop.mapreduce.lib.input. 469 FileInputFormat.INPUT_DIR, dirs == null ? dirStr : 470 dirs + StringUtils.COMMA_STR + dirStr); 471 } 472 473 // This method escapes commas in the glob pattern of the given paths. 474 private static String[] getPathStrings(String commaSeparatedPaths) { 475 int length = commaSeparatedPaths.length(); 476 int curlyOpen = 0; 477 int pathStart = 0; 478 boolean globPattern = false; 479 List<String> pathStrings = new ArrayList<String>(); 480 481 for (int i=0; i<length; i++) { 482 char ch = commaSeparatedPaths.charAt(i); 483 switch(ch) { 484 case '{' : { 485 curlyOpen++; 486 if (!globPattern) { 487 globPattern = true; 488 } 489 break; 490 } 491 case '}' : { 492 curlyOpen--; 493 if (curlyOpen == 0 && globPattern) { 494 globPattern = false; 495 } 496 break; 497 } 498 case ',' : { 499 if (!globPattern) { 500 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 501 pathStart = i + 1 ; 502 } 503 break; 504 } 505 default: 506 continue; // nothing special to do for this character 507 } 508 } 509 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 510 511 return pathStrings.toArray(new String[0]); 512 } 513 514 /** 515 * Get the list of input {@link Path}s for the map-reduce job. 516 * 517 * @param conf The configuration of the job 518 * @return the list of input {@link Path}s for the map-reduce job. 519 */ 520 public static Path[] getInputPaths(JobConf conf) { 521 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 522 FileInputFormat.INPUT_DIR, ""); 523 String [] list = StringUtils.split(dirs); 524 Path[] result = new Path[list.length]; 525 for (int i = 0; i < list.length; i++) { 526 result[i] = new Path(StringUtils.unEscapeString(list[i])); 527 } 528 return result; 529 } 530 531 532 private void sortInDescendingOrder(List<NodeInfo> mylist) { 533 Collections.sort(mylist, new Comparator<NodeInfo> () { 534 public int compare(NodeInfo obj1, NodeInfo obj2) { 535 536 if (obj1 == null || obj2 == null) 537 return -1; 538 539 if (obj1.getValue() == obj2.getValue()) { 540 return 0; 541 } 542 else { 543 return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); 544 } 545 } 546 } 547 ); 548 } 549 550 /** 551 * This function identifies and returns the hosts that contribute 552 * most for a given split. For calculating the contribution, rack 553 * locality is treated on par with host locality, so hosts from racks 554 * that contribute the most are preferred over hosts on racks that 555 * contribute less 556 * @param blkLocations The list of block locations 557 * @param offset 558 * @param splitSize 559 * @return an array of hosts that contribute most to this split 560 * @throws IOException 561 */ 562 protected String[] getSplitHosts(BlockLocation[] blkLocations, 563 long offset, long splitSize, NetworkTopology clusterMap) throws IOException { 564 return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, 565 clusterMap)[0]; 566 } 567 568 /** 569 * This function identifies and returns the hosts that contribute 570 * most for a given split. For calculating the contribution, rack 571 * locality is treated on par with host locality, so hosts from racks 572 * that contribute the most are preferred over hosts on racks that 573 * contribute less 574 * @param blkLocations The list of block locations 575 * @param offset 576 * @param splitSize 577 * @return two arrays - one of hosts that contribute most to this split, and 578 * one of hosts that contribute most to this split that have the data 579 * cached on them 580 * @throws IOException 581 */ 582 private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 583 long offset, long splitSize, NetworkTopology clusterMap) 584 throws IOException { 585 586 int startIndex = getBlockIndex(blkLocations, offset); 587 588 long bytesInThisBlock = blkLocations[startIndex].getOffset() + 589 blkLocations[startIndex].getLength() - offset; 590 591 //If this is the only block, just return 592 if (bytesInThisBlock >= splitSize) { 593 return new String[][] { blkLocations[startIndex].getHosts(), 594 blkLocations[startIndex].getCachedHosts() }; 595 } 596 597 long bytesInFirstBlock = bytesInThisBlock; 598 int index = startIndex + 1; 599 splitSize -= bytesInThisBlock; 600 601 while (splitSize > 0) { 602 bytesInThisBlock = 603 Math.min(splitSize, blkLocations[index++].getLength()); 604 splitSize -= bytesInThisBlock; 605 } 606 607 long bytesInLastBlock = bytesInThisBlock; 608 int endIndex = index - 1; 609 610 Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>(); 611 Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>(); 612 String [] allTopos = new String[0]; 613 614 // Build the hierarchy and aggregate the contribution of 615 // bytes at each level. See TestGetSplitHosts.java 616 617 for (index = startIndex; index <= endIndex; index++) { 618 619 // Establish the bytes in this block 620 if (index == startIndex) { 621 bytesInThisBlock = bytesInFirstBlock; 622 } 623 else if (index == endIndex) { 624 bytesInThisBlock = bytesInLastBlock; 625 } 626 else { 627 bytesInThisBlock = blkLocations[index].getLength(); 628 } 629 630 allTopos = blkLocations[index].getTopologyPaths(); 631 632 // If no topology information is available, just 633 // prefix a fakeRack 634 if (allTopos.length == 0) { 635 allTopos = fakeRacks(blkLocations, index); 636 } 637 638 // NOTE: This code currently works only for one level of 639 // hierarchy (rack/host). However, it is relatively easy 640 // to extend this to support aggregation at different 641 // levels 642 643 for (String topo: allTopos) { 644 645 Node node, parentNode; 646 NodeInfo nodeInfo, parentNodeInfo; 647 648 node = clusterMap.getNode(topo); 649 650 if (node == null) { 651 node = new NodeBase(topo); 652 clusterMap.add(node); 653 } 654 655 nodeInfo = hostsMap.get(node); 656 657 if (nodeInfo == null) { 658 nodeInfo = new NodeInfo(node); 659 hostsMap.put(node,nodeInfo); 660 parentNode = node.getParent(); 661 parentNodeInfo = racksMap.get(parentNode); 662 if (parentNodeInfo == null) { 663 parentNodeInfo = new NodeInfo(parentNode); 664 racksMap.put(parentNode,parentNodeInfo); 665 } 666 parentNodeInfo.addLeaf(nodeInfo); 667 } 668 else { 669 nodeInfo = hostsMap.get(node); 670 parentNode = node.getParent(); 671 parentNodeInfo = racksMap.get(parentNode); 672 } 673 674 nodeInfo.addValue(index, bytesInThisBlock); 675 parentNodeInfo.addValue(index, bytesInThisBlock); 676 677 } // for all topos 678 679 } // for all indices 680 681 // We don't yet support cached hosts when bytesInThisBlock > splitSize 682 return new String[][] { identifyHosts(allTopos.length, racksMap), 683 new String[0]}; 684 } 685 686 private String[] identifyHosts(int replicationFactor, 687 Map<Node,NodeInfo> racksMap) { 688 689 String [] retVal = new String[replicationFactor]; 690 691 List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 692 693 rackList.addAll(racksMap.values()); 694 695 // Sort the racks based on their contribution to this split 696 sortInDescendingOrder(rackList); 697 698 boolean done = false; 699 int index = 0; 700 701 // Get the host list for all our aggregated items, sort 702 // them and return the top entries 703 for (NodeInfo ni: rackList) { 704 705 Set<NodeInfo> hostSet = ni.getLeaves(); 706 707 List<NodeInfo>hostList = new LinkedList<NodeInfo>(); 708 hostList.addAll(hostSet); 709 710 // Sort the hosts in this rack based on their contribution 711 sortInDescendingOrder(hostList); 712 713 for (NodeInfo host: hostList) { 714 // Strip out the port number from the host name 715 retVal[index++] = host.node.getName().split(":")[0]; 716 if (index == replicationFactor) { 717 done = true; 718 break; 719 } 720 } 721 722 if (done == true) { 723 break; 724 } 725 } 726 return retVal; 727 } 728 729 private String[] fakeRacks(BlockLocation[] blkLocations, int index) 730 throws IOException { 731 String[] allHosts = blkLocations[index].getHosts(); 732 String[] allTopos = new String[allHosts.length]; 733 for (int i = 0; i < allHosts.length; i++) { 734 allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; 735 } 736 return allTopos; 737 } 738 739 740 private static class NodeInfo { 741 final Node node; 742 final Set<Integer> blockIds; 743 final Set<NodeInfo> leaves; 744 745 private long value; 746 747 NodeInfo(Node node) { 748 this.node = node; 749 blockIds = new HashSet<Integer>(); 750 leaves = new HashSet<NodeInfo>(); 751 } 752 753 long getValue() {return value;} 754 755 void addValue(int blockIndex, long value) { 756 if (blockIds.add(blockIndex) == true) { 757 this.value += value; 758 } 759 } 760 761 Set<NodeInfo> getLeaves() { return leaves;} 762 763 void addLeaf(NodeInfo nodeInfo) { 764 leaves.add(nodeInfo); 765 } 766 } 767}