001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.partition; 020 021import java.io.IOException; 022import java.lang.reflect.Array; 023import java.util.ArrayList; 024import java.util.Arrays; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configurable; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.BinaryComparable; 035import org.apache.hadoop.io.IOUtils; 036import org.apache.hadoop.io.NullWritable; 037import org.apache.hadoop.io.SequenceFile; 038import org.apache.hadoop.io.RawComparator; 039import org.apache.hadoop.io.WritableComparable; 040import org.apache.hadoop.mapreduce.Job; 041import org.apache.hadoop.mapreduce.Partitioner; 042import org.apache.hadoop.util.ReflectionUtils; 043 044/** 045 * Partitioner effecting a total order by reading split points from 046 * an externally generated source. 047 */ 048@InterfaceAudience.Public 049@InterfaceStability.Stable 050public class TotalOrderPartitioner<K,V> 051 extends Partitioner<K,V> implements Configurable { 052 053 private Node partitions; 054 public static final String DEFAULT_PATH = "_partition.lst"; 055 public static final String PARTITIONER_PATH = 056 "mapreduce.totalorderpartitioner.path"; 057 public static final String MAX_TRIE_DEPTH = 058 "mapreduce.totalorderpartitioner.trie.maxdepth"; 059 public static final String NATURAL_ORDER = 060 "mapreduce.totalorderpartitioner.naturalorder"; 061 Configuration conf; 062 private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class); 063 064 public TotalOrderPartitioner() { } 065 066 /** 067 * Read in the partition file and build indexing data structures. 068 * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and 069 * <tt>total.order.partitioner.natural.order</tt> is not false, a trie 070 * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes 071 * will be built. Otherwise, keys will be located using a binary search of 072 * the partition keyset using the {@link org.apache.hadoop.io.RawComparator} 073 * defined for this job. The input file must be sorted with the same 074 * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys. 075 */ 076 @SuppressWarnings("unchecked") // keytype from conf not static 077 public void setConf(Configuration conf) { 078 try { 079 this.conf = conf; 080 String parts = getPartitionFile(conf); 081 final Path partFile = new Path(parts); 082 final FileSystem fs = (DEFAULT_PATH.equals(parts)) 083 ? FileSystem.getLocal(conf) // assume in DistributedCache 084 : partFile.getFileSystem(conf); 085 086 Job job = Job.getInstance(conf); 087 Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); 088 K[] splitPoints = readPartitions(fs, partFile, keyClass, conf); 089 if (splitPoints.length != job.getNumReduceTasks() - 1) { 090 throw new IOException("Wrong number of partitions in keyset"); 091 } 092 RawComparator<K> comparator = 093 (RawComparator<K>) job.getSortComparator(); 094 for (int i = 0; i < splitPoints.length - 1; ++i) { 095 if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) { 096 throw new IOException("Split points are out of order"); 097 } 098 } 099 boolean natOrder = 100 conf.getBoolean(NATURAL_ORDER, true); 101 if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) { 102 partitions = buildTrie((BinaryComparable[])splitPoints, 0, 103 splitPoints.length, new byte[0], 104 // Now that blocks of identical splitless trie nodes are 105 // represented reentrantly, and we develop a leaf for any trie 106 // node with only one split point, the only reason for a depth 107 // limit is to refute stack overflow or bloat in the pathological 108 // case where the split points are long and mostly look like bytes 109 // iii...iixii...iii . Therefore, we make the default depth 110 // limit large but not huge. 111 conf.getInt(MAX_TRIE_DEPTH, 200)); 112 } else { 113 partitions = new BinarySearchNode(splitPoints, comparator); 114 } 115 } catch (IOException e) { 116 throw new IllegalArgumentException("Can't read partitions file", e); 117 } 118 } 119 120 public Configuration getConf() { 121 return conf; 122 } 123 124 // by construction, we know if our keytype 125 @SuppressWarnings("unchecked") // is memcmp-able and uses the trie 126 public int getPartition(K key, V value, int numPartitions) { 127 return partitions.findPartition(key); 128 } 129 130 /** 131 * Set the path to the SequenceFile storing the sorted partition keyset. 132 * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt> 133 * keys in the SequenceFile. 134 */ 135 public static void setPartitionFile(Configuration conf, Path p) { 136 conf.set(PARTITIONER_PATH, p.toString()); 137 } 138 139 /** 140 * Get the path to the SequenceFile storing the sorted partition keyset. 141 * @see #setPartitionFile(Configuration, Path) 142 */ 143 public static String getPartitionFile(Configuration conf) { 144 return conf.get(PARTITIONER_PATH, DEFAULT_PATH); 145 } 146 147 /** 148 * Interface to the partitioner to locate a key in the partition keyset. 149 */ 150 interface Node<T> { 151 /** 152 * Locate partition in keyset K, st [Ki..Ki+1) defines a partition, 153 * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1. 154 */ 155 int findPartition(T key); 156 } 157 158 /** 159 * Base class for trie nodes. If the keytype is memcomp-able, this builds 160 * tries of the first <tt>total.order.partitioner.max.trie.depth</tt> 161 * bytes. 162 */ 163 static abstract class TrieNode implements Node<BinaryComparable> { 164 private final int level; 165 TrieNode(int level) { 166 this.level = level; 167 } 168 int getLevel() { 169 return level; 170 } 171 } 172 173 /** 174 * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or 175 * where disabled by <tt>total.order.partitioner.natural.order</tt>, 176 * search the partition keyset with a binary search. 177 */ 178 class BinarySearchNode implements Node<K> { 179 private final K[] splitPoints; 180 private final RawComparator<K> comparator; 181 BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) { 182 this.splitPoints = splitPoints; 183 this.comparator = comparator; 184 } 185 public int findPartition(K key) { 186 final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1; 187 return (pos < 0) ? -pos : pos; 188 } 189 } 190 191 /** 192 * An inner trie node that contains 256 children based on the next 193 * character. 194 */ 195 class InnerTrieNode extends TrieNode { 196 private TrieNode[] child = new TrieNode[256]; 197 198 InnerTrieNode(int level) { 199 super(level); 200 } 201 public int findPartition(BinaryComparable key) { 202 int level = getLevel(); 203 if (key.getLength() <= level) { 204 return child[0].findPartition(key); 205 } 206 return child[0xFF & key.getBytes()[level]].findPartition(key); 207 } 208 } 209 210 /** 211 * @param level the tree depth at this node 212 * @param splitPoints the full split point vector, which holds 213 * the split point or points this leaf node 214 * should contain 215 * @param lower first INcluded element of splitPoints 216 * @param upper first EXcluded element of splitPoints 217 * @return a leaf node. They come in three kinds: no split points 218 * [and the findParttion returns a canned index], one split 219 * point [and we compare with a single comparand], or more 220 * than one [and we do a binary search]. The last case is 221 * rare. 222 */ 223 private TrieNode LeafTrieNodeFactory 224 (int level, BinaryComparable[] splitPoints, int lower, int upper) { 225 switch (upper - lower) { 226 case 0: 227 return new UnsplitTrieNode(level, lower); 228 229 case 1: 230 return new SinglySplitTrieNode(level, splitPoints, lower); 231 232 default: 233 return new LeafTrieNode(level, splitPoints, lower, upper); 234 } 235 } 236 237 /** 238 * A leaf trie node that scans for the key between lower..upper. 239 * 240 * We don't generate many of these now, since we usually continue trie-ing 241 * when more than one split point remains at this level. and we make different 242 * objects for nodes with 0 or 1 split point. 243 */ 244 private class LeafTrieNode extends TrieNode { 245 final int lower; 246 final int upper; 247 final BinaryComparable[] splitPoints; 248 LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) { 249 super(level); 250 this.lower = lower; 251 this.upper = upper; 252 this.splitPoints = splitPoints; 253 } 254 public int findPartition(BinaryComparable key) { 255 final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1; 256 return (pos < 0) ? -pos : pos; 257 } 258 } 259 260 private class UnsplitTrieNode extends TrieNode { 261 final int result; 262 263 UnsplitTrieNode(int level, int value) { 264 super(level); 265 this.result = value; 266 } 267 268 public int findPartition(BinaryComparable key) { 269 return result; 270 } 271 } 272 273 private class SinglySplitTrieNode extends TrieNode { 274 final int lower; 275 final BinaryComparable mySplitPoint; 276 277 SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) { 278 super(level); 279 this.lower = lower; 280 this.mySplitPoint = splitPoints[lower]; 281 } 282 283 public int findPartition(BinaryComparable key) { 284 return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1); 285 } 286 } 287 288 289 /** 290 * Read the cut points from the given IFile. 291 * @param fs The file system 292 * @param p The path to read 293 * @param keyClass The map output key class 294 * @param job The job config 295 * @throws IOException 296 */ 297 // matching key types enforced by passing in 298 @SuppressWarnings("unchecked") // map output key class 299 private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass, 300 Configuration conf) throws IOException { 301 SequenceFile.Reader reader = new SequenceFile.Reader( 302 conf, 303 SequenceFile.Reader.file(p)); 304 ArrayList<K> parts = new ArrayList<K>(); 305 K key = ReflectionUtils.newInstance(keyClass, conf); 306 try { 307 while ((key = (K) reader.next(key)) != null) { 308 parts.add(key); 309 key = ReflectionUtils.newInstance(keyClass, conf); 310 } 311 reader.close(); 312 reader = null; 313 } finally { 314 IOUtils.cleanup(LOG, reader); 315 } 316 return parts.toArray((K[])Array.newInstance(keyClass, parts.size())); 317 } 318 319 /** 320 * 321 * This object contains a TrieNodeRef if there is such a thing that 322 * can be repeated. Two adjacent trie node slots that contain no 323 * split points can be filled with the same trie node, even if they 324 * are not on the same level. See buildTreeRec, below. 325 * 326 */ 327 private class CarriedTrieNodeRef 328 { 329 TrieNode content; 330 331 CarriedTrieNodeRef() { 332 content = null; 333 } 334 } 335 336 337 /** 338 * Given a sorted set of cut points, build a trie that will find the correct 339 * partition quickly. 340 * @param splits the list of cut points 341 * @param lower the lower bound of partitions 0..numPartitions-1 342 * @param upper the upper bound of partitions 0..numPartitions-1 343 * @param prefix the prefix that we have already checked against 344 * @param maxDepth the maximum depth we will build a trie for 345 * @return the trie node that will divide the splits correctly 346 */ 347 private TrieNode buildTrie(BinaryComparable[] splits, int lower, 348 int upper, byte[] prefix, int maxDepth) { 349 return buildTrieRec 350 (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef()); 351 } 352 353 /** 354 * This is the core of buildTrie. The interface, and stub, above, just adds 355 * an empty CarriedTrieNodeRef. 356 * 357 * We build trie nodes in depth first order, which is also in key space 358 * order. Every leaf node is referenced as a slot in a parent internal 359 * node. If two adjacent slots [in the DFO] hold leaf nodes that have 360 * no split point, then they are not separated by a split point either, 361 * because there's no place in key space for that split point to exist. 362 * 363 * When that happens, the leaf nodes would be semantically identical, and 364 * we reuse the object. A single CarriedTrieNodeRef "ref" lives for the 365 * duration of the tree-walk. ref carries a potentially reusable, unsplit 366 * leaf node for such reuse until a leaf node with a split arises, which 367 * breaks the chain until we need to make a new unsplit leaf node. 368 * 369 * Note that this use of CarriedTrieNodeRef means that for internal nodes, 370 * for internal nodes if this code is modified in any way we still need 371 * to make or fill in the subnodes in key space order. 372 */ 373 private TrieNode buildTrieRec(BinaryComparable[] splits, int lower, 374 int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) { 375 final int depth = prefix.length; 376 // We generate leaves for a single split point as well as for 377 // no split points. 378 if (depth >= maxDepth || lower >= upper - 1) { 379 // If we have two consecutive requests for an unsplit trie node, we 380 // can deliver the same one the second time. 381 if (lower == upper && ref.content != null) { 382 return ref.content; 383 } 384 TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper); 385 ref.content = lower == upper ? result : null; 386 return result; 387 } 388 InnerTrieNode result = new InnerTrieNode(depth); 389 byte[] trial = Arrays.copyOf(prefix, prefix.length + 1); 390 // append an extra byte on to the prefix 391 int currentBound = lower; 392 for(int ch = 0; ch < 0xFF; ++ch) { 393 trial[depth] = (byte) (ch + 1); 394 lower = currentBound; 395 while (currentBound < upper) { 396 if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) { 397 break; 398 } 399 currentBound += 1; 400 } 401 trial[depth] = (byte) ch; 402 result.child[0xFF & ch] 403 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); 404 } 405 // pick up the rest 406 trial[depth] = (byte)0xFF; 407 result.child[0xFF] 408 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); 409 410 return result; 411 } 412}