001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.util.*; 023import java.rmi.server.UID; 024import java.security.MessageDigest; 025 026import org.apache.commons.io.Charsets; 027import org.apache.commons.logging.*; 028import org.apache.hadoop.util.Options; 029import org.apache.hadoop.fs.*; 030import org.apache.hadoop.fs.Options.CreateOpts; 031import org.apache.hadoop.io.compress.CodecPool; 032import org.apache.hadoop.io.compress.CompressionCodec; 033import org.apache.hadoop.io.compress.CompressionInputStream; 034import org.apache.hadoop.io.compress.CompressionOutputStream; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.hadoop.io.compress.DefaultCodec; 038import org.apache.hadoop.io.compress.GzipCodec; 039import org.apache.hadoop.io.compress.zlib.ZlibFactory; 040import org.apache.hadoop.io.serializer.Deserializer; 041import org.apache.hadoop.io.serializer.Serializer; 042import org.apache.hadoop.io.serializer.SerializationFactory; 043import org.apache.hadoop.classification.InterfaceAudience; 044import org.apache.hadoop.classification.InterfaceStability; 045import org.apache.hadoop.conf.*; 046import org.apache.hadoop.util.Progressable; 047import org.apache.hadoop.util.Progress; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.NativeCodeLoader; 050import org.apache.hadoop.util.MergeSort; 051import org.apache.hadoop.util.PriorityQueue; 052import org.apache.hadoop.util.Time; 053 054/** 055 * <code>SequenceFile</code>s are flat files consisting of binary key/value 056 * pairs. 057 * 058 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer}, 059 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing, 060 * reading and sorting respectively.</p> 061 * 062 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 063 * {@link CompressionType} used to compress key/value pairs: 064 * <ol> 065 * <li> 066 * <code>Writer</code> : Uncompressed records. 067 * </li> 068 * <li> 069 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 070 * values. 071 * </li> 072 * <li> 073 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 074 * values are collected in 'blocks' 075 * separately and compressed. The size of 076 * the 'block' is configurable. 077 * </ol> 078 * 079 * <p>The actual compression algorithm used to compress key and/or values can be 080 * specified by using the appropriate {@link CompressionCodec}.</p> 081 * 082 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 083 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 084 * 085 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the 086 * above <code>SequenceFile</code> formats.</p> 087 * 088 * <h4 id="Formats">SequenceFile Formats</h4> 089 * 090 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 091 * depending on the <code>CompressionType</code> specified. All of them share a 092 * <a href="#Header">common header</a> described below. 093 * 094 * <h5 id="Header">SequenceFile Header</h5> 095 * <ul> 096 * <li> 097 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 098 * version number (e.g. SEQ4 or SEQ6) 099 * </li> 100 * <li> 101 * keyClassName -key class 102 * </li> 103 * <li> 104 * valueClassName - value class 105 * </li> 106 * <li> 107 * compression - A boolean which specifies if compression is turned on for 108 * keys/values in this file. 109 * </li> 110 * <li> 111 * blockCompression - A boolean which specifies if block-compression is 112 * turned on for keys/values in this file. 113 * </li> 114 * <li> 115 * compression codec - <code>CompressionCodec</code> class which is used for 116 * compression of keys and/or values (if compression is 117 * enabled). 118 * </li> 119 * <li> 120 * metadata - {@link Metadata} for this file. 121 * </li> 122 * <li> 123 * sync - A sync marker to denote end of the header. 124 * </li> 125 * </ul> 126 * 127 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 128 * <ul> 129 * <li> 130 * <a href="#Header">Header</a> 131 * </li> 132 * <li> 133 * Record 134 * <ul> 135 * <li>Record length</li> 136 * <li>Key length</li> 137 * <li>Key</li> 138 * <li>Value</li> 139 * </ul> 140 * </li> 141 * <li> 142 * A sync-marker every few <code>100</code> bytes or so. 143 * </li> 144 * </ul> 145 * 146 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 147 * <ul> 148 * <li> 149 * <a href="#Header">Header</a> 150 * </li> 151 * <li> 152 * Record 153 * <ul> 154 * <li>Record length</li> 155 * <li>Key length</li> 156 * <li>Key</li> 157 * <li><i>Compressed</i> Value</li> 158 * </ul> 159 * </li> 160 * <li> 161 * A sync-marker every few <code>100</code> bytes or so. 162 * </li> 163 * </ul> 164 * 165 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 166 * <ul> 167 * <li> 168 * <a href="#Header">Header</a> 169 * </li> 170 * <li> 171 * Record <i>Block</i> 172 * <ul> 173 * <li>Uncompressed number of records in the block</li> 174 * <li>Compressed key-lengths block-size</li> 175 * <li>Compressed key-lengths block</li> 176 * <li>Compressed keys block-size</li> 177 * <li>Compressed keys block</li> 178 * <li>Compressed value-lengths block-size</li> 179 * <li>Compressed value-lengths block</li> 180 * <li>Compressed values block-size</li> 181 * <li>Compressed values block</li> 182 * </ul> 183 * </li> 184 * <li> 185 * A sync-marker every block. 186 * </li> 187 * </ul> 188 * 189 * <p>The compressed blocks of key lengths and value lengths consist of the 190 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 191 * format.</p> 192 * 193 * @see CompressionCodec 194 */ 195@InterfaceAudience.Public 196@InterfaceStability.Stable 197public class SequenceFile { 198 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 199 200 private SequenceFile() {} // no public ctor 201 202 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 203 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 204 private static final byte VERSION_WITH_METADATA = (byte)6; 205 private static byte[] VERSION = new byte[] { 206 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 207 }; 208 209 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 210 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 211 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 212 213 /** The number of bytes between sync points.*/ 214 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 215 216 /** 217 * The compression type used to compress key/value pairs in the 218 * {@link SequenceFile}. 219 * 220 * @see SequenceFile.Writer 221 */ 222 public static enum CompressionType { 223 /** Do not compress records. */ 224 NONE, 225 /** Compress values only, each separately. */ 226 RECORD, 227 /** Compress sequences of records together in blocks. */ 228 BLOCK 229 } 230 231 /** 232 * Get the compression type for the reduce outputs 233 * @param job the job config to look in 234 * @return the kind of compression to use 235 */ 236 static public CompressionType getDefaultCompressionType(Configuration job) { 237 String name = job.get("io.seqfile.compression.type"); 238 return name == null ? CompressionType.RECORD : 239 CompressionType.valueOf(name); 240 } 241 242 /** 243 * Set the default compression type for sequence files. 244 * @param job the configuration to modify 245 * @param val the new compression type (none, block, record) 246 */ 247 static public void setDefaultCompressionType(Configuration job, 248 CompressionType val) { 249 job.set("io.seqfile.compression.type", val.toString()); 250 } 251 252 /** 253 * Create a new Writer with the given options. 254 * @param conf the configuration to use 255 * @param opts the options to create the file with 256 * @return a new Writer 257 * @throws IOException 258 */ 259 public static Writer createWriter(Configuration conf, Writer.Option... opts 260 ) throws IOException { 261 Writer.CompressionOption compressionOption = 262 Options.getOption(Writer.CompressionOption.class, opts); 263 CompressionType kind; 264 if (compressionOption != null) { 265 kind = compressionOption.getValue(); 266 } else { 267 kind = getDefaultCompressionType(conf); 268 opts = Options.prependOptions(opts, Writer.compression(kind)); 269 } 270 switch (kind) { 271 default: 272 case NONE: 273 return new Writer(conf, opts); 274 case RECORD: 275 return new RecordCompressWriter(conf, opts); 276 case BLOCK: 277 return new BlockCompressWriter(conf, opts); 278 } 279 } 280 281 /** 282 * Construct the preferred type of SequenceFile Writer. 283 * @param fs The configured filesystem. 284 * @param conf The configuration. 285 * @param name The name of the file. 286 * @param keyClass The 'key' type. 287 * @param valClass The 'value' type. 288 * @return Returns the handle to the constructed SequenceFile Writer. 289 * @throws IOException 290 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 291 * instead. 292 */ 293 @Deprecated 294 public static Writer 295 createWriter(FileSystem fs, Configuration conf, Path name, 296 Class keyClass, Class valClass) throws IOException { 297 return createWriter(conf, Writer.filesystem(fs), 298 Writer.file(name), Writer.keyClass(keyClass), 299 Writer.valueClass(valClass)); 300 } 301 302 /** 303 * Construct the preferred type of SequenceFile Writer. 304 * @param fs The configured filesystem. 305 * @param conf The configuration. 306 * @param name The name of the file. 307 * @param keyClass The 'key' type. 308 * @param valClass The 'value' type. 309 * @param compressionType The compression type. 310 * @return Returns the handle to the constructed SequenceFile Writer. 311 * @throws IOException 312 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 313 * instead. 314 */ 315 @Deprecated 316 public static Writer 317 createWriter(FileSystem fs, Configuration conf, Path name, 318 Class keyClass, Class valClass, 319 CompressionType compressionType) throws IOException { 320 return createWriter(conf, Writer.filesystem(fs), 321 Writer.file(name), Writer.keyClass(keyClass), 322 Writer.valueClass(valClass), 323 Writer.compression(compressionType)); 324 } 325 326 /** 327 * Construct the preferred type of SequenceFile Writer. 328 * @param fs The configured filesystem. 329 * @param conf The configuration. 330 * @param name The name of the file. 331 * @param keyClass The 'key' type. 332 * @param valClass The 'value' type. 333 * @param compressionType The compression type. 334 * @param progress The Progressable object to track progress. 335 * @return Returns the handle to the constructed SequenceFile Writer. 336 * @throws IOException 337 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 338 * instead. 339 */ 340 @Deprecated 341 public static Writer 342 createWriter(FileSystem fs, Configuration conf, Path name, 343 Class keyClass, Class valClass, CompressionType compressionType, 344 Progressable progress) throws IOException { 345 return createWriter(conf, Writer.file(name), 346 Writer.filesystem(fs), 347 Writer.keyClass(keyClass), 348 Writer.valueClass(valClass), 349 Writer.compression(compressionType), 350 Writer.progressable(progress)); 351 } 352 353 /** 354 * Construct the preferred type of SequenceFile Writer. 355 * @param fs The configured filesystem. 356 * @param conf The configuration. 357 * @param name The name of the file. 358 * @param keyClass The 'key' type. 359 * @param valClass The 'value' type. 360 * @param compressionType The compression type. 361 * @param codec The compression codec. 362 * @return Returns the handle to the constructed SequenceFile Writer. 363 * @throws IOException 364 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 365 * instead. 366 */ 367 @Deprecated 368 public static Writer 369 createWriter(FileSystem fs, Configuration conf, Path name, 370 Class keyClass, Class valClass, CompressionType compressionType, 371 CompressionCodec codec) throws IOException { 372 return createWriter(conf, Writer.file(name), 373 Writer.filesystem(fs), 374 Writer.keyClass(keyClass), 375 Writer.valueClass(valClass), 376 Writer.compression(compressionType, codec)); 377 } 378 379 /** 380 * Construct the preferred type of SequenceFile Writer. 381 * @param fs The configured filesystem. 382 * @param conf The configuration. 383 * @param name The name of the file. 384 * @param keyClass The 'key' type. 385 * @param valClass The 'value' type. 386 * @param compressionType The compression type. 387 * @param codec The compression codec. 388 * @param progress The Progressable object to track progress. 389 * @param metadata The metadata of the file. 390 * @return Returns the handle to the constructed SequenceFile Writer. 391 * @throws IOException 392 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 393 * instead. 394 */ 395 @Deprecated 396 public static Writer 397 createWriter(FileSystem fs, Configuration conf, Path name, 398 Class keyClass, Class valClass, 399 CompressionType compressionType, CompressionCodec codec, 400 Progressable progress, Metadata metadata) throws IOException { 401 return createWriter(conf, Writer.file(name), 402 Writer.filesystem(fs), 403 Writer.keyClass(keyClass), 404 Writer.valueClass(valClass), 405 Writer.compression(compressionType, codec), 406 Writer.progressable(progress), 407 Writer.metadata(metadata)); 408 } 409 410 /** 411 * Construct the preferred type of SequenceFile Writer. 412 * @param fs The configured filesystem. 413 * @param conf The configuration. 414 * @param name The name of the file. 415 * @param keyClass The 'key' type. 416 * @param valClass The 'value' type. 417 * @param bufferSize buffer size for the underlaying outputstream. 418 * @param replication replication factor for the file. 419 * @param blockSize block size for the file. 420 * @param compressionType The compression type. 421 * @param codec The compression codec. 422 * @param progress The Progressable object to track progress. 423 * @param metadata The metadata of the file. 424 * @return Returns the handle to the constructed SequenceFile Writer. 425 * @throws IOException 426 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 427 * instead. 428 */ 429 @Deprecated 430 public static Writer 431 createWriter(FileSystem fs, Configuration conf, Path name, 432 Class keyClass, Class valClass, int bufferSize, 433 short replication, long blockSize, 434 CompressionType compressionType, CompressionCodec codec, 435 Progressable progress, Metadata metadata) throws IOException { 436 return createWriter(conf, Writer.file(name), 437 Writer.filesystem(fs), 438 Writer.keyClass(keyClass), 439 Writer.valueClass(valClass), 440 Writer.bufferSize(bufferSize), 441 Writer.replication(replication), 442 Writer.blockSize(blockSize), 443 Writer.compression(compressionType, codec), 444 Writer.progressable(progress), 445 Writer.metadata(metadata)); 446 } 447 448 /** 449 * Construct the preferred type of SequenceFile Writer. 450 * @param fs The configured filesystem. 451 * @param conf The configuration. 452 * @param name The name of the file. 453 * @param keyClass The 'key' type. 454 * @param valClass The 'value' type. 455 * @param bufferSize buffer size for the underlaying outputstream. 456 * @param replication replication factor for the file. 457 * @param blockSize block size for the file. 458 * @param createParent create parent directory if non-existent 459 * @param compressionType The compression type. 460 * @param codec The compression codec. 461 * @param metadata The metadata of the file. 462 * @return Returns the handle to the constructed SequenceFile Writer. 463 * @throws IOException 464 */ 465 @Deprecated 466 public static Writer 467 createWriter(FileSystem fs, Configuration conf, Path name, 468 Class keyClass, Class valClass, int bufferSize, 469 short replication, long blockSize, boolean createParent, 470 CompressionType compressionType, CompressionCodec codec, 471 Metadata metadata) throws IOException { 472 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 473 conf, name, keyClass, valClass, compressionType, codec, 474 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 475 CreateOpts.bufferSize(bufferSize), 476 createParent ? CreateOpts.createParent() 477 : CreateOpts.donotCreateParent(), 478 CreateOpts.repFac(replication), 479 CreateOpts.blockSize(blockSize) 480 ); 481 } 482 483 /** 484 * Construct the preferred type of SequenceFile Writer. 485 * @param fc The context for the specified file. 486 * @param conf The configuration. 487 * @param name The name of the file. 488 * @param keyClass The 'key' type. 489 * @param valClass The 'value' type. 490 * @param compressionType The compression type. 491 * @param codec The compression codec. 492 * @param metadata The metadata of the file. 493 * @param createFlag gives the semantics of create: overwrite, append etc. 494 * @param opts file creation options; see {@link CreateOpts}. 495 * @return Returns the handle to the constructed SequenceFile Writer. 496 * @throws IOException 497 */ 498 public static Writer 499 createWriter(FileContext fc, Configuration conf, Path name, 500 Class keyClass, Class valClass, 501 CompressionType compressionType, CompressionCodec codec, 502 Metadata metadata, 503 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 504 throws IOException { 505 return createWriter(conf, fc.create(name, createFlag, opts), 506 keyClass, valClass, compressionType, codec, metadata).ownStream(); 507 } 508 509 /** 510 * Construct the preferred type of SequenceFile Writer. 511 * @param fs The configured filesystem. 512 * @param conf The configuration. 513 * @param name The name of the file. 514 * @param keyClass The 'key' type. 515 * @param valClass The 'value' type. 516 * @param compressionType The compression type. 517 * @param codec The compression codec. 518 * @param progress The Progressable object to track progress. 519 * @return Returns the handle to the constructed SequenceFile Writer. 520 * @throws IOException 521 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 522 * instead. 523 */ 524 @Deprecated 525 public static Writer 526 createWriter(FileSystem fs, Configuration conf, Path name, 527 Class keyClass, Class valClass, 528 CompressionType compressionType, CompressionCodec codec, 529 Progressable progress) throws IOException { 530 return createWriter(conf, Writer.file(name), 531 Writer.filesystem(fs), 532 Writer.keyClass(keyClass), 533 Writer.valueClass(valClass), 534 Writer.compression(compressionType, codec), 535 Writer.progressable(progress)); 536 } 537 538 /** 539 * Construct the preferred type of 'raw' SequenceFile Writer. 540 * @param conf The configuration. 541 * @param out The stream on top which the writer is to be constructed. 542 * @param keyClass The 'key' type. 543 * @param valClass The 'value' type. 544 * @param compressionType The compression type. 545 * @param codec The compression codec. 546 * @param metadata The metadata of the file. 547 * @return Returns the handle to the constructed SequenceFile Writer. 548 * @throws IOException 549 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 550 * instead. 551 */ 552 @Deprecated 553 public static Writer 554 createWriter(Configuration conf, FSDataOutputStream out, 555 Class keyClass, Class valClass, 556 CompressionType compressionType, 557 CompressionCodec codec, Metadata metadata) throws IOException { 558 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 559 Writer.valueClass(valClass), 560 Writer.compression(compressionType, codec), 561 Writer.metadata(metadata)); 562 } 563 564 /** 565 * Construct the preferred type of 'raw' SequenceFile Writer. 566 * @param conf The configuration. 567 * @param out The stream on top which the writer is to be constructed. 568 * @param keyClass The 'key' type. 569 * @param valClass The 'value' type. 570 * @param compressionType The compression type. 571 * @param codec The compression codec. 572 * @return Returns the handle to the constructed SequenceFile Writer. 573 * @throws IOException 574 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 575 * instead. 576 */ 577 @Deprecated 578 public static Writer 579 createWriter(Configuration conf, FSDataOutputStream out, 580 Class keyClass, Class valClass, CompressionType compressionType, 581 CompressionCodec codec) throws IOException { 582 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 583 Writer.valueClass(valClass), 584 Writer.compression(compressionType, codec)); 585 } 586 587 588 /** The interface to 'raw' values of SequenceFiles. */ 589 public static interface ValueBytes { 590 591 /** Writes the uncompressed bytes to the outStream. 592 * @param outStream : Stream to write uncompressed bytes into. 593 * @throws IOException 594 */ 595 public void writeUncompressedBytes(DataOutputStream outStream) 596 throws IOException; 597 598 /** Write compressed bytes to outStream. 599 * Note: that it will NOT compress the bytes if they are not compressed. 600 * @param outStream : Stream to write compressed bytes into. 601 */ 602 public void writeCompressedBytes(DataOutputStream outStream) 603 throws IllegalArgumentException, IOException; 604 605 /** 606 * Size of stored data. 607 */ 608 public int getSize(); 609 } 610 611 private static class UncompressedBytes implements ValueBytes { 612 private int dataSize; 613 private byte[] data; 614 615 private UncompressedBytes() { 616 data = null; 617 dataSize = 0; 618 } 619 620 private void reset(DataInputStream in, int length) throws IOException { 621 if (data == null) { 622 data = new byte[length]; 623 } else if (length > data.length) { 624 data = new byte[Math.max(length, data.length * 2)]; 625 } 626 dataSize = -1; 627 in.readFully(data, 0, length); 628 dataSize = length; 629 } 630 631 @Override 632 public int getSize() { 633 return dataSize; 634 } 635 636 @Override 637 public void writeUncompressedBytes(DataOutputStream outStream) 638 throws IOException { 639 outStream.write(data, 0, dataSize); 640 } 641 642 @Override 643 public void writeCompressedBytes(DataOutputStream outStream) 644 throws IllegalArgumentException, IOException { 645 throw 646 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 647 } 648 649 } // UncompressedBytes 650 651 private static class CompressedBytes implements ValueBytes { 652 private int dataSize; 653 private byte[] data; 654 DataInputBuffer rawData = null; 655 CompressionCodec codec = null; 656 CompressionInputStream decompressedStream = null; 657 658 private CompressedBytes(CompressionCodec codec) { 659 data = null; 660 dataSize = 0; 661 this.codec = codec; 662 } 663 664 private void reset(DataInputStream in, int length) throws IOException { 665 if (data == null) { 666 data = new byte[length]; 667 } else if (length > data.length) { 668 data = new byte[Math.max(length, data.length * 2)]; 669 } 670 dataSize = -1; 671 in.readFully(data, 0, length); 672 dataSize = length; 673 } 674 675 @Override 676 public int getSize() { 677 return dataSize; 678 } 679 680 @Override 681 public void writeUncompressedBytes(DataOutputStream outStream) 682 throws IOException { 683 if (decompressedStream == null) { 684 rawData = new DataInputBuffer(); 685 decompressedStream = codec.createInputStream(rawData); 686 } else { 687 decompressedStream.resetState(); 688 } 689 rawData.reset(data, 0, dataSize); 690 691 byte[] buffer = new byte[8192]; 692 int bytesRead = 0; 693 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 694 outStream.write(buffer, 0, bytesRead); 695 } 696 } 697 698 @Override 699 public void writeCompressedBytes(DataOutputStream outStream) 700 throws IllegalArgumentException, IOException { 701 outStream.write(data, 0, dataSize); 702 } 703 704 } // CompressedBytes 705 706 /** 707 * The class encapsulating with the metadata of a file. 708 * The metadata of a file is a list of attribute name/value 709 * pairs of Text type. 710 * 711 */ 712 public static class Metadata implements Writable { 713 714 private TreeMap<Text, Text> theMetadata; 715 716 public Metadata() { 717 this(new TreeMap<Text, Text>()); 718 } 719 720 public Metadata(TreeMap<Text, Text> arg) { 721 if (arg == null) { 722 this.theMetadata = new TreeMap<Text, Text>(); 723 } else { 724 this.theMetadata = arg; 725 } 726 } 727 728 public Text get(Text name) { 729 return this.theMetadata.get(name); 730 } 731 732 public void set(Text name, Text value) { 733 this.theMetadata.put(name, value); 734 } 735 736 public TreeMap<Text, Text> getMetadata() { 737 return new TreeMap<Text, Text>(this.theMetadata); 738 } 739 740 @Override 741 public void write(DataOutput out) throws IOException { 742 out.writeInt(this.theMetadata.size()); 743 Iterator<Map.Entry<Text, Text>> iter = 744 this.theMetadata.entrySet().iterator(); 745 while (iter.hasNext()) { 746 Map.Entry<Text, Text> en = iter.next(); 747 en.getKey().write(out); 748 en.getValue().write(out); 749 } 750 } 751 752 @Override 753 public void readFields(DataInput in) throws IOException { 754 int sz = in.readInt(); 755 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 756 this.theMetadata = new TreeMap<Text, Text>(); 757 for (int i = 0; i < sz; i++) { 758 Text key = new Text(); 759 Text val = new Text(); 760 key.readFields(in); 761 val.readFields(in); 762 this.theMetadata.put(key, val); 763 } 764 } 765 766 @Override 767 public boolean equals(Object other) { 768 if (other == null) { 769 return false; 770 } 771 if (other.getClass() != this.getClass()) { 772 return false; 773 } else { 774 return equals((Metadata)other); 775 } 776 } 777 778 public boolean equals(Metadata other) { 779 if (other == null) return false; 780 if (this.theMetadata.size() != other.theMetadata.size()) { 781 return false; 782 } 783 Iterator<Map.Entry<Text, Text>> iter1 = 784 this.theMetadata.entrySet().iterator(); 785 Iterator<Map.Entry<Text, Text>> iter2 = 786 other.theMetadata.entrySet().iterator(); 787 while (iter1.hasNext() && iter2.hasNext()) { 788 Map.Entry<Text, Text> en1 = iter1.next(); 789 Map.Entry<Text, Text> en2 = iter2.next(); 790 if (!en1.getKey().equals(en2.getKey())) { 791 return false; 792 } 793 if (!en1.getValue().equals(en2.getValue())) { 794 return false; 795 } 796 } 797 if (iter1.hasNext() || iter2.hasNext()) { 798 return false; 799 } 800 return true; 801 } 802 803 @Override 804 public int hashCode() { 805 assert false : "hashCode not designed"; 806 return 42; // any arbitrary constant will do 807 } 808 809 @Override 810 public String toString() { 811 StringBuilder sb = new StringBuilder(); 812 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 813 Iterator<Map.Entry<Text, Text>> iter = 814 this.theMetadata.entrySet().iterator(); 815 while (iter.hasNext()) { 816 Map.Entry<Text, Text> en = iter.next(); 817 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 818 sb.append("\n"); 819 } 820 return sb.toString(); 821 } 822 } 823 824 /** Write key/value pairs to a sequence-format file. */ 825 public static class Writer implements java.io.Closeable, Syncable { 826 private Configuration conf; 827 FSDataOutputStream out; 828 boolean ownOutputStream = true; 829 DataOutputBuffer buffer = new DataOutputBuffer(); 830 831 Class keyClass; 832 Class valClass; 833 834 private final CompressionType compress; 835 CompressionCodec codec = null; 836 CompressionOutputStream deflateFilter = null; 837 DataOutputStream deflateOut = null; 838 Metadata metadata = null; 839 Compressor compressor = null; 840 841 private boolean appendMode = false; 842 843 protected Serializer keySerializer; 844 protected Serializer uncompressedValSerializer; 845 protected Serializer compressedValSerializer; 846 847 // Insert a globally unique 16-byte value every few entries, so that one 848 // can seek into the middle of a file and then synchronize with record 849 // starts and ends by scanning for this value. 850 long lastSyncPos; // position of last sync 851 byte[] sync; // 16 random bytes 852 { 853 try { 854 MessageDigest digester = MessageDigest.getInstance("MD5"); 855 long time = Time.now(); 856 digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8)); 857 sync = digester.digest(); 858 } catch (Exception e) { 859 throw new RuntimeException(e); 860 } 861 } 862 863 public static interface Option {} 864 865 static class FileOption extends Options.PathOption 866 implements Option { 867 FileOption(Path path) { 868 super(path); 869 } 870 } 871 872 /** 873 * @deprecated only used for backwards-compatibility in the createWriter methods 874 * that take FileSystem. 875 */ 876 @Deprecated 877 private static class FileSystemOption implements Option { 878 private final FileSystem value; 879 protected FileSystemOption(FileSystem value) { 880 this.value = value; 881 } 882 public FileSystem getValue() { 883 return value; 884 } 885 } 886 887 static class StreamOption extends Options.FSDataOutputStreamOption 888 implements Option { 889 StreamOption(FSDataOutputStream stream) { 890 super(stream); 891 } 892 } 893 894 static class BufferSizeOption extends Options.IntegerOption 895 implements Option { 896 BufferSizeOption(int value) { 897 super(value); 898 } 899 } 900 901 static class BlockSizeOption extends Options.LongOption implements Option { 902 BlockSizeOption(long value) { 903 super(value); 904 } 905 } 906 907 static class ReplicationOption extends Options.IntegerOption 908 implements Option { 909 ReplicationOption(int value) { 910 super(value); 911 } 912 } 913 914 static class AppendIfExistsOption extends Options.BooleanOption implements 915 Option { 916 AppendIfExistsOption(boolean value) { 917 super(value); 918 } 919 } 920 921 static class KeyClassOption extends Options.ClassOption implements Option { 922 KeyClassOption(Class<?> value) { 923 super(value); 924 } 925 } 926 927 static class ValueClassOption extends Options.ClassOption 928 implements Option { 929 ValueClassOption(Class<?> value) { 930 super(value); 931 } 932 } 933 934 static class MetadataOption implements Option { 935 private final Metadata value; 936 MetadataOption(Metadata value) { 937 this.value = value; 938 } 939 Metadata getValue() { 940 return value; 941 } 942 } 943 944 static class ProgressableOption extends Options.ProgressableOption 945 implements Option { 946 ProgressableOption(Progressable value) { 947 super(value); 948 } 949 } 950 951 private static class CompressionOption implements Option { 952 private final CompressionType value; 953 private final CompressionCodec codec; 954 CompressionOption(CompressionType value) { 955 this(value, null); 956 } 957 CompressionOption(CompressionType value, CompressionCodec codec) { 958 this.value = value; 959 this.codec = (CompressionType.NONE != value && null == codec) 960 ? new DefaultCodec() 961 : codec; 962 } 963 CompressionType getValue() { 964 return value; 965 } 966 CompressionCodec getCodec() { 967 return codec; 968 } 969 } 970 971 public static Option file(Path value) { 972 return new FileOption(value); 973 } 974 975 /** 976 * @deprecated only used for backwards-compatibility in the createWriter methods 977 * that take FileSystem. 978 */ 979 @Deprecated 980 private static Option filesystem(FileSystem fs) { 981 return new SequenceFile.Writer.FileSystemOption(fs); 982 } 983 984 public static Option bufferSize(int value) { 985 return new BufferSizeOption(value); 986 } 987 988 public static Option stream(FSDataOutputStream value) { 989 return new StreamOption(value); 990 } 991 992 public static Option replication(short value) { 993 return new ReplicationOption(value); 994 } 995 996 public static Option appendIfExists(boolean value) { 997 return new AppendIfExistsOption(value); 998 } 999 1000 public static Option blockSize(long value) { 1001 return new BlockSizeOption(value); 1002 } 1003 1004 public static Option progressable(Progressable value) { 1005 return new ProgressableOption(value); 1006 } 1007 1008 public static Option keyClass(Class<?> value) { 1009 return new KeyClassOption(value); 1010 } 1011 1012 public static Option valueClass(Class<?> value) { 1013 return new ValueClassOption(value); 1014 } 1015 1016 public static Option metadata(Metadata value) { 1017 return new MetadataOption(value); 1018 } 1019 1020 public static Option compression(CompressionType value) { 1021 return new CompressionOption(value); 1022 } 1023 1024 public static Option compression(CompressionType value, 1025 CompressionCodec codec) { 1026 return new CompressionOption(value, codec); 1027 } 1028 1029 /** 1030 * Construct a uncompressed writer from a set of options. 1031 * @param conf the configuration to use 1032 * @param options the options used when creating the writer 1033 * @throws IOException if it fails 1034 */ 1035 Writer(Configuration conf, 1036 Option... opts) throws IOException { 1037 BlockSizeOption blockSizeOption = 1038 Options.getOption(BlockSizeOption.class, opts); 1039 BufferSizeOption bufferSizeOption = 1040 Options.getOption(BufferSizeOption.class, opts); 1041 ReplicationOption replicationOption = 1042 Options.getOption(ReplicationOption.class, opts); 1043 ProgressableOption progressOption = 1044 Options.getOption(ProgressableOption.class, opts); 1045 FileOption fileOption = Options.getOption(FileOption.class, opts); 1046 AppendIfExistsOption appendIfExistsOption = Options.getOption( 1047 AppendIfExistsOption.class, opts); 1048 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1049 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1050 KeyClassOption keyClassOption = 1051 Options.getOption(KeyClassOption.class, opts); 1052 ValueClassOption valueClassOption = 1053 Options.getOption(ValueClassOption.class, opts); 1054 MetadataOption metadataOption = 1055 Options.getOption(MetadataOption.class, opts); 1056 CompressionOption compressionTypeOption = 1057 Options.getOption(CompressionOption.class, opts); 1058 // check consistency of options 1059 if ((fileOption == null) == (streamOption == null)) { 1060 throw new IllegalArgumentException("file or stream must be specified"); 1061 } 1062 if (fileOption == null && (blockSizeOption != null || 1063 bufferSizeOption != null || 1064 replicationOption != null || 1065 progressOption != null)) { 1066 throw new IllegalArgumentException("file modifier options not " + 1067 "compatible with stream"); 1068 } 1069 1070 FSDataOutputStream out; 1071 boolean ownStream = fileOption != null; 1072 if (ownStream) { 1073 Path p = fileOption.getValue(); 1074 FileSystem fs; 1075 if (fsOption != null) { 1076 fs = fsOption.getValue(); 1077 } else { 1078 fs = p.getFileSystem(conf); 1079 } 1080 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1081 bufferSizeOption.getValue(); 1082 short replication = replicationOption == null ? 1083 fs.getDefaultReplication(p) : 1084 (short) replicationOption.getValue(); 1085 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1086 blockSizeOption.getValue(); 1087 Progressable progress = progressOption == null ? null : 1088 progressOption.getValue(); 1089 1090 if (appendIfExistsOption != null && appendIfExistsOption.getValue() 1091 && fs.exists(p)) { 1092 1093 // Read the file and verify header details 1094 SequenceFile.Reader reader = new SequenceFile.Reader(conf, 1095 SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption()); 1096 try { 1097 1098 if (keyClassOption.getValue() != reader.getKeyClass() 1099 || valueClassOption.getValue() != reader.getValueClass()) { 1100 throw new IllegalArgumentException( 1101 "Key/value class provided does not match the file"); 1102 } 1103 1104 if (reader.getVersion() != VERSION[3]) { 1105 throw new VersionMismatchException(VERSION[3], 1106 reader.getVersion()); 1107 } 1108 1109 if (metadataOption != null) { 1110 LOG.info("MetaData Option is ignored during append"); 1111 } 1112 metadataOption = (MetadataOption) SequenceFile.Writer 1113 .metadata(reader.getMetadata()); 1114 1115 CompressionOption readerCompressionOption = new CompressionOption( 1116 reader.getCompressionType(), reader.getCompressionCodec()); 1117 1118 if (readerCompressionOption.value != compressionTypeOption.value 1119 || !readerCompressionOption.codec.getClass().getName() 1120 .equals(compressionTypeOption.codec.getClass().getName())) { 1121 throw new IllegalArgumentException( 1122 "Compression option provided does not match the file"); 1123 } 1124 1125 sync = reader.getSync(); 1126 1127 } finally { 1128 reader.close(); 1129 } 1130 1131 out = fs.append(p, bufferSize, progress); 1132 this.appendMode = true; 1133 } else { 1134 out = fs 1135 .create(p, true, bufferSize, replication, blockSize, progress); 1136 } 1137 } else { 1138 out = streamOption.getValue(); 1139 } 1140 Class<?> keyClass = keyClassOption == null ? 1141 Object.class : keyClassOption.getValue(); 1142 Class<?> valueClass = valueClassOption == null ? 1143 Object.class : valueClassOption.getValue(); 1144 Metadata metadata = metadataOption == null ? 1145 new Metadata() : metadataOption.getValue(); 1146 this.compress = compressionTypeOption.getValue(); 1147 final CompressionCodec codec = compressionTypeOption.getCodec(); 1148 if (codec != null && 1149 (codec instanceof GzipCodec) && 1150 !NativeCodeLoader.isNativeCodeLoaded() && 1151 !ZlibFactory.isNativeZlibLoaded(conf)) { 1152 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1153 "GzipCodec without native-hadoop " + 1154 "code!"); 1155 } 1156 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1157 } 1158 1159 /** Create the named file. 1160 * @deprecated Use 1161 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1162 * instead. 1163 */ 1164 @Deprecated 1165 public Writer(FileSystem fs, Configuration conf, Path name, 1166 Class keyClass, Class valClass) throws IOException { 1167 this.compress = CompressionType.NONE; 1168 init(conf, fs.create(name), true, keyClass, valClass, null, 1169 new Metadata()); 1170 } 1171 1172 /** Create the named file with write-progress reporter. 1173 * @deprecated Use 1174 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1175 * instead. 1176 */ 1177 @Deprecated 1178 public Writer(FileSystem fs, Configuration conf, Path name, 1179 Class keyClass, Class valClass, 1180 Progressable progress, Metadata metadata) throws IOException { 1181 this.compress = CompressionType.NONE; 1182 init(conf, fs.create(name, progress), true, keyClass, valClass, 1183 null, metadata); 1184 } 1185 1186 /** Create the named file with write-progress reporter. 1187 * @deprecated Use 1188 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1189 * instead. 1190 */ 1191 @Deprecated 1192 public Writer(FileSystem fs, Configuration conf, Path name, 1193 Class keyClass, Class valClass, 1194 int bufferSize, short replication, long blockSize, 1195 Progressable progress, Metadata metadata) throws IOException { 1196 this.compress = CompressionType.NONE; 1197 init(conf, 1198 fs.create(name, true, bufferSize, replication, blockSize, progress), 1199 true, keyClass, valClass, null, metadata); 1200 } 1201 1202 boolean isCompressed() { return compress != CompressionType.NONE; } 1203 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1204 1205 Writer ownStream() { this.ownOutputStream = true; return this; } 1206 1207 /** Write and flush the file header. */ 1208 private void writeFileHeader() 1209 throws IOException { 1210 out.write(VERSION); 1211 Text.writeString(out, keyClass.getName()); 1212 Text.writeString(out, valClass.getName()); 1213 1214 out.writeBoolean(this.isCompressed()); 1215 out.writeBoolean(this.isBlockCompressed()); 1216 1217 if (this.isCompressed()) { 1218 Text.writeString(out, (codec.getClass()).getName()); 1219 } 1220 this.metadata.write(out); 1221 out.write(sync); // write the sync bytes 1222 out.flush(); // flush header 1223 } 1224 1225 /** Initialize. */ 1226 @SuppressWarnings("unchecked") 1227 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1228 Class keyClass, Class valClass, 1229 CompressionCodec codec, Metadata metadata) 1230 throws IOException { 1231 this.conf = conf; 1232 this.out = out; 1233 this.ownOutputStream = ownStream; 1234 this.keyClass = keyClass; 1235 this.valClass = valClass; 1236 this.codec = codec; 1237 this.metadata = metadata; 1238 SerializationFactory serializationFactory = new SerializationFactory(conf); 1239 this.keySerializer = serializationFactory.getSerializer(keyClass); 1240 if (this.keySerializer == null) { 1241 throw new IOException( 1242 "Could not find a serializer for the Key class: '" 1243 + keyClass.getCanonicalName() + "'. " 1244 + "Please ensure that the configuration '" + 1245 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1246 + "properly configured, if you're using" 1247 + "custom serialization."); 1248 } 1249 this.keySerializer.open(buffer); 1250 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1251 if (this.uncompressedValSerializer == null) { 1252 throw new IOException( 1253 "Could not find a serializer for the Value class: '" 1254 + valClass.getCanonicalName() + "'. " 1255 + "Please ensure that the configuration '" + 1256 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1257 + "properly configured, if you're using" 1258 + "custom serialization."); 1259 } 1260 this.uncompressedValSerializer.open(buffer); 1261 if (this.codec != null) { 1262 ReflectionUtils.setConf(this.codec, this.conf); 1263 this.compressor = CodecPool.getCompressor(this.codec); 1264 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1265 this.deflateOut = 1266 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1267 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1268 if (this.compressedValSerializer == null) { 1269 throw new IOException( 1270 "Could not find a serializer for the Value class: '" 1271 + valClass.getCanonicalName() + "'. " 1272 + "Please ensure that the configuration '" + 1273 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1274 + "properly configured, if you're using" 1275 + "custom serialization."); 1276 } 1277 this.compressedValSerializer.open(deflateOut); 1278 } 1279 1280 if (appendMode) { 1281 sync(); 1282 } else { 1283 writeFileHeader(); 1284 } 1285 } 1286 1287 /** Returns the class of keys in this file. */ 1288 public Class getKeyClass() { return keyClass; } 1289 1290 /** Returns the class of values in this file. */ 1291 public Class getValueClass() { return valClass; } 1292 1293 /** Returns the compression codec of data in this file. */ 1294 public CompressionCodec getCompressionCodec() { return codec; } 1295 1296 /** create a sync point */ 1297 public void sync() throws IOException { 1298 if (sync != null && lastSyncPos != out.getPos()) { 1299 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1300 out.write(sync); // write sync 1301 lastSyncPos = out.getPos(); // update lastSyncPos 1302 } 1303 } 1304 1305 /** 1306 * flush all currently written data to the file system 1307 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1308 */ 1309 @Deprecated 1310 public void syncFs() throws IOException { 1311 if (out != null) { 1312 out.hflush(); // flush contents to file system 1313 } 1314 } 1315 1316 @Override 1317 public void hsync() throws IOException { 1318 if (out != null) { 1319 out.hsync(); 1320 } 1321 } 1322 1323 @Override 1324 public void hflush() throws IOException { 1325 if (out != null) { 1326 out.hflush(); 1327 } 1328 } 1329 1330 /** Returns the configuration of this file. */ 1331 Configuration getConf() { return conf; } 1332 1333 /** Close the file. */ 1334 @Override 1335 public synchronized void close() throws IOException { 1336 keySerializer.close(); 1337 uncompressedValSerializer.close(); 1338 if (compressedValSerializer != null) { 1339 compressedValSerializer.close(); 1340 } 1341 1342 CodecPool.returnCompressor(compressor); 1343 compressor = null; 1344 1345 if (out != null) { 1346 1347 // Close the underlying stream iff we own it... 1348 if (ownOutputStream) { 1349 out.close(); 1350 } else { 1351 out.flush(); 1352 } 1353 out = null; 1354 } 1355 } 1356 1357 synchronized void checkAndWriteSync() throws IOException { 1358 if (sync != null && 1359 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1360 sync(); 1361 } 1362 } 1363 1364 /** Append a key/value pair. */ 1365 public void append(Writable key, Writable val) 1366 throws IOException { 1367 append((Object) key, (Object) val); 1368 } 1369 1370 /** Append a key/value pair. */ 1371 @SuppressWarnings("unchecked") 1372 public synchronized void append(Object key, Object val) 1373 throws IOException { 1374 if (key.getClass() != keyClass) 1375 throw new IOException("wrong key class: "+key.getClass().getName() 1376 +" is not "+keyClass); 1377 if (val.getClass() != valClass) 1378 throw new IOException("wrong value class: "+val.getClass().getName() 1379 +" is not "+valClass); 1380 1381 buffer.reset(); 1382 1383 // Append the 'key' 1384 keySerializer.serialize(key); 1385 int keyLength = buffer.getLength(); 1386 if (keyLength < 0) 1387 throw new IOException("negative length keys not allowed: " + key); 1388 1389 // Append the 'value' 1390 if (compress == CompressionType.RECORD) { 1391 deflateFilter.resetState(); 1392 compressedValSerializer.serialize(val); 1393 deflateOut.flush(); 1394 deflateFilter.finish(); 1395 } else { 1396 uncompressedValSerializer.serialize(val); 1397 } 1398 1399 // Write the record out 1400 checkAndWriteSync(); // sync 1401 out.writeInt(buffer.getLength()); // total record length 1402 out.writeInt(keyLength); // key portion length 1403 out.write(buffer.getData(), 0, buffer.getLength()); // data 1404 } 1405 1406 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1407 int keyLength, ValueBytes val) throws IOException { 1408 if (keyLength < 0) 1409 throw new IOException("negative length keys not allowed: " + keyLength); 1410 1411 int valLength = val.getSize(); 1412 1413 checkAndWriteSync(); 1414 1415 out.writeInt(keyLength+valLength); // total record length 1416 out.writeInt(keyLength); // key portion length 1417 out.write(keyData, keyOffset, keyLength); // key 1418 val.writeUncompressedBytes(out); // value 1419 } 1420 1421 /** Returns the current length of the output file. 1422 * 1423 * <p>This always returns a synchronized position. In other words, 1424 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1425 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1426 * the key may be earlier in the file than key last written when this 1427 * method was called (e.g., with block-compression, it may be the first key 1428 * in the block that was being written when this method was called). 1429 */ 1430 public synchronized long getLength() throws IOException { 1431 return out.getPos(); 1432 } 1433 1434 } // class Writer 1435 1436 /** Write key/compressed-value pairs to a sequence-format file. */ 1437 static class RecordCompressWriter extends Writer { 1438 1439 RecordCompressWriter(Configuration conf, 1440 Option... options) throws IOException { 1441 super(conf, options); 1442 } 1443 1444 /** Append a key/value pair. */ 1445 @Override 1446 @SuppressWarnings("unchecked") 1447 public synchronized void append(Object key, Object val) 1448 throws IOException { 1449 if (key.getClass() != keyClass) 1450 throw new IOException("wrong key class: "+key.getClass().getName() 1451 +" is not "+keyClass); 1452 if (val.getClass() != valClass) 1453 throw new IOException("wrong value class: "+val.getClass().getName() 1454 +" is not "+valClass); 1455 1456 buffer.reset(); 1457 1458 // Append the 'key' 1459 keySerializer.serialize(key); 1460 int keyLength = buffer.getLength(); 1461 if (keyLength < 0) 1462 throw new IOException("negative length keys not allowed: " + key); 1463 1464 // Compress 'value' and append it 1465 deflateFilter.resetState(); 1466 compressedValSerializer.serialize(val); 1467 deflateOut.flush(); 1468 deflateFilter.finish(); 1469 1470 // Write the record out 1471 checkAndWriteSync(); // sync 1472 out.writeInt(buffer.getLength()); // total record length 1473 out.writeInt(keyLength); // key portion length 1474 out.write(buffer.getData(), 0, buffer.getLength()); // data 1475 } 1476 1477 /** Append a key/value pair. */ 1478 @Override 1479 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1480 int keyLength, ValueBytes val) throws IOException { 1481 1482 if (keyLength < 0) 1483 throw new IOException("negative length keys not allowed: " + keyLength); 1484 1485 int valLength = val.getSize(); 1486 1487 checkAndWriteSync(); // sync 1488 out.writeInt(keyLength+valLength); // total record length 1489 out.writeInt(keyLength); // key portion length 1490 out.write(keyData, keyOffset, keyLength); // 'key' data 1491 val.writeCompressedBytes(out); // 'value' data 1492 } 1493 1494 } // RecordCompressionWriter 1495 1496 /** Write compressed key/value blocks to a sequence-format file. */ 1497 static class BlockCompressWriter extends Writer { 1498 1499 private int noBufferedRecords = 0; 1500 1501 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1502 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1503 1504 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1505 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1506 1507 private final int compressionBlockSize; 1508 1509 BlockCompressWriter(Configuration conf, 1510 Option... options) throws IOException { 1511 super(conf, options); 1512 compressionBlockSize = 1513 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1514 keySerializer.close(); 1515 keySerializer.open(keyBuffer); 1516 uncompressedValSerializer.close(); 1517 uncompressedValSerializer.open(valBuffer); 1518 } 1519 1520 /** Workhorse to check and write out compressed data/lengths */ 1521 private synchronized 1522 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1523 throws IOException { 1524 deflateFilter.resetState(); 1525 buffer.reset(); 1526 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1527 uncompressedDataBuffer.getLength()); 1528 deflateOut.flush(); 1529 deflateFilter.finish(); 1530 1531 WritableUtils.writeVInt(out, buffer.getLength()); 1532 out.write(buffer.getData(), 0, buffer.getLength()); 1533 } 1534 1535 /** Compress and flush contents to dfs */ 1536 @Override 1537 public synchronized void sync() throws IOException { 1538 if (noBufferedRecords > 0) { 1539 super.sync(); 1540 1541 // No. of records 1542 WritableUtils.writeVInt(out, noBufferedRecords); 1543 1544 // Write 'keys' and lengths 1545 writeBuffer(keyLenBuffer); 1546 writeBuffer(keyBuffer); 1547 1548 // Write 'values' and lengths 1549 writeBuffer(valLenBuffer); 1550 writeBuffer(valBuffer); 1551 1552 // Flush the file-stream 1553 out.flush(); 1554 1555 // Reset internal states 1556 keyLenBuffer.reset(); 1557 keyBuffer.reset(); 1558 valLenBuffer.reset(); 1559 valBuffer.reset(); 1560 noBufferedRecords = 0; 1561 } 1562 1563 } 1564 1565 /** Close the file. */ 1566 @Override 1567 public synchronized void close() throws IOException { 1568 if (out != null) { 1569 sync(); 1570 } 1571 super.close(); 1572 } 1573 1574 /** Append a key/value pair. */ 1575 @Override 1576 @SuppressWarnings("unchecked") 1577 public synchronized void append(Object key, Object val) 1578 throws IOException { 1579 if (key.getClass() != keyClass) 1580 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1581 if (val.getClass() != valClass) 1582 throw new IOException("wrong value class: "+val+" is not "+valClass); 1583 1584 // Save key/value into respective buffers 1585 int oldKeyLength = keyBuffer.getLength(); 1586 keySerializer.serialize(key); 1587 int keyLength = keyBuffer.getLength() - oldKeyLength; 1588 if (keyLength < 0) 1589 throw new IOException("negative length keys not allowed: " + key); 1590 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1591 1592 int oldValLength = valBuffer.getLength(); 1593 uncompressedValSerializer.serialize(val); 1594 int valLength = valBuffer.getLength() - oldValLength; 1595 WritableUtils.writeVInt(valLenBuffer, valLength); 1596 1597 // Added another key/value pair 1598 ++noBufferedRecords; 1599 1600 // Compress and flush? 1601 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1602 if (currentBlockSize >= compressionBlockSize) { 1603 sync(); 1604 } 1605 } 1606 1607 /** Append a key/value pair. */ 1608 @Override 1609 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1610 int keyLength, ValueBytes val) throws IOException { 1611 1612 if (keyLength < 0) 1613 throw new IOException("negative length keys not allowed"); 1614 1615 int valLength = val.getSize(); 1616 1617 // Save key/value data in relevant buffers 1618 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1619 keyBuffer.write(keyData, keyOffset, keyLength); 1620 WritableUtils.writeVInt(valLenBuffer, valLength); 1621 val.writeUncompressedBytes(valBuffer); 1622 1623 // Added another key/value pair 1624 ++noBufferedRecords; 1625 1626 // Compress and flush? 1627 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1628 if (currentBlockSize >= compressionBlockSize) { 1629 sync(); 1630 } 1631 } 1632 1633 } // BlockCompressionWriter 1634 1635 /** Get the configured buffer size */ 1636 private static int getBufferSize(Configuration conf) { 1637 return conf.getInt("io.file.buffer.size", 4096); 1638 } 1639 1640 /** Reads key/value pairs from a sequence-format file. */ 1641 public static class Reader implements java.io.Closeable { 1642 private String filename; 1643 private FSDataInputStream in; 1644 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1645 1646 private byte version; 1647 1648 private String keyClassName; 1649 private String valClassName; 1650 private Class keyClass; 1651 private Class valClass; 1652 1653 private CompressionCodec codec = null; 1654 private Metadata metadata = null; 1655 1656 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1657 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1658 private boolean syncSeen; 1659 1660 private long headerEnd; 1661 private long end; 1662 private int keyLength; 1663 private int recordLength; 1664 1665 private boolean decompress; 1666 private boolean blockCompressed; 1667 1668 private Configuration conf; 1669 1670 private int noBufferedRecords = 0; 1671 private boolean lazyDecompress = true; 1672 private boolean valuesDecompressed = true; 1673 1674 private int noBufferedKeys = 0; 1675 private int noBufferedValues = 0; 1676 1677 private DataInputBuffer keyLenBuffer = null; 1678 private CompressionInputStream keyLenInFilter = null; 1679 private DataInputStream keyLenIn = null; 1680 private Decompressor keyLenDecompressor = null; 1681 private DataInputBuffer keyBuffer = null; 1682 private CompressionInputStream keyInFilter = null; 1683 private DataInputStream keyIn = null; 1684 private Decompressor keyDecompressor = null; 1685 1686 private DataInputBuffer valLenBuffer = null; 1687 private CompressionInputStream valLenInFilter = null; 1688 private DataInputStream valLenIn = null; 1689 private Decompressor valLenDecompressor = null; 1690 private DataInputBuffer valBuffer = null; 1691 private CompressionInputStream valInFilter = null; 1692 private DataInputStream valIn = null; 1693 private Decompressor valDecompressor = null; 1694 1695 private Deserializer keyDeserializer; 1696 private Deserializer valDeserializer; 1697 1698 /** 1699 * A tag interface for all of the Reader options 1700 */ 1701 public static interface Option {} 1702 1703 /** 1704 * Create an option to specify the path name of the sequence file. 1705 * @param value the path to read 1706 * @return a new option 1707 */ 1708 public static Option file(Path value) { 1709 return new FileOption(value); 1710 } 1711 1712 /** 1713 * Create an option to specify the stream with the sequence file. 1714 * @param value the stream to read. 1715 * @return a new option 1716 */ 1717 public static Option stream(FSDataInputStream value) { 1718 return new InputStreamOption(value); 1719 } 1720 1721 /** 1722 * Create an option to specify the starting byte to read. 1723 * @param value the number of bytes to skip over 1724 * @return a new option 1725 */ 1726 public static Option start(long value) { 1727 return new StartOption(value); 1728 } 1729 1730 /** 1731 * Create an option to specify the number of bytes to read. 1732 * @param value the number of bytes to read 1733 * @return a new option 1734 */ 1735 public static Option length(long value) { 1736 return new LengthOption(value); 1737 } 1738 1739 /** 1740 * Create an option with the buffer size for reading the given pathname. 1741 * @param value the number of bytes to buffer 1742 * @return a new option 1743 */ 1744 public static Option bufferSize(int value) { 1745 return new BufferSizeOption(value); 1746 } 1747 1748 private static class FileOption extends Options.PathOption 1749 implements Option { 1750 private FileOption(Path value) { 1751 super(value); 1752 } 1753 } 1754 1755 private static class InputStreamOption 1756 extends Options.FSDataInputStreamOption 1757 implements Option { 1758 private InputStreamOption(FSDataInputStream value) { 1759 super(value); 1760 } 1761 } 1762 1763 private static class StartOption extends Options.LongOption 1764 implements Option { 1765 private StartOption(long value) { 1766 super(value); 1767 } 1768 } 1769 1770 private static class LengthOption extends Options.LongOption 1771 implements Option { 1772 private LengthOption(long value) { 1773 super(value); 1774 } 1775 } 1776 1777 private static class BufferSizeOption extends Options.IntegerOption 1778 implements Option { 1779 private BufferSizeOption(int value) { 1780 super(value); 1781 } 1782 } 1783 1784 // only used directly 1785 private static class OnlyHeaderOption extends Options.BooleanOption 1786 implements Option { 1787 private OnlyHeaderOption() { 1788 super(true); 1789 } 1790 } 1791 1792 public Reader(Configuration conf, Option... opts) throws IOException { 1793 // Look up the options, these are null if not set 1794 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1795 InputStreamOption streamOpt = 1796 Options.getOption(InputStreamOption.class, opts); 1797 StartOption startOpt = Options.getOption(StartOption.class, opts); 1798 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1799 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1800 OnlyHeaderOption headerOnly = 1801 Options.getOption(OnlyHeaderOption.class, opts); 1802 // check for consistency 1803 if ((fileOpt == null) == (streamOpt == null)) { 1804 throw new 1805 IllegalArgumentException("File or stream option must be specified"); 1806 } 1807 if (fileOpt == null && bufOpt != null) { 1808 throw new IllegalArgumentException("buffer size can only be set when" + 1809 " a file is specified."); 1810 } 1811 // figure out the real values 1812 Path filename = null; 1813 FSDataInputStream file; 1814 final long len; 1815 if (fileOpt != null) { 1816 filename = fileOpt.getValue(); 1817 FileSystem fs = filename.getFileSystem(conf); 1818 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1819 len = null == lenOpt 1820 ? fs.getFileStatus(filename).getLen() 1821 : lenOpt.getValue(); 1822 file = openFile(fs, filename, bufSize, len); 1823 } else { 1824 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1825 file = streamOpt.getValue(); 1826 } 1827 long start = startOpt == null ? 0 : startOpt.getValue(); 1828 // really set up 1829 initialize(filename, file, start, len, conf, headerOnly != null); 1830 } 1831 1832 /** 1833 * Construct a reader by opening a file from the given file system. 1834 * @param fs The file system used to open the file. 1835 * @param file The file being read. 1836 * @param conf Configuration 1837 * @throws IOException 1838 * @deprecated Use Reader(Configuration, Option...) instead. 1839 */ 1840 @Deprecated 1841 public Reader(FileSystem fs, Path file, 1842 Configuration conf) throws IOException { 1843 this(conf, file(file.makeQualified(fs))); 1844 } 1845 1846 /** 1847 * Construct a reader by the given input stream. 1848 * @param in An input stream. 1849 * @param buffersize unused 1850 * @param start The starting position. 1851 * @param length The length being read. 1852 * @param conf Configuration 1853 * @throws IOException 1854 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1855 */ 1856 @Deprecated 1857 public Reader(FSDataInputStream in, int buffersize, 1858 long start, long length, Configuration conf) throws IOException { 1859 this(conf, stream(in), start(start), length(length)); 1860 } 1861 1862 /** Common work of the constructors. */ 1863 private void initialize(Path filename, FSDataInputStream in, 1864 long start, long length, Configuration conf, 1865 boolean tempReader) throws IOException { 1866 if (in == null) { 1867 throw new IllegalArgumentException("in == null"); 1868 } 1869 this.filename = filename == null ? "<unknown>" : filename.toString(); 1870 this.in = in; 1871 this.conf = conf; 1872 boolean succeeded = false; 1873 try { 1874 seek(start); 1875 this.end = this.in.getPos() + length; 1876 // if it wrapped around, use the max 1877 if (end < length) { 1878 end = Long.MAX_VALUE; 1879 } 1880 init(tempReader); 1881 succeeded = true; 1882 } finally { 1883 if (!succeeded) { 1884 IOUtils.cleanup(LOG, this.in); 1885 } 1886 } 1887 } 1888 1889 /** 1890 * Override this method to specialize the type of 1891 * {@link FSDataInputStream} returned. 1892 * @param fs The file system used to open the file. 1893 * @param file The file being read. 1894 * @param bufferSize The buffer size used to read the file. 1895 * @param length The length being read if it is >= 0. Otherwise, 1896 * the length is not available. 1897 * @return The opened stream. 1898 * @throws IOException 1899 */ 1900 protected FSDataInputStream openFile(FileSystem fs, Path file, 1901 int bufferSize, long length) throws IOException { 1902 return fs.open(file, bufferSize); 1903 } 1904 1905 /** 1906 * Initialize the {@link Reader} 1907 * @param tmpReader <code>true</code> if we are constructing a temporary 1908 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1909 * and hence do not initialize every component; 1910 * <code>false</code> otherwise. 1911 * @throws IOException 1912 */ 1913 private void init(boolean tempReader) throws IOException { 1914 byte[] versionBlock = new byte[VERSION.length]; 1915 in.readFully(versionBlock); 1916 1917 if ((versionBlock[0] != VERSION[0]) || 1918 (versionBlock[1] != VERSION[1]) || 1919 (versionBlock[2] != VERSION[2])) 1920 throw new IOException(this + " not a SequenceFile"); 1921 1922 // Set 'version' 1923 version = versionBlock[3]; 1924 if (version > VERSION[3]) 1925 throw new VersionMismatchException(VERSION[3], version); 1926 1927 if (version < BLOCK_COMPRESS_VERSION) { 1928 UTF8 className = new UTF8(); 1929 1930 className.readFields(in); 1931 keyClassName = className.toStringChecked(); // key class name 1932 1933 className.readFields(in); 1934 valClassName = className.toStringChecked(); // val class name 1935 } else { 1936 keyClassName = Text.readString(in); 1937 valClassName = Text.readString(in); 1938 } 1939 1940 if (version > 2) { // if version > 2 1941 this.decompress = in.readBoolean(); // is compressed? 1942 } else { 1943 decompress = false; 1944 } 1945 1946 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1947 this.blockCompressed = in.readBoolean(); // is block-compressed? 1948 } else { 1949 blockCompressed = false; 1950 } 1951 1952 // if version >= 5 1953 // setup the compression codec 1954 if (decompress) { 1955 if (version >= CUSTOM_COMPRESS_VERSION) { 1956 String codecClassname = Text.readString(in); 1957 try { 1958 Class<? extends CompressionCodec> codecClass 1959 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1960 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1961 } catch (ClassNotFoundException cnfe) { 1962 throw new IllegalArgumentException("Unknown codec: " + 1963 codecClassname, cnfe); 1964 } 1965 } else { 1966 codec = new DefaultCodec(); 1967 ((Configurable)codec).setConf(conf); 1968 } 1969 } 1970 1971 this.metadata = new Metadata(); 1972 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1973 this.metadata.readFields(in); 1974 } 1975 1976 if (version > 1) { // if version > 1 1977 in.readFully(sync); // read sync bytes 1978 headerEnd = in.getPos(); // record end of header 1979 } 1980 1981 // Initialize... *not* if this we are constructing a temporary Reader 1982 if (!tempReader) { 1983 valBuffer = new DataInputBuffer(); 1984 if (decompress) { 1985 valDecompressor = CodecPool.getDecompressor(codec); 1986 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1987 valIn = new DataInputStream(valInFilter); 1988 } else { 1989 valIn = valBuffer; 1990 } 1991 1992 if (blockCompressed) { 1993 keyLenBuffer = new DataInputBuffer(); 1994 keyBuffer = new DataInputBuffer(); 1995 valLenBuffer = new DataInputBuffer(); 1996 1997 keyLenDecompressor = CodecPool.getDecompressor(codec); 1998 keyLenInFilter = codec.createInputStream(keyLenBuffer, 1999 keyLenDecompressor); 2000 keyLenIn = new DataInputStream(keyLenInFilter); 2001 2002 keyDecompressor = CodecPool.getDecompressor(codec); 2003 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 2004 keyIn = new DataInputStream(keyInFilter); 2005 2006 valLenDecompressor = CodecPool.getDecompressor(codec); 2007 valLenInFilter = codec.createInputStream(valLenBuffer, 2008 valLenDecompressor); 2009 valLenIn = new DataInputStream(valLenInFilter); 2010 } 2011 2012 SerializationFactory serializationFactory = 2013 new SerializationFactory(conf); 2014 this.keyDeserializer = 2015 getDeserializer(serializationFactory, getKeyClass()); 2016 if (this.keyDeserializer == null) { 2017 throw new IOException( 2018 "Could not find a deserializer for the Key class: '" 2019 + getKeyClass().getCanonicalName() + "'. " 2020 + "Please ensure that the configuration '" + 2021 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2022 + "properly configured, if you're using " 2023 + "custom serialization."); 2024 } 2025 if (!blockCompressed) { 2026 this.keyDeserializer.open(valBuffer); 2027 } else { 2028 this.keyDeserializer.open(keyIn); 2029 } 2030 this.valDeserializer = 2031 getDeserializer(serializationFactory, getValueClass()); 2032 if (this.valDeserializer == null) { 2033 throw new IOException( 2034 "Could not find a deserializer for the Value class: '" 2035 + getValueClass().getCanonicalName() + "'. " 2036 + "Please ensure that the configuration '" + 2037 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2038 + "properly configured, if you're using " 2039 + "custom serialization."); 2040 } 2041 this.valDeserializer.open(valIn); 2042 } 2043 } 2044 2045 @SuppressWarnings("unchecked") 2046 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 2047 return sf.getDeserializer(c); 2048 } 2049 2050 /** Close the file. */ 2051 @Override 2052 public synchronized void close() throws IOException { 2053 // Return the decompressors to the pool 2054 CodecPool.returnDecompressor(keyLenDecompressor); 2055 CodecPool.returnDecompressor(keyDecompressor); 2056 CodecPool.returnDecompressor(valLenDecompressor); 2057 CodecPool.returnDecompressor(valDecompressor); 2058 keyLenDecompressor = keyDecompressor = null; 2059 valLenDecompressor = valDecompressor = null; 2060 2061 if (keyDeserializer != null) { 2062 keyDeserializer.close(); 2063 } 2064 if (valDeserializer != null) { 2065 valDeserializer.close(); 2066 } 2067 2068 // Close the input-stream 2069 in.close(); 2070 } 2071 2072 /** Returns the name of the key class. */ 2073 public String getKeyClassName() { 2074 return keyClassName; 2075 } 2076 2077 /** Returns the class of keys in this file. */ 2078 public synchronized Class<?> getKeyClass() { 2079 if (null == keyClass) { 2080 try { 2081 keyClass = WritableName.getClass(getKeyClassName(), conf); 2082 } catch (IOException e) { 2083 throw new RuntimeException(e); 2084 } 2085 } 2086 return keyClass; 2087 } 2088 2089 /** Returns the name of the value class. */ 2090 public String getValueClassName() { 2091 return valClassName; 2092 } 2093 2094 /** Returns the class of values in this file. */ 2095 public synchronized Class<?> getValueClass() { 2096 if (null == valClass) { 2097 try { 2098 valClass = WritableName.getClass(getValueClassName(), conf); 2099 } catch (IOException e) { 2100 throw new RuntimeException(e); 2101 } 2102 } 2103 return valClass; 2104 } 2105 2106 /** Returns true if values are compressed. */ 2107 public boolean isCompressed() { return decompress; } 2108 2109 /** Returns true if records are block-compressed. */ 2110 public boolean isBlockCompressed() { return blockCompressed; } 2111 2112 /** Returns the compression codec of data in this file. */ 2113 public CompressionCodec getCompressionCodec() { return codec; } 2114 2115 private byte[] getSync() { 2116 return sync; 2117 } 2118 2119 private byte getVersion() { 2120 return version; 2121 } 2122 2123 /** 2124 * Get the compression type for this file. 2125 * @return the compression type 2126 */ 2127 public CompressionType getCompressionType() { 2128 if (decompress) { 2129 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2130 } else { 2131 return CompressionType.NONE; 2132 } 2133 } 2134 2135 /** Returns the metadata object of the file */ 2136 public Metadata getMetadata() { 2137 return this.metadata; 2138 } 2139 2140 /** Returns the configuration used for this file. */ 2141 Configuration getConf() { return conf; } 2142 2143 /** Read a compressed buffer */ 2144 private synchronized void readBuffer(DataInputBuffer buffer, 2145 CompressionInputStream filter) throws IOException { 2146 // Read data into a temporary buffer 2147 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2148 2149 try { 2150 int dataBufferLength = WritableUtils.readVInt(in); 2151 dataBuffer.write(in, dataBufferLength); 2152 2153 // Set up 'buffer' connected to the input-stream 2154 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2155 } finally { 2156 dataBuffer.close(); 2157 } 2158 2159 // Reset the codec 2160 filter.resetState(); 2161 } 2162 2163 /** Read the next 'compressed' block */ 2164 private synchronized void readBlock() throws IOException { 2165 // Check if we need to throw away a whole block of 2166 // 'values' due to 'lazy decompression' 2167 if (lazyDecompress && !valuesDecompressed) { 2168 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2169 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2170 } 2171 2172 // Reset internal states 2173 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2174 valuesDecompressed = false; 2175 2176 //Process sync 2177 if (sync != null) { 2178 in.readInt(); 2179 in.readFully(syncCheck); // read syncCheck 2180 if (!Arrays.equals(sync, syncCheck)) // check it 2181 throw new IOException("File is corrupt!"); 2182 } 2183 syncSeen = true; 2184 2185 // Read number of records in this block 2186 noBufferedRecords = WritableUtils.readVInt(in); 2187 2188 // Read key lengths and keys 2189 readBuffer(keyLenBuffer, keyLenInFilter); 2190 readBuffer(keyBuffer, keyInFilter); 2191 noBufferedKeys = noBufferedRecords; 2192 2193 // Read value lengths and values 2194 if (!lazyDecompress) { 2195 readBuffer(valLenBuffer, valLenInFilter); 2196 readBuffer(valBuffer, valInFilter); 2197 noBufferedValues = noBufferedRecords; 2198 valuesDecompressed = true; 2199 } 2200 } 2201 2202 /** 2203 * Position valLenIn/valIn to the 'value' 2204 * corresponding to the 'current' key 2205 */ 2206 private synchronized void seekToCurrentValue() throws IOException { 2207 if (!blockCompressed) { 2208 if (decompress) { 2209 valInFilter.resetState(); 2210 } 2211 valBuffer.reset(); 2212 } else { 2213 // Check if this is the first value in the 'block' to be read 2214 if (lazyDecompress && !valuesDecompressed) { 2215 // Read the value lengths and values 2216 readBuffer(valLenBuffer, valLenInFilter); 2217 readBuffer(valBuffer, valInFilter); 2218 noBufferedValues = noBufferedRecords; 2219 valuesDecompressed = true; 2220 } 2221 2222 // Calculate the no. of bytes to skip 2223 // Note: 'current' key has already been read! 2224 int skipValBytes = 0; 2225 int currentKey = noBufferedKeys + 1; 2226 for (int i=noBufferedValues; i > currentKey; --i) { 2227 skipValBytes += WritableUtils.readVInt(valLenIn); 2228 --noBufferedValues; 2229 } 2230 2231 // Skip to the 'val' corresponding to 'current' key 2232 if (skipValBytes > 0) { 2233 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2234 throw new IOException("Failed to seek to " + currentKey + 2235 "(th) value!"); 2236 } 2237 } 2238 } 2239 } 2240 2241 /** 2242 * Get the 'value' corresponding to the last read 'key'. 2243 * @param val : The 'value' to be read. 2244 * @throws IOException 2245 */ 2246 public synchronized void getCurrentValue(Writable val) 2247 throws IOException { 2248 if (val instanceof Configurable) { 2249 ((Configurable) val).setConf(this.conf); 2250 } 2251 2252 // Position stream to 'current' value 2253 seekToCurrentValue(); 2254 2255 if (!blockCompressed) { 2256 val.readFields(valIn); 2257 2258 if (valIn.read() > 0) { 2259 LOG.info("available bytes: " + valIn.available()); 2260 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2261 + " bytes, should read " + 2262 (valBuffer.getLength()-keyLength)); 2263 } 2264 } else { 2265 // Get the value 2266 int valLength = WritableUtils.readVInt(valLenIn); 2267 val.readFields(valIn); 2268 2269 // Read another compressed 'value' 2270 --noBufferedValues; 2271 2272 // Sanity check 2273 if ((valLength < 0) && LOG.isDebugEnabled()) { 2274 LOG.debug(val + " is a zero-length value"); 2275 } 2276 } 2277 2278 } 2279 2280 /** 2281 * Get the 'value' corresponding to the last read 'key'. 2282 * @param val : The 'value' to be read. 2283 * @throws IOException 2284 */ 2285 public synchronized Object getCurrentValue(Object val) 2286 throws IOException { 2287 if (val instanceof Configurable) { 2288 ((Configurable) val).setConf(this.conf); 2289 } 2290 2291 // Position stream to 'current' value 2292 seekToCurrentValue(); 2293 2294 if (!blockCompressed) { 2295 val = deserializeValue(val); 2296 2297 if (valIn.read() > 0) { 2298 LOG.info("available bytes: " + valIn.available()); 2299 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2300 + " bytes, should read " + 2301 (valBuffer.getLength()-keyLength)); 2302 } 2303 } else { 2304 // Get the value 2305 int valLength = WritableUtils.readVInt(valLenIn); 2306 val = deserializeValue(val); 2307 2308 // Read another compressed 'value' 2309 --noBufferedValues; 2310 2311 // Sanity check 2312 if ((valLength < 0) && LOG.isDebugEnabled()) { 2313 LOG.debug(val + " is a zero-length value"); 2314 } 2315 } 2316 return val; 2317 2318 } 2319 2320 @SuppressWarnings("unchecked") 2321 private Object deserializeValue(Object val) throws IOException { 2322 return valDeserializer.deserialize(val); 2323 } 2324 2325 /** Read the next key in the file into <code>key</code>, skipping its 2326 * value. True if another entry exists, and false at end of file. */ 2327 public synchronized boolean next(Writable key) throws IOException { 2328 if (key.getClass() != getKeyClass()) 2329 throw new IOException("wrong key class: "+key.getClass().getName() 2330 +" is not "+keyClass); 2331 2332 if (!blockCompressed) { 2333 outBuf.reset(); 2334 2335 keyLength = next(outBuf); 2336 if (keyLength < 0) 2337 return false; 2338 2339 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2340 2341 key.readFields(valBuffer); 2342 valBuffer.mark(0); 2343 if (valBuffer.getPosition() != keyLength) 2344 throw new IOException(key + " read " + valBuffer.getPosition() 2345 + " bytes, should read " + keyLength); 2346 } else { 2347 //Reset syncSeen 2348 syncSeen = false; 2349 2350 if (noBufferedKeys == 0) { 2351 try { 2352 readBlock(); 2353 } catch (EOFException eof) { 2354 return false; 2355 } 2356 } 2357 2358 int keyLength = WritableUtils.readVInt(keyLenIn); 2359 2360 // Sanity check 2361 if (keyLength < 0) { 2362 return false; 2363 } 2364 2365 //Read another compressed 'key' 2366 key.readFields(keyIn); 2367 --noBufferedKeys; 2368 } 2369 2370 return true; 2371 } 2372 2373 /** Read the next key/value pair in the file into <code>key</code> and 2374 * <code>val</code>. Returns true if such a pair exists and false when at 2375 * end of file */ 2376 public synchronized boolean next(Writable key, Writable val) 2377 throws IOException { 2378 if (val.getClass() != getValueClass()) 2379 throw new IOException("wrong value class: "+val+" is not "+valClass); 2380 2381 boolean more = next(key); 2382 2383 if (more) { 2384 getCurrentValue(val); 2385 } 2386 2387 return more; 2388 } 2389 2390 /** 2391 * Read and return the next record length, potentially skipping over 2392 * a sync block. 2393 * @return the length of the next record or -1 if there is no next record 2394 * @throws IOException 2395 */ 2396 private synchronized int readRecordLength() throws IOException { 2397 if (in.getPos() >= end) { 2398 return -1; 2399 } 2400 int length = in.readInt(); 2401 if (version > 1 && sync != null && 2402 length == SYNC_ESCAPE) { // process a sync entry 2403 in.readFully(syncCheck); // read syncCheck 2404 if (!Arrays.equals(sync, syncCheck)) // check it 2405 throw new IOException("File is corrupt!"); 2406 syncSeen = true; 2407 if (in.getPos() >= end) { 2408 return -1; 2409 } 2410 length = in.readInt(); // re-read length 2411 } else { 2412 syncSeen = false; 2413 } 2414 2415 return length; 2416 } 2417 2418 /** Read the next key/value pair in the file into <code>buffer</code>. 2419 * Returns the length of the key read, or -1 if at end of file. The length 2420 * of the value may be computed by calling buffer.getLength() before and 2421 * after calls to this method. */ 2422 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2423 @Deprecated 2424 synchronized int next(DataOutputBuffer buffer) throws IOException { 2425 // Unsupported for block-compressed sequence files 2426 if (blockCompressed) { 2427 throw new IOException("Unsupported call for block-compressed" + 2428 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2429 } 2430 try { 2431 int length = readRecordLength(); 2432 if (length == -1) { 2433 return -1; 2434 } 2435 int keyLength = in.readInt(); 2436 buffer.write(in, length); 2437 return keyLength; 2438 } catch (ChecksumException e) { // checksum failure 2439 handleChecksumException(e); 2440 return next(buffer); 2441 } 2442 } 2443 2444 public ValueBytes createValueBytes() { 2445 ValueBytes val = null; 2446 if (!decompress || blockCompressed) { 2447 val = new UncompressedBytes(); 2448 } else { 2449 val = new CompressedBytes(codec); 2450 } 2451 return val; 2452 } 2453 2454 /** 2455 * Read 'raw' records. 2456 * @param key - The buffer into which the key is read 2457 * @param val - The 'raw' value 2458 * @return Returns the total record length or -1 for end of file 2459 * @throws IOException 2460 */ 2461 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2462 throws IOException { 2463 if (!blockCompressed) { 2464 int length = readRecordLength(); 2465 if (length == -1) { 2466 return -1; 2467 } 2468 int keyLength = in.readInt(); 2469 int valLength = length - keyLength; 2470 key.write(in, keyLength); 2471 if (decompress) { 2472 CompressedBytes value = (CompressedBytes)val; 2473 value.reset(in, valLength); 2474 } else { 2475 UncompressedBytes value = (UncompressedBytes)val; 2476 value.reset(in, valLength); 2477 } 2478 2479 return length; 2480 } else { 2481 //Reset syncSeen 2482 syncSeen = false; 2483 2484 // Read 'key' 2485 if (noBufferedKeys == 0) { 2486 if (in.getPos() >= end) 2487 return -1; 2488 2489 try { 2490 readBlock(); 2491 } catch (EOFException eof) { 2492 return -1; 2493 } 2494 } 2495 int keyLength = WritableUtils.readVInt(keyLenIn); 2496 if (keyLength < 0) { 2497 throw new IOException("zero length key found!"); 2498 } 2499 key.write(keyIn, keyLength); 2500 --noBufferedKeys; 2501 2502 // Read raw 'value' 2503 seekToCurrentValue(); 2504 int valLength = WritableUtils.readVInt(valLenIn); 2505 UncompressedBytes rawValue = (UncompressedBytes)val; 2506 rawValue.reset(valIn, valLength); 2507 --noBufferedValues; 2508 2509 return (keyLength+valLength); 2510 } 2511 2512 } 2513 2514 /** 2515 * Read 'raw' keys. 2516 * @param key - The buffer into which the key is read 2517 * @return Returns the key length or -1 for end of file 2518 * @throws IOException 2519 */ 2520 public synchronized int nextRawKey(DataOutputBuffer key) 2521 throws IOException { 2522 if (!blockCompressed) { 2523 recordLength = readRecordLength(); 2524 if (recordLength == -1) { 2525 return -1; 2526 } 2527 keyLength = in.readInt(); 2528 key.write(in, keyLength); 2529 return keyLength; 2530 } else { 2531 //Reset syncSeen 2532 syncSeen = false; 2533 2534 // Read 'key' 2535 if (noBufferedKeys == 0) { 2536 if (in.getPos() >= end) 2537 return -1; 2538 2539 try { 2540 readBlock(); 2541 } catch (EOFException eof) { 2542 return -1; 2543 } 2544 } 2545 int keyLength = WritableUtils.readVInt(keyLenIn); 2546 if (keyLength < 0) { 2547 throw new IOException("zero length key found!"); 2548 } 2549 key.write(keyIn, keyLength); 2550 --noBufferedKeys; 2551 2552 return keyLength; 2553 } 2554 2555 } 2556 2557 /** Read the next key in the file, skipping its 2558 * value. Return null at end of file. */ 2559 public synchronized Object next(Object key) throws IOException { 2560 if (key != null && key.getClass() != getKeyClass()) { 2561 throw new IOException("wrong key class: "+key.getClass().getName() 2562 +" is not "+keyClass); 2563 } 2564 2565 if (!blockCompressed) { 2566 outBuf.reset(); 2567 2568 keyLength = next(outBuf); 2569 if (keyLength < 0) 2570 return null; 2571 2572 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2573 2574 key = deserializeKey(key); 2575 valBuffer.mark(0); 2576 if (valBuffer.getPosition() != keyLength) 2577 throw new IOException(key + " read " + valBuffer.getPosition() 2578 + " bytes, should read " + keyLength); 2579 } else { 2580 //Reset syncSeen 2581 syncSeen = false; 2582 2583 if (noBufferedKeys == 0) { 2584 try { 2585 readBlock(); 2586 } catch (EOFException eof) { 2587 return null; 2588 } 2589 } 2590 2591 int keyLength = WritableUtils.readVInt(keyLenIn); 2592 2593 // Sanity check 2594 if (keyLength < 0) { 2595 return null; 2596 } 2597 2598 //Read another compressed 'key' 2599 key = deserializeKey(key); 2600 --noBufferedKeys; 2601 } 2602 2603 return key; 2604 } 2605 2606 @SuppressWarnings("unchecked") 2607 private Object deserializeKey(Object key) throws IOException { 2608 return keyDeserializer.deserialize(key); 2609 } 2610 2611 /** 2612 * Read 'raw' values. 2613 * @param val - The 'raw' value 2614 * @return Returns the value length 2615 * @throws IOException 2616 */ 2617 public synchronized int nextRawValue(ValueBytes val) 2618 throws IOException { 2619 2620 // Position stream to current value 2621 seekToCurrentValue(); 2622 2623 if (!blockCompressed) { 2624 int valLength = recordLength - keyLength; 2625 if (decompress) { 2626 CompressedBytes value = (CompressedBytes)val; 2627 value.reset(in, valLength); 2628 } else { 2629 UncompressedBytes value = (UncompressedBytes)val; 2630 value.reset(in, valLength); 2631 } 2632 2633 return valLength; 2634 } else { 2635 int valLength = WritableUtils.readVInt(valLenIn); 2636 UncompressedBytes rawValue = (UncompressedBytes)val; 2637 rawValue.reset(valIn, valLength); 2638 --noBufferedValues; 2639 return valLength; 2640 } 2641 2642 } 2643 2644 private void handleChecksumException(ChecksumException e) 2645 throws IOException { 2646 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2647 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2648 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2649 } else { 2650 throw e; 2651 } 2652 } 2653 2654 /** disables sync. often invoked for tmp files */ 2655 synchronized void ignoreSync() { 2656 sync = null; 2657 } 2658 2659 /** Set the current byte position in the input file. 2660 * 2661 * <p>The position passed must be a position returned by {@link 2662 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2663 * position, use {@link SequenceFile.Reader#sync(long)}. 2664 */ 2665 public synchronized void seek(long position) throws IOException { 2666 in.seek(position); 2667 if (blockCompressed) { // trigger block read 2668 noBufferedKeys = 0; 2669 valuesDecompressed = true; 2670 } 2671 } 2672 2673 /** Seek to the next sync mark past a given position.*/ 2674 public synchronized void sync(long position) throws IOException { 2675 if (position+SYNC_SIZE >= end) { 2676 seek(end); 2677 return; 2678 } 2679 2680 if (position < headerEnd) { 2681 // seek directly to first record 2682 in.seek(headerEnd); 2683 // note the sync marker "seen" in the header 2684 syncSeen = true; 2685 return; 2686 } 2687 2688 try { 2689 seek(position+4); // skip escape 2690 in.readFully(syncCheck); 2691 int syncLen = sync.length; 2692 for (int i = 0; in.getPos() < end; i++) { 2693 int j = 0; 2694 for (; j < syncLen; j++) { 2695 if (sync[j] != syncCheck[(i+j)%syncLen]) 2696 break; 2697 } 2698 if (j == syncLen) { 2699 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2700 return; 2701 } 2702 syncCheck[i%syncLen] = in.readByte(); 2703 } 2704 } catch (ChecksumException e) { // checksum failure 2705 handleChecksumException(e); 2706 } 2707 } 2708 2709 /** Returns true iff the previous call to next passed a sync mark.*/ 2710 public synchronized boolean syncSeen() { return syncSeen; } 2711 2712 /** Return the current byte position in the input file. */ 2713 public synchronized long getPosition() throws IOException { 2714 return in.getPos(); 2715 } 2716 2717 /** Returns the name of the file. */ 2718 @Override 2719 public String toString() { 2720 return filename; 2721 } 2722 2723 } 2724 2725 /** Sorts key/value pairs in a sequence-format file. 2726 * 2727 * <p>For best performance, applications should make sure that the {@link 2728 * Writable#readFields(DataInput)} implementation of their keys is 2729 * very efficient. In particular, it should avoid allocating memory. 2730 */ 2731 public static class Sorter { 2732 2733 private RawComparator comparator; 2734 2735 private MergeSort mergeSort; //the implementation of merge sort 2736 2737 private Path[] inFiles; // when merging or sorting 2738 2739 private Path outFile; 2740 2741 private int memory; // bytes 2742 private int factor; // merged per pass 2743 2744 private FileSystem fs = null; 2745 2746 private Class keyClass; 2747 private Class valClass; 2748 2749 private Configuration conf; 2750 private Metadata metadata; 2751 2752 private Progressable progressable = null; 2753 2754 /** Sort and merge files containing the named classes. */ 2755 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2756 Class valClass, Configuration conf) { 2757 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); 2758 } 2759 2760 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2761 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2762 Class valClass, Configuration conf) { 2763 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2764 } 2765 2766 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2767 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2768 Class valClass, Configuration conf, Metadata metadata) { 2769 this.fs = fs; 2770 this.comparator = comparator; 2771 this.keyClass = keyClass; 2772 this.valClass = valClass; 2773 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2774 this.factor = conf.getInt("io.sort.factor", 100); 2775 this.conf = conf; 2776 this.metadata = metadata; 2777 } 2778 2779 /** Set the number of streams to merge at once.*/ 2780 public void setFactor(int factor) { this.factor = factor; } 2781 2782 /** Get the number of streams to merge at once.*/ 2783 public int getFactor() { return factor; } 2784 2785 /** Set the total amount of buffer memory, in bytes.*/ 2786 public void setMemory(int memory) { this.memory = memory; } 2787 2788 /** Get the total amount of buffer memory, in bytes.*/ 2789 public int getMemory() { return memory; } 2790 2791 /** Set the progressable object in order to report progress. */ 2792 public void setProgressable(Progressable progressable) { 2793 this.progressable = progressable; 2794 } 2795 2796 /** 2797 * Perform a file sort from a set of input files into an output file. 2798 * @param inFiles the files to be sorted 2799 * @param outFile the sorted output file 2800 * @param deleteInput should the input files be deleted as they are read? 2801 */ 2802 public void sort(Path[] inFiles, Path outFile, 2803 boolean deleteInput) throws IOException { 2804 if (fs.exists(outFile)) { 2805 throw new IOException("already exists: " + outFile); 2806 } 2807 2808 this.inFiles = inFiles; 2809 this.outFile = outFile; 2810 2811 int segments = sortPass(deleteInput); 2812 if (segments > 1) { 2813 mergePass(outFile.getParent()); 2814 } 2815 } 2816 2817 /** 2818 * Perform a file sort from a set of input files and return an iterator. 2819 * @param inFiles the files to be sorted 2820 * @param tempDir the directory where temp files are created during sort 2821 * @param deleteInput should the input files be deleted as they are read? 2822 * @return iterator the RawKeyValueIterator 2823 */ 2824 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2825 boolean deleteInput) throws IOException { 2826 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2827 if (fs.exists(outFile)) { 2828 throw new IOException("already exists: " + outFile); 2829 } 2830 this.inFiles = inFiles; 2831 //outFile will basically be used as prefix for temp files in the cases 2832 //where sort outputs multiple sorted segments. For the single segment 2833 //case, the outputFile itself will contain the sorted data for that 2834 //segment 2835 this.outFile = outFile; 2836 2837 int segments = sortPass(deleteInput); 2838 if (segments > 1) 2839 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2840 tempDir); 2841 else if (segments == 1) 2842 return merge(new Path[]{outFile}, true, tempDir); 2843 else return null; 2844 } 2845 2846 /** 2847 * The backwards compatible interface to sort. 2848 * @param inFile the input file to sort 2849 * @param outFile the sorted output file 2850 */ 2851 public void sort(Path inFile, Path outFile) throws IOException { 2852 sort(new Path[]{inFile}, outFile, false); 2853 } 2854 2855 private int sortPass(boolean deleteInput) throws IOException { 2856 if(LOG.isDebugEnabled()) { 2857 LOG.debug("running sort pass"); 2858 } 2859 SortPass sortPass = new SortPass(); // make the SortPass 2860 sortPass.setProgressable(progressable); 2861 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2862 try { 2863 return sortPass.run(deleteInput); // run it 2864 } finally { 2865 sortPass.close(); // close it 2866 } 2867 } 2868 2869 private class SortPass { 2870 private int memoryLimit = memory/4; 2871 private int recordLimit = 1000000; 2872 2873 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2874 private byte[] rawBuffer; 2875 2876 private int[] keyOffsets = new int[1024]; 2877 private int[] pointers = new int[keyOffsets.length]; 2878 private int[] pointersCopy = new int[keyOffsets.length]; 2879 private int[] keyLengths = new int[keyOffsets.length]; 2880 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2881 2882 private ArrayList segmentLengths = new ArrayList(); 2883 2884 private Reader in = null; 2885 private FSDataOutputStream out = null; 2886 private FSDataOutputStream indexOut = null; 2887 private Path outName; 2888 2889 private Progressable progressable = null; 2890 2891 public int run(boolean deleteInput) throws IOException { 2892 int segments = 0; 2893 int currentFile = 0; 2894 boolean atEof = (currentFile >= inFiles.length); 2895 CompressionType compressionType; 2896 CompressionCodec codec = null; 2897 segmentLengths.clear(); 2898 if (atEof) { 2899 return 0; 2900 } 2901 2902 // Initialize 2903 in = new Reader(fs, inFiles[currentFile], conf); 2904 compressionType = in.getCompressionType(); 2905 codec = in.getCompressionCodec(); 2906 2907 for (int i=0; i < rawValues.length; ++i) { 2908 rawValues[i] = null; 2909 } 2910 2911 while (!atEof) { 2912 int count = 0; 2913 int bytesProcessed = 0; 2914 rawKeys.reset(); 2915 while (!atEof && 2916 bytesProcessed < memoryLimit && count < recordLimit) { 2917 2918 // Read a record into buffer 2919 // Note: Attempt to re-use 'rawValue' as far as possible 2920 int keyOffset = rawKeys.getLength(); 2921 ValueBytes rawValue = 2922 (count == keyOffsets.length || rawValues[count] == null) ? 2923 in.createValueBytes() : 2924 rawValues[count]; 2925 int recordLength = in.nextRaw(rawKeys, rawValue); 2926 if (recordLength == -1) { 2927 in.close(); 2928 if (deleteInput) { 2929 fs.delete(inFiles[currentFile], true); 2930 } 2931 currentFile += 1; 2932 atEof = currentFile >= inFiles.length; 2933 if (!atEof) { 2934 in = new Reader(fs, inFiles[currentFile], conf); 2935 } else { 2936 in = null; 2937 } 2938 continue; 2939 } 2940 2941 int keyLength = rawKeys.getLength() - keyOffset; 2942 2943 if (count == keyOffsets.length) 2944 grow(); 2945 2946 keyOffsets[count] = keyOffset; // update pointers 2947 pointers[count] = count; 2948 keyLengths[count] = keyLength; 2949 rawValues[count] = rawValue; 2950 2951 bytesProcessed += recordLength; 2952 count++; 2953 } 2954 2955 // buffer is full -- sort & flush it 2956 if(LOG.isDebugEnabled()) { 2957 LOG.debug("flushing segment " + segments); 2958 } 2959 rawBuffer = rawKeys.getData(); 2960 sort(count); 2961 // indicate we're making progress 2962 if (progressable != null) { 2963 progressable.progress(); 2964 } 2965 flush(count, bytesProcessed, compressionType, codec, 2966 segments==0 && atEof); 2967 segments++; 2968 } 2969 return segments; 2970 } 2971 2972 public void close() throws IOException { 2973 if (in != null) { 2974 in.close(); 2975 } 2976 if (out != null) { 2977 out.close(); 2978 } 2979 if (indexOut != null) { 2980 indexOut.close(); 2981 } 2982 } 2983 2984 private void grow() { 2985 int newLength = keyOffsets.length * 3 / 2; 2986 keyOffsets = grow(keyOffsets, newLength); 2987 pointers = grow(pointers, newLength); 2988 pointersCopy = new int[newLength]; 2989 keyLengths = grow(keyLengths, newLength); 2990 rawValues = grow(rawValues, newLength); 2991 } 2992 2993 private int[] grow(int[] old, int newLength) { 2994 int[] result = new int[newLength]; 2995 System.arraycopy(old, 0, result, 0, old.length); 2996 return result; 2997 } 2998 2999 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 3000 ValueBytes[] result = new ValueBytes[newLength]; 3001 System.arraycopy(old, 0, result, 0, old.length); 3002 for (int i=old.length; i < newLength; ++i) { 3003 result[i] = null; 3004 } 3005 return result; 3006 } 3007 3008 private void flush(int count, int bytesProcessed, 3009 CompressionType compressionType, 3010 CompressionCodec codec, 3011 boolean done) throws IOException { 3012 if (out == null) { 3013 outName = done ? outFile : outFile.suffix(".0"); 3014 out = fs.create(outName); 3015 if (!done) { 3016 indexOut = fs.create(outName.suffix(".index")); 3017 } 3018 } 3019 3020 long segmentStart = out.getPos(); 3021 Writer writer = createWriter(conf, Writer.stream(out), 3022 Writer.keyClass(keyClass), Writer.valueClass(valClass), 3023 Writer.compression(compressionType, codec), 3024 Writer.metadata(done ? metadata : new Metadata())); 3025 3026 if (!done) { 3027 writer.sync = null; // disable sync on temp files 3028 } 3029 3030 for (int i = 0; i < count; i++) { // write in sorted order 3031 int p = pointers[i]; 3032 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 3033 } 3034 writer.close(); 3035 3036 if (!done) { 3037 // Save the segment length 3038 WritableUtils.writeVLong(indexOut, segmentStart); 3039 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 3040 indexOut.flush(); 3041 } 3042 } 3043 3044 private void sort(int count) { 3045 System.arraycopy(pointers, 0, pointersCopy, 0, count); 3046 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 3047 } 3048 class SeqFileComparator implements Comparator<IntWritable> { 3049 @Override 3050 public int compare(IntWritable I, IntWritable J) { 3051 return comparator.compare(rawBuffer, keyOffsets[I.get()], 3052 keyLengths[I.get()], rawBuffer, 3053 keyOffsets[J.get()], keyLengths[J.get()]); 3054 } 3055 } 3056 3057 /** set the progressable object in order to report progress */ 3058 public void setProgressable(Progressable progressable) 3059 { 3060 this.progressable = progressable; 3061 } 3062 3063 } // SequenceFile.Sorter.SortPass 3064 3065 /** The interface to iterate over raw keys/values of SequenceFiles. */ 3066 public static interface RawKeyValueIterator { 3067 /** Gets the current raw key 3068 * @return DataOutputBuffer 3069 * @throws IOException 3070 */ 3071 DataOutputBuffer getKey() throws IOException; 3072 /** Gets the current raw value 3073 * @return ValueBytes 3074 * @throws IOException 3075 */ 3076 ValueBytes getValue() throws IOException; 3077 /** Sets up the current key and value (for getKey and getValue) 3078 * @return true if there exists a key/value, false otherwise 3079 * @throws IOException 3080 */ 3081 boolean next() throws IOException; 3082 /** closes the iterator so that the underlying streams can be closed 3083 * @throws IOException 3084 */ 3085 void close() throws IOException; 3086 /** Gets the Progress object; this has a float (0.0 - 1.0) 3087 * indicating the bytes processed by the iterator so far 3088 */ 3089 Progress getProgress(); 3090 } 3091 3092 /** 3093 * Merges the list of segments of type <code>SegmentDescriptor</code> 3094 * @param segments the list of SegmentDescriptors 3095 * @param tmpDir the directory to write temporary files into 3096 * @return RawKeyValueIterator 3097 * @throws IOException 3098 */ 3099 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3100 Path tmpDir) 3101 throws IOException { 3102 // pass in object to report progress, if present 3103 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3104 return mQueue.merge(); 3105 } 3106 3107 /** 3108 * Merges the contents of files passed in Path[] using a max factor value 3109 * that is already set 3110 * @param inNames the array of path names 3111 * @param deleteInputs true if the input files should be deleted when 3112 * unnecessary 3113 * @param tmpDir the directory to write temporary files into 3114 * @return RawKeyValueIteratorMergeQueue 3115 * @throws IOException 3116 */ 3117 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3118 Path tmpDir) 3119 throws IOException { 3120 return merge(inNames, deleteInputs, 3121 (inNames.length < factor) ? inNames.length : factor, 3122 tmpDir); 3123 } 3124 3125 /** 3126 * Merges the contents of files passed in Path[] 3127 * @param inNames the array of path names 3128 * @param deleteInputs true if the input files should be deleted when 3129 * unnecessary 3130 * @param factor the factor that will be used as the maximum merge fan-in 3131 * @param tmpDir the directory to write temporary files into 3132 * @return RawKeyValueIteratorMergeQueue 3133 * @throws IOException 3134 */ 3135 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3136 int factor, Path tmpDir) 3137 throws IOException { 3138 //get the segments from inNames 3139 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3140 for (int i = 0; i < inNames.length; i++) { 3141 SegmentDescriptor s = new SegmentDescriptor(0, 3142 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3143 s.preserveInput(!deleteInputs); 3144 s.doSync(); 3145 a.add(s); 3146 } 3147 this.factor = factor; 3148 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3149 return mQueue.merge(); 3150 } 3151 3152 /** 3153 * Merges the contents of files passed in Path[] 3154 * @param inNames the array of path names 3155 * @param tempDir the directory for creating temp files during merge 3156 * @param deleteInputs true if the input files should be deleted when 3157 * unnecessary 3158 * @return RawKeyValueIteratorMergeQueue 3159 * @throws IOException 3160 */ 3161 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3162 boolean deleteInputs) 3163 throws IOException { 3164 //outFile will basically be used as prefix for temp files for the 3165 //intermediate merge outputs 3166 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3167 //get the segments from inNames 3168 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3169 for (int i = 0; i < inNames.length; i++) { 3170 SegmentDescriptor s = new SegmentDescriptor(0, 3171 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3172 s.preserveInput(!deleteInputs); 3173 s.doSync(); 3174 a.add(s); 3175 } 3176 factor = (inNames.length < factor) ? inNames.length : factor; 3177 // pass in object to report progress, if present 3178 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3179 return mQueue.merge(); 3180 } 3181 3182 /** 3183 * Clones the attributes (like compression of the input file and creates a 3184 * corresponding Writer 3185 * @param inputFile the path of the input file whose attributes should be 3186 * cloned 3187 * @param outputFile the path of the output file 3188 * @param prog the Progressable to report status during the file write 3189 * @return Writer 3190 * @throws IOException 3191 */ 3192 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3193 Progressable prog) throws IOException { 3194 Reader reader = new Reader(conf, 3195 Reader.file(inputFile), 3196 new Reader.OnlyHeaderOption()); 3197 CompressionType compress = reader.getCompressionType(); 3198 CompressionCodec codec = reader.getCompressionCodec(); 3199 reader.close(); 3200 3201 Writer writer = createWriter(conf, 3202 Writer.file(outputFile), 3203 Writer.keyClass(keyClass), 3204 Writer.valueClass(valClass), 3205 Writer.compression(compress, codec), 3206 Writer.progressable(prog)); 3207 return writer; 3208 } 3209 3210 /** 3211 * Writes records from RawKeyValueIterator into a file represented by the 3212 * passed writer 3213 * @param records the RawKeyValueIterator 3214 * @param writer the Writer created earlier 3215 * @throws IOException 3216 */ 3217 public void writeFile(RawKeyValueIterator records, Writer writer) 3218 throws IOException { 3219 while(records.next()) { 3220 writer.appendRaw(records.getKey().getData(), 0, 3221 records.getKey().getLength(), records.getValue()); 3222 } 3223 writer.sync(); 3224 } 3225 3226 /** Merge the provided files. 3227 * @param inFiles the array of input path names 3228 * @param outFile the final output file 3229 * @throws IOException 3230 */ 3231 public void merge(Path[] inFiles, Path outFile) throws IOException { 3232 if (fs.exists(outFile)) { 3233 throw new IOException("already exists: " + outFile); 3234 } 3235 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3236 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3237 3238 writeFile(r, writer); 3239 3240 writer.close(); 3241 } 3242 3243 /** sort calls this to generate the final merged output */ 3244 private int mergePass(Path tmpDir) throws IOException { 3245 if(LOG.isDebugEnabled()) { 3246 LOG.debug("running merge pass"); 3247 } 3248 Writer writer = cloneFileAttributes( 3249 outFile.suffix(".0"), outFile, null); 3250 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3251 outFile.suffix(".0.index"), tmpDir); 3252 writeFile(r, writer); 3253 3254 writer.close(); 3255 return 0; 3256 } 3257 3258 /** Used by mergePass to merge the output of the sort 3259 * @param inName the name of the input file containing sorted segments 3260 * @param indexIn the offsets of the sorted segments 3261 * @param tmpDir the relative directory to store intermediate results in 3262 * @return RawKeyValueIterator 3263 * @throws IOException 3264 */ 3265 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3266 throws IOException { 3267 //get the segments from indexIn 3268 //we create a SegmentContainer so that we can track segments belonging to 3269 //inName and delete inName as soon as we see that we have looked at all 3270 //the contained segments during the merge process & hence don't need 3271 //them anymore 3272 SegmentContainer container = new SegmentContainer(inName, indexIn); 3273 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3274 return mQueue.merge(); 3275 } 3276 3277 /** This class implements the core of the merge logic */ 3278 private class MergeQueue extends PriorityQueue 3279 implements RawKeyValueIterator { 3280 private boolean compress; 3281 private boolean blockCompress; 3282 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3283 private ValueBytes rawValue; 3284 private long totalBytesProcessed; 3285 private float progPerByte; 3286 private Progress mergeProgress = new Progress(); 3287 private Path tmpDir; 3288 private Progressable progress = null; //handle to the progress reporting object 3289 private SegmentDescriptor minSegment; 3290 3291 //a TreeMap used to store the segments sorted by size (segment offset and 3292 //segment path name is used to break ties between segments of same sizes) 3293 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3294 new TreeMap<SegmentDescriptor, Void>(); 3295 3296 @SuppressWarnings("unchecked") 3297 public void put(SegmentDescriptor stream) throws IOException { 3298 if (size() == 0) { 3299 compress = stream.in.isCompressed(); 3300 blockCompress = stream.in.isBlockCompressed(); 3301 } else if (compress != stream.in.isCompressed() || 3302 blockCompress != stream.in.isBlockCompressed()) { 3303 throw new IOException("All merged files must be compressed or not."); 3304 } 3305 super.put(stream); 3306 } 3307 3308 /** 3309 * A queue of file segments to merge 3310 * @param segments the file segments to merge 3311 * @param tmpDir a relative local directory to save intermediate files in 3312 * @param progress the reference to the Progressable object 3313 */ 3314 public MergeQueue(List <SegmentDescriptor> segments, 3315 Path tmpDir, Progressable progress) { 3316 int size = segments.size(); 3317 for (int i = 0; i < size; i++) { 3318 sortedSegmentSizes.put(segments.get(i), null); 3319 } 3320 this.tmpDir = tmpDir; 3321 this.progress = progress; 3322 } 3323 @Override 3324 protected boolean lessThan(Object a, Object b) { 3325 // indicate we're making progress 3326 if (progress != null) { 3327 progress.progress(); 3328 } 3329 SegmentDescriptor msa = (SegmentDescriptor)a; 3330 SegmentDescriptor msb = (SegmentDescriptor)b; 3331 return comparator.compare(msa.getKey().getData(), 0, 3332 msa.getKey().getLength(), msb.getKey().getData(), 0, 3333 msb.getKey().getLength()) < 0; 3334 } 3335 @Override 3336 public void close() throws IOException { 3337 SegmentDescriptor ms; // close inputs 3338 while ((ms = (SegmentDescriptor)pop()) != null) { 3339 ms.cleanup(); 3340 } 3341 minSegment = null; 3342 } 3343 @Override 3344 public DataOutputBuffer getKey() throws IOException { 3345 return rawKey; 3346 } 3347 @Override 3348 public ValueBytes getValue() throws IOException { 3349 return rawValue; 3350 } 3351 @Override 3352 public boolean next() throws IOException { 3353 if (size() == 0) 3354 return false; 3355 if (minSegment != null) { 3356 //minSegment is non-null for all invocations of next except the first 3357 //one. For the first invocation, the priority queue is ready for use 3358 //but for the subsequent invocations, first adjust the queue 3359 adjustPriorityQueue(minSegment); 3360 if (size() == 0) { 3361 minSegment = null; 3362 return false; 3363 } 3364 } 3365 minSegment = (SegmentDescriptor)top(); 3366 long startPos = minSegment.in.getPosition(); // Current position in stream 3367 //save the raw key reference 3368 rawKey = minSegment.getKey(); 3369 //load the raw value. Re-use the existing rawValue buffer 3370 if (rawValue == null) { 3371 rawValue = minSegment.in.createValueBytes(); 3372 } 3373 minSegment.nextRawValue(rawValue); 3374 long endPos = minSegment.in.getPosition(); // End position after reading value 3375 updateProgress(endPos - startPos); 3376 return true; 3377 } 3378 3379 @Override 3380 public Progress getProgress() { 3381 return mergeProgress; 3382 } 3383 3384 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3385 long startPos = ms.in.getPosition(); // Current position in stream 3386 boolean hasNext = ms.nextRawKey(); 3387 long endPos = ms.in.getPosition(); // End position after reading key 3388 updateProgress(endPos - startPos); 3389 if (hasNext) { 3390 adjustTop(); 3391 } else { 3392 pop(); 3393 ms.cleanup(); 3394 } 3395 } 3396 3397 private void updateProgress(long bytesProcessed) { 3398 totalBytesProcessed += bytesProcessed; 3399 if (progPerByte > 0) { 3400 mergeProgress.set(totalBytesProcessed * progPerByte); 3401 } 3402 } 3403 3404 /** This is the single level merge that is called multiple times 3405 * depending on the factor size and the number of segments 3406 * @return RawKeyValueIterator 3407 * @throws IOException 3408 */ 3409 public RawKeyValueIterator merge() throws IOException { 3410 //create the MergeStreams from the sorted map created in the constructor 3411 //and dump the final output to a file 3412 int numSegments = sortedSegmentSizes.size(); 3413 int origFactor = factor; 3414 int passNo = 1; 3415 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3416 do { 3417 //get the factor for this pass of merge 3418 factor = getPassFactor(passNo, numSegments); 3419 List<SegmentDescriptor> segmentsToMerge = 3420 new ArrayList<SegmentDescriptor>(); 3421 int segmentsConsidered = 0; 3422 int numSegmentsToConsider = factor; 3423 while (true) { 3424 //extract the smallest 'factor' number of segment pointers from the 3425 //TreeMap. Call cleanup on the empty segments (no key/value data) 3426 SegmentDescriptor[] mStream = 3427 getSegmentDescriptors(numSegmentsToConsider); 3428 for (int i = 0; i < mStream.length; i++) { 3429 if (mStream[i].nextRawKey()) { 3430 segmentsToMerge.add(mStream[i]); 3431 segmentsConsidered++; 3432 // Count the fact that we read some bytes in calling nextRawKey() 3433 updateProgress(mStream[i].in.getPosition()); 3434 } 3435 else { 3436 mStream[i].cleanup(); 3437 numSegments--; //we ignore this segment for the merge 3438 } 3439 } 3440 //if we have the desired number of segments 3441 //or looked at all available segments, we break 3442 if (segmentsConsidered == factor || 3443 sortedSegmentSizes.size() == 0) { 3444 break; 3445 } 3446 3447 numSegmentsToConsider = factor - segmentsConsidered; 3448 } 3449 //feed the streams to the priority queue 3450 initialize(segmentsToMerge.size()); clear(); 3451 for (int i = 0; i < segmentsToMerge.size(); i++) { 3452 put(segmentsToMerge.get(i)); 3453 } 3454 //if we have lesser number of segments remaining, then just return the 3455 //iterator, else do another single level merge 3456 if (numSegments <= factor) { 3457 //calculate the length of the remaining segments. Required for 3458 //calculating the merge progress 3459 long totalBytes = 0; 3460 for (int i = 0; i < segmentsToMerge.size(); i++) { 3461 totalBytes += segmentsToMerge.get(i).segmentLength; 3462 } 3463 if (totalBytes != 0) //being paranoid 3464 progPerByte = 1.0f / (float)totalBytes; 3465 //reset factor to what it originally was 3466 factor = origFactor; 3467 return this; 3468 } else { 3469 //we want to spread the creation of temp files on multiple disks if 3470 //available under the space constraints 3471 long approxOutputSize = 0; 3472 for (SegmentDescriptor s : segmentsToMerge) { 3473 approxOutputSize += s.segmentLength + 3474 ChecksumFileSystem.getApproxChkSumLength( 3475 s.segmentLength); 3476 } 3477 Path tmpFilename = 3478 new Path(tmpDir, "intermediate").suffix("." + passNo); 3479 3480 Path outputFile = lDirAlloc.getLocalPathForWrite( 3481 tmpFilename.toString(), 3482 approxOutputSize, conf); 3483 if(LOG.isDebugEnabled()) { 3484 LOG.debug("writing intermediate results to " + outputFile); 3485 } 3486 Writer writer = cloneFileAttributes( 3487 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3488 fs.makeQualified(outputFile), null); 3489 writer.sync = null; //disable sync for temp files 3490 writeFile(this, writer); 3491 writer.close(); 3492 3493 //we finished one single level merge; now clean up the priority 3494 //queue 3495 this.close(); 3496 3497 SegmentDescriptor tempSegment = 3498 new SegmentDescriptor(0, 3499 fs.getFileStatus(outputFile).getLen(), outputFile); 3500 //put the segment back in the TreeMap 3501 sortedSegmentSizes.put(tempSegment, null); 3502 numSegments = sortedSegmentSizes.size(); 3503 passNo++; 3504 } 3505 //we are worried about only the first pass merge factor. So reset the 3506 //factor to what it originally was 3507 factor = origFactor; 3508 } while(true); 3509 } 3510 3511 //Hadoop-591 3512 public int getPassFactor(int passNo, int numSegments) { 3513 if (passNo > 1 || numSegments <= factor || factor == 1) 3514 return factor; 3515 int mod = (numSegments - 1) % (factor - 1); 3516 if (mod == 0) 3517 return factor; 3518 return mod + 1; 3519 } 3520 3521 /** Return (& remove) the requested number of segment descriptors from the 3522 * sorted map. 3523 */ 3524 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3525 if (numDescriptors > sortedSegmentSizes.size()) 3526 numDescriptors = sortedSegmentSizes.size(); 3527 SegmentDescriptor[] SegmentDescriptors = 3528 new SegmentDescriptor[numDescriptors]; 3529 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3530 int i = 0; 3531 while (i < numDescriptors) { 3532 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3533 iter.remove(); 3534 } 3535 return SegmentDescriptors; 3536 } 3537 } // SequenceFile.Sorter.MergeQueue 3538 3539 /** This class defines a merge segment. This class can be subclassed to 3540 * provide a customized cleanup method implementation. In this 3541 * implementation, cleanup closes the file handle and deletes the file 3542 */ 3543 public class SegmentDescriptor implements Comparable { 3544 3545 long segmentOffset; //the start of the segment in the file 3546 long segmentLength; //the length of the segment 3547 Path segmentPathName; //the path name of the file containing the segment 3548 boolean ignoreSync = true; //set to true for temp files 3549 private Reader in = null; 3550 private DataOutputBuffer rawKey = null; //this will hold the current key 3551 private boolean preserveInput = false; //delete input segment files? 3552 3553 /** Constructs a segment 3554 * @param segmentOffset the offset of the segment in the file 3555 * @param segmentLength the length of the segment 3556 * @param segmentPathName the path name of the file containing the segment 3557 */ 3558 public SegmentDescriptor (long segmentOffset, long segmentLength, 3559 Path segmentPathName) { 3560 this.segmentOffset = segmentOffset; 3561 this.segmentLength = segmentLength; 3562 this.segmentPathName = segmentPathName; 3563 } 3564 3565 /** Do the sync checks */ 3566 public void doSync() {ignoreSync = false;} 3567 3568 /** Whether to delete the files when no longer needed */ 3569 public void preserveInput(boolean preserve) { 3570 preserveInput = preserve; 3571 } 3572 3573 public boolean shouldPreserveInput() { 3574 return preserveInput; 3575 } 3576 3577 @Override 3578 public int compareTo(Object o) { 3579 SegmentDescriptor that = (SegmentDescriptor)o; 3580 if (this.segmentLength != that.segmentLength) { 3581 return (this.segmentLength < that.segmentLength ? -1 : 1); 3582 } 3583 if (this.segmentOffset != that.segmentOffset) { 3584 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3585 } 3586 return (this.segmentPathName.toString()). 3587 compareTo(that.segmentPathName.toString()); 3588 } 3589 3590 @Override 3591 public boolean equals(Object o) { 3592 if (!(o instanceof SegmentDescriptor)) { 3593 return false; 3594 } 3595 SegmentDescriptor that = (SegmentDescriptor)o; 3596 if (this.segmentLength == that.segmentLength && 3597 this.segmentOffset == that.segmentOffset && 3598 this.segmentPathName.toString().equals( 3599 that.segmentPathName.toString())) { 3600 return true; 3601 } 3602 return false; 3603 } 3604 3605 @Override 3606 public int hashCode() { 3607 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3608 } 3609 3610 /** Fills up the rawKey object with the key returned by the Reader 3611 * @return true if there is a key returned; false, otherwise 3612 * @throws IOException 3613 */ 3614 public boolean nextRawKey() throws IOException { 3615 if (in == null) { 3616 int bufferSize = getBufferSize(conf); 3617 Reader reader = new Reader(conf, 3618 Reader.file(segmentPathName), 3619 Reader.bufferSize(bufferSize), 3620 Reader.start(segmentOffset), 3621 Reader.length(segmentLength)); 3622 3623 //sometimes we ignore syncs especially for temp merge files 3624 if (ignoreSync) reader.ignoreSync(); 3625 3626 if (reader.getKeyClass() != keyClass) 3627 throw new IOException("wrong key class: " + reader.getKeyClass() + 3628 " is not " + keyClass); 3629 if (reader.getValueClass() != valClass) 3630 throw new IOException("wrong value class: "+reader.getValueClass()+ 3631 " is not " + valClass); 3632 this.in = reader; 3633 rawKey = new DataOutputBuffer(); 3634 } 3635 rawKey.reset(); 3636 int keyLength = 3637 in.nextRawKey(rawKey); 3638 return (keyLength >= 0); 3639 } 3640 3641 /** Fills up the passed rawValue with the value corresponding to the key 3642 * read earlier 3643 * @param rawValue 3644 * @return the length of the value 3645 * @throws IOException 3646 */ 3647 public int nextRawValue(ValueBytes rawValue) throws IOException { 3648 int valLength = in.nextRawValue(rawValue); 3649 return valLength; 3650 } 3651 3652 /** Returns the stored rawKey */ 3653 public DataOutputBuffer getKey() { 3654 return rawKey; 3655 } 3656 3657 /** closes the underlying reader */ 3658 private void close() throws IOException { 3659 this.in.close(); 3660 this.in = null; 3661 } 3662 3663 /** The default cleanup. Subclasses can override this with a custom 3664 * cleanup 3665 */ 3666 public void cleanup() throws IOException { 3667 close(); 3668 if (!preserveInput) { 3669 fs.delete(segmentPathName, true); 3670 } 3671 } 3672 } // SequenceFile.Sorter.SegmentDescriptor 3673 3674 /** This class provisions multiple segments contained within a single 3675 * file 3676 */ 3677 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3678 3679 SegmentContainer parentContainer = null; 3680 3681 /** Constructs a segment 3682 * @param segmentOffset the offset of the segment in the file 3683 * @param segmentLength the length of the segment 3684 * @param segmentPathName the path name of the file containing the segment 3685 * @param parent the parent SegmentContainer that holds the segment 3686 */ 3687 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3688 Path segmentPathName, SegmentContainer parent) { 3689 super(segmentOffset, segmentLength, segmentPathName); 3690 this.parentContainer = parent; 3691 } 3692 /** The default cleanup. Subclasses can override this with a custom 3693 * cleanup 3694 */ 3695 @Override 3696 public void cleanup() throws IOException { 3697 super.close(); 3698 if (super.shouldPreserveInput()) return; 3699 parentContainer.cleanup(); 3700 } 3701 3702 @Override 3703 public boolean equals(Object o) { 3704 if (!(o instanceof LinkedSegmentsDescriptor)) { 3705 return false; 3706 } 3707 return super.equals(o); 3708 } 3709 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3710 3711 /** The class that defines a container for segments to be merged. Primarily 3712 * required to delete temp files as soon as all the contained segments 3713 * have been looked at */ 3714 private class SegmentContainer { 3715 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3716 private int numSegmentsContained; //# of segments contained 3717 private Path inName; //input file from where segments are created 3718 3719 //the list of segments read from the file 3720 private ArrayList <SegmentDescriptor> segments = 3721 new ArrayList <SegmentDescriptor>(); 3722 /** This constructor is there primarily to serve the sort routine that 3723 * generates a single output file with an associated index file */ 3724 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3725 //get the segments from indexIn 3726 FSDataInputStream fsIndexIn = fs.open(indexIn); 3727 long end = fs.getFileStatus(indexIn).getLen(); 3728 while (fsIndexIn.getPos() < end) { 3729 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3730 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3731 Path segmentName = inName; 3732 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3733 segmentLength, segmentName, this)); 3734 } 3735 fsIndexIn.close(); 3736 fs.delete(indexIn, true); 3737 numSegmentsContained = segments.size(); 3738 this.inName = inName; 3739 } 3740 3741 public List <SegmentDescriptor> getSegmentList() { 3742 return segments; 3743 } 3744 public void cleanup() throws IOException { 3745 numSegmentsCleanedUp++; 3746 if (numSegmentsCleanedUp == numSegmentsContained) { 3747 fs.delete(inName, true); 3748 } 3749 } 3750 } //SequenceFile.Sorter.SegmentContainer 3751 3752 } // SequenceFile.Sorter 3753 3754} // SequenceFile