001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.LinkedHashSet; 026import java.util.HashSet; 027import java.util.List; 028import java.util.HashMap; 029import java.util.Set; 030import java.util.Iterator; 031import java.util.Map; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.LocatedFileStatus; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.fs.BlockLocation; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.PathFilter; 044import org.apache.hadoop.io.compress.CompressionCodec; 045import org.apache.hadoop.io.compress.CompressionCodecFactory; 046import org.apache.hadoop.io.compress.SplittableCompressionCodec; 047import org.apache.hadoop.mapreduce.InputFormat; 048import org.apache.hadoop.mapreduce.InputSplit; 049import org.apache.hadoop.mapreduce.JobContext; 050import org.apache.hadoop.mapreduce.RecordReader; 051import org.apache.hadoop.mapreduce.TaskAttemptContext; 052import org.apache.hadoop.net.NodeBase; 053import org.apache.hadoop.net.NetworkTopology; 054 055import com.google.common.annotations.VisibleForTesting; 056import com.google.common.collect.HashMultiset; 057import com.google.common.collect.Multiset; 058 059/** 060 * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 061 * {@link InputFormat#getSplits(JobContext)} method. 062 * 063 * Splits are constructed from the files under the input paths. 064 * A split cannot have files from different pools. 065 * Each split returned may contain blocks from different files. 066 * If a maxSplitSize is specified, then blocks on the same node are 067 * combined to form a single split. Blocks that are left over are 068 * then combined with other blocks in the same rack. 069 * If maxSplitSize is not specified, then blocks from the same rack 070 * are combined in a single split; no attempt is made to create 071 * node-local splits. 072 * If the maxSplitSize is equal to the block size, then this class 073 * is similar to the default splitting behavior in Hadoop: each 074 * block is a locally processed split. 075 * Subclasses implement 076 * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)} 077 * to construct <code>RecordReader</code>'s for 078 * <code>CombineFileSplit</code>'s. 079 * 080 * @see CombineFileSplit 081 */ 082@InterfaceAudience.Public 083@InterfaceStability.Stable 084public abstract class CombineFileInputFormat<K, V> 085 extends FileInputFormat<K, V> { 086 087 private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class); 088 089 public static final String SPLIT_MINSIZE_PERNODE = 090 "mapreduce.input.fileinputformat.split.minsize.per.node"; 091 public static final String SPLIT_MINSIZE_PERRACK = 092 "mapreduce.input.fileinputformat.split.minsize.per.rack"; 093 // ability to limit the size of a single split 094 private long maxSplitSize = 0; 095 private long minSplitSizeNode = 0; 096 private long minSplitSizeRack = 0; 097 098 // A pool of input paths filters. A split cannot have blocks from files 099 // across multiple pools. 100 private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>(); 101 102 // mapping from a rack name to the set of Nodes in the rack 103 private HashMap<String, Set<String>> rackToNodes = 104 new HashMap<String, Set<String>>(); 105 /** 106 * Specify the maximum size (in bytes) of each split. Each split is 107 * approximately equal to the specified size. 108 */ 109 protected void setMaxSplitSize(long maxSplitSize) { 110 this.maxSplitSize = maxSplitSize; 111 } 112 113 /** 114 * Specify the minimum size (in bytes) of each split per node. 115 * This applies to data that is left over after combining data on a single 116 * node into splits that are of maximum size specified by maxSplitSize. 117 * This leftover data will be combined into its own split if its size 118 * exceeds minSplitSizeNode. 119 */ 120 protected void setMinSplitSizeNode(long minSplitSizeNode) { 121 this.minSplitSizeNode = minSplitSizeNode; 122 } 123 124 /** 125 * Specify the minimum size (in bytes) of each split per rack. 126 * This applies to data that is left over after combining data on a single 127 * rack into splits that are of maximum size specified by maxSplitSize. 128 * This leftover data will be combined into its own split if its size 129 * exceeds minSplitSizeRack. 130 */ 131 protected void setMinSplitSizeRack(long minSplitSizeRack) { 132 this.minSplitSizeRack = minSplitSizeRack; 133 } 134 135 /** 136 * Create a new pool and add the filters to it. 137 * A split cannot have files from different pools. 138 */ 139 protected void createPool(List<PathFilter> filters) { 140 pools.add(new MultiPathFilter(filters)); 141 } 142 143 /** 144 * Create a new pool and add the filters to it. 145 * A pathname can satisfy any one of the specified filters. 146 * A split cannot have files from different pools. 147 */ 148 protected void createPool(PathFilter... filters) { 149 MultiPathFilter multi = new MultiPathFilter(); 150 for (PathFilter f: filters) { 151 multi.add(f); 152 } 153 pools.add(multi); 154 } 155 156 @Override 157 protected boolean isSplitable(JobContext context, Path file) { 158 final CompressionCodec codec = 159 new CompressionCodecFactory(context.getConfiguration()).getCodec(file); 160 if (null == codec) { 161 return true; 162 } 163 return codec instanceof SplittableCompressionCodec; 164 } 165 166 /** 167 * default constructor 168 */ 169 public CombineFileInputFormat() { 170 } 171 172 @Override 173 public List<InputSplit> getSplits(JobContext job) 174 throws IOException { 175 long minSizeNode = 0; 176 long minSizeRack = 0; 177 long maxSize = 0; 178 Configuration conf = job.getConfiguration(); 179 180 // the values specified by setxxxSplitSize() takes precedence over the 181 // values that might have been specified in the config 182 if (minSplitSizeNode != 0) { 183 minSizeNode = minSplitSizeNode; 184 } else { 185 minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); 186 } 187 if (minSplitSizeRack != 0) { 188 minSizeRack = minSplitSizeRack; 189 } else { 190 minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); 191 } 192 if (maxSplitSize != 0) { 193 maxSize = maxSplitSize; 194 } else { 195 maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); 196 // If maxSize is not configured, a single split will be generated per 197 // node. 198 } 199 if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { 200 throw new IOException("Minimum split size pernode " + minSizeNode + 201 " cannot be larger than maximum split size " + 202 maxSize); 203 } 204 if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { 205 throw new IOException("Minimum split size per rack " + minSizeRack + 206 " cannot be larger than maximum split size " + 207 maxSize); 208 } 209 if (minSizeRack != 0 && minSizeNode > minSizeRack) { 210 throw new IOException("Minimum split size per node " + minSizeNode + 211 " cannot be larger than minimum split " + 212 "size per rack " + minSizeRack); 213 } 214 215 // all the files in input set 216 List<FileStatus> stats = listStatus(job); 217 List<InputSplit> splits = new ArrayList<InputSplit>(); 218 if (stats.size() == 0) { 219 return splits; 220 } 221 222 // In one single iteration, process all the paths in a single pool. 223 // Processing one pool at a time ensures that a split contains paths 224 // from a single pool only. 225 for (MultiPathFilter onepool : pools) { 226 ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>(); 227 228 // pick one input path. If it matches all the filters in a pool, 229 // add it to the output set 230 for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) { 231 FileStatus p = iter.next(); 232 if (onepool.accept(p.getPath())) { 233 myPaths.add(p); // add it to my output set 234 iter.remove(); 235 } 236 } 237 // create splits for all files in this pool. 238 getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits); 239 } 240 241 // create splits for all files that are not in any pool. 242 getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits); 243 244 // free up rackToNodes map 245 rackToNodes.clear(); 246 return splits; 247 } 248 249 /** 250 * Return all the splits in the specified set of paths 251 */ 252 private void getMoreSplits(JobContext job, List<FileStatus> stats, 253 long maxSize, long minSizeNode, long minSizeRack, 254 List<InputSplit> splits) 255 throws IOException { 256 Configuration conf = job.getConfiguration(); 257 258 // all blocks for all the files in input set 259 OneFileInfo[] files; 260 261 // mapping from a rack name to the list of blocks it has 262 HashMap<String, List<OneBlockInfo>> rackToBlocks = 263 new HashMap<String, List<OneBlockInfo>>(); 264 265 // mapping from a block to the nodes on which it has replicas 266 HashMap<OneBlockInfo, String[]> blockToNodes = 267 new HashMap<OneBlockInfo, String[]>(); 268 269 // mapping from a node to the list of blocks that it contains 270 HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 271 new HashMap<String, Set<OneBlockInfo>>(); 272 273 files = new OneFileInfo[stats.size()]; 274 if (stats.size() == 0) { 275 return; 276 } 277 278 // populate all the blocks for all files 279 long totLength = 0; 280 int i = 0; 281 for (FileStatus stat : stats) { 282 files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()), 283 rackToBlocks, blockToNodes, nodeToBlocks, 284 rackToNodes, maxSize); 285 totLength += files[i].getLength(); 286 } 287 createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 288 maxSize, minSizeNode, minSizeRack, splits); 289 } 290 291 /** 292 * Process all the nodes and create splits that are local to a node. 293 * Generate one split per node iteration, and walk over nodes multiple times 294 * to distribute the splits across nodes. 295 * <p> 296 * Note: The order of processing the nodes is undetermined because the 297 * implementation of nodeToBlocks is {@link java.util.HashMap} and its order 298 * of the entries is undetermined. 299 * @param nodeToBlocks Mapping from a node to the list of blocks that 300 * it contains. 301 * @param blockToNodes Mapping from a block to the nodes on which 302 * it has replicas. 303 * @param rackToBlocks Mapping from a rack name to the list of blocks it has. 304 * @param totLength Total length of the input files. 305 * @param maxSize Max size of each split. 306 * If set to 0, disable smoothing load. 307 * @param minSizeNode Minimum split size per node. 308 * @param minSizeRack Minimum split size per rack. 309 * @param splits New splits created by this method are added to the list. 310 */ 311 @VisibleForTesting 312 void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks, 313 Map<OneBlockInfo, String[]> blockToNodes, 314 Map<String, List<OneBlockInfo>> rackToBlocks, 315 long totLength, 316 long maxSize, 317 long minSizeNode, 318 long minSizeRack, 319 List<InputSplit> splits 320 ) { 321 ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>(); 322 long curSplitSize = 0; 323 324 int totalNodes = nodeToBlocks.size(); 325 long totalLength = totLength; 326 327 Multiset<String> splitsPerNode = HashMultiset.create(); 328 Set<String> completedNodes = new HashSet<String>(); 329 330 while(true) { 331 for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks 332 .entrySet().iterator(); iter.hasNext();) { 333 Map.Entry<String, Set<OneBlockInfo>> one = iter.next(); 334 335 String node = one.getKey(); 336 337 // Skip the node if it has previously been marked as completed. 338 if (completedNodes.contains(node)) { 339 continue; 340 } 341 342 Set<OneBlockInfo> blocksInCurrentNode = one.getValue(); 343 344 // for each block, copy it into validBlocks. Delete it from 345 // blockToNodes so that the same block does not appear in 346 // two different splits. 347 Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator(); 348 while (oneBlockIter.hasNext()) { 349 OneBlockInfo oneblock = oneBlockIter.next(); 350 351 // Remove all blocks which may already have been assigned to other 352 // splits. 353 if(!blockToNodes.containsKey(oneblock)) { 354 oneBlockIter.remove(); 355 continue; 356 } 357 358 validBlocks.add(oneblock); 359 blockToNodes.remove(oneblock); 360 curSplitSize += oneblock.length; 361 362 // if the accumulated split size exceeds the maximum, then 363 // create this split. 364 if (maxSize != 0 && curSplitSize >= maxSize) { 365 // create an input split and add it to the splits array 366 addCreatedSplit(splits, Collections.singleton(node), validBlocks); 367 totalLength -= curSplitSize; 368 curSplitSize = 0; 369 370 splitsPerNode.add(node); 371 372 // Remove entries from blocksInNode so that we don't walk these 373 // again. 374 blocksInCurrentNode.removeAll(validBlocks); 375 validBlocks.clear(); 376 377 // Done creating a single split for this node. Move on to the next 378 // node so that splits are distributed across nodes. 379 break; 380 } 381 382 } 383 if (validBlocks.size() != 0) { 384 // This implies that the last few blocks (or all in case maxSize=0) 385 // were not part of a split. The node is complete. 386 387 // if there were any blocks left over and their combined size is 388 // larger than minSplitNode, then combine them into one split. 389 // Otherwise add them back to the unprocessed pool. It is likely 390 // that they will be combined with other blocks from the 391 // same rack later on. 392 // This condition also kicks in when max split size is not set. All 393 // blocks on a node will be grouped together into a single split. 394 if (minSizeNode != 0 && curSplitSize >= minSizeNode 395 && splitsPerNode.count(node) == 0) { 396 // haven't created any split on this machine. so its ok to add a 397 // smaller one for parallelism. Otherwise group it in the rack for 398 // balanced size create an input split and add it to the splits 399 // array 400 addCreatedSplit(splits, Collections.singleton(node), validBlocks); 401 totalLength -= curSplitSize; 402 splitsPerNode.add(node); 403 // Remove entries from blocksInNode so that we don't walk this again. 404 blocksInCurrentNode.removeAll(validBlocks); 405 // The node is done. This was the last set of blocks for this node. 406 } else { 407 // Put the unplaced blocks back into the pool for later rack-allocation. 408 for (OneBlockInfo oneblock : validBlocks) { 409 blockToNodes.put(oneblock, oneblock.hosts); 410 } 411 } 412 validBlocks.clear(); 413 curSplitSize = 0; 414 completedNodes.add(node); 415 } else { // No in-flight blocks. 416 if (blocksInCurrentNode.size() == 0) { 417 // Node is done. All blocks were fit into node-local splits. 418 completedNodes.add(node); 419 } // else Run through the node again. 420 } 421 } 422 423 // Check if node-local assignments are complete. 424 if (completedNodes.size() == totalNodes || totalLength == 0) { 425 // All nodes have been walked over and marked as completed or all blocks 426 // have been assigned. The rest should be handled via rackLock assignment. 427 LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: " 428 + completedNodes.size() + ", size left: " + totalLength); 429 break; 430 } 431 } 432 433 // if blocks in a rack are below the specified minimum size, then keep them 434 // in 'overflow'. After the processing of all racks is complete, these 435 // overflow blocks will be combined into splits. 436 ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>(); 437 Set<String> racks = new HashSet<String>(); 438 439 // Process all racks over and over again until there is no more work to do. 440 while (blockToNodes.size() > 0) { 441 442 // Create one split for this rack before moving over to the next rack. 443 // Come back to this rack after creating a single split for each of the 444 // remaining racks. 445 // Process one rack location at a time, Combine all possible blocks that 446 // reside on this rack as one split. (constrained by minimum and maximum 447 // split size). 448 449 // iterate over all racks 450 for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 451 rackToBlocks.entrySet().iterator(); iter.hasNext();) { 452 453 Map.Entry<String, List<OneBlockInfo>> one = iter.next(); 454 racks.add(one.getKey()); 455 List<OneBlockInfo> blocks = one.getValue(); 456 457 // for each block, copy it into validBlocks. Delete it from 458 // blockToNodes so that the same block does not appear in 459 // two different splits. 460 boolean createdSplit = false; 461 for (OneBlockInfo oneblock : blocks) { 462 if (blockToNodes.containsKey(oneblock)) { 463 validBlocks.add(oneblock); 464 blockToNodes.remove(oneblock); 465 curSplitSize += oneblock.length; 466 467 // if the accumulated split size exceeds the maximum, then 468 // create this split. 469 if (maxSize != 0 && curSplitSize >= maxSize) { 470 // create an input split and add it to the splits array 471 addCreatedSplit(splits, getHosts(racks), validBlocks); 472 createdSplit = true; 473 break; 474 } 475 } 476 } 477 478 // if we created a split, then just go to the next rack 479 if (createdSplit) { 480 curSplitSize = 0; 481 validBlocks.clear(); 482 racks.clear(); 483 continue; 484 } 485 486 if (!validBlocks.isEmpty()) { 487 if (minSizeRack != 0 && curSplitSize >= minSizeRack) { 488 // if there is a minimum size specified, then create a single split 489 // otherwise, store these blocks into overflow data structure 490 addCreatedSplit(splits, getHosts(racks), validBlocks); 491 } else { 492 // There were a few blocks in this rack that 493 // remained to be processed. Keep them in 'overflow' block list. 494 // These will be combined later. 495 overflowBlocks.addAll(validBlocks); 496 } 497 } 498 curSplitSize = 0; 499 validBlocks.clear(); 500 racks.clear(); 501 } 502 } 503 504 assert blockToNodes.isEmpty(); 505 assert curSplitSize == 0; 506 assert validBlocks.isEmpty(); 507 assert racks.isEmpty(); 508 509 // Process all overflow blocks 510 for (OneBlockInfo oneblock : overflowBlocks) { 511 validBlocks.add(oneblock); 512 curSplitSize += oneblock.length; 513 514 // This might cause an exiting rack location to be re-added, 515 // but it should be ok. 516 for (int i = 0; i < oneblock.racks.length; i++) { 517 racks.add(oneblock.racks[i]); 518 } 519 520 // if the accumulated split size exceeds the maximum, then 521 // create this split. 522 if (maxSize != 0 && curSplitSize >= maxSize) { 523 // create an input split and add it to the splits array 524 addCreatedSplit(splits, getHosts(racks), validBlocks); 525 curSplitSize = 0; 526 validBlocks.clear(); 527 racks.clear(); 528 } 529 } 530 531 // Process any remaining blocks, if any. 532 if (!validBlocks.isEmpty()) { 533 addCreatedSplit(splits, getHosts(racks), validBlocks); 534 } 535 } 536 537 /** 538 * Create a single split from the list of blocks specified in validBlocks 539 * Add this new split into splitList. 540 */ 541 private void addCreatedSplit(List<InputSplit> splitList, 542 Collection<String> locations, 543 ArrayList<OneBlockInfo> validBlocks) { 544 // create an input split 545 Path[] fl = new Path[validBlocks.size()]; 546 long[] offset = new long[validBlocks.size()]; 547 long[] length = new long[validBlocks.size()]; 548 for (int i = 0; i < validBlocks.size(); i++) { 549 fl[i] = validBlocks.get(i).onepath; 550 offset[i] = validBlocks.get(i).offset; 551 length[i] = validBlocks.get(i).length; 552 } 553 // add this split to the list that is returned 554 CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 555 length, locations.toArray(new String[0])); 556 splitList.add(thissplit); 557 } 558 559 /** 560 * This is not implemented yet. 561 */ 562 public abstract RecordReader<K, V> createRecordReader(InputSplit split, 563 TaskAttemptContext context) throws IOException; 564 565 /** 566 * information about one file from the File System 567 */ 568 @VisibleForTesting 569 static class OneFileInfo { 570 private long fileSize; // size of the file 571 private OneBlockInfo[] blocks; // all blocks in this file 572 573 OneFileInfo(FileStatus stat, Configuration conf, 574 boolean isSplitable, 575 HashMap<String, List<OneBlockInfo>> rackToBlocks, 576 HashMap<OneBlockInfo, String[]> blockToNodes, 577 HashMap<String, Set<OneBlockInfo>> nodeToBlocks, 578 HashMap<String, Set<String>> rackToNodes, 579 long maxSize) 580 throws IOException { 581 this.fileSize = 0; 582 583 // get block locations from file system 584 BlockLocation[] locations; 585 if (stat instanceof LocatedFileStatus) { 586 locations = ((LocatedFileStatus) stat).getBlockLocations(); 587 } else { 588 FileSystem fs = stat.getPath().getFileSystem(conf); 589 locations = fs.getFileBlockLocations(stat, 0, stat.getLen()); 590 } 591 // create a list of all block and their locations 592 if (locations == null) { 593 blocks = new OneBlockInfo[0]; 594 } else { 595 596 if(locations.length == 0 && !stat.isDirectory()) { 597 locations = new BlockLocation[] { new BlockLocation() }; 598 } 599 600 if (!isSplitable) { 601 // if the file is not splitable, just create the one block with 602 // full file length 603 blocks = new OneBlockInfo[1]; 604 fileSize = stat.getLen(); 605 blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize, 606 locations[0].getHosts(), locations[0].getTopologyPaths()); 607 } else { 608 ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>( 609 locations.length); 610 for (int i = 0; i < locations.length; i++) { 611 fileSize += locations[i].getLength(); 612 613 // each split can be a maximum of maxSize 614 long left = locations[i].getLength(); 615 long myOffset = locations[i].getOffset(); 616 long myLength = 0; 617 do { 618 if (maxSize == 0) { 619 myLength = left; 620 } else { 621 if (left > maxSize && left < 2 * maxSize) { 622 // if remainder is between max and 2*max - then 623 // instead of creating splits of size max, left-max we 624 // create splits of size left/2 and left/2. This is 625 // a heuristic to avoid creating really really small 626 // splits. 627 myLength = left / 2; 628 } else { 629 myLength = Math.min(maxSize, left); 630 } 631 } 632 OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(), 633 myOffset, myLength, locations[i].getHosts(), 634 locations[i].getTopologyPaths()); 635 left -= myLength; 636 myOffset += myLength; 637 638 blocksList.add(oneblock); 639 } while (left > 0); 640 } 641 blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]); 642 } 643 644 populateBlockInfo(blocks, rackToBlocks, blockToNodes, 645 nodeToBlocks, rackToNodes); 646 } 647 } 648 649 @VisibleForTesting 650 static void populateBlockInfo(OneBlockInfo[] blocks, 651 Map<String, List<OneBlockInfo>> rackToBlocks, 652 Map<OneBlockInfo, String[]> blockToNodes, 653 Map<String, Set<OneBlockInfo>> nodeToBlocks, 654 Map<String, Set<String>> rackToNodes) { 655 for (OneBlockInfo oneblock : blocks) { 656 // add this block to the block --> node locations map 657 blockToNodes.put(oneblock, oneblock.hosts); 658 659 // For blocks that do not have host/rack information, 660 // assign to default rack. 661 String[] racks = null; 662 if (oneblock.hosts.length == 0) { 663 racks = new String[]{NetworkTopology.DEFAULT_RACK}; 664 } else { 665 racks = oneblock.racks; 666 } 667 668 // add this block to the rack --> block map 669 for (int j = 0; j < racks.length; j++) { 670 String rack = racks[j]; 671 List<OneBlockInfo> blklist = rackToBlocks.get(rack); 672 if (blklist == null) { 673 blklist = new ArrayList<OneBlockInfo>(); 674 rackToBlocks.put(rack, blklist); 675 } 676 blklist.add(oneblock); 677 if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { 678 // Add this host to rackToNodes map 679 addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); 680 } 681 } 682 683 // add this block to the node --> block map 684 for (int j = 0; j < oneblock.hosts.length; j++) { 685 String node = oneblock.hosts[j]; 686 Set<OneBlockInfo> blklist = nodeToBlocks.get(node); 687 if (blklist == null) { 688 blklist = new LinkedHashSet<OneBlockInfo>(); 689 nodeToBlocks.put(node, blklist); 690 } 691 blklist.add(oneblock); 692 } 693 } 694 } 695 696 long getLength() { 697 return fileSize; 698 } 699 700 OneBlockInfo[] getBlocks() { 701 return blocks; 702 } 703 } 704 705 /** 706 * information about one block from the File System 707 */ 708 @VisibleForTesting 709 static class OneBlockInfo { 710 Path onepath; // name of this file 711 long offset; // offset in file 712 long length; // length of this block 713 String[] hosts; // nodes on which this block resides 714 String[] racks; // network topology of hosts 715 716 OneBlockInfo(Path path, long offset, long len, 717 String[] hosts, String[] topologyPaths) { 718 this.onepath = path; 719 this.offset = offset; 720 this.hosts = hosts; 721 this.length = len; 722 assert (hosts.length == topologyPaths.length || 723 topologyPaths.length == 0); 724 725 // if the file system does not have any rack information, then 726 // use dummy rack location. 727 if (topologyPaths.length == 0) { 728 topologyPaths = new String[hosts.length]; 729 for (int i = 0; i < topologyPaths.length; i++) { 730 topologyPaths[i] = (new NodeBase(hosts[i], 731 NetworkTopology.DEFAULT_RACK)).toString(); 732 } 733 } 734 735 // The topology paths have the host name included as the last 736 // component. Strip it. 737 this.racks = new String[topologyPaths.length]; 738 for (int i = 0; i < topologyPaths.length; i++) { 739 this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation(); 740 } 741 } 742 } 743 744 protected BlockLocation[] getFileBlockLocations( 745 FileSystem fs, FileStatus stat) throws IOException { 746 if (stat instanceof LocatedFileStatus) { 747 return ((LocatedFileStatus) stat).getBlockLocations(); 748 } 749 return fs.getFileBlockLocations(stat, 0, stat.getLen()); 750 } 751 752 private static void addHostToRack(Map<String, Set<String>> rackToNodes, 753 String rack, String host) { 754 Set<String> hosts = rackToNodes.get(rack); 755 if (hosts == null) { 756 hosts = new HashSet<String>(); 757 rackToNodes.put(rack, hosts); 758 } 759 hosts.add(host); 760 } 761 762 private Set<String> getHosts(Set<String> racks) { 763 Set<String> hosts = new HashSet<String>(); 764 for (String rack : racks) { 765 if (rackToNodes.containsKey(rack)) { 766 hosts.addAll(rackToNodes.get(rack)); 767 } 768 } 769 return hosts; 770 } 771 772 /** 773 * Accept a path only if any one of filters given in the 774 * constructor do. 775 */ 776 private static class MultiPathFilter implements PathFilter { 777 private List<PathFilter> filters; 778 779 public MultiPathFilter() { 780 this.filters = new ArrayList<PathFilter>(); 781 } 782 783 public MultiPathFilter(List<PathFilter> filters) { 784 this.filters = filters; 785 } 786 787 public void add(PathFilter one) { 788 filters.add(one); 789 } 790 791 public boolean accept(Path path) { 792 for (PathFilter filter : filters) { 793 if (filter.accept(path)) { 794 return true; 795 } 796 } 797 return false; 798 } 799 800 public String toString() { 801 StringBuffer buf = new StringBuffer(); 802 buf.append("["); 803 for (PathFilter f: filters) { 804 buf.append(f); 805 buf.append(","); 806 } 807 buf.append("]"); 808 return buf.toString(); 809 } 810 } 811}