001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.util.*; 023import java.rmi.server.UID; 024import java.security.MessageDigest; 025 026import org.apache.commons.io.Charsets; 027import org.apache.commons.logging.*; 028import org.apache.hadoop.util.Options; 029import org.apache.hadoop.fs.*; 030import org.apache.hadoop.fs.Options.CreateOpts; 031import org.apache.hadoop.io.compress.CodecPool; 032import org.apache.hadoop.io.compress.CompressionCodec; 033import org.apache.hadoop.io.compress.CompressionInputStream; 034import org.apache.hadoop.io.compress.CompressionOutputStream; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.hadoop.io.compress.DefaultCodec; 038import org.apache.hadoop.io.compress.GzipCodec; 039import org.apache.hadoop.io.compress.zlib.ZlibFactory; 040import org.apache.hadoop.io.serializer.Deserializer; 041import org.apache.hadoop.io.serializer.Serializer; 042import org.apache.hadoop.io.serializer.SerializationFactory; 043import org.apache.hadoop.classification.InterfaceAudience; 044import org.apache.hadoop.classification.InterfaceStability; 045import org.apache.hadoop.conf.*; 046import org.apache.hadoop.util.Progressable; 047import org.apache.hadoop.util.Progress; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.NativeCodeLoader; 050import org.apache.hadoop.util.MergeSort; 051import org.apache.hadoop.util.PriorityQueue; 052import org.apache.hadoop.util.Time; 053 054/** 055 * <code>SequenceFile</code>s are flat files consisting of binary key/value 056 * pairs. 057 * 058 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer}, 059 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing, 060 * reading and sorting respectively.</p> 061 * 062 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 063 * {@link CompressionType} used to compress key/value pairs: 064 * <ol> 065 * <li> 066 * <code>Writer</code> : Uncompressed records. 067 * </li> 068 * <li> 069 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 070 * values. 071 * </li> 072 * <li> 073 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 074 * values are collected in 'blocks' 075 * separately and compressed. The size of 076 * the 'block' is configurable. 077 * </ol> 078 * 079 * <p>The actual compression algorithm used to compress key and/or values can be 080 * specified by using the appropriate {@link CompressionCodec}.</p> 081 * 082 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 083 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 084 * 085 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the 086 * above <code>SequenceFile</code> formats.</p> 087 * 088 * <h4 id="Formats">SequenceFile Formats</h4> 089 * 090 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 091 * depending on the <code>CompressionType</code> specified. All of them share a 092 * <a href="#Header">common header</a> described below. 093 * 094 * <h5 id="Header">SequenceFile Header</h5> 095 * <ul> 096 * <li> 097 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 098 * version number (e.g. SEQ4 or SEQ6) 099 * </li> 100 * <li> 101 * keyClassName -key class 102 * </li> 103 * <li> 104 * valueClassName - value class 105 * </li> 106 * <li> 107 * compression - A boolean which specifies if compression is turned on for 108 * keys/values in this file. 109 * </li> 110 * <li> 111 * blockCompression - A boolean which specifies if block-compression is 112 * turned on for keys/values in this file. 113 * </li> 114 * <li> 115 * compression codec - <code>CompressionCodec</code> class which is used for 116 * compression of keys and/or values (if compression is 117 * enabled). 118 * </li> 119 * <li> 120 * metadata - {@link Metadata} for this file. 121 * </li> 122 * <li> 123 * sync - A sync marker to denote end of the header. 124 * </li> 125 * </ul> 126 * 127 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 128 * <ul> 129 * <li> 130 * <a href="#Header">Header</a> 131 * </li> 132 * <li> 133 * Record 134 * <ul> 135 * <li>Record length</li> 136 * <li>Key length</li> 137 * <li>Key</li> 138 * <li>Value</li> 139 * </ul> 140 * </li> 141 * <li> 142 * A sync-marker every few <code>100</code> bytes or so. 143 * </li> 144 * </ul> 145 * 146 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 147 * <ul> 148 * <li> 149 * <a href="#Header">Header</a> 150 * </li> 151 * <li> 152 * Record 153 * <ul> 154 * <li>Record length</li> 155 * <li>Key length</li> 156 * <li>Key</li> 157 * <li><i>Compressed</i> Value</li> 158 * </ul> 159 * </li> 160 * <li> 161 * A sync-marker every few <code>100</code> bytes or so. 162 * </li> 163 * </ul> 164 * 165 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 166 * <ul> 167 * <li> 168 * <a href="#Header">Header</a> 169 * </li> 170 * <li> 171 * Record <i>Block</i> 172 * <ul> 173 * <li>Uncompressed number of records in the block</li> 174 * <li>Compressed key-lengths block-size</li> 175 * <li>Compressed key-lengths block</li> 176 * <li>Compressed keys block-size</li> 177 * <li>Compressed keys block</li> 178 * <li>Compressed value-lengths block-size</li> 179 * <li>Compressed value-lengths block</li> 180 * <li>Compressed values block-size</li> 181 * <li>Compressed values block</li> 182 * </ul> 183 * </li> 184 * <li> 185 * A sync-marker every block. 186 * </li> 187 * </ul> 188 * 189 * <p>The compressed blocks of key lengths and value lengths consist of the 190 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 191 * format.</p> 192 * 193 * @see CompressionCodec 194 */ 195@InterfaceAudience.Public 196@InterfaceStability.Stable 197public class SequenceFile { 198 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 199 200 private SequenceFile() {} // no public ctor 201 202 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 203 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 204 private static final byte VERSION_WITH_METADATA = (byte)6; 205 private static byte[] VERSION = new byte[] { 206 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 207 }; 208 209 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 210 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 211 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 212 213 /** The number of bytes between sync points.*/ 214 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 215 216 /** 217 * The compression type used to compress key/value pairs in the 218 * {@link SequenceFile}. 219 * 220 * @see SequenceFile.Writer 221 */ 222 public static enum CompressionType { 223 /** Do not compress records. */ 224 NONE, 225 /** Compress values only, each separately. */ 226 RECORD, 227 /** Compress sequences of records together in blocks. */ 228 BLOCK 229 } 230 231 /** 232 * Get the compression type for the reduce outputs 233 * @param job the job config to look in 234 * @return the kind of compression to use 235 */ 236 static public CompressionType getDefaultCompressionType(Configuration job) { 237 String name = job.get("io.seqfile.compression.type"); 238 return name == null ? CompressionType.RECORD : 239 CompressionType.valueOf(name); 240 } 241 242 /** 243 * Set the default compression type for sequence files. 244 * @param job the configuration to modify 245 * @param val the new compression type (none, block, record) 246 */ 247 static public void setDefaultCompressionType(Configuration job, 248 CompressionType val) { 249 job.set("io.seqfile.compression.type", val.toString()); 250 } 251 252 /** 253 * Create a new Writer with the given options. 254 * @param conf the configuration to use 255 * @param opts the options to create the file with 256 * @return a new Writer 257 * @throws IOException 258 */ 259 public static Writer createWriter(Configuration conf, Writer.Option... opts 260 ) throws IOException { 261 Writer.CompressionOption compressionOption = 262 Options.getOption(Writer.CompressionOption.class, opts); 263 CompressionType kind; 264 if (compressionOption != null) { 265 kind = compressionOption.getValue(); 266 } else { 267 kind = getDefaultCompressionType(conf); 268 opts = Options.prependOptions(opts, Writer.compression(kind)); 269 } 270 switch (kind) { 271 default: 272 case NONE: 273 return new Writer(conf, opts); 274 case RECORD: 275 return new RecordCompressWriter(conf, opts); 276 case BLOCK: 277 return new BlockCompressWriter(conf, opts); 278 } 279 } 280 281 /** 282 * Construct the preferred type of SequenceFile Writer. 283 * @param fs The configured filesystem. 284 * @param conf The configuration. 285 * @param name The name of the file. 286 * @param keyClass The 'key' type. 287 * @param valClass The 'value' type. 288 * @return Returns the handle to the constructed SequenceFile Writer. 289 * @throws IOException 290 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 291 * instead. 292 */ 293 @Deprecated 294 public static Writer 295 createWriter(FileSystem fs, Configuration conf, Path name, 296 Class keyClass, Class valClass) throws IOException { 297 return createWriter(conf, Writer.filesystem(fs), 298 Writer.file(name), Writer.keyClass(keyClass), 299 Writer.valueClass(valClass)); 300 } 301 302 /** 303 * Construct the preferred type of SequenceFile Writer. 304 * @param fs The configured filesystem. 305 * @param conf The configuration. 306 * @param name The name of the file. 307 * @param keyClass The 'key' type. 308 * @param valClass The 'value' type. 309 * @param compressionType The compression type. 310 * @return Returns the handle to the constructed SequenceFile Writer. 311 * @throws IOException 312 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 313 * instead. 314 */ 315 @Deprecated 316 public static Writer 317 createWriter(FileSystem fs, Configuration conf, Path name, 318 Class keyClass, Class valClass, 319 CompressionType compressionType) throws IOException { 320 return createWriter(conf, Writer.filesystem(fs), 321 Writer.file(name), Writer.keyClass(keyClass), 322 Writer.valueClass(valClass), 323 Writer.compression(compressionType)); 324 } 325 326 /** 327 * Construct the preferred type of SequenceFile Writer. 328 * @param fs The configured filesystem. 329 * @param conf The configuration. 330 * @param name The name of the file. 331 * @param keyClass The 'key' type. 332 * @param valClass The 'value' type. 333 * @param compressionType The compression type. 334 * @param progress The Progressable object to track progress. 335 * @return Returns the handle to the constructed SequenceFile Writer. 336 * @throws IOException 337 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 338 * instead. 339 */ 340 @Deprecated 341 public static Writer 342 createWriter(FileSystem fs, Configuration conf, Path name, 343 Class keyClass, Class valClass, CompressionType compressionType, 344 Progressable progress) throws IOException { 345 return createWriter(conf, Writer.file(name), 346 Writer.filesystem(fs), 347 Writer.keyClass(keyClass), 348 Writer.valueClass(valClass), 349 Writer.compression(compressionType), 350 Writer.progressable(progress)); 351 } 352 353 /** 354 * Construct the preferred type of SequenceFile Writer. 355 * @param fs The configured filesystem. 356 * @param conf The configuration. 357 * @param name The name of the file. 358 * @param keyClass The 'key' type. 359 * @param valClass The 'value' type. 360 * @param compressionType The compression type. 361 * @param codec The compression codec. 362 * @return Returns the handle to the constructed SequenceFile Writer. 363 * @throws IOException 364 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 365 * instead. 366 */ 367 @Deprecated 368 public static Writer 369 createWriter(FileSystem fs, Configuration conf, Path name, 370 Class keyClass, Class valClass, CompressionType compressionType, 371 CompressionCodec codec) throws IOException { 372 return createWriter(conf, Writer.file(name), 373 Writer.filesystem(fs), 374 Writer.keyClass(keyClass), 375 Writer.valueClass(valClass), 376 Writer.compression(compressionType, codec)); 377 } 378 379 /** 380 * Construct the preferred type of SequenceFile Writer. 381 * @param fs The configured filesystem. 382 * @param conf The configuration. 383 * @param name The name of the file. 384 * @param keyClass The 'key' type. 385 * @param valClass The 'value' type. 386 * @param compressionType The compression type. 387 * @param codec The compression codec. 388 * @param progress The Progressable object to track progress. 389 * @param metadata The metadata of the file. 390 * @return Returns the handle to the constructed SequenceFile Writer. 391 * @throws IOException 392 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 393 * instead. 394 */ 395 @Deprecated 396 public static Writer 397 createWriter(FileSystem fs, Configuration conf, Path name, 398 Class keyClass, Class valClass, 399 CompressionType compressionType, CompressionCodec codec, 400 Progressable progress, Metadata metadata) throws IOException { 401 return createWriter(conf, Writer.file(name), 402 Writer.filesystem(fs), 403 Writer.keyClass(keyClass), 404 Writer.valueClass(valClass), 405 Writer.compression(compressionType, codec), 406 Writer.progressable(progress), 407 Writer.metadata(metadata)); 408 } 409 410 /** 411 * Construct the preferred type of SequenceFile Writer. 412 * @param fs The configured filesystem. 413 * @param conf The configuration. 414 * @param name The name of the file. 415 * @param keyClass The 'key' type. 416 * @param valClass The 'value' type. 417 * @param bufferSize buffer size for the underlaying outputstream. 418 * @param replication replication factor for the file. 419 * @param blockSize block size for the file. 420 * @param compressionType The compression type. 421 * @param codec The compression codec. 422 * @param progress The Progressable object to track progress. 423 * @param metadata The metadata of the file. 424 * @return Returns the handle to the constructed SequenceFile Writer. 425 * @throws IOException 426 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 427 * instead. 428 */ 429 @Deprecated 430 public static Writer 431 createWriter(FileSystem fs, Configuration conf, Path name, 432 Class keyClass, Class valClass, int bufferSize, 433 short replication, long blockSize, 434 CompressionType compressionType, CompressionCodec codec, 435 Progressable progress, Metadata metadata) throws IOException { 436 return createWriter(conf, Writer.file(name), 437 Writer.filesystem(fs), 438 Writer.keyClass(keyClass), 439 Writer.valueClass(valClass), 440 Writer.bufferSize(bufferSize), 441 Writer.replication(replication), 442 Writer.blockSize(blockSize), 443 Writer.compression(compressionType, codec), 444 Writer.progressable(progress), 445 Writer.metadata(metadata)); 446 } 447 448 /** 449 * Construct the preferred type of SequenceFile Writer. 450 * @param fs The configured filesystem. 451 * @param conf The configuration. 452 * @param name The name of the file. 453 * @param keyClass The 'key' type. 454 * @param valClass The 'value' type. 455 * @param bufferSize buffer size for the underlaying outputstream. 456 * @param replication replication factor for the file. 457 * @param blockSize block size for the file. 458 * @param createParent create parent directory if non-existent 459 * @param compressionType The compression type. 460 * @param codec The compression codec. 461 * @param metadata The metadata of the file. 462 * @return Returns the handle to the constructed SequenceFile Writer. 463 * @throws IOException 464 */ 465 @Deprecated 466 public static Writer 467 createWriter(FileSystem fs, Configuration conf, Path name, 468 Class keyClass, Class valClass, int bufferSize, 469 short replication, long blockSize, boolean createParent, 470 CompressionType compressionType, CompressionCodec codec, 471 Metadata metadata) throws IOException { 472 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 473 conf, name, keyClass, valClass, compressionType, codec, 474 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 475 CreateOpts.bufferSize(bufferSize), 476 createParent ? CreateOpts.createParent() 477 : CreateOpts.donotCreateParent(), 478 CreateOpts.repFac(replication), 479 CreateOpts.blockSize(blockSize) 480 ); 481 } 482 483 /** 484 * Construct the preferred type of SequenceFile Writer. 485 * @param fc The context for the specified file. 486 * @param conf The configuration. 487 * @param name The name of the file. 488 * @param keyClass The 'key' type. 489 * @param valClass The 'value' type. 490 * @param compressionType The compression type. 491 * @param codec The compression codec. 492 * @param metadata The metadata of the file. 493 * @param createFlag gives the semantics of create: overwrite, append etc. 494 * @param opts file creation options; see {@link CreateOpts}. 495 * @return Returns the handle to the constructed SequenceFile Writer. 496 * @throws IOException 497 */ 498 public static Writer 499 createWriter(FileContext fc, Configuration conf, Path name, 500 Class keyClass, Class valClass, 501 CompressionType compressionType, CompressionCodec codec, 502 Metadata metadata, 503 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 504 throws IOException { 505 return createWriter(conf, fc.create(name, createFlag, opts), 506 keyClass, valClass, compressionType, codec, metadata).ownStream(); 507 } 508 509 /** 510 * Construct the preferred type of SequenceFile Writer. 511 * @param fs The configured filesystem. 512 * @param conf The configuration. 513 * @param name The name of the file. 514 * @param keyClass The 'key' type. 515 * @param valClass The 'value' type. 516 * @param compressionType The compression type. 517 * @param codec The compression codec. 518 * @param progress The Progressable object to track progress. 519 * @return Returns the handle to the constructed SequenceFile Writer. 520 * @throws IOException 521 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 522 * instead. 523 */ 524 @Deprecated 525 public static Writer 526 createWriter(FileSystem fs, Configuration conf, Path name, 527 Class keyClass, Class valClass, 528 CompressionType compressionType, CompressionCodec codec, 529 Progressable progress) throws IOException { 530 return createWriter(conf, Writer.file(name), 531 Writer.filesystem(fs), 532 Writer.keyClass(keyClass), 533 Writer.valueClass(valClass), 534 Writer.compression(compressionType, codec), 535 Writer.progressable(progress)); 536 } 537 538 /** 539 * Construct the preferred type of 'raw' SequenceFile Writer. 540 * @param conf The configuration. 541 * @param out The stream on top which the writer is to be constructed. 542 * @param keyClass The 'key' type. 543 * @param valClass The 'value' type. 544 * @param compressionType The compression type. 545 * @param codec The compression codec. 546 * @param metadata The metadata of the file. 547 * @return Returns the handle to the constructed SequenceFile Writer. 548 * @throws IOException 549 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 550 * instead. 551 */ 552 @Deprecated 553 public static Writer 554 createWriter(Configuration conf, FSDataOutputStream out, 555 Class keyClass, Class valClass, 556 CompressionType compressionType, 557 CompressionCodec codec, Metadata metadata) throws IOException { 558 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 559 Writer.valueClass(valClass), 560 Writer.compression(compressionType, codec), 561 Writer.metadata(metadata)); 562 } 563 564 /** 565 * Construct the preferred type of 'raw' SequenceFile Writer. 566 * @param conf The configuration. 567 * @param out The stream on top which the writer is to be constructed. 568 * @param keyClass The 'key' type. 569 * @param valClass The 'value' type. 570 * @param compressionType The compression type. 571 * @param codec The compression codec. 572 * @return Returns the handle to the constructed SequenceFile Writer. 573 * @throws IOException 574 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 575 * instead. 576 */ 577 @Deprecated 578 public static Writer 579 createWriter(Configuration conf, FSDataOutputStream out, 580 Class keyClass, Class valClass, CompressionType compressionType, 581 CompressionCodec codec) throws IOException { 582 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 583 Writer.valueClass(valClass), 584 Writer.compression(compressionType, codec)); 585 } 586 587 588 /** The interface to 'raw' values of SequenceFiles. */ 589 public static interface ValueBytes { 590 591 /** Writes the uncompressed bytes to the outStream. 592 * @param outStream : Stream to write uncompressed bytes into. 593 * @throws IOException 594 */ 595 public void writeUncompressedBytes(DataOutputStream outStream) 596 throws IOException; 597 598 /** Write compressed bytes to outStream. 599 * Note: that it will NOT compress the bytes if they are not compressed. 600 * @param outStream : Stream to write compressed bytes into. 601 */ 602 public void writeCompressedBytes(DataOutputStream outStream) 603 throws IllegalArgumentException, IOException; 604 605 /** 606 * Size of stored data. 607 */ 608 public int getSize(); 609 } 610 611 private static class UncompressedBytes implements ValueBytes { 612 private int dataSize; 613 private byte[] data; 614 615 private UncompressedBytes() { 616 data = null; 617 dataSize = 0; 618 } 619 620 private void reset(DataInputStream in, int length) throws IOException { 621 if (data == null) { 622 data = new byte[length]; 623 } else if (length > data.length) { 624 data = new byte[Math.max(length, data.length * 2)]; 625 } 626 dataSize = -1; 627 in.readFully(data, 0, length); 628 dataSize = length; 629 } 630 631 @Override 632 public int getSize() { 633 return dataSize; 634 } 635 636 @Override 637 public void writeUncompressedBytes(DataOutputStream outStream) 638 throws IOException { 639 outStream.write(data, 0, dataSize); 640 } 641 642 @Override 643 public void writeCompressedBytes(DataOutputStream outStream) 644 throws IllegalArgumentException, IOException { 645 throw 646 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 647 } 648 649 } // UncompressedBytes 650 651 private static class CompressedBytes implements ValueBytes { 652 private int dataSize; 653 private byte[] data; 654 DataInputBuffer rawData = null; 655 CompressionCodec codec = null; 656 CompressionInputStream decompressedStream = null; 657 658 private CompressedBytes(CompressionCodec codec) { 659 data = null; 660 dataSize = 0; 661 this.codec = codec; 662 } 663 664 private void reset(DataInputStream in, int length) throws IOException { 665 if (data == null) { 666 data = new byte[length]; 667 } else if (length > data.length) { 668 data = new byte[Math.max(length, data.length * 2)]; 669 } 670 dataSize = -1; 671 in.readFully(data, 0, length); 672 dataSize = length; 673 } 674 675 @Override 676 public int getSize() { 677 return dataSize; 678 } 679 680 @Override 681 public void writeUncompressedBytes(DataOutputStream outStream) 682 throws IOException { 683 if (decompressedStream == null) { 684 rawData = new DataInputBuffer(); 685 decompressedStream = codec.createInputStream(rawData); 686 } else { 687 decompressedStream.resetState(); 688 } 689 rawData.reset(data, 0, dataSize); 690 691 byte[] buffer = new byte[8192]; 692 int bytesRead = 0; 693 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 694 outStream.write(buffer, 0, bytesRead); 695 } 696 } 697 698 @Override 699 public void writeCompressedBytes(DataOutputStream outStream) 700 throws IllegalArgumentException, IOException { 701 outStream.write(data, 0, dataSize); 702 } 703 704 } // CompressedBytes 705 706 /** 707 * The class encapsulating with the metadata of a file. 708 * The metadata of a file is a list of attribute name/value 709 * pairs of Text type. 710 * 711 */ 712 public static class Metadata implements Writable { 713 714 private TreeMap<Text, Text> theMetadata; 715 716 public Metadata() { 717 this(new TreeMap<Text, Text>()); 718 } 719 720 public Metadata(TreeMap<Text, Text> arg) { 721 if (arg == null) { 722 this.theMetadata = new TreeMap<Text, Text>(); 723 } else { 724 this.theMetadata = arg; 725 } 726 } 727 728 public Text get(Text name) { 729 return this.theMetadata.get(name); 730 } 731 732 public void set(Text name, Text value) { 733 this.theMetadata.put(name, value); 734 } 735 736 public TreeMap<Text, Text> getMetadata() { 737 return new TreeMap<Text, Text>(this.theMetadata); 738 } 739 740 @Override 741 public void write(DataOutput out) throws IOException { 742 out.writeInt(this.theMetadata.size()); 743 Iterator<Map.Entry<Text, Text>> iter = 744 this.theMetadata.entrySet().iterator(); 745 while (iter.hasNext()) { 746 Map.Entry<Text, Text> en = iter.next(); 747 en.getKey().write(out); 748 en.getValue().write(out); 749 } 750 } 751 752 @Override 753 public void readFields(DataInput in) throws IOException { 754 int sz = in.readInt(); 755 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 756 this.theMetadata = new TreeMap<Text, Text>(); 757 for (int i = 0; i < sz; i++) { 758 Text key = new Text(); 759 Text val = new Text(); 760 key.readFields(in); 761 val.readFields(in); 762 this.theMetadata.put(key, val); 763 } 764 } 765 766 @Override 767 public boolean equals(Object other) { 768 if (other == null) { 769 return false; 770 } 771 if (other.getClass() != this.getClass()) { 772 return false; 773 } else { 774 return equals((Metadata)other); 775 } 776 } 777 778 public boolean equals(Metadata other) { 779 if (other == null) return false; 780 if (this.theMetadata.size() != other.theMetadata.size()) { 781 return false; 782 } 783 Iterator<Map.Entry<Text, Text>> iter1 = 784 this.theMetadata.entrySet().iterator(); 785 Iterator<Map.Entry<Text, Text>> iter2 = 786 other.theMetadata.entrySet().iterator(); 787 while (iter1.hasNext() && iter2.hasNext()) { 788 Map.Entry<Text, Text> en1 = iter1.next(); 789 Map.Entry<Text, Text> en2 = iter2.next(); 790 if (!en1.getKey().equals(en2.getKey())) { 791 return false; 792 } 793 if (!en1.getValue().equals(en2.getValue())) { 794 return false; 795 } 796 } 797 if (iter1.hasNext() || iter2.hasNext()) { 798 return false; 799 } 800 return true; 801 } 802 803 @Override 804 public int hashCode() { 805 assert false : "hashCode not designed"; 806 return 42; // any arbitrary constant will do 807 } 808 809 @Override 810 public String toString() { 811 StringBuilder sb = new StringBuilder(); 812 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 813 Iterator<Map.Entry<Text, Text>> iter = 814 this.theMetadata.entrySet().iterator(); 815 while (iter.hasNext()) { 816 Map.Entry<Text, Text> en = iter.next(); 817 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 818 sb.append("\n"); 819 } 820 return sb.toString(); 821 } 822 } 823 824 /** Write key/value pairs to a sequence-format file. */ 825 public static class Writer implements java.io.Closeable, Syncable { 826 private Configuration conf; 827 FSDataOutputStream out; 828 boolean ownOutputStream = true; 829 DataOutputBuffer buffer = new DataOutputBuffer(); 830 831 Class keyClass; 832 Class valClass; 833 834 private final CompressionType compress; 835 CompressionCodec codec = null; 836 CompressionOutputStream deflateFilter = null; 837 DataOutputStream deflateOut = null; 838 Metadata metadata = null; 839 Compressor compressor = null; 840 841 private boolean appendMode = false; 842 843 protected Serializer keySerializer; 844 protected Serializer uncompressedValSerializer; 845 protected Serializer compressedValSerializer; 846 847 // Insert a globally unique 16-byte value every few entries, so that one 848 // can seek into the middle of a file and then synchronize with record 849 // starts and ends by scanning for this value. 850 long lastSyncPos; // position of last sync 851 byte[] sync; // 16 random bytes 852 { 853 try { 854 MessageDigest digester = MessageDigest.getInstance("MD5"); 855 long time = Time.now(); 856 digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8)); 857 sync = digester.digest(); 858 } catch (Exception e) { 859 throw new RuntimeException(e); 860 } 861 } 862 863 public static interface Option {} 864 865 static class FileOption extends Options.PathOption 866 implements Option { 867 FileOption(Path path) { 868 super(path); 869 } 870 } 871 872 /** 873 * @deprecated only used for backwards-compatibility in the createWriter methods 874 * that take FileSystem. 875 */ 876 @Deprecated 877 private static class FileSystemOption implements Option { 878 private final FileSystem value; 879 protected FileSystemOption(FileSystem value) { 880 this.value = value; 881 } 882 public FileSystem getValue() { 883 return value; 884 } 885 } 886 887 static class StreamOption extends Options.FSDataOutputStreamOption 888 implements Option { 889 StreamOption(FSDataOutputStream stream) { 890 super(stream); 891 } 892 } 893 894 static class BufferSizeOption extends Options.IntegerOption 895 implements Option { 896 BufferSizeOption(int value) { 897 super(value); 898 } 899 } 900 901 static class BlockSizeOption extends Options.LongOption implements Option { 902 BlockSizeOption(long value) { 903 super(value); 904 } 905 } 906 907 static class ReplicationOption extends Options.IntegerOption 908 implements Option { 909 ReplicationOption(int value) { 910 super(value); 911 } 912 } 913 914 static class AppendIfExistsOption extends Options.BooleanOption implements 915 Option { 916 AppendIfExistsOption(boolean value) { 917 super(value); 918 } 919 } 920 921 static class KeyClassOption extends Options.ClassOption implements Option { 922 KeyClassOption(Class<?> value) { 923 super(value); 924 } 925 } 926 927 static class ValueClassOption extends Options.ClassOption 928 implements Option { 929 ValueClassOption(Class<?> value) { 930 super(value); 931 } 932 } 933 934 static class MetadataOption implements Option { 935 private final Metadata value; 936 MetadataOption(Metadata value) { 937 this.value = value; 938 } 939 Metadata getValue() { 940 return value; 941 } 942 } 943 944 static class ProgressableOption extends Options.ProgressableOption 945 implements Option { 946 ProgressableOption(Progressable value) { 947 super(value); 948 } 949 } 950 951 private static class CompressionOption implements Option { 952 private final CompressionType value; 953 private final CompressionCodec codec; 954 CompressionOption(CompressionType value) { 955 this(value, null); 956 } 957 CompressionOption(CompressionType value, CompressionCodec codec) { 958 this.value = value; 959 this.codec = (CompressionType.NONE != value && null == codec) 960 ? new DefaultCodec() 961 : codec; 962 } 963 CompressionType getValue() { 964 return value; 965 } 966 CompressionCodec getCodec() { 967 return codec; 968 } 969 } 970 971 public static Option file(Path value) { 972 return new FileOption(value); 973 } 974 975 /** 976 * @deprecated only used for backwards-compatibility in the createWriter methods 977 * that take FileSystem. 978 */ 979 @Deprecated 980 private static Option filesystem(FileSystem fs) { 981 return new SequenceFile.Writer.FileSystemOption(fs); 982 } 983 984 public static Option bufferSize(int value) { 985 return new BufferSizeOption(value); 986 } 987 988 public static Option stream(FSDataOutputStream value) { 989 return new StreamOption(value); 990 } 991 992 public static Option replication(short value) { 993 return new ReplicationOption(value); 994 } 995 996 public static Option appendIfExists(boolean value) { 997 return new AppendIfExistsOption(value); 998 } 999 1000 public static Option blockSize(long value) { 1001 return new BlockSizeOption(value); 1002 } 1003 1004 public static Option progressable(Progressable value) { 1005 return new ProgressableOption(value); 1006 } 1007 1008 public static Option keyClass(Class<?> value) { 1009 return new KeyClassOption(value); 1010 } 1011 1012 public static Option valueClass(Class<?> value) { 1013 return new ValueClassOption(value); 1014 } 1015 1016 public static Option metadata(Metadata value) { 1017 return new MetadataOption(value); 1018 } 1019 1020 public static Option compression(CompressionType value) { 1021 return new CompressionOption(value); 1022 } 1023 1024 public static Option compression(CompressionType value, 1025 CompressionCodec codec) { 1026 return new CompressionOption(value, codec); 1027 } 1028 1029 /** 1030 * Construct a uncompressed writer from a set of options. 1031 * @param conf the configuration to use 1032 * @param options the options used when creating the writer 1033 * @throws IOException if it fails 1034 */ 1035 Writer(Configuration conf, 1036 Option... opts) throws IOException { 1037 BlockSizeOption blockSizeOption = 1038 Options.getOption(BlockSizeOption.class, opts); 1039 BufferSizeOption bufferSizeOption = 1040 Options.getOption(BufferSizeOption.class, opts); 1041 ReplicationOption replicationOption = 1042 Options.getOption(ReplicationOption.class, opts); 1043 ProgressableOption progressOption = 1044 Options.getOption(ProgressableOption.class, opts); 1045 FileOption fileOption = Options.getOption(FileOption.class, opts); 1046 AppendIfExistsOption appendIfExistsOption = Options.getOption( 1047 AppendIfExistsOption.class, opts); 1048 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1049 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1050 KeyClassOption keyClassOption = 1051 Options.getOption(KeyClassOption.class, opts); 1052 ValueClassOption valueClassOption = 1053 Options.getOption(ValueClassOption.class, opts); 1054 MetadataOption metadataOption = 1055 Options.getOption(MetadataOption.class, opts); 1056 CompressionOption compressionTypeOption = 1057 Options.getOption(CompressionOption.class, opts); 1058 // check consistency of options 1059 if ((fileOption == null) == (streamOption == null)) { 1060 throw new IllegalArgumentException("file or stream must be specified"); 1061 } 1062 if (fileOption == null && (blockSizeOption != null || 1063 bufferSizeOption != null || 1064 replicationOption != null || 1065 progressOption != null)) { 1066 throw new IllegalArgumentException("file modifier options not " + 1067 "compatible with stream"); 1068 } 1069 1070 FSDataOutputStream out; 1071 boolean ownStream = fileOption != null; 1072 if (ownStream) { 1073 Path p = fileOption.getValue(); 1074 FileSystem fs; 1075 if (fsOption != null) { 1076 fs = fsOption.getValue(); 1077 } else { 1078 fs = p.getFileSystem(conf); 1079 } 1080 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1081 bufferSizeOption.getValue(); 1082 short replication = replicationOption == null ? 1083 fs.getDefaultReplication(p) : 1084 (short) replicationOption.getValue(); 1085 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1086 blockSizeOption.getValue(); 1087 Progressable progress = progressOption == null ? null : 1088 progressOption.getValue(); 1089 1090 if (appendIfExistsOption != null && appendIfExistsOption.getValue() 1091 && fs.exists(p)) { 1092 1093 // Read the file and verify header details 1094 SequenceFile.Reader reader = new SequenceFile.Reader(conf, 1095 SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption()); 1096 try { 1097 1098 if (keyClassOption.getValue() != reader.getKeyClass() 1099 || valueClassOption.getValue() != reader.getValueClass()) { 1100 throw new IllegalArgumentException( 1101 "Key/value class provided does not match the file"); 1102 } 1103 1104 if (reader.getVersion() != VERSION[3]) { 1105 throw new VersionMismatchException(VERSION[3], 1106 reader.getVersion()); 1107 } 1108 1109 if (metadataOption != null) { 1110 LOG.info("MetaData Option is ignored during append"); 1111 } 1112 metadataOption = (MetadataOption) SequenceFile.Writer 1113 .metadata(reader.getMetadata()); 1114 1115 CompressionOption readerCompressionOption = new CompressionOption( 1116 reader.getCompressionType(), reader.getCompressionCodec()); 1117 1118 if (readerCompressionOption.value != compressionTypeOption.value 1119 || !readerCompressionOption.codec.getClass().getName() 1120 .equals(compressionTypeOption.codec.getClass().getName())) { 1121 throw new IllegalArgumentException( 1122 "Compression option provided does not match the file"); 1123 } 1124 1125 sync = reader.getSync(); 1126 1127 } finally { 1128 reader.close(); 1129 } 1130 1131 out = fs.append(p, bufferSize, progress); 1132 this.appendMode = true; 1133 } else { 1134 out = fs 1135 .create(p, true, bufferSize, replication, blockSize, progress); 1136 } 1137 } else { 1138 out = streamOption.getValue(); 1139 } 1140 Class<?> keyClass = keyClassOption == null ? 1141 Object.class : keyClassOption.getValue(); 1142 Class<?> valueClass = valueClassOption == null ? 1143 Object.class : valueClassOption.getValue(); 1144 Metadata metadata = metadataOption == null ? 1145 new Metadata() : metadataOption.getValue(); 1146 this.compress = compressionTypeOption.getValue(); 1147 final CompressionCodec codec = compressionTypeOption.getCodec(); 1148 if (codec != null && 1149 (codec instanceof GzipCodec) && 1150 !NativeCodeLoader.isNativeCodeLoaded() && 1151 !ZlibFactory.isNativeZlibLoaded(conf)) { 1152 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1153 "GzipCodec without native-hadoop " + 1154 "code!"); 1155 } 1156 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1157 } 1158 1159 /** Create the named file. 1160 * @deprecated Use 1161 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1162 * instead. 1163 */ 1164 @Deprecated 1165 public Writer(FileSystem fs, Configuration conf, Path name, 1166 Class keyClass, Class valClass) throws IOException { 1167 this.compress = CompressionType.NONE; 1168 init(conf, fs.create(name), true, keyClass, valClass, null, 1169 new Metadata()); 1170 } 1171 1172 /** Create the named file with write-progress reporter. 1173 * @deprecated Use 1174 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1175 * instead. 1176 */ 1177 @Deprecated 1178 public Writer(FileSystem fs, Configuration conf, Path name, 1179 Class keyClass, Class valClass, 1180 Progressable progress, Metadata metadata) throws IOException { 1181 this.compress = CompressionType.NONE; 1182 init(conf, fs.create(name, progress), true, keyClass, valClass, 1183 null, metadata); 1184 } 1185 1186 /** Create the named file with write-progress reporter. 1187 * @deprecated Use 1188 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1189 * instead. 1190 */ 1191 @Deprecated 1192 public Writer(FileSystem fs, Configuration conf, Path name, 1193 Class keyClass, Class valClass, 1194 int bufferSize, short replication, long blockSize, 1195 Progressable progress, Metadata metadata) throws IOException { 1196 this.compress = CompressionType.NONE; 1197 init(conf, 1198 fs.create(name, true, bufferSize, replication, blockSize, progress), 1199 true, keyClass, valClass, null, metadata); 1200 } 1201 1202 boolean isCompressed() { return compress != CompressionType.NONE; } 1203 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1204 1205 Writer ownStream() { this.ownOutputStream = true; return this; } 1206 1207 /** Write and flush the file header. */ 1208 private void writeFileHeader() 1209 throws IOException { 1210 out.write(VERSION); 1211 Text.writeString(out, keyClass.getName()); 1212 Text.writeString(out, valClass.getName()); 1213 1214 out.writeBoolean(this.isCompressed()); 1215 out.writeBoolean(this.isBlockCompressed()); 1216 1217 if (this.isCompressed()) { 1218 Text.writeString(out, (codec.getClass()).getName()); 1219 } 1220 this.metadata.write(out); 1221 out.write(sync); // write the sync bytes 1222 out.flush(); // flush header 1223 } 1224 1225 /** Initialize. */ 1226 @SuppressWarnings("unchecked") 1227 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1228 Class keyClass, Class valClass, 1229 CompressionCodec codec, Metadata metadata) 1230 throws IOException { 1231 this.conf = conf; 1232 this.out = out; 1233 this.ownOutputStream = ownStream; 1234 this.keyClass = keyClass; 1235 this.valClass = valClass; 1236 this.codec = codec; 1237 this.metadata = metadata; 1238 SerializationFactory serializationFactory = new SerializationFactory(conf); 1239 this.keySerializer = serializationFactory.getSerializer(keyClass); 1240 if (this.keySerializer == null) { 1241 throw new IOException( 1242 "Could not find a serializer for the Key class: '" 1243 + keyClass.getCanonicalName() + "'. " 1244 + "Please ensure that the configuration '" + 1245 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1246 + "properly configured, if you're using" 1247 + "custom serialization."); 1248 } 1249 this.keySerializer.open(buffer); 1250 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1251 if (this.uncompressedValSerializer == null) { 1252 throw new IOException( 1253 "Could not find a serializer for the Value class: '" 1254 + valClass.getCanonicalName() + "'. " 1255 + "Please ensure that the configuration '" + 1256 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1257 + "properly configured, if you're using" 1258 + "custom serialization."); 1259 } 1260 this.uncompressedValSerializer.open(buffer); 1261 if (this.codec != null) { 1262 ReflectionUtils.setConf(this.codec, this.conf); 1263 this.compressor = CodecPool.getCompressor(this.codec); 1264 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1265 this.deflateOut = 1266 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1267 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1268 if (this.compressedValSerializer == null) { 1269 throw new IOException( 1270 "Could not find a serializer for the Value class: '" 1271 + valClass.getCanonicalName() + "'. " 1272 + "Please ensure that the configuration '" + 1273 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1274 + "properly configured, if you're using" 1275 + "custom serialization."); 1276 } 1277 this.compressedValSerializer.open(deflateOut); 1278 } 1279 1280 if (appendMode) { 1281 sync(); 1282 } else { 1283 writeFileHeader(); 1284 } 1285 } 1286 1287 /** Returns the class of keys in this file. */ 1288 public Class getKeyClass() { return keyClass; } 1289 1290 /** Returns the class of values in this file. */ 1291 public Class getValueClass() { return valClass; } 1292 1293 /** Returns the compression codec of data in this file. */ 1294 public CompressionCodec getCompressionCodec() { return codec; } 1295 1296 /** create a sync point */ 1297 public void sync() throws IOException { 1298 if (sync != null && lastSyncPos != out.getPos()) { 1299 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1300 out.write(sync); // write sync 1301 lastSyncPos = out.getPos(); // update lastSyncPos 1302 } 1303 } 1304 1305 /** 1306 * flush all currently written data to the file system 1307 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1308 */ 1309 @Deprecated 1310 public void syncFs() throws IOException { 1311 if (out != null) { 1312 out.hflush(); // flush contents to file system 1313 } 1314 } 1315 1316 @Override 1317 public void hsync() throws IOException { 1318 if (out != null) { 1319 out.hsync(); 1320 } 1321 } 1322 1323 @Override 1324 public void hflush() throws IOException { 1325 if (out != null) { 1326 out.hflush(); 1327 } 1328 } 1329 1330 /** Returns the configuration of this file. */ 1331 Configuration getConf() { return conf; } 1332 1333 /** Close the file. */ 1334 @Override 1335 public synchronized void close() throws IOException { 1336 keySerializer.close(); 1337 uncompressedValSerializer.close(); 1338 if (compressedValSerializer != null) { 1339 compressedValSerializer.close(); 1340 } 1341 1342 CodecPool.returnCompressor(compressor); 1343 compressor = null; 1344 1345 if (out != null) { 1346 1347 // Close the underlying stream iff we own it... 1348 if (ownOutputStream) { 1349 out.close(); 1350 } else { 1351 out.flush(); 1352 } 1353 out = null; 1354 } 1355 } 1356 1357 synchronized void checkAndWriteSync() throws IOException { 1358 if (sync != null && 1359 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1360 sync(); 1361 } 1362 } 1363 1364 /** Append a key/value pair. */ 1365 public void append(Writable key, Writable val) 1366 throws IOException { 1367 append((Object) key, (Object) val); 1368 } 1369 1370 /** Append a key/value pair. */ 1371 @SuppressWarnings("unchecked") 1372 public synchronized void append(Object key, Object val) 1373 throws IOException { 1374 if (key.getClass() != keyClass) 1375 throw new IOException("wrong key class: "+key.getClass().getName() 1376 +" is not "+keyClass); 1377 if (val.getClass() != valClass) 1378 throw new IOException("wrong value class: "+val.getClass().getName() 1379 +" is not "+valClass); 1380 1381 buffer.reset(); 1382 1383 // Append the 'key' 1384 keySerializer.serialize(key); 1385 int keyLength = buffer.getLength(); 1386 if (keyLength < 0) 1387 throw new IOException("negative length keys not allowed: " + key); 1388 1389 // Append the 'value' 1390 if (compress == CompressionType.RECORD) { 1391 deflateFilter.resetState(); 1392 compressedValSerializer.serialize(val); 1393 deflateOut.flush(); 1394 deflateFilter.finish(); 1395 } else { 1396 uncompressedValSerializer.serialize(val); 1397 } 1398 1399 // Write the record out 1400 checkAndWriteSync(); // sync 1401 out.writeInt(buffer.getLength()); // total record length 1402 out.writeInt(keyLength); // key portion length 1403 out.write(buffer.getData(), 0, buffer.getLength()); // data 1404 } 1405 1406 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1407 int keyLength, ValueBytes val) throws IOException { 1408 if (keyLength < 0) 1409 throw new IOException("negative length keys not allowed: " + keyLength); 1410 1411 int valLength = val.getSize(); 1412 1413 checkAndWriteSync(); 1414 1415 out.writeInt(keyLength+valLength); // total record length 1416 out.writeInt(keyLength); // key portion length 1417 out.write(keyData, keyOffset, keyLength); // key 1418 val.writeUncompressedBytes(out); // value 1419 } 1420 1421 /** Returns the current length of the output file. 1422 * 1423 * <p>This always returns a synchronized position. In other words, 1424 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1425 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1426 * the key may be earlier in the file than key last written when this 1427 * method was called (e.g., with block-compression, it may be the first key 1428 * in the block that was being written when this method was called). 1429 */ 1430 public synchronized long getLength() throws IOException { 1431 return out.getPos(); 1432 } 1433 1434 } // class Writer 1435 1436 /** Write key/compressed-value pairs to a sequence-format file. */ 1437 static class RecordCompressWriter extends Writer { 1438 1439 RecordCompressWriter(Configuration conf, 1440 Option... options) throws IOException { 1441 super(conf, options); 1442 } 1443 1444 /** Append a key/value pair. */ 1445 @Override 1446 @SuppressWarnings("unchecked") 1447 public synchronized void append(Object key, Object val) 1448 throws IOException { 1449 if (key.getClass() != keyClass) 1450 throw new IOException("wrong key class: "+key.getClass().getName() 1451 +" is not "+keyClass); 1452 if (val.getClass() != valClass) 1453 throw new IOException("wrong value class: "+val.getClass().getName() 1454 +" is not "+valClass); 1455 1456 buffer.reset(); 1457 1458 // Append the 'key' 1459 keySerializer.serialize(key); 1460 int keyLength = buffer.getLength(); 1461 if (keyLength < 0) 1462 throw new IOException("negative length keys not allowed: " + key); 1463 1464 // Compress 'value' and append it 1465 deflateFilter.resetState(); 1466 compressedValSerializer.serialize(val); 1467 deflateOut.flush(); 1468 deflateFilter.finish(); 1469 1470 // Write the record out 1471 checkAndWriteSync(); // sync 1472 out.writeInt(buffer.getLength()); // total record length 1473 out.writeInt(keyLength); // key portion length 1474 out.write(buffer.getData(), 0, buffer.getLength()); // data 1475 } 1476 1477 /** Append a key/value pair. */ 1478 @Override 1479 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1480 int keyLength, ValueBytes val) throws IOException { 1481 1482 if (keyLength < 0) 1483 throw new IOException("negative length keys not allowed: " + keyLength); 1484 1485 int valLength = val.getSize(); 1486 1487 checkAndWriteSync(); // sync 1488 out.writeInt(keyLength+valLength); // total record length 1489 out.writeInt(keyLength); // key portion length 1490 out.write(keyData, keyOffset, keyLength); // 'key' data 1491 val.writeCompressedBytes(out); // 'value' data 1492 } 1493 1494 } // RecordCompressionWriter 1495 1496 /** Write compressed key/value blocks to a sequence-format file. */ 1497 static class BlockCompressWriter extends Writer { 1498 1499 private int noBufferedRecords = 0; 1500 1501 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1502 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1503 1504 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1505 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1506 1507 private final int compressionBlockSize; 1508 1509 BlockCompressWriter(Configuration conf, 1510 Option... options) throws IOException { 1511 super(conf, options); 1512 compressionBlockSize = 1513 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1514 keySerializer.close(); 1515 keySerializer.open(keyBuffer); 1516 uncompressedValSerializer.close(); 1517 uncompressedValSerializer.open(valBuffer); 1518 } 1519 1520 /** Workhorse to check and write out compressed data/lengths */ 1521 private synchronized 1522 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1523 throws IOException { 1524 deflateFilter.resetState(); 1525 buffer.reset(); 1526 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1527 uncompressedDataBuffer.getLength()); 1528 deflateOut.flush(); 1529 deflateFilter.finish(); 1530 1531 WritableUtils.writeVInt(out, buffer.getLength()); 1532 out.write(buffer.getData(), 0, buffer.getLength()); 1533 } 1534 1535 /** Compress and flush contents to dfs */ 1536 @Override 1537 public synchronized void sync() throws IOException { 1538 if (noBufferedRecords > 0) { 1539 super.sync(); 1540 1541 // No. of records 1542 WritableUtils.writeVInt(out, noBufferedRecords); 1543 1544 // Write 'keys' and lengths 1545 writeBuffer(keyLenBuffer); 1546 writeBuffer(keyBuffer); 1547 1548 // Write 'values' and lengths 1549 writeBuffer(valLenBuffer); 1550 writeBuffer(valBuffer); 1551 1552 // Flush the file-stream 1553 out.flush(); 1554 1555 // Reset internal states 1556 keyLenBuffer.reset(); 1557 keyBuffer.reset(); 1558 valLenBuffer.reset(); 1559 valBuffer.reset(); 1560 noBufferedRecords = 0; 1561 } 1562 1563 } 1564 1565 /** Close the file. */ 1566 @Override 1567 public synchronized void close() throws IOException { 1568 if (out != null) { 1569 sync(); 1570 } 1571 super.close(); 1572 } 1573 1574 /** Append a key/value pair. */ 1575 @Override 1576 @SuppressWarnings("unchecked") 1577 public synchronized void append(Object key, Object val) 1578 throws IOException { 1579 if (key.getClass() != keyClass) 1580 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1581 if (val.getClass() != valClass) 1582 throw new IOException("wrong value class: "+val+" is not "+valClass); 1583 1584 // Save key/value into respective buffers 1585 int oldKeyLength = keyBuffer.getLength(); 1586 keySerializer.serialize(key); 1587 int keyLength = keyBuffer.getLength() - oldKeyLength; 1588 if (keyLength < 0) 1589 throw new IOException("negative length keys not allowed: " + key); 1590 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1591 1592 int oldValLength = valBuffer.getLength(); 1593 uncompressedValSerializer.serialize(val); 1594 int valLength = valBuffer.getLength() - oldValLength; 1595 WritableUtils.writeVInt(valLenBuffer, valLength); 1596 1597 // Added another key/value pair 1598 ++noBufferedRecords; 1599 1600 // Compress and flush? 1601 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1602 if (currentBlockSize >= compressionBlockSize) { 1603 sync(); 1604 } 1605 } 1606 1607 /** Append a key/value pair. */ 1608 @Override 1609 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1610 int keyLength, ValueBytes val) throws IOException { 1611 1612 if (keyLength < 0) 1613 throw new IOException("negative length keys not allowed"); 1614 1615 int valLength = val.getSize(); 1616 1617 // Save key/value data in relevant buffers 1618 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1619 keyBuffer.write(keyData, keyOffset, keyLength); 1620 WritableUtils.writeVInt(valLenBuffer, valLength); 1621 val.writeUncompressedBytes(valBuffer); 1622 1623 // Added another key/value pair 1624 ++noBufferedRecords; 1625 1626 // Compress and flush? 1627 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1628 if (currentBlockSize >= compressionBlockSize) { 1629 sync(); 1630 } 1631 } 1632 1633 } // BlockCompressionWriter 1634 1635 /** Get the configured buffer size */ 1636 private static int getBufferSize(Configuration conf) { 1637 return conf.getInt("io.file.buffer.size", 4096); 1638 } 1639 1640 /** Reads key/value pairs from a sequence-format file. */ 1641 public static class Reader implements java.io.Closeable { 1642 private String filename; 1643 private FSDataInputStream in; 1644 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1645 1646 private byte version; 1647 1648 private String keyClassName; 1649 private String valClassName; 1650 private Class keyClass; 1651 private Class valClass; 1652 1653 private CompressionCodec codec = null; 1654 private Metadata metadata = null; 1655 1656 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1657 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1658 private boolean syncSeen; 1659 1660 private long headerEnd; 1661 private long end; 1662 private int keyLength; 1663 private int recordLength; 1664 1665 private boolean decompress; 1666 private boolean blockCompressed; 1667 1668 private Configuration conf; 1669 1670 private int noBufferedRecords = 0; 1671 private boolean lazyDecompress = true; 1672 private boolean valuesDecompressed = true; 1673 1674 private int noBufferedKeys = 0; 1675 private int noBufferedValues = 0; 1676 1677 private DataInputBuffer keyLenBuffer = null; 1678 private CompressionInputStream keyLenInFilter = null; 1679 private DataInputStream keyLenIn = null; 1680 private Decompressor keyLenDecompressor = null; 1681 private DataInputBuffer keyBuffer = null; 1682 private CompressionInputStream keyInFilter = null; 1683 private DataInputStream keyIn = null; 1684 private Decompressor keyDecompressor = null; 1685 1686 private DataInputBuffer valLenBuffer = null; 1687 private CompressionInputStream valLenInFilter = null; 1688 private DataInputStream valLenIn = null; 1689 private Decompressor valLenDecompressor = null; 1690 private DataInputBuffer valBuffer = null; 1691 private CompressionInputStream valInFilter = null; 1692 private DataInputStream valIn = null; 1693 private Decompressor valDecompressor = null; 1694 1695 private Deserializer keyDeserializer; 1696 private Deserializer valDeserializer; 1697 1698 /** 1699 * A tag interface for all of the Reader options 1700 */ 1701 public static interface Option {} 1702 1703 /** 1704 * Create an option to specify the path name of the sequence file. 1705 * @param value the path to read 1706 * @return a new option 1707 */ 1708 public static Option file(Path value) { 1709 return new FileOption(value); 1710 } 1711 1712 /** 1713 * Create an option to specify the stream with the sequence file. 1714 * @param value the stream to read. 1715 * @return a new option 1716 */ 1717 public static Option stream(FSDataInputStream value) { 1718 return new InputStreamOption(value); 1719 } 1720 1721 /** 1722 * Create an option to specify the starting byte to read. 1723 * @param value the number of bytes to skip over 1724 * @return a new option 1725 */ 1726 public static Option start(long value) { 1727 return new StartOption(value); 1728 } 1729 1730 /** 1731 * Create an option to specify the number of bytes to read. 1732 * @param value the number of bytes to read 1733 * @return a new option 1734 */ 1735 public static Option length(long value) { 1736 return new LengthOption(value); 1737 } 1738 1739 /** 1740 * Create an option with the buffer size for reading the given pathname. 1741 * @param value the number of bytes to buffer 1742 * @return a new option 1743 */ 1744 public static Option bufferSize(int value) { 1745 return new BufferSizeOption(value); 1746 } 1747 1748 private static class FileOption extends Options.PathOption 1749 implements Option { 1750 private FileOption(Path value) { 1751 super(value); 1752 } 1753 } 1754 1755 private static class InputStreamOption 1756 extends Options.FSDataInputStreamOption 1757 implements Option { 1758 private InputStreamOption(FSDataInputStream value) { 1759 super(value); 1760 } 1761 } 1762 1763 private static class StartOption extends Options.LongOption 1764 implements Option { 1765 private StartOption(long value) { 1766 super(value); 1767 } 1768 } 1769 1770 private static class LengthOption extends Options.LongOption 1771 implements Option { 1772 private LengthOption(long value) { 1773 super(value); 1774 } 1775 } 1776 1777 private static class BufferSizeOption extends Options.IntegerOption 1778 implements Option { 1779 private BufferSizeOption(int value) { 1780 super(value); 1781 } 1782 } 1783 1784 // only used directly 1785 private static class OnlyHeaderOption extends Options.BooleanOption 1786 implements Option { 1787 private OnlyHeaderOption() { 1788 super(true); 1789 } 1790 } 1791 1792 public Reader(Configuration conf, Option... opts) throws IOException { 1793 // Look up the options, these are null if not set 1794 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1795 InputStreamOption streamOpt = 1796 Options.getOption(InputStreamOption.class, opts); 1797 StartOption startOpt = Options.getOption(StartOption.class, opts); 1798 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1799 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1800 OnlyHeaderOption headerOnly = 1801 Options.getOption(OnlyHeaderOption.class, opts); 1802 // check for consistency 1803 if ((fileOpt == null) == (streamOpt == null)) { 1804 throw new 1805 IllegalArgumentException("File or stream option must be specified"); 1806 } 1807 if (fileOpt == null && bufOpt != null) { 1808 throw new IllegalArgumentException("buffer size can only be set when" + 1809 " a file is specified."); 1810 } 1811 // figure out the real values 1812 Path filename = null; 1813 FSDataInputStream file; 1814 final long len; 1815 if (fileOpt != null) { 1816 filename = fileOpt.getValue(); 1817 FileSystem fs = filename.getFileSystem(conf); 1818 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1819 len = null == lenOpt 1820 ? fs.getFileStatus(filename).getLen() 1821 : lenOpt.getValue(); 1822 file = openFile(fs, filename, bufSize, len); 1823 } else { 1824 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1825 file = streamOpt.getValue(); 1826 } 1827 long start = startOpt == null ? 0 : startOpt.getValue(); 1828 // really set up 1829 initialize(filename, file, start, len, conf, headerOnly != null); 1830 } 1831 1832 /** 1833 * Construct a reader by opening a file from the given file system. 1834 * @param fs The file system used to open the file. 1835 * @param file The file being read. 1836 * @param conf Configuration 1837 * @throws IOException 1838 * @deprecated Use Reader(Configuration, Option...) instead. 1839 */ 1840 @Deprecated 1841 public Reader(FileSystem fs, Path file, 1842 Configuration conf) throws IOException { 1843 this(conf, file(file.makeQualified(fs))); 1844 } 1845 1846 /** 1847 * Construct a reader by the given input stream. 1848 * @param in An input stream. 1849 * @param buffersize unused 1850 * @param start The starting position. 1851 * @param length The length being read. 1852 * @param conf Configuration 1853 * @throws IOException 1854 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1855 */ 1856 @Deprecated 1857 public Reader(FSDataInputStream in, int buffersize, 1858 long start, long length, Configuration conf) throws IOException { 1859 this(conf, stream(in), start(start), length(length)); 1860 } 1861 1862 /** Common work of the constructors. */ 1863 private void initialize(Path filename, FSDataInputStream in, 1864 long start, long length, Configuration conf, 1865 boolean tempReader) throws IOException { 1866 if (in == null) { 1867 throw new IllegalArgumentException("in == null"); 1868 } 1869 this.filename = filename == null ? "<unknown>" : filename.toString(); 1870 this.in = in; 1871 this.conf = conf; 1872 boolean succeeded = false; 1873 try { 1874 seek(start); 1875 this.end = this.in.getPos() + length; 1876 // if it wrapped around, use the max 1877 if (end < length) { 1878 end = Long.MAX_VALUE; 1879 } 1880 init(tempReader); 1881 succeeded = true; 1882 } finally { 1883 if (!succeeded) { 1884 IOUtils.cleanup(LOG, this.in); 1885 } 1886 } 1887 } 1888 1889 /** 1890 * Override this method to specialize the type of 1891 * {@link FSDataInputStream} returned. 1892 * @param fs The file system used to open the file. 1893 * @param file The file being read. 1894 * @param bufferSize The buffer size used to read the file. 1895 * @param length The length being read if it is >= 0. Otherwise, 1896 * the length is not available. 1897 * @return The opened stream. 1898 * @throws IOException 1899 */ 1900 protected FSDataInputStream openFile(FileSystem fs, Path file, 1901 int bufferSize, long length) throws IOException { 1902 return fs.open(file, bufferSize); 1903 } 1904 1905 /** 1906 * Initialize the {@link Reader} 1907 * @param tmpReader <code>true</code> if we are constructing a temporary 1908 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1909 * and hence do not initialize every component; 1910 * <code>false</code> otherwise. 1911 * @throws IOException 1912 */ 1913 private void init(boolean tempReader) throws IOException { 1914 byte[] versionBlock = new byte[VERSION.length]; 1915 String exceptionMsg = this + " not a SequenceFile"; 1916 1917 // Try to read sequence file header. 1918 try { 1919 in.readFully(versionBlock); 1920 } catch (EOFException e) { 1921 throw new EOFException(exceptionMsg); 1922 } 1923 1924 if ((versionBlock[0] != VERSION[0]) || 1925 (versionBlock[1] != VERSION[1]) || 1926 (versionBlock[2] != VERSION[2])) { 1927 throw new IOException(this + " not a SequenceFile"); 1928 } 1929 1930 // Set 'version' 1931 version = versionBlock[3]; 1932 if (version > VERSION[3]) { 1933 throw new VersionMismatchException(VERSION[3], version); 1934 } 1935 1936 if (version < BLOCK_COMPRESS_VERSION) { 1937 UTF8 className = new UTF8(); 1938 1939 className.readFields(in); 1940 keyClassName = className.toStringChecked(); // key class name 1941 1942 className.readFields(in); 1943 valClassName = className.toStringChecked(); // val class name 1944 } else { 1945 keyClassName = Text.readString(in); 1946 valClassName = Text.readString(in); 1947 } 1948 1949 if (version > 2) { // if version > 2 1950 this.decompress = in.readBoolean(); // is compressed? 1951 } else { 1952 decompress = false; 1953 } 1954 1955 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1956 this.blockCompressed = in.readBoolean(); // is block-compressed? 1957 } else { 1958 blockCompressed = false; 1959 } 1960 1961 // if version >= 5 1962 // setup the compression codec 1963 if (decompress) { 1964 if (version >= CUSTOM_COMPRESS_VERSION) { 1965 String codecClassname = Text.readString(in); 1966 try { 1967 Class<? extends CompressionCodec> codecClass 1968 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1969 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1970 } catch (ClassNotFoundException cnfe) { 1971 throw new IllegalArgumentException("Unknown codec: " + 1972 codecClassname, cnfe); 1973 } 1974 } else { 1975 codec = new DefaultCodec(); 1976 ((Configurable)codec).setConf(conf); 1977 } 1978 } 1979 1980 this.metadata = new Metadata(); 1981 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1982 this.metadata.readFields(in); 1983 } 1984 1985 if (version > 1) { // if version > 1 1986 in.readFully(sync); // read sync bytes 1987 headerEnd = in.getPos(); // record end of header 1988 } 1989 1990 // Initialize... *not* if this we are constructing a temporary Reader 1991 if (!tempReader) { 1992 valBuffer = new DataInputBuffer(); 1993 if (decompress) { 1994 valDecompressor = CodecPool.getDecompressor(codec); 1995 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1996 valIn = new DataInputStream(valInFilter); 1997 } else { 1998 valIn = valBuffer; 1999 } 2000 2001 if (blockCompressed) { 2002 keyLenBuffer = new DataInputBuffer(); 2003 keyBuffer = new DataInputBuffer(); 2004 valLenBuffer = new DataInputBuffer(); 2005 2006 keyLenDecompressor = CodecPool.getDecompressor(codec); 2007 keyLenInFilter = codec.createInputStream(keyLenBuffer, 2008 keyLenDecompressor); 2009 keyLenIn = new DataInputStream(keyLenInFilter); 2010 2011 keyDecompressor = CodecPool.getDecompressor(codec); 2012 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 2013 keyIn = new DataInputStream(keyInFilter); 2014 2015 valLenDecompressor = CodecPool.getDecompressor(codec); 2016 valLenInFilter = codec.createInputStream(valLenBuffer, 2017 valLenDecompressor); 2018 valLenIn = new DataInputStream(valLenInFilter); 2019 } 2020 2021 SerializationFactory serializationFactory = 2022 new SerializationFactory(conf); 2023 this.keyDeserializer = 2024 getDeserializer(serializationFactory, getKeyClass()); 2025 if (this.keyDeserializer == null) { 2026 throw new IOException( 2027 "Could not find a deserializer for the Key class: '" 2028 + getKeyClass().getCanonicalName() + "'. " 2029 + "Please ensure that the configuration '" + 2030 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2031 + "properly configured, if you're using " 2032 + "custom serialization."); 2033 } 2034 if (!blockCompressed) { 2035 this.keyDeserializer.open(valBuffer); 2036 } else { 2037 this.keyDeserializer.open(keyIn); 2038 } 2039 this.valDeserializer = 2040 getDeserializer(serializationFactory, getValueClass()); 2041 if (this.valDeserializer == null) { 2042 throw new IOException( 2043 "Could not find a deserializer for the Value class: '" 2044 + getValueClass().getCanonicalName() + "'. " 2045 + "Please ensure that the configuration '" + 2046 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2047 + "properly configured, if you're using " 2048 + "custom serialization."); 2049 } 2050 this.valDeserializer.open(valIn); 2051 } 2052 } 2053 2054 @SuppressWarnings("unchecked") 2055 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 2056 return sf.getDeserializer(c); 2057 } 2058 2059 /** Close the file. */ 2060 @Override 2061 public synchronized void close() throws IOException { 2062 // Return the decompressors to the pool 2063 CodecPool.returnDecompressor(keyLenDecompressor); 2064 CodecPool.returnDecompressor(keyDecompressor); 2065 CodecPool.returnDecompressor(valLenDecompressor); 2066 CodecPool.returnDecompressor(valDecompressor); 2067 keyLenDecompressor = keyDecompressor = null; 2068 valLenDecompressor = valDecompressor = null; 2069 2070 if (keyDeserializer != null) { 2071 keyDeserializer.close(); 2072 } 2073 if (valDeserializer != null) { 2074 valDeserializer.close(); 2075 } 2076 2077 // Close the input-stream 2078 in.close(); 2079 } 2080 2081 /** Returns the name of the key class. */ 2082 public String getKeyClassName() { 2083 return keyClassName; 2084 } 2085 2086 /** Returns the class of keys in this file. */ 2087 public synchronized Class<?> getKeyClass() { 2088 if (null == keyClass) { 2089 try { 2090 keyClass = WritableName.getClass(getKeyClassName(), conf); 2091 } catch (IOException e) { 2092 throw new RuntimeException(e); 2093 } 2094 } 2095 return keyClass; 2096 } 2097 2098 /** Returns the name of the value class. */ 2099 public String getValueClassName() { 2100 return valClassName; 2101 } 2102 2103 /** Returns the class of values in this file. */ 2104 public synchronized Class<?> getValueClass() { 2105 if (null == valClass) { 2106 try { 2107 valClass = WritableName.getClass(getValueClassName(), conf); 2108 } catch (IOException e) { 2109 throw new RuntimeException(e); 2110 } 2111 } 2112 return valClass; 2113 } 2114 2115 /** Returns true if values are compressed. */ 2116 public boolean isCompressed() { return decompress; } 2117 2118 /** Returns true if records are block-compressed. */ 2119 public boolean isBlockCompressed() { return blockCompressed; } 2120 2121 /** Returns the compression codec of data in this file. */ 2122 public CompressionCodec getCompressionCodec() { return codec; } 2123 2124 private byte[] getSync() { 2125 return sync; 2126 } 2127 2128 private byte getVersion() { 2129 return version; 2130 } 2131 2132 /** 2133 * Get the compression type for this file. 2134 * @return the compression type 2135 */ 2136 public CompressionType getCompressionType() { 2137 if (decompress) { 2138 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2139 } else { 2140 return CompressionType.NONE; 2141 } 2142 } 2143 2144 /** Returns the metadata object of the file */ 2145 public Metadata getMetadata() { 2146 return this.metadata; 2147 } 2148 2149 /** Returns the configuration used for this file. */ 2150 Configuration getConf() { return conf; } 2151 2152 /** Read a compressed buffer */ 2153 private synchronized void readBuffer(DataInputBuffer buffer, 2154 CompressionInputStream filter) throws IOException { 2155 // Read data into a temporary buffer 2156 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2157 2158 try { 2159 int dataBufferLength = WritableUtils.readVInt(in); 2160 dataBuffer.write(in, dataBufferLength); 2161 2162 // Set up 'buffer' connected to the input-stream 2163 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2164 } finally { 2165 dataBuffer.close(); 2166 } 2167 2168 // Reset the codec 2169 filter.resetState(); 2170 } 2171 2172 /** Read the next 'compressed' block */ 2173 private synchronized void readBlock() throws IOException { 2174 // Check if we need to throw away a whole block of 2175 // 'values' due to 'lazy decompression' 2176 if (lazyDecompress && !valuesDecompressed) { 2177 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2178 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2179 } 2180 2181 // Reset internal states 2182 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2183 valuesDecompressed = false; 2184 2185 //Process sync 2186 if (sync != null) { 2187 in.readInt(); 2188 in.readFully(syncCheck); // read syncCheck 2189 if (!Arrays.equals(sync, syncCheck)) // check it 2190 throw new IOException("File is corrupt!"); 2191 } 2192 syncSeen = true; 2193 2194 // Read number of records in this block 2195 noBufferedRecords = WritableUtils.readVInt(in); 2196 2197 // Read key lengths and keys 2198 readBuffer(keyLenBuffer, keyLenInFilter); 2199 readBuffer(keyBuffer, keyInFilter); 2200 noBufferedKeys = noBufferedRecords; 2201 2202 // Read value lengths and values 2203 if (!lazyDecompress) { 2204 readBuffer(valLenBuffer, valLenInFilter); 2205 readBuffer(valBuffer, valInFilter); 2206 noBufferedValues = noBufferedRecords; 2207 valuesDecompressed = true; 2208 } 2209 } 2210 2211 /** 2212 * Position valLenIn/valIn to the 'value' 2213 * corresponding to the 'current' key 2214 */ 2215 private synchronized void seekToCurrentValue() throws IOException { 2216 if (!blockCompressed) { 2217 if (decompress) { 2218 valInFilter.resetState(); 2219 } 2220 valBuffer.reset(); 2221 } else { 2222 // Check if this is the first value in the 'block' to be read 2223 if (lazyDecompress && !valuesDecompressed) { 2224 // Read the value lengths and values 2225 readBuffer(valLenBuffer, valLenInFilter); 2226 readBuffer(valBuffer, valInFilter); 2227 noBufferedValues = noBufferedRecords; 2228 valuesDecompressed = true; 2229 } 2230 2231 // Calculate the no. of bytes to skip 2232 // Note: 'current' key has already been read! 2233 int skipValBytes = 0; 2234 int currentKey = noBufferedKeys + 1; 2235 for (int i=noBufferedValues; i > currentKey; --i) { 2236 skipValBytes += WritableUtils.readVInt(valLenIn); 2237 --noBufferedValues; 2238 } 2239 2240 // Skip to the 'val' corresponding to 'current' key 2241 if (skipValBytes > 0) { 2242 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2243 throw new IOException("Failed to seek to " + currentKey + 2244 "(th) value!"); 2245 } 2246 } 2247 } 2248 } 2249 2250 /** 2251 * Get the 'value' corresponding to the last read 'key'. 2252 * @param val : The 'value' to be read. 2253 * @throws IOException 2254 */ 2255 public synchronized void getCurrentValue(Writable val) 2256 throws IOException { 2257 if (val instanceof Configurable) { 2258 ((Configurable) val).setConf(this.conf); 2259 } 2260 2261 // Position stream to 'current' value 2262 seekToCurrentValue(); 2263 2264 if (!blockCompressed) { 2265 val.readFields(valIn); 2266 2267 if (valIn.read() > 0) { 2268 LOG.info("available bytes: " + valIn.available()); 2269 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2270 + " bytes, should read " + 2271 (valBuffer.getLength()-keyLength)); 2272 } 2273 } else { 2274 // Get the value 2275 int valLength = WritableUtils.readVInt(valLenIn); 2276 val.readFields(valIn); 2277 2278 // Read another compressed 'value' 2279 --noBufferedValues; 2280 2281 // Sanity check 2282 if ((valLength < 0) && LOG.isDebugEnabled()) { 2283 LOG.debug(val + " is a zero-length value"); 2284 } 2285 } 2286 2287 } 2288 2289 /** 2290 * Get the 'value' corresponding to the last read 'key'. 2291 * @param val : The 'value' to be read. 2292 * @throws IOException 2293 */ 2294 public synchronized Object getCurrentValue(Object val) 2295 throws IOException { 2296 if (val instanceof Configurable) { 2297 ((Configurable) val).setConf(this.conf); 2298 } 2299 2300 // Position stream to 'current' value 2301 seekToCurrentValue(); 2302 2303 if (!blockCompressed) { 2304 val = deserializeValue(val); 2305 2306 if (valIn.read() > 0) { 2307 LOG.info("available bytes: " + valIn.available()); 2308 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2309 + " bytes, should read " + 2310 (valBuffer.getLength()-keyLength)); 2311 } 2312 } else { 2313 // Get the value 2314 int valLength = WritableUtils.readVInt(valLenIn); 2315 val = deserializeValue(val); 2316 2317 // Read another compressed 'value' 2318 --noBufferedValues; 2319 2320 // Sanity check 2321 if ((valLength < 0) && LOG.isDebugEnabled()) { 2322 LOG.debug(val + " is a zero-length value"); 2323 } 2324 } 2325 return val; 2326 2327 } 2328 2329 @SuppressWarnings("unchecked") 2330 private Object deserializeValue(Object val) throws IOException { 2331 return valDeserializer.deserialize(val); 2332 } 2333 2334 /** Read the next key in the file into <code>key</code>, skipping its 2335 * value. True if another entry exists, and false at end of file. */ 2336 public synchronized boolean next(Writable key) throws IOException { 2337 if (key.getClass() != getKeyClass()) 2338 throw new IOException("wrong key class: "+key.getClass().getName() 2339 +" is not "+keyClass); 2340 2341 if (!blockCompressed) { 2342 outBuf.reset(); 2343 2344 keyLength = next(outBuf); 2345 if (keyLength < 0) 2346 return false; 2347 2348 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2349 2350 key.readFields(valBuffer); 2351 valBuffer.mark(0); 2352 if (valBuffer.getPosition() != keyLength) 2353 throw new IOException(key + " read " + valBuffer.getPosition() 2354 + " bytes, should read " + keyLength); 2355 } else { 2356 //Reset syncSeen 2357 syncSeen = false; 2358 2359 if (noBufferedKeys == 0) { 2360 try { 2361 readBlock(); 2362 } catch (EOFException eof) { 2363 return false; 2364 } 2365 } 2366 2367 int keyLength = WritableUtils.readVInt(keyLenIn); 2368 2369 // Sanity check 2370 if (keyLength < 0) { 2371 return false; 2372 } 2373 2374 //Read another compressed 'key' 2375 key.readFields(keyIn); 2376 --noBufferedKeys; 2377 } 2378 2379 return true; 2380 } 2381 2382 /** Read the next key/value pair in the file into <code>key</code> and 2383 * <code>val</code>. Returns true if such a pair exists and false when at 2384 * end of file */ 2385 public synchronized boolean next(Writable key, Writable val) 2386 throws IOException { 2387 if (val.getClass() != getValueClass()) 2388 throw new IOException("wrong value class: "+val+" is not "+valClass); 2389 2390 boolean more = next(key); 2391 2392 if (more) { 2393 getCurrentValue(val); 2394 } 2395 2396 return more; 2397 } 2398 2399 /** 2400 * Read and return the next record length, potentially skipping over 2401 * a sync block. 2402 * @return the length of the next record or -1 if there is no next record 2403 * @throws IOException 2404 */ 2405 private synchronized int readRecordLength() throws IOException { 2406 if (in.getPos() >= end) { 2407 return -1; 2408 } 2409 int length = in.readInt(); 2410 if (version > 1 && sync != null && 2411 length == SYNC_ESCAPE) { // process a sync entry 2412 in.readFully(syncCheck); // read syncCheck 2413 if (!Arrays.equals(sync, syncCheck)) // check it 2414 throw new IOException("File is corrupt!"); 2415 syncSeen = true; 2416 if (in.getPos() >= end) { 2417 return -1; 2418 } 2419 length = in.readInt(); // re-read length 2420 } else { 2421 syncSeen = false; 2422 } 2423 2424 return length; 2425 } 2426 2427 /** Read the next key/value pair in the file into <code>buffer</code>. 2428 * Returns the length of the key read, or -1 if at end of file. The length 2429 * of the value may be computed by calling buffer.getLength() before and 2430 * after calls to this method. */ 2431 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2432 @Deprecated 2433 synchronized int next(DataOutputBuffer buffer) throws IOException { 2434 // Unsupported for block-compressed sequence files 2435 if (blockCompressed) { 2436 throw new IOException("Unsupported call for block-compressed" + 2437 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2438 } 2439 try { 2440 int length = readRecordLength(); 2441 if (length == -1) { 2442 return -1; 2443 } 2444 int keyLength = in.readInt(); 2445 buffer.write(in, length); 2446 return keyLength; 2447 } catch (ChecksumException e) { // checksum failure 2448 handleChecksumException(e); 2449 return next(buffer); 2450 } 2451 } 2452 2453 public ValueBytes createValueBytes() { 2454 ValueBytes val = null; 2455 if (!decompress || blockCompressed) { 2456 val = new UncompressedBytes(); 2457 } else { 2458 val = new CompressedBytes(codec); 2459 } 2460 return val; 2461 } 2462 2463 /** 2464 * Read 'raw' records. 2465 * @param key - The buffer into which the key is read 2466 * @param val - The 'raw' value 2467 * @return Returns the total record length or -1 for end of file 2468 * @throws IOException 2469 */ 2470 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2471 throws IOException { 2472 if (!blockCompressed) { 2473 int length = readRecordLength(); 2474 if (length == -1) { 2475 return -1; 2476 } 2477 int keyLength = in.readInt(); 2478 int valLength = length - keyLength; 2479 key.write(in, keyLength); 2480 if (decompress) { 2481 CompressedBytes value = (CompressedBytes)val; 2482 value.reset(in, valLength); 2483 } else { 2484 UncompressedBytes value = (UncompressedBytes)val; 2485 value.reset(in, valLength); 2486 } 2487 2488 return length; 2489 } else { 2490 //Reset syncSeen 2491 syncSeen = false; 2492 2493 // Read 'key' 2494 if (noBufferedKeys == 0) { 2495 if (in.getPos() >= end) 2496 return -1; 2497 2498 try { 2499 readBlock(); 2500 } catch (EOFException eof) { 2501 return -1; 2502 } 2503 } 2504 int keyLength = WritableUtils.readVInt(keyLenIn); 2505 if (keyLength < 0) { 2506 throw new IOException("zero length key found!"); 2507 } 2508 key.write(keyIn, keyLength); 2509 --noBufferedKeys; 2510 2511 // Read raw 'value' 2512 seekToCurrentValue(); 2513 int valLength = WritableUtils.readVInt(valLenIn); 2514 UncompressedBytes rawValue = (UncompressedBytes)val; 2515 rawValue.reset(valIn, valLength); 2516 --noBufferedValues; 2517 2518 return (keyLength+valLength); 2519 } 2520 2521 } 2522 2523 /** 2524 * Read 'raw' keys. 2525 * @param key - The buffer into which the key is read 2526 * @return Returns the key length or -1 for end of file 2527 * @throws IOException 2528 */ 2529 public synchronized int nextRawKey(DataOutputBuffer key) 2530 throws IOException { 2531 if (!blockCompressed) { 2532 recordLength = readRecordLength(); 2533 if (recordLength == -1) { 2534 return -1; 2535 } 2536 keyLength = in.readInt(); 2537 key.write(in, keyLength); 2538 return keyLength; 2539 } else { 2540 //Reset syncSeen 2541 syncSeen = false; 2542 2543 // Read 'key' 2544 if (noBufferedKeys == 0) { 2545 if (in.getPos() >= end) 2546 return -1; 2547 2548 try { 2549 readBlock(); 2550 } catch (EOFException eof) { 2551 return -1; 2552 } 2553 } 2554 int keyLength = WritableUtils.readVInt(keyLenIn); 2555 if (keyLength < 0) { 2556 throw new IOException("zero length key found!"); 2557 } 2558 key.write(keyIn, keyLength); 2559 --noBufferedKeys; 2560 2561 return keyLength; 2562 } 2563 2564 } 2565 2566 /** Read the next key in the file, skipping its 2567 * value. Return null at end of file. */ 2568 public synchronized Object next(Object key) throws IOException { 2569 if (key != null && key.getClass() != getKeyClass()) { 2570 throw new IOException("wrong key class: "+key.getClass().getName() 2571 +" is not "+keyClass); 2572 } 2573 2574 if (!blockCompressed) { 2575 outBuf.reset(); 2576 2577 keyLength = next(outBuf); 2578 if (keyLength < 0) 2579 return null; 2580 2581 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2582 2583 key = deserializeKey(key); 2584 valBuffer.mark(0); 2585 if (valBuffer.getPosition() != keyLength) 2586 throw new IOException(key + " read " + valBuffer.getPosition() 2587 + " bytes, should read " + keyLength); 2588 } else { 2589 //Reset syncSeen 2590 syncSeen = false; 2591 2592 if (noBufferedKeys == 0) { 2593 try { 2594 readBlock(); 2595 } catch (EOFException eof) { 2596 return null; 2597 } 2598 } 2599 2600 int keyLength = WritableUtils.readVInt(keyLenIn); 2601 2602 // Sanity check 2603 if (keyLength < 0) { 2604 return null; 2605 } 2606 2607 //Read another compressed 'key' 2608 key = deserializeKey(key); 2609 --noBufferedKeys; 2610 } 2611 2612 return key; 2613 } 2614 2615 @SuppressWarnings("unchecked") 2616 private Object deserializeKey(Object key) throws IOException { 2617 return keyDeserializer.deserialize(key); 2618 } 2619 2620 /** 2621 * Read 'raw' values. 2622 * @param val - The 'raw' value 2623 * @return Returns the value length 2624 * @throws IOException 2625 */ 2626 public synchronized int nextRawValue(ValueBytes val) 2627 throws IOException { 2628 2629 // Position stream to current value 2630 seekToCurrentValue(); 2631 2632 if (!blockCompressed) { 2633 int valLength = recordLength - keyLength; 2634 if (decompress) { 2635 CompressedBytes value = (CompressedBytes)val; 2636 value.reset(in, valLength); 2637 } else { 2638 UncompressedBytes value = (UncompressedBytes)val; 2639 value.reset(in, valLength); 2640 } 2641 2642 return valLength; 2643 } else { 2644 int valLength = WritableUtils.readVInt(valLenIn); 2645 UncompressedBytes rawValue = (UncompressedBytes)val; 2646 rawValue.reset(valIn, valLength); 2647 --noBufferedValues; 2648 return valLength; 2649 } 2650 2651 } 2652 2653 private void handleChecksumException(ChecksumException e) 2654 throws IOException { 2655 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2656 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2657 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2658 } else { 2659 throw e; 2660 } 2661 } 2662 2663 /** disables sync. often invoked for tmp files */ 2664 synchronized void ignoreSync() { 2665 sync = null; 2666 } 2667 2668 /** Set the current byte position in the input file. 2669 * 2670 * <p>The position passed must be a position returned by {@link 2671 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2672 * position, use {@link SequenceFile.Reader#sync(long)}. 2673 */ 2674 public synchronized void seek(long position) throws IOException { 2675 in.seek(position); 2676 if (blockCompressed) { // trigger block read 2677 noBufferedKeys = 0; 2678 valuesDecompressed = true; 2679 } 2680 } 2681 2682 /** Seek to the next sync mark past a given position.*/ 2683 public synchronized void sync(long position) throws IOException { 2684 if (position+SYNC_SIZE >= end) { 2685 seek(end); 2686 return; 2687 } 2688 2689 if (position < headerEnd) { 2690 // seek directly to first record 2691 in.seek(headerEnd); 2692 // note the sync marker "seen" in the header 2693 syncSeen = true; 2694 return; 2695 } 2696 2697 try { 2698 seek(position+4); // skip escape 2699 in.readFully(syncCheck); 2700 int syncLen = sync.length; 2701 for (int i = 0; in.getPos() < end; i++) { 2702 int j = 0; 2703 for (; j < syncLen; j++) { 2704 if (sync[j] != syncCheck[(i+j)%syncLen]) 2705 break; 2706 } 2707 if (j == syncLen) { 2708 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2709 return; 2710 } 2711 syncCheck[i%syncLen] = in.readByte(); 2712 } 2713 } catch (ChecksumException e) { // checksum failure 2714 handleChecksumException(e); 2715 } 2716 } 2717 2718 /** Returns true iff the previous call to next passed a sync mark.*/ 2719 public synchronized boolean syncSeen() { return syncSeen; } 2720 2721 /** Return the current byte position in the input file. */ 2722 public synchronized long getPosition() throws IOException { 2723 return in.getPos(); 2724 } 2725 2726 /** Returns the name of the file. */ 2727 @Override 2728 public String toString() { 2729 return filename; 2730 } 2731 2732 } 2733 2734 /** Sorts key/value pairs in a sequence-format file. 2735 * 2736 * <p>For best performance, applications should make sure that the {@link 2737 * Writable#readFields(DataInput)} implementation of their keys is 2738 * very efficient. In particular, it should avoid allocating memory. 2739 */ 2740 public static class Sorter { 2741 2742 private RawComparator comparator; 2743 2744 private MergeSort mergeSort; //the implementation of merge sort 2745 2746 private Path[] inFiles; // when merging or sorting 2747 2748 private Path outFile; 2749 2750 private int memory; // bytes 2751 private int factor; // merged per pass 2752 2753 private FileSystem fs = null; 2754 2755 private Class keyClass; 2756 private Class valClass; 2757 2758 private Configuration conf; 2759 private Metadata metadata; 2760 2761 private Progressable progressable = null; 2762 2763 /** Sort and merge files containing the named classes. */ 2764 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2765 Class valClass, Configuration conf) { 2766 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); 2767 } 2768 2769 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2770 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2771 Class valClass, Configuration conf) { 2772 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2773 } 2774 2775 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2776 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2777 Class valClass, Configuration conf, Metadata metadata) { 2778 this.fs = fs; 2779 this.comparator = comparator; 2780 this.keyClass = keyClass; 2781 this.valClass = valClass; 2782 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2783 this.factor = conf.getInt("io.sort.factor", 100); 2784 this.conf = conf; 2785 this.metadata = metadata; 2786 } 2787 2788 /** Set the number of streams to merge at once.*/ 2789 public void setFactor(int factor) { this.factor = factor; } 2790 2791 /** Get the number of streams to merge at once.*/ 2792 public int getFactor() { return factor; } 2793 2794 /** Set the total amount of buffer memory, in bytes.*/ 2795 public void setMemory(int memory) { this.memory = memory; } 2796 2797 /** Get the total amount of buffer memory, in bytes.*/ 2798 public int getMemory() { return memory; } 2799 2800 /** Set the progressable object in order to report progress. */ 2801 public void setProgressable(Progressable progressable) { 2802 this.progressable = progressable; 2803 } 2804 2805 /** 2806 * Perform a file sort from a set of input files into an output file. 2807 * @param inFiles the files to be sorted 2808 * @param outFile the sorted output file 2809 * @param deleteInput should the input files be deleted as they are read? 2810 */ 2811 public void sort(Path[] inFiles, Path outFile, 2812 boolean deleteInput) throws IOException { 2813 if (fs.exists(outFile)) { 2814 throw new IOException("already exists: " + outFile); 2815 } 2816 2817 this.inFiles = inFiles; 2818 this.outFile = outFile; 2819 2820 int segments = sortPass(deleteInput); 2821 if (segments > 1) { 2822 mergePass(outFile.getParent()); 2823 } 2824 } 2825 2826 /** 2827 * Perform a file sort from a set of input files and return an iterator. 2828 * @param inFiles the files to be sorted 2829 * @param tempDir the directory where temp files are created during sort 2830 * @param deleteInput should the input files be deleted as they are read? 2831 * @return iterator the RawKeyValueIterator 2832 */ 2833 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2834 boolean deleteInput) throws IOException { 2835 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2836 if (fs.exists(outFile)) { 2837 throw new IOException("already exists: " + outFile); 2838 } 2839 this.inFiles = inFiles; 2840 //outFile will basically be used as prefix for temp files in the cases 2841 //where sort outputs multiple sorted segments. For the single segment 2842 //case, the outputFile itself will contain the sorted data for that 2843 //segment 2844 this.outFile = outFile; 2845 2846 int segments = sortPass(deleteInput); 2847 if (segments > 1) 2848 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2849 tempDir); 2850 else if (segments == 1) 2851 return merge(new Path[]{outFile}, true, tempDir); 2852 else return null; 2853 } 2854 2855 /** 2856 * The backwards compatible interface to sort. 2857 * @param inFile the input file to sort 2858 * @param outFile the sorted output file 2859 */ 2860 public void sort(Path inFile, Path outFile) throws IOException { 2861 sort(new Path[]{inFile}, outFile, false); 2862 } 2863 2864 private int sortPass(boolean deleteInput) throws IOException { 2865 if(LOG.isDebugEnabled()) { 2866 LOG.debug("running sort pass"); 2867 } 2868 SortPass sortPass = new SortPass(); // make the SortPass 2869 sortPass.setProgressable(progressable); 2870 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2871 try { 2872 return sortPass.run(deleteInput); // run it 2873 } finally { 2874 sortPass.close(); // close it 2875 } 2876 } 2877 2878 private class SortPass { 2879 private int memoryLimit = memory/4; 2880 private int recordLimit = 1000000; 2881 2882 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2883 private byte[] rawBuffer; 2884 2885 private int[] keyOffsets = new int[1024]; 2886 private int[] pointers = new int[keyOffsets.length]; 2887 private int[] pointersCopy = new int[keyOffsets.length]; 2888 private int[] keyLengths = new int[keyOffsets.length]; 2889 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2890 2891 private ArrayList segmentLengths = new ArrayList(); 2892 2893 private Reader in = null; 2894 private FSDataOutputStream out = null; 2895 private FSDataOutputStream indexOut = null; 2896 private Path outName; 2897 2898 private Progressable progressable = null; 2899 2900 public int run(boolean deleteInput) throws IOException { 2901 int segments = 0; 2902 int currentFile = 0; 2903 boolean atEof = (currentFile >= inFiles.length); 2904 CompressionType compressionType; 2905 CompressionCodec codec = null; 2906 segmentLengths.clear(); 2907 if (atEof) { 2908 return 0; 2909 } 2910 2911 // Initialize 2912 in = new Reader(fs, inFiles[currentFile], conf); 2913 compressionType = in.getCompressionType(); 2914 codec = in.getCompressionCodec(); 2915 2916 for (int i=0; i < rawValues.length; ++i) { 2917 rawValues[i] = null; 2918 } 2919 2920 while (!atEof) { 2921 int count = 0; 2922 int bytesProcessed = 0; 2923 rawKeys.reset(); 2924 while (!atEof && 2925 bytesProcessed < memoryLimit && count < recordLimit) { 2926 2927 // Read a record into buffer 2928 // Note: Attempt to re-use 'rawValue' as far as possible 2929 int keyOffset = rawKeys.getLength(); 2930 ValueBytes rawValue = 2931 (count == keyOffsets.length || rawValues[count] == null) ? 2932 in.createValueBytes() : 2933 rawValues[count]; 2934 int recordLength = in.nextRaw(rawKeys, rawValue); 2935 if (recordLength == -1) { 2936 in.close(); 2937 if (deleteInput) { 2938 fs.delete(inFiles[currentFile], true); 2939 } 2940 currentFile += 1; 2941 atEof = currentFile >= inFiles.length; 2942 if (!atEof) { 2943 in = new Reader(fs, inFiles[currentFile], conf); 2944 } else { 2945 in = null; 2946 } 2947 continue; 2948 } 2949 2950 int keyLength = rawKeys.getLength() - keyOffset; 2951 2952 if (count == keyOffsets.length) 2953 grow(); 2954 2955 keyOffsets[count] = keyOffset; // update pointers 2956 pointers[count] = count; 2957 keyLengths[count] = keyLength; 2958 rawValues[count] = rawValue; 2959 2960 bytesProcessed += recordLength; 2961 count++; 2962 } 2963 2964 // buffer is full -- sort & flush it 2965 if(LOG.isDebugEnabled()) { 2966 LOG.debug("flushing segment " + segments); 2967 } 2968 rawBuffer = rawKeys.getData(); 2969 sort(count); 2970 // indicate we're making progress 2971 if (progressable != null) { 2972 progressable.progress(); 2973 } 2974 flush(count, bytesProcessed, compressionType, codec, 2975 segments==0 && atEof); 2976 segments++; 2977 } 2978 return segments; 2979 } 2980 2981 public void close() throws IOException { 2982 if (in != null) { 2983 in.close(); 2984 } 2985 if (out != null) { 2986 out.close(); 2987 } 2988 if (indexOut != null) { 2989 indexOut.close(); 2990 } 2991 } 2992 2993 private void grow() { 2994 int newLength = keyOffsets.length * 3 / 2; 2995 keyOffsets = grow(keyOffsets, newLength); 2996 pointers = grow(pointers, newLength); 2997 pointersCopy = new int[newLength]; 2998 keyLengths = grow(keyLengths, newLength); 2999 rawValues = grow(rawValues, newLength); 3000 } 3001 3002 private int[] grow(int[] old, int newLength) { 3003 int[] result = new int[newLength]; 3004 System.arraycopy(old, 0, result, 0, old.length); 3005 return result; 3006 } 3007 3008 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 3009 ValueBytes[] result = new ValueBytes[newLength]; 3010 System.arraycopy(old, 0, result, 0, old.length); 3011 for (int i=old.length; i < newLength; ++i) { 3012 result[i] = null; 3013 } 3014 return result; 3015 } 3016 3017 private void flush(int count, int bytesProcessed, 3018 CompressionType compressionType, 3019 CompressionCodec codec, 3020 boolean done) throws IOException { 3021 if (out == null) { 3022 outName = done ? outFile : outFile.suffix(".0"); 3023 out = fs.create(outName); 3024 if (!done) { 3025 indexOut = fs.create(outName.suffix(".index")); 3026 } 3027 } 3028 3029 long segmentStart = out.getPos(); 3030 Writer writer = createWriter(conf, Writer.stream(out), 3031 Writer.keyClass(keyClass), Writer.valueClass(valClass), 3032 Writer.compression(compressionType, codec), 3033 Writer.metadata(done ? metadata : new Metadata())); 3034 3035 if (!done) { 3036 writer.sync = null; // disable sync on temp files 3037 } 3038 3039 for (int i = 0; i < count; i++) { // write in sorted order 3040 int p = pointers[i]; 3041 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 3042 } 3043 writer.close(); 3044 3045 if (!done) { 3046 // Save the segment length 3047 WritableUtils.writeVLong(indexOut, segmentStart); 3048 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 3049 indexOut.flush(); 3050 } 3051 } 3052 3053 private void sort(int count) { 3054 System.arraycopy(pointers, 0, pointersCopy, 0, count); 3055 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 3056 } 3057 class SeqFileComparator implements Comparator<IntWritable> { 3058 @Override 3059 public int compare(IntWritable I, IntWritable J) { 3060 return comparator.compare(rawBuffer, keyOffsets[I.get()], 3061 keyLengths[I.get()], rawBuffer, 3062 keyOffsets[J.get()], keyLengths[J.get()]); 3063 } 3064 } 3065 3066 /** set the progressable object in order to report progress */ 3067 public void setProgressable(Progressable progressable) 3068 { 3069 this.progressable = progressable; 3070 } 3071 3072 } // SequenceFile.Sorter.SortPass 3073 3074 /** The interface to iterate over raw keys/values of SequenceFiles. */ 3075 public static interface RawKeyValueIterator { 3076 /** Gets the current raw key 3077 * @return DataOutputBuffer 3078 * @throws IOException 3079 */ 3080 DataOutputBuffer getKey() throws IOException; 3081 /** Gets the current raw value 3082 * @return ValueBytes 3083 * @throws IOException 3084 */ 3085 ValueBytes getValue() throws IOException; 3086 /** Sets up the current key and value (for getKey and getValue) 3087 * @return true if there exists a key/value, false otherwise 3088 * @throws IOException 3089 */ 3090 boolean next() throws IOException; 3091 /** closes the iterator so that the underlying streams can be closed 3092 * @throws IOException 3093 */ 3094 void close() throws IOException; 3095 /** Gets the Progress object; this has a float (0.0 - 1.0) 3096 * indicating the bytes processed by the iterator so far 3097 */ 3098 Progress getProgress(); 3099 } 3100 3101 /** 3102 * Merges the list of segments of type <code>SegmentDescriptor</code> 3103 * @param segments the list of SegmentDescriptors 3104 * @param tmpDir the directory to write temporary files into 3105 * @return RawKeyValueIterator 3106 * @throws IOException 3107 */ 3108 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3109 Path tmpDir) 3110 throws IOException { 3111 // pass in object to report progress, if present 3112 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3113 return mQueue.merge(); 3114 } 3115 3116 /** 3117 * Merges the contents of files passed in Path[] using a max factor value 3118 * that is already set 3119 * @param inNames the array of path names 3120 * @param deleteInputs true if the input files should be deleted when 3121 * unnecessary 3122 * @param tmpDir the directory to write temporary files into 3123 * @return RawKeyValueIteratorMergeQueue 3124 * @throws IOException 3125 */ 3126 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3127 Path tmpDir) 3128 throws IOException { 3129 return merge(inNames, deleteInputs, 3130 (inNames.length < factor) ? inNames.length : factor, 3131 tmpDir); 3132 } 3133 3134 /** 3135 * Merges the contents of files passed in Path[] 3136 * @param inNames the array of path names 3137 * @param deleteInputs true if the input files should be deleted when 3138 * unnecessary 3139 * @param factor the factor that will be used as the maximum merge fan-in 3140 * @param tmpDir the directory to write temporary files into 3141 * @return RawKeyValueIteratorMergeQueue 3142 * @throws IOException 3143 */ 3144 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3145 int factor, Path tmpDir) 3146 throws IOException { 3147 //get the segments from inNames 3148 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3149 for (int i = 0; i < inNames.length; i++) { 3150 SegmentDescriptor s = new SegmentDescriptor(0, 3151 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3152 s.preserveInput(!deleteInputs); 3153 s.doSync(); 3154 a.add(s); 3155 } 3156 this.factor = factor; 3157 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3158 return mQueue.merge(); 3159 } 3160 3161 /** 3162 * Merges the contents of files passed in Path[] 3163 * @param inNames the array of path names 3164 * @param tempDir the directory for creating temp files during merge 3165 * @param deleteInputs true if the input files should be deleted when 3166 * unnecessary 3167 * @return RawKeyValueIteratorMergeQueue 3168 * @throws IOException 3169 */ 3170 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3171 boolean deleteInputs) 3172 throws IOException { 3173 //outFile will basically be used as prefix for temp files for the 3174 //intermediate merge outputs 3175 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3176 //get the segments from inNames 3177 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3178 for (int i = 0; i < inNames.length; i++) { 3179 SegmentDescriptor s = new SegmentDescriptor(0, 3180 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3181 s.preserveInput(!deleteInputs); 3182 s.doSync(); 3183 a.add(s); 3184 } 3185 factor = (inNames.length < factor) ? inNames.length : factor; 3186 // pass in object to report progress, if present 3187 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3188 return mQueue.merge(); 3189 } 3190 3191 /** 3192 * Clones the attributes (like compression of the input file and creates a 3193 * corresponding Writer 3194 * @param inputFile the path of the input file whose attributes should be 3195 * cloned 3196 * @param outputFile the path of the output file 3197 * @param prog the Progressable to report status during the file write 3198 * @return Writer 3199 * @throws IOException 3200 */ 3201 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3202 Progressable prog) throws IOException { 3203 Reader reader = new Reader(conf, 3204 Reader.file(inputFile), 3205 new Reader.OnlyHeaderOption()); 3206 CompressionType compress = reader.getCompressionType(); 3207 CompressionCodec codec = reader.getCompressionCodec(); 3208 reader.close(); 3209 3210 Writer writer = createWriter(conf, 3211 Writer.file(outputFile), 3212 Writer.keyClass(keyClass), 3213 Writer.valueClass(valClass), 3214 Writer.compression(compress, codec), 3215 Writer.progressable(prog)); 3216 return writer; 3217 } 3218 3219 /** 3220 * Writes records from RawKeyValueIterator into a file represented by the 3221 * passed writer 3222 * @param records the RawKeyValueIterator 3223 * @param writer the Writer created earlier 3224 * @throws IOException 3225 */ 3226 public void writeFile(RawKeyValueIterator records, Writer writer) 3227 throws IOException { 3228 while(records.next()) { 3229 writer.appendRaw(records.getKey().getData(), 0, 3230 records.getKey().getLength(), records.getValue()); 3231 } 3232 writer.sync(); 3233 } 3234 3235 /** Merge the provided files. 3236 * @param inFiles the array of input path names 3237 * @param outFile the final output file 3238 * @throws IOException 3239 */ 3240 public void merge(Path[] inFiles, Path outFile) throws IOException { 3241 if (fs.exists(outFile)) { 3242 throw new IOException("already exists: " + outFile); 3243 } 3244 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3245 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3246 3247 writeFile(r, writer); 3248 3249 writer.close(); 3250 } 3251 3252 /** sort calls this to generate the final merged output */ 3253 private int mergePass(Path tmpDir) throws IOException { 3254 if(LOG.isDebugEnabled()) { 3255 LOG.debug("running merge pass"); 3256 } 3257 Writer writer = cloneFileAttributes( 3258 outFile.suffix(".0"), outFile, null); 3259 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3260 outFile.suffix(".0.index"), tmpDir); 3261 writeFile(r, writer); 3262 3263 writer.close(); 3264 return 0; 3265 } 3266 3267 /** Used by mergePass to merge the output of the sort 3268 * @param inName the name of the input file containing sorted segments 3269 * @param indexIn the offsets of the sorted segments 3270 * @param tmpDir the relative directory to store intermediate results in 3271 * @return RawKeyValueIterator 3272 * @throws IOException 3273 */ 3274 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3275 throws IOException { 3276 //get the segments from indexIn 3277 //we create a SegmentContainer so that we can track segments belonging to 3278 //inName and delete inName as soon as we see that we have looked at all 3279 //the contained segments during the merge process & hence don't need 3280 //them anymore 3281 SegmentContainer container = new SegmentContainer(inName, indexIn); 3282 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3283 return mQueue.merge(); 3284 } 3285 3286 /** This class implements the core of the merge logic */ 3287 private class MergeQueue extends PriorityQueue 3288 implements RawKeyValueIterator { 3289 private boolean compress; 3290 private boolean blockCompress; 3291 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3292 private ValueBytes rawValue; 3293 private long totalBytesProcessed; 3294 private float progPerByte; 3295 private Progress mergeProgress = new Progress(); 3296 private Path tmpDir; 3297 private Progressable progress = null; //handle to the progress reporting object 3298 private SegmentDescriptor minSegment; 3299 3300 //a TreeMap used to store the segments sorted by size (segment offset and 3301 //segment path name is used to break ties between segments of same sizes) 3302 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3303 new TreeMap<SegmentDescriptor, Void>(); 3304 3305 @SuppressWarnings("unchecked") 3306 public void put(SegmentDescriptor stream) throws IOException { 3307 if (size() == 0) { 3308 compress = stream.in.isCompressed(); 3309 blockCompress = stream.in.isBlockCompressed(); 3310 } else if (compress != stream.in.isCompressed() || 3311 blockCompress != stream.in.isBlockCompressed()) { 3312 throw new IOException("All merged files must be compressed or not."); 3313 } 3314 super.put(stream); 3315 } 3316 3317 /** 3318 * A queue of file segments to merge 3319 * @param segments the file segments to merge 3320 * @param tmpDir a relative local directory to save intermediate files in 3321 * @param progress the reference to the Progressable object 3322 */ 3323 public MergeQueue(List <SegmentDescriptor> segments, 3324 Path tmpDir, Progressable progress) { 3325 int size = segments.size(); 3326 for (int i = 0; i < size; i++) { 3327 sortedSegmentSizes.put(segments.get(i), null); 3328 } 3329 this.tmpDir = tmpDir; 3330 this.progress = progress; 3331 } 3332 @Override 3333 protected boolean lessThan(Object a, Object b) { 3334 // indicate we're making progress 3335 if (progress != null) { 3336 progress.progress(); 3337 } 3338 SegmentDescriptor msa = (SegmentDescriptor)a; 3339 SegmentDescriptor msb = (SegmentDescriptor)b; 3340 return comparator.compare(msa.getKey().getData(), 0, 3341 msa.getKey().getLength(), msb.getKey().getData(), 0, 3342 msb.getKey().getLength()) < 0; 3343 } 3344 @Override 3345 public void close() throws IOException { 3346 SegmentDescriptor ms; // close inputs 3347 while ((ms = (SegmentDescriptor)pop()) != null) { 3348 ms.cleanup(); 3349 } 3350 minSegment = null; 3351 } 3352 @Override 3353 public DataOutputBuffer getKey() throws IOException { 3354 return rawKey; 3355 } 3356 @Override 3357 public ValueBytes getValue() throws IOException { 3358 return rawValue; 3359 } 3360 @Override 3361 public boolean next() throws IOException { 3362 if (size() == 0) 3363 return false; 3364 if (minSegment != null) { 3365 //minSegment is non-null for all invocations of next except the first 3366 //one. For the first invocation, the priority queue is ready for use 3367 //but for the subsequent invocations, first adjust the queue 3368 adjustPriorityQueue(minSegment); 3369 if (size() == 0) { 3370 minSegment = null; 3371 return false; 3372 } 3373 } 3374 minSegment = (SegmentDescriptor)top(); 3375 long startPos = minSegment.in.getPosition(); // Current position in stream 3376 //save the raw key reference 3377 rawKey = minSegment.getKey(); 3378 //load the raw value. Re-use the existing rawValue buffer 3379 if (rawValue == null) { 3380 rawValue = minSegment.in.createValueBytes(); 3381 } 3382 minSegment.nextRawValue(rawValue); 3383 long endPos = minSegment.in.getPosition(); // End position after reading value 3384 updateProgress(endPos - startPos); 3385 return true; 3386 } 3387 3388 @Override 3389 public Progress getProgress() { 3390 return mergeProgress; 3391 } 3392 3393 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3394 long startPos = ms.in.getPosition(); // Current position in stream 3395 boolean hasNext = ms.nextRawKey(); 3396 long endPos = ms.in.getPosition(); // End position after reading key 3397 updateProgress(endPos - startPos); 3398 if (hasNext) { 3399 adjustTop(); 3400 } else { 3401 pop(); 3402 ms.cleanup(); 3403 } 3404 } 3405 3406 private void updateProgress(long bytesProcessed) { 3407 totalBytesProcessed += bytesProcessed; 3408 if (progPerByte > 0) { 3409 mergeProgress.set(totalBytesProcessed * progPerByte); 3410 } 3411 } 3412 3413 /** This is the single level merge that is called multiple times 3414 * depending on the factor size and the number of segments 3415 * @return RawKeyValueIterator 3416 * @throws IOException 3417 */ 3418 public RawKeyValueIterator merge() throws IOException { 3419 //create the MergeStreams from the sorted map created in the constructor 3420 //and dump the final output to a file 3421 int numSegments = sortedSegmentSizes.size(); 3422 int origFactor = factor; 3423 int passNo = 1; 3424 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3425 do { 3426 //get the factor for this pass of merge 3427 factor = getPassFactor(passNo, numSegments); 3428 List<SegmentDescriptor> segmentsToMerge = 3429 new ArrayList<SegmentDescriptor>(); 3430 int segmentsConsidered = 0; 3431 int numSegmentsToConsider = factor; 3432 while (true) { 3433 //extract the smallest 'factor' number of segment pointers from the 3434 //TreeMap. Call cleanup on the empty segments (no key/value data) 3435 SegmentDescriptor[] mStream = 3436 getSegmentDescriptors(numSegmentsToConsider); 3437 for (int i = 0; i < mStream.length; i++) { 3438 if (mStream[i].nextRawKey()) { 3439 segmentsToMerge.add(mStream[i]); 3440 segmentsConsidered++; 3441 // Count the fact that we read some bytes in calling nextRawKey() 3442 updateProgress(mStream[i].in.getPosition()); 3443 } 3444 else { 3445 mStream[i].cleanup(); 3446 numSegments--; //we ignore this segment for the merge 3447 } 3448 } 3449 //if we have the desired number of segments 3450 //or looked at all available segments, we break 3451 if (segmentsConsidered == factor || 3452 sortedSegmentSizes.size() == 0) { 3453 break; 3454 } 3455 3456 numSegmentsToConsider = factor - segmentsConsidered; 3457 } 3458 //feed the streams to the priority queue 3459 initialize(segmentsToMerge.size()); clear(); 3460 for (int i = 0; i < segmentsToMerge.size(); i++) { 3461 put(segmentsToMerge.get(i)); 3462 } 3463 //if we have lesser number of segments remaining, then just return the 3464 //iterator, else do another single level merge 3465 if (numSegments <= factor) { 3466 //calculate the length of the remaining segments. Required for 3467 //calculating the merge progress 3468 long totalBytes = 0; 3469 for (int i = 0; i < segmentsToMerge.size(); i++) { 3470 totalBytes += segmentsToMerge.get(i).segmentLength; 3471 } 3472 if (totalBytes != 0) //being paranoid 3473 progPerByte = 1.0f / (float)totalBytes; 3474 //reset factor to what it originally was 3475 factor = origFactor; 3476 return this; 3477 } else { 3478 //we want to spread the creation of temp files on multiple disks if 3479 //available under the space constraints 3480 long approxOutputSize = 0; 3481 for (SegmentDescriptor s : segmentsToMerge) { 3482 approxOutputSize += s.segmentLength + 3483 ChecksumFileSystem.getApproxChkSumLength( 3484 s.segmentLength); 3485 } 3486 Path tmpFilename = 3487 new Path(tmpDir, "intermediate").suffix("." + passNo); 3488 3489 Path outputFile = lDirAlloc.getLocalPathForWrite( 3490 tmpFilename.toString(), 3491 approxOutputSize, conf); 3492 if(LOG.isDebugEnabled()) { 3493 LOG.debug("writing intermediate results to " + outputFile); 3494 } 3495 Writer writer = cloneFileAttributes( 3496 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3497 fs.makeQualified(outputFile), null); 3498 writer.sync = null; //disable sync for temp files 3499 writeFile(this, writer); 3500 writer.close(); 3501 3502 //we finished one single level merge; now clean up the priority 3503 //queue 3504 this.close(); 3505 3506 SegmentDescriptor tempSegment = 3507 new SegmentDescriptor(0, 3508 fs.getFileStatus(outputFile).getLen(), outputFile); 3509 //put the segment back in the TreeMap 3510 sortedSegmentSizes.put(tempSegment, null); 3511 numSegments = sortedSegmentSizes.size(); 3512 passNo++; 3513 } 3514 //we are worried about only the first pass merge factor. So reset the 3515 //factor to what it originally was 3516 factor = origFactor; 3517 } while(true); 3518 } 3519 3520 //Hadoop-591 3521 public int getPassFactor(int passNo, int numSegments) { 3522 if (passNo > 1 || numSegments <= factor || factor == 1) 3523 return factor; 3524 int mod = (numSegments - 1) % (factor - 1); 3525 if (mod == 0) 3526 return factor; 3527 return mod + 1; 3528 } 3529 3530 /** Return (& remove) the requested number of segment descriptors from the 3531 * sorted map. 3532 */ 3533 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3534 if (numDescriptors > sortedSegmentSizes.size()) 3535 numDescriptors = sortedSegmentSizes.size(); 3536 SegmentDescriptor[] SegmentDescriptors = 3537 new SegmentDescriptor[numDescriptors]; 3538 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3539 int i = 0; 3540 while (i < numDescriptors) { 3541 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3542 iter.remove(); 3543 } 3544 return SegmentDescriptors; 3545 } 3546 } // SequenceFile.Sorter.MergeQueue 3547 3548 /** This class defines a merge segment. This class can be subclassed to 3549 * provide a customized cleanup method implementation. In this 3550 * implementation, cleanup closes the file handle and deletes the file 3551 */ 3552 public class SegmentDescriptor implements Comparable { 3553 3554 long segmentOffset; //the start of the segment in the file 3555 long segmentLength; //the length of the segment 3556 Path segmentPathName; //the path name of the file containing the segment 3557 boolean ignoreSync = true; //set to true for temp files 3558 private Reader in = null; 3559 private DataOutputBuffer rawKey = null; //this will hold the current key 3560 private boolean preserveInput = false; //delete input segment files? 3561 3562 /** Constructs a segment 3563 * @param segmentOffset the offset of the segment in the file 3564 * @param segmentLength the length of the segment 3565 * @param segmentPathName the path name of the file containing the segment 3566 */ 3567 public SegmentDescriptor (long segmentOffset, long segmentLength, 3568 Path segmentPathName) { 3569 this.segmentOffset = segmentOffset; 3570 this.segmentLength = segmentLength; 3571 this.segmentPathName = segmentPathName; 3572 } 3573 3574 /** Do the sync checks */ 3575 public void doSync() {ignoreSync = false;} 3576 3577 /** Whether to delete the files when no longer needed */ 3578 public void preserveInput(boolean preserve) { 3579 preserveInput = preserve; 3580 } 3581 3582 public boolean shouldPreserveInput() { 3583 return preserveInput; 3584 } 3585 3586 @Override 3587 public int compareTo(Object o) { 3588 SegmentDescriptor that = (SegmentDescriptor)o; 3589 if (this.segmentLength != that.segmentLength) { 3590 return (this.segmentLength < that.segmentLength ? -1 : 1); 3591 } 3592 if (this.segmentOffset != that.segmentOffset) { 3593 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3594 } 3595 return (this.segmentPathName.toString()). 3596 compareTo(that.segmentPathName.toString()); 3597 } 3598 3599 @Override 3600 public boolean equals(Object o) { 3601 if (!(o instanceof SegmentDescriptor)) { 3602 return false; 3603 } 3604 SegmentDescriptor that = (SegmentDescriptor)o; 3605 if (this.segmentLength == that.segmentLength && 3606 this.segmentOffset == that.segmentOffset && 3607 this.segmentPathName.toString().equals( 3608 that.segmentPathName.toString())) { 3609 return true; 3610 } 3611 return false; 3612 } 3613 3614 @Override 3615 public int hashCode() { 3616 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3617 } 3618 3619 /** Fills up the rawKey object with the key returned by the Reader 3620 * @return true if there is a key returned; false, otherwise 3621 * @throws IOException 3622 */ 3623 public boolean nextRawKey() throws IOException { 3624 if (in == null) { 3625 int bufferSize = getBufferSize(conf); 3626 Reader reader = new Reader(conf, 3627 Reader.file(segmentPathName), 3628 Reader.bufferSize(bufferSize), 3629 Reader.start(segmentOffset), 3630 Reader.length(segmentLength)); 3631 3632 //sometimes we ignore syncs especially for temp merge files 3633 if (ignoreSync) reader.ignoreSync(); 3634 3635 if (reader.getKeyClass() != keyClass) 3636 throw new IOException("wrong key class: " + reader.getKeyClass() + 3637 " is not " + keyClass); 3638 if (reader.getValueClass() != valClass) 3639 throw new IOException("wrong value class: "+reader.getValueClass()+ 3640 " is not " + valClass); 3641 this.in = reader; 3642 rawKey = new DataOutputBuffer(); 3643 } 3644 rawKey.reset(); 3645 int keyLength = 3646 in.nextRawKey(rawKey); 3647 return (keyLength >= 0); 3648 } 3649 3650 /** Fills up the passed rawValue with the value corresponding to the key 3651 * read earlier 3652 * @param rawValue 3653 * @return the length of the value 3654 * @throws IOException 3655 */ 3656 public int nextRawValue(ValueBytes rawValue) throws IOException { 3657 int valLength = in.nextRawValue(rawValue); 3658 return valLength; 3659 } 3660 3661 /** Returns the stored rawKey */ 3662 public DataOutputBuffer getKey() { 3663 return rawKey; 3664 } 3665 3666 /** closes the underlying reader */ 3667 private void close() throws IOException { 3668 this.in.close(); 3669 this.in = null; 3670 } 3671 3672 /** The default cleanup. Subclasses can override this with a custom 3673 * cleanup 3674 */ 3675 public void cleanup() throws IOException { 3676 close(); 3677 if (!preserveInput) { 3678 fs.delete(segmentPathName, true); 3679 } 3680 } 3681 } // SequenceFile.Sorter.SegmentDescriptor 3682 3683 /** This class provisions multiple segments contained within a single 3684 * file 3685 */ 3686 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3687 3688 SegmentContainer parentContainer = null; 3689 3690 /** Constructs a segment 3691 * @param segmentOffset the offset of the segment in the file 3692 * @param segmentLength the length of the segment 3693 * @param segmentPathName the path name of the file containing the segment 3694 * @param parent the parent SegmentContainer that holds the segment 3695 */ 3696 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3697 Path segmentPathName, SegmentContainer parent) { 3698 super(segmentOffset, segmentLength, segmentPathName); 3699 this.parentContainer = parent; 3700 } 3701 /** The default cleanup. Subclasses can override this with a custom 3702 * cleanup 3703 */ 3704 @Override 3705 public void cleanup() throws IOException { 3706 super.close(); 3707 if (super.shouldPreserveInput()) return; 3708 parentContainer.cleanup(); 3709 } 3710 3711 @Override 3712 public boolean equals(Object o) { 3713 if (!(o instanceof LinkedSegmentsDescriptor)) { 3714 return false; 3715 } 3716 return super.equals(o); 3717 } 3718 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3719 3720 /** The class that defines a container for segments to be merged. Primarily 3721 * required to delete temp files as soon as all the contained segments 3722 * have been looked at */ 3723 private class SegmentContainer { 3724 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3725 private int numSegmentsContained; //# of segments contained 3726 private Path inName; //input file from where segments are created 3727 3728 //the list of segments read from the file 3729 private ArrayList <SegmentDescriptor> segments = 3730 new ArrayList <SegmentDescriptor>(); 3731 /** This constructor is there primarily to serve the sort routine that 3732 * generates a single output file with an associated index file */ 3733 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3734 //get the segments from indexIn 3735 FSDataInputStream fsIndexIn = fs.open(indexIn); 3736 long end = fs.getFileStatus(indexIn).getLen(); 3737 while (fsIndexIn.getPos() < end) { 3738 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3739 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3740 Path segmentName = inName; 3741 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3742 segmentLength, segmentName, this)); 3743 } 3744 fsIndexIn.close(); 3745 fs.delete(indexIn, true); 3746 numSegmentsContained = segments.size(); 3747 this.inName = inName; 3748 } 3749 3750 public List <SegmentDescriptor> getSegmentList() { 3751 return segments; 3752 } 3753 public void cleanup() throws IOException { 3754 numSegmentsCleanedUp++; 3755 if (numSegmentsCleanedUp == numSegmentsContained) { 3756 fs.delete(inName, true); 3757 } 3758 } 3759 } //SequenceFile.Sorter.SegmentContainer 3760 3761 } // SequenceFile.Sorter 3762 3763} // SequenceFile