001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.util.*; 023import java.rmi.server.UID; 024import java.security.MessageDigest; 025 026import org.apache.commons.io.Charsets; 027import org.apache.commons.logging.*; 028import org.apache.hadoop.util.Options; 029import org.apache.hadoop.fs.*; 030import org.apache.hadoop.fs.Options.CreateOpts; 031import org.apache.hadoop.io.compress.CodecPool; 032import org.apache.hadoop.io.compress.CompressionCodec; 033import org.apache.hadoop.io.compress.CompressionInputStream; 034import org.apache.hadoop.io.compress.CompressionOutputStream; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.hadoop.io.compress.DefaultCodec; 038import org.apache.hadoop.io.compress.GzipCodec; 039import org.apache.hadoop.io.compress.zlib.ZlibFactory; 040import org.apache.hadoop.io.serializer.Deserializer; 041import org.apache.hadoop.io.serializer.Serializer; 042import org.apache.hadoop.io.serializer.SerializationFactory; 043import org.apache.hadoop.classification.InterfaceAudience; 044import org.apache.hadoop.classification.InterfaceStability; 045import org.apache.hadoop.conf.*; 046import org.apache.hadoop.util.Progressable; 047import org.apache.hadoop.util.Progress; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.NativeCodeLoader; 050import org.apache.hadoop.util.MergeSort; 051import org.apache.hadoop.util.PriorityQueue; 052import org.apache.hadoop.util.Time; 053 054/** 055 * <code>SequenceFile</code>s are flat files consisting of binary key/value 056 * pairs. 057 * 058 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer}, 059 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing, 060 * reading and sorting respectively.</p> 061 * 062 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 063 * {@link CompressionType} used to compress key/value pairs: 064 * <ol> 065 * <li> 066 * <code>Writer</code> : Uncompressed records. 067 * </li> 068 * <li> 069 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 070 * values. 071 * </li> 072 * <li> 073 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 074 * values are collected in 'blocks' 075 * separately and compressed. The size of 076 * the 'block' is configurable. 077 * </ol> 078 * 079 * <p>The actual compression algorithm used to compress key and/or values can be 080 * specified by using the appropriate {@link CompressionCodec}.</p> 081 * 082 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 083 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 084 * 085 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the 086 * above <code>SequenceFile</code> formats.</p> 087 * 088 * <h4 id="Formats">SequenceFile Formats</h4> 089 * 090 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 091 * depending on the <code>CompressionType</code> specified. All of them share a 092 * <a href="#Header">common header</a> described below. 093 * 094 * <h5 id="Header">SequenceFile Header</h5> 095 * <ul> 096 * <li> 097 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 098 * version number (e.g. SEQ4 or SEQ6) 099 * </li> 100 * <li> 101 * keyClassName -key class 102 * </li> 103 * <li> 104 * valueClassName - value class 105 * </li> 106 * <li> 107 * compression - A boolean which specifies if compression is turned on for 108 * keys/values in this file. 109 * </li> 110 * <li> 111 * blockCompression - A boolean which specifies if block-compression is 112 * turned on for keys/values in this file. 113 * </li> 114 * <li> 115 * compression codec - <code>CompressionCodec</code> class which is used for 116 * compression of keys and/or values (if compression is 117 * enabled). 118 * </li> 119 * <li> 120 * metadata - {@link Metadata} for this file. 121 * </li> 122 * <li> 123 * sync - A sync marker to denote end of the header. 124 * </li> 125 * </ul> 126 * 127 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 128 * <ul> 129 * <li> 130 * <a href="#Header">Header</a> 131 * </li> 132 * <li> 133 * Record 134 * <ul> 135 * <li>Record length</li> 136 * <li>Key length</li> 137 * <li>Key</li> 138 * <li>Value</li> 139 * </ul> 140 * </li> 141 * <li> 142 * A sync-marker every few <code>100</code> bytes or so. 143 * </li> 144 * </ul> 145 * 146 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 147 * <ul> 148 * <li> 149 * <a href="#Header">Header</a> 150 * </li> 151 * <li> 152 * Record 153 * <ul> 154 * <li>Record length</li> 155 * <li>Key length</li> 156 * <li>Key</li> 157 * <li><i>Compressed</i> Value</li> 158 * </ul> 159 * </li> 160 * <li> 161 * A sync-marker every few <code>100</code> bytes or so. 162 * </li> 163 * </ul> 164 * 165 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 166 * <ul> 167 * <li> 168 * <a href="#Header">Header</a> 169 * </li> 170 * <li> 171 * Record <i>Block</i> 172 * <ul> 173 * <li>Uncompressed number of records in the block</li> 174 * <li>Compressed key-lengths block-size</li> 175 * <li>Compressed key-lengths block</li> 176 * <li>Compressed keys block-size</li> 177 * <li>Compressed keys block</li> 178 * <li>Compressed value-lengths block-size</li> 179 * <li>Compressed value-lengths block</li> 180 * <li>Compressed values block-size</li> 181 * <li>Compressed values block</li> 182 * </ul> 183 * </li> 184 * <li> 185 * A sync-marker every block. 186 * </li> 187 * </ul> 188 * 189 * <p>The compressed blocks of key lengths and value lengths consist of the 190 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 191 * format.</p> 192 * 193 * @see CompressionCodec 194 */ 195@InterfaceAudience.Public 196@InterfaceStability.Stable 197public class SequenceFile { 198 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 199 200 private SequenceFile() {} // no public ctor 201 202 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 203 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 204 private static final byte VERSION_WITH_METADATA = (byte)6; 205 private static byte[] VERSION = new byte[] { 206 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 207 }; 208 209 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 210 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 211 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 212 213 /** The number of bytes between sync points.*/ 214 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 215 216 /** 217 * The compression type used to compress key/value pairs in the 218 * {@link SequenceFile}. 219 * 220 * @see SequenceFile.Writer 221 */ 222 public static enum CompressionType { 223 /** Do not compress records. */ 224 NONE, 225 /** Compress values only, each separately. */ 226 RECORD, 227 /** Compress sequences of records together in blocks. */ 228 BLOCK 229 } 230 231 /** 232 * Get the compression type for the reduce outputs 233 * @param job the job config to look in 234 * @return the kind of compression to use 235 */ 236 static public CompressionType getDefaultCompressionType(Configuration job) { 237 String name = job.get("io.seqfile.compression.type"); 238 return name == null ? CompressionType.RECORD : 239 CompressionType.valueOf(name); 240 } 241 242 /** 243 * Set the default compression type for sequence files. 244 * @param job the configuration to modify 245 * @param val the new compression type (none, block, record) 246 */ 247 static public void setDefaultCompressionType(Configuration job, 248 CompressionType val) { 249 job.set("io.seqfile.compression.type", val.toString()); 250 } 251 252 /** 253 * Create a new Writer with the given options. 254 * @param conf the configuration to use 255 * @param opts the options to create the file with 256 * @return a new Writer 257 * @throws IOException 258 */ 259 public static Writer createWriter(Configuration conf, Writer.Option... opts 260 ) throws IOException { 261 Writer.CompressionOption compressionOption = 262 Options.getOption(Writer.CompressionOption.class, opts); 263 CompressionType kind; 264 if (compressionOption != null) { 265 kind = compressionOption.getValue(); 266 } else { 267 kind = getDefaultCompressionType(conf); 268 opts = Options.prependOptions(opts, Writer.compression(kind)); 269 } 270 switch (kind) { 271 default: 272 case NONE: 273 return new Writer(conf, opts); 274 case RECORD: 275 return new RecordCompressWriter(conf, opts); 276 case BLOCK: 277 return new BlockCompressWriter(conf, opts); 278 } 279 } 280 281 /** 282 * Construct the preferred type of SequenceFile Writer. 283 * @param fs The configured filesystem. 284 * @param conf The configuration. 285 * @param name The name of the file. 286 * @param keyClass The 'key' type. 287 * @param valClass The 'value' type. 288 * @return Returns the handle to the constructed SequenceFile Writer. 289 * @throws IOException 290 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 291 * instead. 292 */ 293 @Deprecated 294 public static Writer 295 createWriter(FileSystem fs, Configuration conf, Path name, 296 Class keyClass, Class valClass) throws IOException { 297 return createWriter(conf, Writer.filesystem(fs), 298 Writer.file(name), Writer.keyClass(keyClass), 299 Writer.valueClass(valClass)); 300 } 301 302 /** 303 * Construct the preferred type of SequenceFile Writer. 304 * @param fs The configured filesystem. 305 * @param conf The configuration. 306 * @param name The name of the file. 307 * @param keyClass The 'key' type. 308 * @param valClass The 'value' type. 309 * @param compressionType The compression type. 310 * @return Returns the handle to the constructed SequenceFile Writer. 311 * @throws IOException 312 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 313 * instead. 314 */ 315 @Deprecated 316 public static Writer 317 createWriter(FileSystem fs, Configuration conf, Path name, 318 Class keyClass, Class valClass, 319 CompressionType compressionType) throws IOException { 320 return createWriter(conf, Writer.filesystem(fs), 321 Writer.file(name), Writer.keyClass(keyClass), 322 Writer.valueClass(valClass), 323 Writer.compression(compressionType)); 324 } 325 326 /** 327 * Construct the preferred type of SequenceFile Writer. 328 * @param fs The configured filesystem. 329 * @param conf The configuration. 330 * @param name The name of the file. 331 * @param keyClass The 'key' type. 332 * @param valClass The 'value' type. 333 * @param compressionType The compression type. 334 * @param progress The Progressable object to track progress. 335 * @return Returns the handle to the constructed SequenceFile Writer. 336 * @throws IOException 337 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 338 * instead. 339 */ 340 @Deprecated 341 public static Writer 342 createWriter(FileSystem fs, Configuration conf, Path name, 343 Class keyClass, Class valClass, CompressionType compressionType, 344 Progressable progress) throws IOException { 345 return createWriter(conf, Writer.file(name), 346 Writer.filesystem(fs), 347 Writer.keyClass(keyClass), 348 Writer.valueClass(valClass), 349 Writer.compression(compressionType), 350 Writer.progressable(progress)); 351 } 352 353 /** 354 * Construct the preferred type of SequenceFile Writer. 355 * @param fs The configured filesystem. 356 * @param conf The configuration. 357 * @param name The name of the file. 358 * @param keyClass The 'key' type. 359 * @param valClass The 'value' type. 360 * @param compressionType The compression type. 361 * @param codec The compression codec. 362 * @return Returns the handle to the constructed SequenceFile Writer. 363 * @throws IOException 364 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 365 * instead. 366 */ 367 @Deprecated 368 public static Writer 369 createWriter(FileSystem fs, Configuration conf, Path name, 370 Class keyClass, Class valClass, CompressionType compressionType, 371 CompressionCodec codec) throws IOException { 372 return createWriter(conf, Writer.file(name), 373 Writer.filesystem(fs), 374 Writer.keyClass(keyClass), 375 Writer.valueClass(valClass), 376 Writer.compression(compressionType, codec)); 377 } 378 379 /** 380 * Construct the preferred type of SequenceFile Writer. 381 * @param fs The configured filesystem. 382 * @param conf The configuration. 383 * @param name The name of the file. 384 * @param keyClass The 'key' type. 385 * @param valClass The 'value' type. 386 * @param compressionType The compression type. 387 * @param codec The compression codec. 388 * @param progress The Progressable object to track progress. 389 * @param metadata The metadata of the file. 390 * @return Returns the handle to the constructed SequenceFile Writer. 391 * @throws IOException 392 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 393 * instead. 394 */ 395 @Deprecated 396 public static Writer 397 createWriter(FileSystem fs, Configuration conf, Path name, 398 Class keyClass, Class valClass, 399 CompressionType compressionType, CompressionCodec codec, 400 Progressable progress, Metadata metadata) throws IOException { 401 return createWriter(conf, Writer.file(name), 402 Writer.filesystem(fs), 403 Writer.keyClass(keyClass), 404 Writer.valueClass(valClass), 405 Writer.compression(compressionType, codec), 406 Writer.progressable(progress), 407 Writer.metadata(metadata)); 408 } 409 410 /** 411 * Construct the preferred type of SequenceFile Writer. 412 * @param fs The configured filesystem. 413 * @param conf The configuration. 414 * @param name The name of the file. 415 * @param keyClass The 'key' type. 416 * @param valClass The 'value' type. 417 * @param bufferSize buffer size for the underlaying outputstream. 418 * @param replication replication factor for the file. 419 * @param blockSize block size for the file. 420 * @param compressionType The compression type. 421 * @param codec The compression codec. 422 * @param progress The Progressable object to track progress. 423 * @param metadata The metadata of the file. 424 * @return Returns the handle to the constructed SequenceFile Writer. 425 * @throws IOException 426 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 427 * instead. 428 */ 429 @Deprecated 430 public static Writer 431 createWriter(FileSystem fs, Configuration conf, Path name, 432 Class keyClass, Class valClass, int bufferSize, 433 short replication, long blockSize, 434 CompressionType compressionType, CompressionCodec codec, 435 Progressable progress, Metadata metadata) throws IOException { 436 return createWriter(conf, Writer.file(name), 437 Writer.filesystem(fs), 438 Writer.keyClass(keyClass), 439 Writer.valueClass(valClass), 440 Writer.bufferSize(bufferSize), 441 Writer.replication(replication), 442 Writer.blockSize(blockSize), 443 Writer.compression(compressionType, codec), 444 Writer.progressable(progress), 445 Writer.metadata(metadata)); 446 } 447 448 /** 449 * Construct the preferred type of SequenceFile Writer. 450 * @param fs The configured filesystem. 451 * @param conf The configuration. 452 * @param name The name of the file. 453 * @param keyClass The 'key' type. 454 * @param valClass The 'value' type. 455 * @param bufferSize buffer size for the underlaying outputstream. 456 * @param replication replication factor for the file. 457 * @param blockSize block size for the file. 458 * @param createParent create parent directory if non-existent 459 * @param compressionType The compression type. 460 * @param codec The compression codec. 461 * @param metadata The metadata of the file. 462 * @return Returns the handle to the constructed SequenceFile Writer. 463 * @throws IOException 464 */ 465 @Deprecated 466 public static Writer 467 createWriter(FileSystem fs, Configuration conf, Path name, 468 Class keyClass, Class valClass, int bufferSize, 469 short replication, long blockSize, boolean createParent, 470 CompressionType compressionType, CompressionCodec codec, 471 Metadata metadata) throws IOException { 472 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 473 conf, name, keyClass, valClass, compressionType, codec, 474 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 475 CreateOpts.bufferSize(bufferSize), 476 createParent ? CreateOpts.createParent() 477 : CreateOpts.donotCreateParent(), 478 CreateOpts.repFac(replication), 479 CreateOpts.blockSize(blockSize) 480 ); 481 } 482 483 /** 484 * Construct the preferred type of SequenceFile Writer. 485 * @param fc The context for the specified file. 486 * @param conf The configuration. 487 * @param name The name of the file. 488 * @param keyClass The 'key' type. 489 * @param valClass The 'value' type. 490 * @param compressionType The compression type. 491 * @param codec The compression codec. 492 * @param metadata The metadata of the file. 493 * @param createFlag gives the semantics of create: overwrite, append etc. 494 * @param opts file creation options; see {@link CreateOpts}. 495 * @return Returns the handle to the constructed SequenceFile Writer. 496 * @throws IOException 497 */ 498 public static Writer 499 createWriter(FileContext fc, Configuration conf, Path name, 500 Class keyClass, Class valClass, 501 CompressionType compressionType, CompressionCodec codec, 502 Metadata metadata, 503 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 504 throws IOException { 505 return createWriter(conf, fc.create(name, createFlag, opts), 506 keyClass, valClass, compressionType, codec, metadata).ownStream(); 507 } 508 509 /** 510 * Construct the preferred type of SequenceFile Writer. 511 * @param fs The configured filesystem. 512 * @param conf The configuration. 513 * @param name The name of the file. 514 * @param keyClass The 'key' type. 515 * @param valClass The 'value' type. 516 * @param compressionType The compression type. 517 * @param codec The compression codec. 518 * @param progress The Progressable object to track progress. 519 * @return Returns the handle to the constructed SequenceFile Writer. 520 * @throws IOException 521 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 522 * instead. 523 */ 524 @Deprecated 525 public static Writer 526 createWriter(FileSystem fs, Configuration conf, Path name, 527 Class keyClass, Class valClass, 528 CompressionType compressionType, CompressionCodec codec, 529 Progressable progress) throws IOException { 530 return createWriter(conf, Writer.file(name), 531 Writer.filesystem(fs), 532 Writer.keyClass(keyClass), 533 Writer.valueClass(valClass), 534 Writer.compression(compressionType, codec), 535 Writer.progressable(progress)); 536 } 537 538 /** 539 * Construct the preferred type of 'raw' SequenceFile Writer. 540 * @param conf The configuration. 541 * @param out The stream on top which the writer is to be constructed. 542 * @param keyClass The 'key' type. 543 * @param valClass The 'value' type. 544 * @param compressionType The compression type. 545 * @param codec The compression codec. 546 * @param metadata The metadata of the file. 547 * @return Returns the handle to the constructed SequenceFile Writer. 548 * @throws IOException 549 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 550 * instead. 551 */ 552 @Deprecated 553 public static Writer 554 createWriter(Configuration conf, FSDataOutputStream out, 555 Class keyClass, Class valClass, 556 CompressionType compressionType, 557 CompressionCodec codec, Metadata metadata) throws IOException { 558 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 559 Writer.valueClass(valClass), 560 Writer.compression(compressionType, codec), 561 Writer.metadata(metadata)); 562 } 563 564 /** 565 * Construct the preferred type of 'raw' SequenceFile Writer. 566 * @param conf The configuration. 567 * @param out The stream on top which the writer is to be constructed. 568 * @param keyClass The 'key' type. 569 * @param valClass The 'value' type. 570 * @param compressionType The compression type. 571 * @param codec The compression codec. 572 * @return Returns the handle to the constructed SequenceFile Writer. 573 * @throws IOException 574 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 575 * instead. 576 */ 577 @Deprecated 578 public static Writer 579 createWriter(Configuration conf, FSDataOutputStream out, 580 Class keyClass, Class valClass, CompressionType compressionType, 581 CompressionCodec codec) throws IOException { 582 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 583 Writer.valueClass(valClass), 584 Writer.compression(compressionType, codec)); 585 } 586 587 588 /** The interface to 'raw' values of SequenceFiles. */ 589 public static interface ValueBytes { 590 591 /** Writes the uncompressed bytes to the outStream. 592 * @param outStream : Stream to write uncompressed bytes into. 593 * @throws IOException 594 */ 595 public void writeUncompressedBytes(DataOutputStream outStream) 596 throws IOException; 597 598 /** Write compressed bytes to outStream. 599 * Note: that it will NOT compress the bytes if they are not compressed. 600 * @param outStream : Stream to write compressed bytes into. 601 */ 602 public void writeCompressedBytes(DataOutputStream outStream) 603 throws IllegalArgumentException, IOException; 604 605 /** 606 * Size of stored data. 607 */ 608 public int getSize(); 609 } 610 611 private static class UncompressedBytes implements ValueBytes { 612 private int dataSize; 613 private byte[] data; 614 615 private UncompressedBytes() { 616 data = null; 617 dataSize = 0; 618 } 619 620 private void reset(DataInputStream in, int length) throws IOException { 621 if (data == null) { 622 data = new byte[length]; 623 } else if (length > data.length) { 624 data = new byte[Math.max(length, data.length * 2)]; 625 } 626 dataSize = -1; 627 in.readFully(data, 0, length); 628 dataSize = length; 629 } 630 631 @Override 632 public int getSize() { 633 return dataSize; 634 } 635 636 @Override 637 public void writeUncompressedBytes(DataOutputStream outStream) 638 throws IOException { 639 outStream.write(data, 0, dataSize); 640 } 641 642 @Override 643 public void writeCompressedBytes(DataOutputStream outStream) 644 throws IllegalArgumentException, IOException { 645 throw 646 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 647 } 648 649 } // UncompressedBytes 650 651 private static class CompressedBytes implements ValueBytes { 652 private int dataSize; 653 private byte[] data; 654 DataInputBuffer rawData = null; 655 CompressionCodec codec = null; 656 CompressionInputStream decompressedStream = null; 657 658 private CompressedBytes(CompressionCodec codec) { 659 data = null; 660 dataSize = 0; 661 this.codec = codec; 662 } 663 664 private void reset(DataInputStream in, int length) throws IOException { 665 if (data == null) { 666 data = new byte[length]; 667 } else if (length > data.length) { 668 data = new byte[Math.max(length, data.length * 2)]; 669 } 670 dataSize = -1; 671 in.readFully(data, 0, length); 672 dataSize = length; 673 } 674 675 @Override 676 public int getSize() { 677 return dataSize; 678 } 679 680 @Override 681 public void writeUncompressedBytes(DataOutputStream outStream) 682 throws IOException { 683 if (decompressedStream == null) { 684 rawData = new DataInputBuffer(); 685 decompressedStream = codec.createInputStream(rawData); 686 } else { 687 decompressedStream.resetState(); 688 } 689 rawData.reset(data, 0, dataSize); 690 691 byte[] buffer = new byte[8192]; 692 int bytesRead = 0; 693 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 694 outStream.write(buffer, 0, bytesRead); 695 } 696 } 697 698 @Override 699 public void writeCompressedBytes(DataOutputStream outStream) 700 throws IllegalArgumentException, IOException { 701 outStream.write(data, 0, dataSize); 702 } 703 704 } // CompressedBytes 705 706 /** 707 * The class encapsulating with the metadata of a file. 708 * The metadata of a file is a list of attribute name/value 709 * pairs of Text type. 710 * 711 */ 712 public static class Metadata implements Writable { 713 714 private TreeMap<Text, Text> theMetadata; 715 716 public Metadata() { 717 this(new TreeMap<Text, Text>()); 718 } 719 720 public Metadata(TreeMap<Text, Text> arg) { 721 if (arg == null) { 722 this.theMetadata = new TreeMap<Text, Text>(); 723 } else { 724 this.theMetadata = arg; 725 } 726 } 727 728 public Text get(Text name) { 729 return this.theMetadata.get(name); 730 } 731 732 public void set(Text name, Text value) { 733 this.theMetadata.put(name, value); 734 } 735 736 public TreeMap<Text, Text> getMetadata() { 737 return new TreeMap<Text, Text>(this.theMetadata); 738 } 739 740 @Override 741 public void write(DataOutput out) throws IOException { 742 out.writeInt(this.theMetadata.size()); 743 Iterator<Map.Entry<Text, Text>> iter = 744 this.theMetadata.entrySet().iterator(); 745 while (iter.hasNext()) { 746 Map.Entry<Text, Text> en = iter.next(); 747 en.getKey().write(out); 748 en.getValue().write(out); 749 } 750 } 751 752 @Override 753 public void readFields(DataInput in) throws IOException { 754 int sz = in.readInt(); 755 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 756 this.theMetadata = new TreeMap<Text, Text>(); 757 for (int i = 0; i < sz; i++) { 758 Text key = new Text(); 759 Text val = new Text(); 760 key.readFields(in); 761 val.readFields(in); 762 this.theMetadata.put(key, val); 763 } 764 } 765 766 @Override 767 public boolean equals(Object other) { 768 if (other == null) { 769 return false; 770 } 771 if (other.getClass() != this.getClass()) { 772 return false; 773 } else { 774 return equals((Metadata)other); 775 } 776 } 777 778 public boolean equals(Metadata other) { 779 if (other == null) return false; 780 if (this.theMetadata.size() != other.theMetadata.size()) { 781 return false; 782 } 783 Iterator<Map.Entry<Text, Text>> iter1 = 784 this.theMetadata.entrySet().iterator(); 785 Iterator<Map.Entry<Text, Text>> iter2 = 786 other.theMetadata.entrySet().iterator(); 787 while (iter1.hasNext() && iter2.hasNext()) { 788 Map.Entry<Text, Text> en1 = iter1.next(); 789 Map.Entry<Text, Text> en2 = iter2.next(); 790 if (!en1.getKey().equals(en2.getKey())) { 791 return false; 792 } 793 if (!en1.getValue().equals(en2.getValue())) { 794 return false; 795 } 796 } 797 if (iter1.hasNext() || iter2.hasNext()) { 798 return false; 799 } 800 return true; 801 } 802 803 @Override 804 public int hashCode() { 805 assert false : "hashCode not designed"; 806 return 42; // any arbitrary constant will do 807 } 808 809 @Override 810 public String toString() { 811 StringBuilder sb = new StringBuilder(); 812 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 813 Iterator<Map.Entry<Text, Text>> iter = 814 this.theMetadata.entrySet().iterator(); 815 while (iter.hasNext()) { 816 Map.Entry<Text, Text> en = iter.next(); 817 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 818 sb.append("\n"); 819 } 820 return sb.toString(); 821 } 822 } 823 824 /** Write key/value pairs to a sequence-format file. */ 825 public static class Writer implements java.io.Closeable, Syncable { 826 private Configuration conf; 827 FSDataOutputStream out; 828 boolean ownOutputStream = true; 829 DataOutputBuffer buffer = new DataOutputBuffer(); 830 831 Class keyClass; 832 Class valClass; 833 834 private final CompressionType compress; 835 CompressionCodec codec = null; 836 CompressionOutputStream deflateFilter = null; 837 DataOutputStream deflateOut = null; 838 Metadata metadata = null; 839 Compressor compressor = null; 840 841 private boolean appendMode = false; 842 843 protected Serializer keySerializer; 844 protected Serializer uncompressedValSerializer; 845 protected Serializer compressedValSerializer; 846 847 // Insert a globally unique 16-byte value every few entries, so that one 848 // can seek into the middle of a file and then synchronize with record 849 // starts and ends by scanning for this value. 850 long lastSyncPos; // position of last sync 851 byte[] sync; // 16 random bytes 852 { 853 try { 854 MessageDigest digester = MessageDigest.getInstance("MD5"); 855 long time = Time.now(); 856 digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8)); 857 sync = digester.digest(); 858 } catch (Exception e) { 859 throw new RuntimeException(e); 860 } 861 } 862 863 public static interface Option {} 864 865 static class FileOption extends Options.PathOption 866 implements Option { 867 FileOption(Path path) { 868 super(path); 869 } 870 } 871 872 /** 873 * @deprecated only used for backwards-compatibility in the createWriter methods 874 * that take FileSystem. 875 */ 876 @Deprecated 877 private static class FileSystemOption implements Option { 878 private final FileSystem value; 879 protected FileSystemOption(FileSystem value) { 880 this.value = value; 881 } 882 public FileSystem getValue() { 883 return value; 884 } 885 } 886 887 static class StreamOption extends Options.FSDataOutputStreamOption 888 implements Option { 889 StreamOption(FSDataOutputStream stream) { 890 super(stream); 891 } 892 } 893 894 static class BufferSizeOption extends Options.IntegerOption 895 implements Option { 896 BufferSizeOption(int value) { 897 super(value); 898 } 899 } 900 901 static class BlockSizeOption extends Options.LongOption implements Option { 902 BlockSizeOption(long value) { 903 super(value); 904 } 905 } 906 907 static class ReplicationOption extends Options.IntegerOption 908 implements Option { 909 ReplicationOption(int value) { 910 super(value); 911 } 912 } 913 914 static class AppendIfExistsOption extends Options.BooleanOption implements 915 Option { 916 AppendIfExistsOption(boolean value) { 917 super(value); 918 } 919 } 920 921 static class KeyClassOption extends Options.ClassOption implements Option { 922 KeyClassOption(Class<?> value) { 923 super(value); 924 } 925 } 926 927 static class ValueClassOption extends Options.ClassOption 928 implements Option { 929 ValueClassOption(Class<?> value) { 930 super(value); 931 } 932 } 933 934 static class MetadataOption implements Option { 935 private final Metadata value; 936 MetadataOption(Metadata value) { 937 this.value = value; 938 } 939 Metadata getValue() { 940 return value; 941 } 942 } 943 944 static class ProgressableOption extends Options.ProgressableOption 945 implements Option { 946 ProgressableOption(Progressable value) { 947 super(value); 948 } 949 } 950 951 private static class CompressionOption implements Option { 952 private final CompressionType value; 953 private final CompressionCodec codec; 954 CompressionOption(CompressionType value) { 955 this(value, null); 956 } 957 CompressionOption(CompressionType value, CompressionCodec codec) { 958 this.value = value; 959 this.codec = (CompressionType.NONE != value && null == codec) 960 ? new DefaultCodec() 961 : codec; 962 } 963 CompressionType getValue() { 964 return value; 965 } 966 CompressionCodec getCodec() { 967 return codec; 968 } 969 } 970 971 public static Option file(Path value) { 972 return new FileOption(value); 973 } 974 975 /** 976 * @deprecated only used for backwards-compatibility in the createWriter methods 977 * that take FileSystem. 978 */ 979 @Deprecated 980 private static Option filesystem(FileSystem fs) { 981 return new SequenceFile.Writer.FileSystemOption(fs); 982 } 983 984 public static Option bufferSize(int value) { 985 return new BufferSizeOption(value); 986 } 987 988 public static Option stream(FSDataOutputStream value) { 989 return new StreamOption(value); 990 } 991 992 public static Option replication(short value) { 993 return new ReplicationOption(value); 994 } 995 996 public static Option appendIfExists(boolean value) { 997 return new AppendIfExistsOption(value); 998 } 999 1000 public static Option blockSize(long value) { 1001 return new BlockSizeOption(value); 1002 } 1003 1004 public static Option progressable(Progressable value) { 1005 return new ProgressableOption(value); 1006 } 1007 1008 public static Option keyClass(Class<?> value) { 1009 return new KeyClassOption(value); 1010 } 1011 1012 public static Option valueClass(Class<?> value) { 1013 return new ValueClassOption(value); 1014 } 1015 1016 public static Option metadata(Metadata value) { 1017 return new MetadataOption(value); 1018 } 1019 1020 public static Option compression(CompressionType value) { 1021 return new CompressionOption(value); 1022 } 1023 1024 public static Option compression(CompressionType value, 1025 CompressionCodec codec) { 1026 return new CompressionOption(value, codec); 1027 } 1028 1029 /** 1030 * Construct a uncompressed writer from a set of options. 1031 * @param conf the configuration to use 1032 * @param options the options used when creating the writer 1033 * @throws IOException if it fails 1034 */ 1035 Writer(Configuration conf, 1036 Option... opts) throws IOException { 1037 BlockSizeOption blockSizeOption = 1038 Options.getOption(BlockSizeOption.class, opts); 1039 BufferSizeOption bufferSizeOption = 1040 Options.getOption(BufferSizeOption.class, opts); 1041 ReplicationOption replicationOption = 1042 Options.getOption(ReplicationOption.class, opts); 1043 ProgressableOption progressOption = 1044 Options.getOption(ProgressableOption.class, opts); 1045 FileOption fileOption = Options.getOption(FileOption.class, opts); 1046 AppendIfExistsOption appendIfExistsOption = Options.getOption( 1047 AppendIfExistsOption.class, opts); 1048 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1049 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1050 KeyClassOption keyClassOption = 1051 Options.getOption(KeyClassOption.class, opts); 1052 ValueClassOption valueClassOption = 1053 Options.getOption(ValueClassOption.class, opts); 1054 MetadataOption metadataOption = 1055 Options.getOption(MetadataOption.class, opts); 1056 CompressionOption compressionTypeOption = 1057 Options.getOption(CompressionOption.class, opts); 1058 // check consistency of options 1059 if ((fileOption == null) == (streamOption == null)) { 1060 throw new IllegalArgumentException("file or stream must be specified"); 1061 } 1062 if (fileOption == null && (blockSizeOption != null || 1063 bufferSizeOption != null || 1064 replicationOption != null || 1065 progressOption != null)) { 1066 throw new IllegalArgumentException("file modifier options not " + 1067 "compatible with stream"); 1068 } 1069 1070 FSDataOutputStream out; 1071 boolean ownStream = fileOption != null; 1072 if (ownStream) { 1073 Path p = fileOption.getValue(); 1074 FileSystem fs; 1075 if (fsOption != null) { 1076 fs = fsOption.getValue(); 1077 } else { 1078 fs = p.getFileSystem(conf); 1079 } 1080 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1081 bufferSizeOption.getValue(); 1082 short replication = replicationOption == null ? 1083 fs.getDefaultReplication(p) : 1084 (short) replicationOption.getValue(); 1085 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1086 blockSizeOption.getValue(); 1087 Progressable progress = progressOption == null ? null : 1088 progressOption.getValue(); 1089 1090 if (appendIfExistsOption != null && appendIfExistsOption.getValue() 1091 && fs.exists(p)) { 1092 1093 // Read the file and verify header details 1094 SequenceFile.Reader reader = new SequenceFile.Reader(conf, 1095 SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption()); 1096 try { 1097 1098 if (keyClassOption.getValue() != reader.getKeyClass() 1099 || valueClassOption.getValue() != reader.getValueClass()) { 1100 throw new IllegalArgumentException( 1101 "Key/value class provided does not match the file"); 1102 } 1103 1104 if (reader.getVersion() != VERSION[3]) { 1105 throw new VersionMismatchException(VERSION[3], 1106 reader.getVersion()); 1107 } 1108 1109 if (metadataOption != null) { 1110 LOG.info("MetaData Option is ignored during append"); 1111 } 1112 metadataOption = (MetadataOption) SequenceFile.Writer 1113 .metadata(reader.getMetadata()); 1114 1115 CompressionOption readerCompressionOption = new CompressionOption( 1116 reader.getCompressionType(), reader.getCompressionCodec()); 1117 1118 // Codec comparison will be ignored if the compression is NONE 1119 if (readerCompressionOption.value != compressionTypeOption.value 1120 || (readerCompressionOption.value != CompressionType.NONE 1121 && readerCompressionOption.codec 1122 .getClass() != compressionTypeOption.codec 1123 .getClass())) { 1124 throw new IllegalArgumentException( 1125 "Compression option provided does not match the file"); 1126 } 1127 1128 sync = reader.getSync(); 1129 1130 } finally { 1131 reader.close(); 1132 } 1133 1134 out = fs.append(p, bufferSize, progress); 1135 this.appendMode = true; 1136 } else { 1137 out = fs 1138 .create(p, true, bufferSize, replication, blockSize, progress); 1139 } 1140 } else { 1141 out = streamOption.getValue(); 1142 } 1143 Class<?> keyClass = keyClassOption == null ? 1144 Object.class : keyClassOption.getValue(); 1145 Class<?> valueClass = valueClassOption == null ? 1146 Object.class : valueClassOption.getValue(); 1147 Metadata metadata = metadataOption == null ? 1148 new Metadata() : metadataOption.getValue(); 1149 this.compress = compressionTypeOption.getValue(); 1150 final CompressionCodec codec = compressionTypeOption.getCodec(); 1151 if (codec != null && 1152 (codec instanceof GzipCodec) && 1153 !NativeCodeLoader.isNativeCodeLoaded() && 1154 !ZlibFactory.isNativeZlibLoaded(conf)) { 1155 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1156 "GzipCodec without native-hadoop " + 1157 "code!"); 1158 } 1159 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1160 } 1161 1162 /** Create the named file. 1163 * @deprecated Use 1164 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1165 * instead. 1166 */ 1167 @Deprecated 1168 public Writer(FileSystem fs, Configuration conf, Path name, 1169 Class keyClass, Class valClass) throws IOException { 1170 this.compress = CompressionType.NONE; 1171 init(conf, fs.create(name), true, keyClass, valClass, null, 1172 new Metadata()); 1173 } 1174 1175 /** Create the named file with write-progress reporter. 1176 * @deprecated Use 1177 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1178 * instead. 1179 */ 1180 @Deprecated 1181 public Writer(FileSystem fs, Configuration conf, Path name, 1182 Class keyClass, Class valClass, 1183 Progressable progress, Metadata metadata) throws IOException { 1184 this.compress = CompressionType.NONE; 1185 init(conf, fs.create(name, progress), true, keyClass, valClass, 1186 null, metadata); 1187 } 1188 1189 /** Create the named file with write-progress reporter. 1190 * @deprecated Use 1191 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1192 * instead. 1193 */ 1194 @Deprecated 1195 public Writer(FileSystem fs, Configuration conf, Path name, 1196 Class keyClass, Class valClass, 1197 int bufferSize, short replication, long blockSize, 1198 Progressable progress, Metadata metadata) throws IOException { 1199 this.compress = CompressionType.NONE; 1200 init(conf, 1201 fs.create(name, true, bufferSize, replication, blockSize, progress), 1202 true, keyClass, valClass, null, metadata); 1203 } 1204 1205 boolean isCompressed() { return compress != CompressionType.NONE; } 1206 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1207 1208 Writer ownStream() { this.ownOutputStream = true; return this; } 1209 1210 /** Write and flush the file header. */ 1211 private void writeFileHeader() 1212 throws IOException { 1213 out.write(VERSION); 1214 Text.writeString(out, keyClass.getName()); 1215 Text.writeString(out, valClass.getName()); 1216 1217 out.writeBoolean(this.isCompressed()); 1218 out.writeBoolean(this.isBlockCompressed()); 1219 1220 if (this.isCompressed()) { 1221 Text.writeString(out, (codec.getClass()).getName()); 1222 } 1223 this.metadata.write(out); 1224 out.write(sync); // write the sync bytes 1225 out.flush(); // flush header 1226 } 1227 1228 /** Initialize. */ 1229 @SuppressWarnings("unchecked") 1230 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1231 Class keyClass, Class valClass, 1232 CompressionCodec codec, Metadata metadata) 1233 throws IOException { 1234 this.conf = conf; 1235 this.out = out; 1236 this.ownOutputStream = ownStream; 1237 this.keyClass = keyClass; 1238 this.valClass = valClass; 1239 this.codec = codec; 1240 this.metadata = metadata; 1241 SerializationFactory serializationFactory = new SerializationFactory(conf); 1242 this.keySerializer = serializationFactory.getSerializer(keyClass); 1243 if (this.keySerializer == null) { 1244 throw new IOException( 1245 "Could not find a serializer for the Key class: '" 1246 + keyClass.getCanonicalName() + "'. " 1247 + "Please ensure that the configuration '" + 1248 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1249 + "properly configured, if you're using" 1250 + "custom serialization."); 1251 } 1252 this.keySerializer.open(buffer); 1253 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1254 if (this.uncompressedValSerializer == null) { 1255 throw new IOException( 1256 "Could not find a serializer for the Value class: '" 1257 + valClass.getCanonicalName() + "'. " 1258 + "Please ensure that the configuration '" + 1259 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1260 + "properly configured, if you're using" 1261 + "custom serialization."); 1262 } 1263 this.uncompressedValSerializer.open(buffer); 1264 if (this.codec != null) { 1265 ReflectionUtils.setConf(this.codec, this.conf); 1266 this.compressor = CodecPool.getCompressor(this.codec); 1267 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1268 this.deflateOut = 1269 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1270 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1271 if (this.compressedValSerializer == null) { 1272 throw new IOException( 1273 "Could not find a serializer for the Value class: '" 1274 + valClass.getCanonicalName() + "'. " 1275 + "Please ensure that the configuration '" + 1276 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1277 + "properly configured, if you're using" 1278 + "custom serialization."); 1279 } 1280 this.compressedValSerializer.open(deflateOut); 1281 } 1282 1283 if (appendMode) { 1284 sync(); 1285 } else { 1286 writeFileHeader(); 1287 } 1288 } 1289 1290 /** Returns the class of keys in this file. */ 1291 public Class getKeyClass() { return keyClass; } 1292 1293 /** Returns the class of values in this file. */ 1294 public Class getValueClass() { return valClass; } 1295 1296 /** Returns the compression codec of data in this file. */ 1297 public CompressionCodec getCompressionCodec() { return codec; } 1298 1299 /** create a sync point */ 1300 public void sync() throws IOException { 1301 if (sync != null && lastSyncPos != out.getPos()) { 1302 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1303 out.write(sync); // write sync 1304 lastSyncPos = out.getPos(); // update lastSyncPos 1305 } 1306 } 1307 1308 /** 1309 * flush all currently written data to the file system 1310 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1311 */ 1312 @Deprecated 1313 public void syncFs() throws IOException { 1314 if (out != null) { 1315 out.hflush(); // flush contents to file system 1316 } 1317 } 1318 1319 @Override 1320 public void hsync() throws IOException { 1321 if (out != null) { 1322 out.hsync(); 1323 } 1324 } 1325 1326 @Override 1327 public void hflush() throws IOException { 1328 if (out != null) { 1329 out.hflush(); 1330 } 1331 } 1332 1333 /** Returns the configuration of this file. */ 1334 Configuration getConf() { return conf; } 1335 1336 /** Close the file. */ 1337 @Override 1338 public synchronized void close() throws IOException { 1339 keySerializer.close(); 1340 uncompressedValSerializer.close(); 1341 if (compressedValSerializer != null) { 1342 compressedValSerializer.close(); 1343 } 1344 1345 CodecPool.returnCompressor(compressor); 1346 compressor = null; 1347 1348 if (out != null) { 1349 1350 // Close the underlying stream iff we own it... 1351 if (ownOutputStream) { 1352 out.close(); 1353 } else { 1354 out.flush(); 1355 } 1356 out = null; 1357 } 1358 } 1359 1360 synchronized void checkAndWriteSync() throws IOException { 1361 if (sync != null && 1362 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1363 sync(); 1364 } 1365 } 1366 1367 /** Append a key/value pair. */ 1368 public void append(Writable key, Writable val) 1369 throws IOException { 1370 append((Object) key, (Object) val); 1371 } 1372 1373 /** Append a key/value pair. */ 1374 @SuppressWarnings("unchecked") 1375 public synchronized void append(Object key, Object val) 1376 throws IOException { 1377 if (key.getClass() != keyClass) 1378 throw new IOException("wrong key class: "+key.getClass().getName() 1379 +" is not "+keyClass); 1380 if (val.getClass() != valClass) 1381 throw new IOException("wrong value class: "+val.getClass().getName() 1382 +" is not "+valClass); 1383 1384 buffer.reset(); 1385 1386 // Append the 'key' 1387 keySerializer.serialize(key); 1388 int keyLength = buffer.getLength(); 1389 if (keyLength < 0) 1390 throw new IOException("negative length keys not allowed: " + key); 1391 1392 // Append the 'value' 1393 if (compress == CompressionType.RECORD) { 1394 deflateFilter.resetState(); 1395 compressedValSerializer.serialize(val); 1396 deflateOut.flush(); 1397 deflateFilter.finish(); 1398 } else { 1399 uncompressedValSerializer.serialize(val); 1400 } 1401 1402 // Write the record out 1403 checkAndWriteSync(); // sync 1404 out.writeInt(buffer.getLength()); // total record length 1405 out.writeInt(keyLength); // key portion length 1406 out.write(buffer.getData(), 0, buffer.getLength()); // data 1407 } 1408 1409 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1410 int keyLength, ValueBytes val) throws IOException { 1411 if (keyLength < 0) 1412 throw new IOException("negative length keys not allowed: " + keyLength); 1413 1414 int valLength = val.getSize(); 1415 1416 checkAndWriteSync(); 1417 1418 out.writeInt(keyLength+valLength); // total record length 1419 out.writeInt(keyLength); // key portion length 1420 out.write(keyData, keyOffset, keyLength); // key 1421 val.writeUncompressedBytes(out); // value 1422 } 1423 1424 /** Returns the current length of the output file. 1425 * 1426 * <p>This always returns a synchronized position. In other words, 1427 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1428 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1429 * the key may be earlier in the file than key last written when this 1430 * method was called (e.g., with block-compression, it may be the first key 1431 * in the block that was being written when this method was called). 1432 */ 1433 public synchronized long getLength() throws IOException { 1434 return out.getPos(); 1435 } 1436 1437 } // class Writer 1438 1439 /** Write key/compressed-value pairs to a sequence-format file. */ 1440 static class RecordCompressWriter extends Writer { 1441 1442 RecordCompressWriter(Configuration conf, 1443 Option... options) throws IOException { 1444 super(conf, options); 1445 } 1446 1447 /** Append a key/value pair. */ 1448 @Override 1449 @SuppressWarnings("unchecked") 1450 public synchronized void append(Object key, Object val) 1451 throws IOException { 1452 if (key.getClass() != keyClass) 1453 throw new IOException("wrong key class: "+key.getClass().getName() 1454 +" is not "+keyClass); 1455 if (val.getClass() != valClass) 1456 throw new IOException("wrong value class: "+val.getClass().getName() 1457 +" is not "+valClass); 1458 1459 buffer.reset(); 1460 1461 // Append the 'key' 1462 keySerializer.serialize(key); 1463 int keyLength = buffer.getLength(); 1464 if (keyLength < 0) 1465 throw new IOException("negative length keys not allowed: " + key); 1466 1467 // Compress 'value' and append it 1468 deflateFilter.resetState(); 1469 compressedValSerializer.serialize(val); 1470 deflateOut.flush(); 1471 deflateFilter.finish(); 1472 1473 // Write the record out 1474 checkAndWriteSync(); // sync 1475 out.writeInt(buffer.getLength()); // total record length 1476 out.writeInt(keyLength); // key portion length 1477 out.write(buffer.getData(), 0, buffer.getLength()); // data 1478 } 1479 1480 /** Append a key/value pair. */ 1481 @Override 1482 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1483 int keyLength, ValueBytes val) throws IOException { 1484 1485 if (keyLength < 0) 1486 throw new IOException("negative length keys not allowed: " + keyLength); 1487 1488 int valLength = val.getSize(); 1489 1490 checkAndWriteSync(); // sync 1491 out.writeInt(keyLength+valLength); // total record length 1492 out.writeInt(keyLength); // key portion length 1493 out.write(keyData, keyOffset, keyLength); // 'key' data 1494 val.writeCompressedBytes(out); // 'value' data 1495 } 1496 1497 } // RecordCompressionWriter 1498 1499 /** Write compressed key/value blocks to a sequence-format file. */ 1500 static class BlockCompressWriter extends Writer { 1501 1502 private int noBufferedRecords = 0; 1503 1504 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1505 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1506 1507 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1508 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1509 1510 private final int compressionBlockSize; 1511 1512 BlockCompressWriter(Configuration conf, 1513 Option... options) throws IOException { 1514 super(conf, options); 1515 compressionBlockSize = 1516 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1517 keySerializer.close(); 1518 keySerializer.open(keyBuffer); 1519 uncompressedValSerializer.close(); 1520 uncompressedValSerializer.open(valBuffer); 1521 } 1522 1523 /** Workhorse to check and write out compressed data/lengths */ 1524 private synchronized 1525 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1526 throws IOException { 1527 deflateFilter.resetState(); 1528 buffer.reset(); 1529 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1530 uncompressedDataBuffer.getLength()); 1531 deflateOut.flush(); 1532 deflateFilter.finish(); 1533 1534 WritableUtils.writeVInt(out, buffer.getLength()); 1535 out.write(buffer.getData(), 0, buffer.getLength()); 1536 } 1537 1538 /** Compress and flush contents to dfs */ 1539 @Override 1540 public synchronized void sync() throws IOException { 1541 if (noBufferedRecords > 0) { 1542 super.sync(); 1543 1544 // No. of records 1545 WritableUtils.writeVInt(out, noBufferedRecords); 1546 1547 // Write 'keys' and lengths 1548 writeBuffer(keyLenBuffer); 1549 writeBuffer(keyBuffer); 1550 1551 // Write 'values' and lengths 1552 writeBuffer(valLenBuffer); 1553 writeBuffer(valBuffer); 1554 1555 // Flush the file-stream 1556 out.flush(); 1557 1558 // Reset internal states 1559 keyLenBuffer.reset(); 1560 keyBuffer.reset(); 1561 valLenBuffer.reset(); 1562 valBuffer.reset(); 1563 noBufferedRecords = 0; 1564 } 1565 1566 } 1567 1568 /** Close the file. */ 1569 @Override 1570 public synchronized void close() throws IOException { 1571 if (out != null) { 1572 sync(); 1573 } 1574 super.close(); 1575 } 1576 1577 /** Append a key/value pair. */ 1578 @Override 1579 @SuppressWarnings("unchecked") 1580 public synchronized void append(Object key, Object val) 1581 throws IOException { 1582 if (key.getClass() != keyClass) 1583 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1584 if (val.getClass() != valClass) 1585 throw new IOException("wrong value class: "+val+" is not "+valClass); 1586 1587 // Save key/value into respective buffers 1588 int oldKeyLength = keyBuffer.getLength(); 1589 keySerializer.serialize(key); 1590 int keyLength = keyBuffer.getLength() - oldKeyLength; 1591 if (keyLength < 0) 1592 throw new IOException("negative length keys not allowed: " + key); 1593 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1594 1595 int oldValLength = valBuffer.getLength(); 1596 uncompressedValSerializer.serialize(val); 1597 int valLength = valBuffer.getLength() - oldValLength; 1598 WritableUtils.writeVInt(valLenBuffer, valLength); 1599 1600 // Added another key/value pair 1601 ++noBufferedRecords; 1602 1603 // Compress and flush? 1604 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1605 if (currentBlockSize >= compressionBlockSize) { 1606 sync(); 1607 } 1608 } 1609 1610 /** Append a key/value pair. */ 1611 @Override 1612 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1613 int keyLength, ValueBytes val) throws IOException { 1614 1615 if (keyLength < 0) 1616 throw new IOException("negative length keys not allowed"); 1617 1618 int valLength = val.getSize(); 1619 1620 // Save key/value data in relevant buffers 1621 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1622 keyBuffer.write(keyData, keyOffset, keyLength); 1623 WritableUtils.writeVInt(valLenBuffer, valLength); 1624 val.writeUncompressedBytes(valBuffer); 1625 1626 // Added another key/value pair 1627 ++noBufferedRecords; 1628 1629 // Compress and flush? 1630 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1631 if (currentBlockSize >= compressionBlockSize) { 1632 sync(); 1633 } 1634 } 1635 1636 } // BlockCompressionWriter 1637 1638 /** Get the configured buffer size */ 1639 private static int getBufferSize(Configuration conf) { 1640 return conf.getInt("io.file.buffer.size", 4096); 1641 } 1642 1643 /** Reads key/value pairs from a sequence-format file. */ 1644 public static class Reader implements java.io.Closeable { 1645 private String filename; 1646 private FSDataInputStream in; 1647 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1648 1649 private byte version; 1650 1651 private String keyClassName; 1652 private String valClassName; 1653 private Class keyClass; 1654 private Class valClass; 1655 1656 private CompressionCodec codec = null; 1657 private Metadata metadata = null; 1658 1659 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1660 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1661 private boolean syncSeen; 1662 1663 private long headerEnd; 1664 private long end; 1665 private int keyLength; 1666 private int recordLength; 1667 1668 private boolean decompress; 1669 private boolean blockCompressed; 1670 1671 private Configuration conf; 1672 1673 private int noBufferedRecords = 0; 1674 private boolean lazyDecompress = true; 1675 private boolean valuesDecompressed = true; 1676 1677 private int noBufferedKeys = 0; 1678 private int noBufferedValues = 0; 1679 1680 private DataInputBuffer keyLenBuffer = null; 1681 private CompressionInputStream keyLenInFilter = null; 1682 private DataInputStream keyLenIn = null; 1683 private Decompressor keyLenDecompressor = null; 1684 private DataInputBuffer keyBuffer = null; 1685 private CompressionInputStream keyInFilter = null; 1686 private DataInputStream keyIn = null; 1687 private Decompressor keyDecompressor = null; 1688 1689 private DataInputBuffer valLenBuffer = null; 1690 private CompressionInputStream valLenInFilter = null; 1691 private DataInputStream valLenIn = null; 1692 private Decompressor valLenDecompressor = null; 1693 private DataInputBuffer valBuffer = null; 1694 private CompressionInputStream valInFilter = null; 1695 private DataInputStream valIn = null; 1696 private Decompressor valDecompressor = null; 1697 1698 private Deserializer keyDeserializer; 1699 private Deserializer valDeserializer; 1700 1701 /** 1702 * A tag interface for all of the Reader options 1703 */ 1704 public static interface Option {} 1705 1706 /** 1707 * Create an option to specify the path name of the sequence file. 1708 * @param value the path to read 1709 * @return a new option 1710 */ 1711 public static Option file(Path value) { 1712 return new FileOption(value); 1713 } 1714 1715 /** 1716 * Create an option to specify the stream with the sequence file. 1717 * @param value the stream to read. 1718 * @return a new option 1719 */ 1720 public static Option stream(FSDataInputStream value) { 1721 return new InputStreamOption(value); 1722 } 1723 1724 /** 1725 * Create an option to specify the starting byte to read. 1726 * @param value the number of bytes to skip over 1727 * @return a new option 1728 */ 1729 public static Option start(long value) { 1730 return new StartOption(value); 1731 } 1732 1733 /** 1734 * Create an option to specify the number of bytes to read. 1735 * @param value the number of bytes to read 1736 * @return a new option 1737 */ 1738 public static Option length(long value) { 1739 return new LengthOption(value); 1740 } 1741 1742 /** 1743 * Create an option with the buffer size for reading the given pathname. 1744 * @param value the number of bytes to buffer 1745 * @return a new option 1746 */ 1747 public static Option bufferSize(int value) { 1748 return new BufferSizeOption(value); 1749 } 1750 1751 private static class FileOption extends Options.PathOption 1752 implements Option { 1753 private FileOption(Path value) { 1754 super(value); 1755 } 1756 } 1757 1758 private static class InputStreamOption 1759 extends Options.FSDataInputStreamOption 1760 implements Option { 1761 private InputStreamOption(FSDataInputStream value) { 1762 super(value); 1763 } 1764 } 1765 1766 private static class StartOption extends Options.LongOption 1767 implements Option { 1768 private StartOption(long value) { 1769 super(value); 1770 } 1771 } 1772 1773 private static class LengthOption extends Options.LongOption 1774 implements Option { 1775 private LengthOption(long value) { 1776 super(value); 1777 } 1778 } 1779 1780 private static class BufferSizeOption extends Options.IntegerOption 1781 implements Option { 1782 private BufferSizeOption(int value) { 1783 super(value); 1784 } 1785 } 1786 1787 // only used directly 1788 private static class OnlyHeaderOption extends Options.BooleanOption 1789 implements Option { 1790 private OnlyHeaderOption() { 1791 super(true); 1792 } 1793 } 1794 1795 public Reader(Configuration conf, Option... opts) throws IOException { 1796 // Look up the options, these are null if not set 1797 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1798 InputStreamOption streamOpt = 1799 Options.getOption(InputStreamOption.class, opts); 1800 StartOption startOpt = Options.getOption(StartOption.class, opts); 1801 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1802 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1803 OnlyHeaderOption headerOnly = 1804 Options.getOption(OnlyHeaderOption.class, opts); 1805 // check for consistency 1806 if ((fileOpt == null) == (streamOpt == null)) { 1807 throw new 1808 IllegalArgumentException("File or stream option must be specified"); 1809 } 1810 if (fileOpt == null && bufOpt != null) { 1811 throw new IllegalArgumentException("buffer size can only be set when" + 1812 " a file is specified."); 1813 } 1814 // figure out the real values 1815 Path filename = null; 1816 FSDataInputStream file; 1817 final long len; 1818 if (fileOpt != null) { 1819 filename = fileOpt.getValue(); 1820 FileSystem fs = filename.getFileSystem(conf); 1821 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1822 len = null == lenOpt 1823 ? fs.getFileStatus(filename).getLen() 1824 : lenOpt.getValue(); 1825 file = openFile(fs, filename, bufSize, len); 1826 } else { 1827 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1828 file = streamOpt.getValue(); 1829 } 1830 long start = startOpt == null ? 0 : startOpt.getValue(); 1831 // really set up 1832 initialize(filename, file, start, len, conf, headerOnly != null); 1833 } 1834 1835 /** 1836 * Construct a reader by opening a file from the given file system. 1837 * @param fs The file system used to open the file. 1838 * @param file The file being read. 1839 * @param conf Configuration 1840 * @throws IOException 1841 * @deprecated Use Reader(Configuration, Option...) instead. 1842 */ 1843 @Deprecated 1844 public Reader(FileSystem fs, Path file, 1845 Configuration conf) throws IOException { 1846 this(conf, file(file.makeQualified(fs))); 1847 } 1848 1849 /** 1850 * Construct a reader by the given input stream. 1851 * @param in An input stream. 1852 * @param buffersize unused 1853 * @param start The starting position. 1854 * @param length The length being read. 1855 * @param conf Configuration 1856 * @throws IOException 1857 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1858 */ 1859 @Deprecated 1860 public Reader(FSDataInputStream in, int buffersize, 1861 long start, long length, Configuration conf) throws IOException { 1862 this(conf, stream(in), start(start), length(length)); 1863 } 1864 1865 /** Common work of the constructors. */ 1866 private void initialize(Path filename, FSDataInputStream in, 1867 long start, long length, Configuration conf, 1868 boolean tempReader) throws IOException { 1869 if (in == null) { 1870 throw new IllegalArgumentException("in == null"); 1871 } 1872 this.filename = filename == null ? "<unknown>" : filename.toString(); 1873 this.in = in; 1874 this.conf = conf; 1875 boolean succeeded = false; 1876 try { 1877 seek(start); 1878 this.end = this.in.getPos() + length; 1879 // if it wrapped around, use the max 1880 if (end < length) { 1881 end = Long.MAX_VALUE; 1882 } 1883 init(tempReader); 1884 succeeded = true; 1885 } finally { 1886 if (!succeeded) { 1887 IOUtils.cleanup(LOG, this.in); 1888 } 1889 } 1890 } 1891 1892 /** 1893 * Override this method to specialize the type of 1894 * {@link FSDataInputStream} returned. 1895 * @param fs The file system used to open the file. 1896 * @param file The file being read. 1897 * @param bufferSize The buffer size used to read the file. 1898 * @param length The length being read if it is >= 0. Otherwise, 1899 * the length is not available. 1900 * @return The opened stream. 1901 * @throws IOException 1902 */ 1903 protected FSDataInputStream openFile(FileSystem fs, Path file, 1904 int bufferSize, long length) throws IOException { 1905 return fs.open(file, bufferSize); 1906 } 1907 1908 /** 1909 * Initialize the {@link Reader} 1910 * @param tmpReader <code>true</code> if we are constructing a temporary 1911 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1912 * and hence do not initialize every component; 1913 * <code>false</code> otherwise. 1914 * @throws IOException 1915 */ 1916 private void init(boolean tempReader) throws IOException { 1917 byte[] versionBlock = new byte[VERSION.length]; 1918 String exceptionMsg = this + " not a SequenceFile"; 1919 1920 // Try to read sequence file header. 1921 try { 1922 in.readFully(versionBlock); 1923 } catch (EOFException e) { 1924 throw new EOFException(exceptionMsg); 1925 } 1926 1927 if ((versionBlock[0] != VERSION[0]) || 1928 (versionBlock[1] != VERSION[1]) || 1929 (versionBlock[2] != VERSION[2])) { 1930 throw new IOException(this + " not a SequenceFile"); 1931 } 1932 1933 // Set 'version' 1934 version = versionBlock[3]; 1935 if (version > VERSION[3]) { 1936 throw new VersionMismatchException(VERSION[3], version); 1937 } 1938 1939 if (version < BLOCK_COMPRESS_VERSION) { 1940 UTF8 className = new UTF8(); 1941 1942 className.readFields(in); 1943 keyClassName = className.toStringChecked(); // key class name 1944 1945 className.readFields(in); 1946 valClassName = className.toStringChecked(); // val class name 1947 } else { 1948 keyClassName = Text.readString(in); 1949 valClassName = Text.readString(in); 1950 } 1951 1952 if (version > 2) { // if version > 2 1953 this.decompress = in.readBoolean(); // is compressed? 1954 } else { 1955 decompress = false; 1956 } 1957 1958 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1959 this.blockCompressed = in.readBoolean(); // is block-compressed? 1960 } else { 1961 blockCompressed = false; 1962 } 1963 1964 // if version >= 5 1965 // setup the compression codec 1966 if (decompress) { 1967 if (version >= CUSTOM_COMPRESS_VERSION) { 1968 String codecClassname = Text.readString(in); 1969 try { 1970 Class<? extends CompressionCodec> codecClass 1971 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1972 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1973 } catch (ClassNotFoundException cnfe) { 1974 throw new IllegalArgumentException("Unknown codec: " + 1975 codecClassname, cnfe); 1976 } 1977 } else { 1978 codec = new DefaultCodec(); 1979 ((Configurable)codec).setConf(conf); 1980 } 1981 } 1982 1983 this.metadata = new Metadata(); 1984 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1985 this.metadata.readFields(in); 1986 } 1987 1988 if (version > 1) { // if version > 1 1989 in.readFully(sync); // read sync bytes 1990 headerEnd = in.getPos(); // record end of header 1991 } 1992 1993 // Initialize... *not* if this we are constructing a temporary Reader 1994 if (!tempReader) { 1995 valBuffer = new DataInputBuffer(); 1996 if (decompress) { 1997 valDecompressor = CodecPool.getDecompressor(codec); 1998 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1999 valIn = new DataInputStream(valInFilter); 2000 } else { 2001 valIn = valBuffer; 2002 } 2003 2004 if (blockCompressed) { 2005 keyLenBuffer = new DataInputBuffer(); 2006 keyBuffer = new DataInputBuffer(); 2007 valLenBuffer = new DataInputBuffer(); 2008 2009 keyLenDecompressor = CodecPool.getDecompressor(codec); 2010 keyLenInFilter = codec.createInputStream(keyLenBuffer, 2011 keyLenDecompressor); 2012 keyLenIn = new DataInputStream(keyLenInFilter); 2013 2014 keyDecompressor = CodecPool.getDecompressor(codec); 2015 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 2016 keyIn = new DataInputStream(keyInFilter); 2017 2018 valLenDecompressor = CodecPool.getDecompressor(codec); 2019 valLenInFilter = codec.createInputStream(valLenBuffer, 2020 valLenDecompressor); 2021 valLenIn = new DataInputStream(valLenInFilter); 2022 } 2023 2024 SerializationFactory serializationFactory = 2025 new SerializationFactory(conf); 2026 this.keyDeserializer = 2027 getDeserializer(serializationFactory, getKeyClass()); 2028 if (this.keyDeserializer == null) { 2029 throw new IOException( 2030 "Could not find a deserializer for the Key class: '" 2031 + getKeyClass().getCanonicalName() + "'. " 2032 + "Please ensure that the configuration '" + 2033 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2034 + "properly configured, if you're using " 2035 + "custom serialization."); 2036 } 2037 if (!blockCompressed) { 2038 this.keyDeserializer.open(valBuffer); 2039 } else { 2040 this.keyDeserializer.open(keyIn); 2041 } 2042 this.valDeserializer = 2043 getDeserializer(serializationFactory, getValueClass()); 2044 if (this.valDeserializer == null) { 2045 throw new IOException( 2046 "Could not find a deserializer for the Value class: '" 2047 + getValueClass().getCanonicalName() + "'. " 2048 + "Please ensure that the configuration '" + 2049 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2050 + "properly configured, if you're using " 2051 + "custom serialization."); 2052 } 2053 this.valDeserializer.open(valIn); 2054 } 2055 } 2056 2057 @SuppressWarnings("unchecked") 2058 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 2059 return sf.getDeserializer(c); 2060 } 2061 2062 /** Close the file. */ 2063 @Override 2064 public synchronized void close() throws IOException { 2065 // Return the decompressors to the pool 2066 CodecPool.returnDecompressor(keyLenDecompressor); 2067 CodecPool.returnDecompressor(keyDecompressor); 2068 CodecPool.returnDecompressor(valLenDecompressor); 2069 CodecPool.returnDecompressor(valDecompressor); 2070 keyLenDecompressor = keyDecompressor = null; 2071 valLenDecompressor = valDecompressor = null; 2072 2073 if (keyDeserializer != null) { 2074 keyDeserializer.close(); 2075 } 2076 if (valDeserializer != null) { 2077 valDeserializer.close(); 2078 } 2079 2080 // Close the input-stream 2081 in.close(); 2082 } 2083 2084 /** Returns the name of the key class. */ 2085 public String getKeyClassName() { 2086 return keyClassName; 2087 } 2088 2089 /** Returns the class of keys in this file. */ 2090 public synchronized Class<?> getKeyClass() { 2091 if (null == keyClass) { 2092 try { 2093 keyClass = WritableName.getClass(getKeyClassName(), conf); 2094 } catch (IOException e) { 2095 throw new RuntimeException(e); 2096 } 2097 } 2098 return keyClass; 2099 } 2100 2101 /** Returns the name of the value class. */ 2102 public String getValueClassName() { 2103 return valClassName; 2104 } 2105 2106 /** Returns the class of values in this file. */ 2107 public synchronized Class<?> getValueClass() { 2108 if (null == valClass) { 2109 try { 2110 valClass = WritableName.getClass(getValueClassName(), conf); 2111 } catch (IOException e) { 2112 throw new RuntimeException(e); 2113 } 2114 } 2115 return valClass; 2116 } 2117 2118 /** Returns true if values are compressed. */ 2119 public boolean isCompressed() { return decompress; } 2120 2121 /** Returns true if records are block-compressed. */ 2122 public boolean isBlockCompressed() { return blockCompressed; } 2123 2124 /** Returns the compression codec of data in this file. */ 2125 public CompressionCodec getCompressionCodec() { return codec; } 2126 2127 private byte[] getSync() { 2128 return sync; 2129 } 2130 2131 private byte getVersion() { 2132 return version; 2133 } 2134 2135 /** 2136 * Get the compression type for this file. 2137 * @return the compression type 2138 */ 2139 public CompressionType getCompressionType() { 2140 if (decompress) { 2141 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2142 } else { 2143 return CompressionType.NONE; 2144 } 2145 } 2146 2147 /** Returns the metadata object of the file */ 2148 public Metadata getMetadata() { 2149 return this.metadata; 2150 } 2151 2152 /** Returns the configuration used for this file. */ 2153 Configuration getConf() { return conf; } 2154 2155 /** Read a compressed buffer */ 2156 private synchronized void readBuffer(DataInputBuffer buffer, 2157 CompressionInputStream filter) throws IOException { 2158 // Read data into a temporary buffer 2159 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2160 2161 try { 2162 int dataBufferLength = WritableUtils.readVInt(in); 2163 dataBuffer.write(in, dataBufferLength); 2164 2165 // Set up 'buffer' connected to the input-stream 2166 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2167 } finally { 2168 dataBuffer.close(); 2169 } 2170 2171 // Reset the codec 2172 filter.resetState(); 2173 } 2174 2175 /** Read the next 'compressed' block */ 2176 private synchronized void readBlock() throws IOException { 2177 // Check if we need to throw away a whole block of 2178 // 'values' due to 'lazy decompression' 2179 if (lazyDecompress && !valuesDecompressed) { 2180 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2181 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2182 } 2183 2184 // Reset internal states 2185 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2186 valuesDecompressed = false; 2187 2188 //Process sync 2189 if (sync != null) { 2190 in.readInt(); 2191 in.readFully(syncCheck); // read syncCheck 2192 if (!Arrays.equals(sync, syncCheck)) // check it 2193 throw new IOException("File is corrupt!"); 2194 } 2195 syncSeen = true; 2196 2197 // Read number of records in this block 2198 noBufferedRecords = WritableUtils.readVInt(in); 2199 2200 // Read key lengths and keys 2201 readBuffer(keyLenBuffer, keyLenInFilter); 2202 readBuffer(keyBuffer, keyInFilter); 2203 noBufferedKeys = noBufferedRecords; 2204 2205 // Read value lengths and values 2206 if (!lazyDecompress) { 2207 readBuffer(valLenBuffer, valLenInFilter); 2208 readBuffer(valBuffer, valInFilter); 2209 noBufferedValues = noBufferedRecords; 2210 valuesDecompressed = true; 2211 } 2212 } 2213 2214 /** 2215 * Position valLenIn/valIn to the 'value' 2216 * corresponding to the 'current' key 2217 */ 2218 private synchronized void seekToCurrentValue() throws IOException { 2219 if (!blockCompressed) { 2220 if (decompress) { 2221 valInFilter.resetState(); 2222 } 2223 valBuffer.reset(); 2224 } else { 2225 // Check if this is the first value in the 'block' to be read 2226 if (lazyDecompress && !valuesDecompressed) { 2227 // Read the value lengths and values 2228 readBuffer(valLenBuffer, valLenInFilter); 2229 readBuffer(valBuffer, valInFilter); 2230 noBufferedValues = noBufferedRecords; 2231 valuesDecompressed = true; 2232 } 2233 2234 // Calculate the no. of bytes to skip 2235 // Note: 'current' key has already been read! 2236 int skipValBytes = 0; 2237 int currentKey = noBufferedKeys + 1; 2238 for (int i=noBufferedValues; i > currentKey; --i) { 2239 skipValBytes += WritableUtils.readVInt(valLenIn); 2240 --noBufferedValues; 2241 } 2242 2243 // Skip to the 'val' corresponding to 'current' key 2244 if (skipValBytes > 0) { 2245 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2246 throw new IOException("Failed to seek to " + currentKey + 2247 "(th) value!"); 2248 } 2249 } 2250 } 2251 } 2252 2253 /** 2254 * Get the 'value' corresponding to the last read 'key'. 2255 * @param val : The 'value' to be read. 2256 * @throws IOException 2257 */ 2258 public synchronized void getCurrentValue(Writable val) 2259 throws IOException { 2260 if (val instanceof Configurable) { 2261 ((Configurable) val).setConf(this.conf); 2262 } 2263 2264 // Position stream to 'current' value 2265 seekToCurrentValue(); 2266 2267 if (!blockCompressed) { 2268 val.readFields(valIn); 2269 2270 if (valIn.read() > 0) { 2271 LOG.info("available bytes: " + valIn.available()); 2272 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2273 + " bytes, should read " + 2274 (valBuffer.getLength()-keyLength)); 2275 } 2276 } else { 2277 // Get the value 2278 int valLength = WritableUtils.readVInt(valLenIn); 2279 val.readFields(valIn); 2280 2281 // Read another compressed 'value' 2282 --noBufferedValues; 2283 2284 // Sanity check 2285 if ((valLength < 0) && LOG.isDebugEnabled()) { 2286 LOG.debug(val + " is a zero-length value"); 2287 } 2288 } 2289 2290 } 2291 2292 /** 2293 * Get the 'value' corresponding to the last read 'key'. 2294 * @param val : The 'value' to be read. 2295 * @throws IOException 2296 */ 2297 public synchronized Object getCurrentValue(Object val) 2298 throws IOException { 2299 if (val instanceof Configurable) { 2300 ((Configurable) val).setConf(this.conf); 2301 } 2302 2303 // Position stream to 'current' value 2304 seekToCurrentValue(); 2305 2306 if (!blockCompressed) { 2307 val = deserializeValue(val); 2308 2309 if (valIn.read() > 0) { 2310 LOG.info("available bytes: " + valIn.available()); 2311 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2312 + " bytes, should read " + 2313 (valBuffer.getLength()-keyLength)); 2314 } 2315 } else { 2316 // Get the value 2317 int valLength = WritableUtils.readVInt(valLenIn); 2318 val = deserializeValue(val); 2319 2320 // Read another compressed 'value' 2321 --noBufferedValues; 2322 2323 // Sanity check 2324 if ((valLength < 0) && LOG.isDebugEnabled()) { 2325 LOG.debug(val + " is a zero-length value"); 2326 } 2327 } 2328 return val; 2329 2330 } 2331 2332 @SuppressWarnings("unchecked") 2333 private Object deserializeValue(Object val) throws IOException { 2334 return valDeserializer.deserialize(val); 2335 } 2336 2337 /** Read the next key in the file into <code>key</code>, skipping its 2338 * value. True if another entry exists, and false at end of file. */ 2339 public synchronized boolean next(Writable key) throws IOException { 2340 if (key.getClass() != getKeyClass()) 2341 throw new IOException("wrong key class: "+key.getClass().getName() 2342 +" is not "+keyClass); 2343 2344 if (!blockCompressed) { 2345 outBuf.reset(); 2346 2347 keyLength = next(outBuf); 2348 if (keyLength < 0) 2349 return false; 2350 2351 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2352 2353 key.readFields(valBuffer); 2354 valBuffer.mark(0); 2355 if (valBuffer.getPosition() != keyLength) 2356 throw new IOException(key + " read " + valBuffer.getPosition() 2357 + " bytes, should read " + keyLength); 2358 } else { 2359 //Reset syncSeen 2360 syncSeen = false; 2361 2362 if (noBufferedKeys == 0) { 2363 try { 2364 readBlock(); 2365 } catch (EOFException eof) { 2366 return false; 2367 } 2368 } 2369 2370 int keyLength = WritableUtils.readVInt(keyLenIn); 2371 2372 // Sanity check 2373 if (keyLength < 0) { 2374 return false; 2375 } 2376 2377 //Read another compressed 'key' 2378 key.readFields(keyIn); 2379 --noBufferedKeys; 2380 } 2381 2382 return true; 2383 } 2384 2385 /** Read the next key/value pair in the file into <code>key</code> and 2386 * <code>val</code>. Returns true if such a pair exists and false when at 2387 * end of file */ 2388 public synchronized boolean next(Writable key, Writable val) 2389 throws IOException { 2390 if (val.getClass() != getValueClass()) 2391 throw new IOException("wrong value class: "+val+" is not "+valClass); 2392 2393 boolean more = next(key); 2394 2395 if (more) { 2396 getCurrentValue(val); 2397 } 2398 2399 return more; 2400 } 2401 2402 /** 2403 * Read and return the next record length, potentially skipping over 2404 * a sync block. 2405 * @return the length of the next record or -1 if there is no next record 2406 * @throws IOException 2407 */ 2408 private synchronized int readRecordLength() throws IOException { 2409 if (in.getPos() >= end) { 2410 return -1; 2411 } 2412 int length = in.readInt(); 2413 if (version > 1 && sync != null && 2414 length == SYNC_ESCAPE) { // process a sync entry 2415 in.readFully(syncCheck); // read syncCheck 2416 if (!Arrays.equals(sync, syncCheck)) // check it 2417 throw new IOException("File is corrupt!"); 2418 syncSeen = true; 2419 if (in.getPos() >= end) { 2420 return -1; 2421 } 2422 length = in.readInt(); // re-read length 2423 } else { 2424 syncSeen = false; 2425 } 2426 2427 return length; 2428 } 2429 2430 /** Read the next key/value pair in the file into <code>buffer</code>. 2431 * Returns the length of the key read, or -1 if at end of file. The length 2432 * of the value may be computed by calling buffer.getLength() before and 2433 * after calls to this method. */ 2434 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2435 @Deprecated 2436 synchronized int next(DataOutputBuffer buffer) throws IOException { 2437 // Unsupported for block-compressed sequence files 2438 if (blockCompressed) { 2439 throw new IOException("Unsupported call for block-compressed" + 2440 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2441 } 2442 try { 2443 int length = readRecordLength(); 2444 if (length == -1) { 2445 return -1; 2446 } 2447 int keyLength = in.readInt(); 2448 buffer.write(in, length); 2449 return keyLength; 2450 } catch (ChecksumException e) { // checksum failure 2451 handleChecksumException(e); 2452 return next(buffer); 2453 } 2454 } 2455 2456 public ValueBytes createValueBytes() { 2457 ValueBytes val = null; 2458 if (!decompress || blockCompressed) { 2459 val = new UncompressedBytes(); 2460 } else { 2461 val = new CompressedBytes(codec); 2462 } 2463 return val; 2464 } 2465 2466 /** 2467 * Read 'raw' records. 2468 * @param key - The buffer into which the key is read 2469 * @param val - The 'raw' value 2470 * @return Returns the total record length or -1 for end of file 2471 * @throws IOException 2472 */ 2473 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2474 throws IOException { 2475 if (!blockCompressed) { 2476 int length = readRecordLength(); 2477 if (length == -1) { 2478 return -1; 2479 } 2480 int keyLength = in.readInt(); 2481 int valLength = length - keyLength; 2482 key.write(in, keyLength); 2483 if (decompress) { 2484 CompressedBytes value = (CompressedBytes)val; 2485 value.reset(in, valLength); 2486 } else { 2487 UncompressedBytes value = (UncompressedBytes)val; 2488 value.reset(in, valLength); 2489 } 2490 2491 return length; 2492 } else { 2493 //Reset syncSeen 2494 syncSeen = false; 2495 2496 // Read 'key' 2497 if (noBufferedKeys == 0) { 2498 if (in.getPos() >= end) 2499 return -1; 2500 2501 try { 2502 readBlock(); 2503 } catch (EOFException eof) { 2504 return -1; 2505 } 2506 } 2507 int keyLength = WritableUtils.readVInt(keyLenIn); 2508 if (keyLength < 0) { 2509 throw new IOException("zero length key found!"); 2510 } 2511 key.write(keyIn, keyLength); 2512 --noBufferedKeys; 2513 2514 // Read raw 'value' 2515 seekToCurrentValue(); 2516 int valLength = WritableUtils.readVInt(valLenIn); 2517 UncompressedBytes rawValue = (UncompressedBytes)val; 2518 rawValue.reset(valIn, valLength); 2519 --noBufferedValues; 2520 2521 return (keyLength+valLength); 2522 } 2523 2524 } 2525 2526 /** 2527 * Read 'raw' keys. 2528 * @param key - The buffer into which the key is read 2529 * @return Returns the key length or -1 for end of file 2530 * @throws IOException 2531 */ 2532 public synchronized int nextRawKey(DataOutputBuffer key) 2533 throws IOException { 2534 if (!blockCompressed) { 2535 recordLength = readRecordLength(); 2536 if (recordLength == -1) { 2537 return -1; 2538 } 2539 keyLength = in.readInt(); 2540 key.write(in, keyLength); 2541 return keyLength; 2542 } else { 2543 //Reset syncSeen 2544 syncSeen = false; 2545 2546 // Read 'key' 2547 if (noBufferedKeys == 0) { 2548 if (in.getPos() >= end) 2549 return -1; 2550 2551 try { 2552 readBlock(); 2553 } catch (EOFException eof) { 2554 return -1; 2555 } 2556 } 2557 int keyLength = WritableUtils.readVInt(keyLenIn); 2558 if (keyLength < 0) { 2559 throw new IOException("zero length key found!"); 2560 } 2561 key.write(keyIn, keyLength); 2562 --noBufferedKeys; 2563 2564 return keyLength; 2565 } 2566 2567 } 2568 2569 /** Read the next key in the file, skipping its 2570 * value. Return null at end of file. */ 2571 public synchronized Object next(Object key) throws IOException { 2572 if (key != null && key.getClass() != getKeyClass()) { 2573 throw new IOException("wrong key class: "+key.getClass().getName() 2574 +" is not "+keyClass); 2575 } 2576 2577 if (!blockCompressed) { 2578 outBuf.reset(); 2579 2580 keyLength = next(outBuf); 2581 if (keyLength < 0) 2582 return null; 2583 2584 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2585 2586 key = deserializeKey(key); 2587 valBuffer.mark(0); 2588 if (valBuffer.getPosition() != keyLength) 2589 throw new IOException(key + " read " + valBuffer.getPosition() 2590 + " bytes, should read " + keyLength); 2591 } else { 2592 //Reset syncSeen 2593 syncSeen = false; 2594 2595 if (noBufferedKeys == 0) { 2596 try { 2597 readBlock(); 2598 } catch (EOFException eof) { 2599 return null; 2600 } 2601 } 2602 2603 int keyLength = WritableUtils.readVInt(keyLenIn); 2604 2605 // Sanity check 2606 if (keyLength < 0) { 2607 return null; 2608 } 2609 2610 //Read another compressed 'key' 2611 key = deserializeKey(key); 2612 --noBufferedKeys; 2613 } 2614 2615 return key; 2616 } 2617 2618 @SuppressWarnings("unchecked") 2619 private Object deserializeKey(Object key) throws IOException { 2620 return keyDeserializer.deserialize(key); 2621 } 2622 2623 /** 2624 * Read 'raw' values. 2625 * @param val - The 'raw' value 2626 * @return Returns the value length 2627 * @throws IOException 2628 */ 2629 public synchronized int nextRawValue(ValueBytes val) 2630 throws IOException { 2631 2632 // Position stream to current value 2633 seekToCurrentValue(); 2634 2635 if (!blockCompressed) { 2636 int valLength = recordLength - keyLength; 2637 if (decompress) { 2638 CompressedBytes value = (CompressedBytes)val; 2639 value.reset(in, valLength); 2640 } else { 2641 UncompressedBytes value = (UncompressedBytes)val; 2642 value.reset(in, valLength); 2643 } 2644 2645 return valLength; 2646 } else { 2647 int valLength = WritableUtils.readVInt(valLenIn); 2648 UncompressedBytes rawValue = (UncompressedBytes)val; 2649 rawValue.reset(valIn, valLength); 2650 --noBufferedValues; 2651 return valLength; 2652 } 2653 2654 } 2655 2656 private void handleChecksumException(ChecksumException e) 2657 throws IOException { 2658 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2659 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2660 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2661 } else { 2662 throw e; 2663 } 2664 } 2665 2666 /** disables sync. often invoked for tmp files */ 2667 synchronized void ignoreSync() { 2668 sync = null; 2669 } 2670 2671 /** Set the current byte position in the input file. 2672 * 2673 * <p>The position passed must be a position returned by {@link 2674 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2675 * position, use {@link SequenceFile.Reader#sync(long)}. 2676 */ 2677 public synchronized void seek(long position) throws IOException { 2678 in.seek(position); 2679 if (blockCompressed) { // trigger block read 2680 noBufferedKeys = 0; 2681 valuesDecompressed = true; 2682 } 2683 } 2684 2685 /** Seek to the next sync mark past a given position.*/ 2686 public synchronized void sync(long position) throws IOException { 2687 if (position+SYNC_SIZE >= end) { 2688 seek(end); 2689 return; 2690 } 2691 2692 if (position < headerEnd) { 2693 // seek directly to first record 2694 in.seek(headerEnd); 2695 // note the sync marker "seen" in the header 2696 syncSeen = true; 2697 return; 2698 } 2699 2700 try { 2701 seek(position+4); // skip escape 2702 in.readFully(syncCheck); 2703 int syncLen = sync.length; 2704 for (int i = 0; in.getPos() < end; i++) { 2705 int j = 0; 2706 for (; j < syncLen; j++) { 2707 if (sync[j] != syncCheck[(i+j)%syncLen]) 2708 break; 2709 } 2710 if (j == syncLen) { 2711 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2712 return; 2713 } 2714 syncCheck[i%syncLen] = in.readByte(); 2715 } 2716 } catch (ChecksumException e) { // checksum failure 2717 handleChecksumException(e); 2718 } 2719 } 2720 2721 /** Returns true iff the previous call to next passed a sync mark.*/ 2722 public synchronized boolean syncSeen() { return syncSeen; } 2723 2724 /** Return the current byte position in the input file. */ 2725 public synchronized long getPosition() throws IOException { 2726 return in.getPos(); 2727 } 2728 2729 /** Returns the name of the file. */ 2730 @Override 2731 public String toString() { 2732 return filename; 2733 } 2734 2735 } 2736 2737 /** Sorts key/value pairs in a sequence-format file. 2738 * 2739 * <p>For best performance, applications should make sure that the {@link 2740 * Writable#readFields(DataInput)} implementation of their keys is 2741 * very efficient. In particular, it should avoid allocating memory. 2742 */ 2743 public static class Sorter { 2744 2745 private RawComparator comparator; 2746 2747 private MergeSort mergeSort; //the implementation of merge sort 2748 2749 private Path[] inFiles; // when merging or sorting 2750 2751 private Path outFile; 2752 2753 private int memory; // bytes 2754 private int factor; // merged per pass 2755 2756 private FileSystem fs = null; 2757 2758 private Class keyClass; 2759 private Class valClass; 2760 2761 private Configuration conf; 2762 private Metadata metadata; 2763 2764 private Progressable progressable = null; 2765 2766 /** Sort and merge files containing the named classes. */ 2767 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2768 Class valClass, Configuration conf) { 2769 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); 2770 } 2771 2772 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2773 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2774 Class valClass, Configuration conf) { 2775 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2776 } 2777 2778 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2779 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2780 Class valClass, Configuration conf, Metadata metadata) { 2781 this.fs = fs; 2782 this.comparator = comparator; 2783 this.keyClass = keyClass; 2784 this.valClass = valClass; 2785 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2786 this.factor = conf.getInt("io.sort.factor", 100); 2787 this.conf = conf; 2788 this.metadata = metadata; 2789 } 2790 2791 /** Set the number of streams to merge at once.*/ 2792 public void setFactor(int factor) { this.factor = factor; } 2793 2794 /** Get the number of streams to merge at once.*/ 2795 public int getFactor() { return factor; } 2796 2797 /** Set the total amount of buffer memory, in bytes.*/ 2798 public void setMemory(int memory) { this.memory = memory; } 2799 2800 /** Get the total amount of buffer memory, in bytes.*/ 2801 public int getMemory() { return memory; } 2802 2803 /** Set the progressable object in order to report progress. */ 2804 public void setProgressable(Progressable progressable) { 2805 this.progressable = progressable; 2806 } 2807 2808 /** 2809 * Perform a file sort from a set of input files into an output file. 2810 * @param inFiles the files to be sorted 2811 * @param outFile the sorted output file 2812 * @param deleteInput should the input files be deleted as they are read? 2813 */ 2814 public void sort(Path[] inFiles, Path outFile, 2815 boolean deleteInput) throws IOException { 2816 if (fs.exists(outFile)) { 2817 throw new IOException("already exists: " + outFile); 2818 } 2819 2820 this.inFiles = inFiles; 2821 this.outFile = outFile; 2822 2823 int segments = sortPass(deleteInput); 2824 if (segments > 1) { 2825 mergePass(outFile.getParent()); 2826 } 2827 } 2828 2829 /** 2830 * Perform a file sort from a set of input files and return an iterator. 2831 * @param inFiles the files to be sorted 2832 * @param tempDir the directory where temp files are created during sort 2833 * @param deleteInput should the input files be deleted as they are read? 2834 * @return iterator the RawKeyValueIterator 2835 */ 2836 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2837 boolean deleteInput) throws IOException { 2838 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2839 if (fs.exists(outFile)) { 2840 throw new IOException("already exists: " + outFile); 2841 } 2842 this.inFiles = inFiles; 2843 //outFile will basically be used as prefix for temp files in the cases 2844 //where sort outputs multiple sorted segments. For the single segment 2845 //case, the outputFile itself will contain the sorted data for that 2846 //segment 2847 this.outFile = outFile; 2848 2849 int segments = sortPass(deleteInput); 2850 if (segments > 1) 2851 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2852 tempDir); 2853 else if (segments == 1) 2854 return merge(new Path[]{outFile}, true, tempDir); 2855 else return null; 2856 } 2857 2858 /** 2859 * The backwards compatible interface to sort. 2860 * @param inFile the input file to sort 2861 * @param outFile the sorted output file 2862 */ 2863 public void sort(Path inFile, Path outFile) throws IOException { 2864 sort(new Path[]{inFile}, outFile, false); 2865 } 2866 2867 private int sortPass(boolean deleteInput) throws IOException { 2868 if(LOG.isDebugEnabled()) { 2869 LOG.debug("running sort pass"); 2870 } 2871 SortPass sortPass = new SortPass(); // make the SortPass 2872 sortPass.setProgressable(progressable); 2873 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2874 try { 2875 return sortPass.run(deleteInput); // run it 2876 } finally { 2877 sortPass.close(); // close it 2878 } 2879 } 2880 2881 private class SortPass { 2882 private int memoryLimit = memory/4; 2883 private int recordLimit = 1000000; 2884 2885 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2886 private byte[] rawBuffer; 2887 2888 private int[] keyOffsets = new int[1024]; 2889 private int[] pointers = new int[keyOffsets.length]; 2890 private int[] pointersCopy = new int[keyOffsets.length]; 2891 private int[] keyLengths = new int[keyOffsets.length]; 2892 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2893 2894 private ArrayList segmentLengths = new ArrayList(); 2895 2896 private Reader in = null; 2897 private FSDataOutputStream out = null; 2898 private FSDataOutputStream indexOut = null; 2899 private Path outName; 2900 2901 private Progressable progressable = null; 2902 2903 public int run(boolean deleteInput) throws IOException { 2904 int segments = 0; 2905 int currentFile = 0; 2906 boolean atEof = (currentFile >= inFiles.length); 2907 CompressionType compressionType; 2908 CompressionCodec codec = null; 2909 segmentLengths.clear(); 2910 if (atEof) { 2911 return 0; 2912 } 2913 2914 // Initialize 2915 in = new Reader(fs, inFiles[currentFile], conf); 2916 compressionType = in.getCompressionType(); 2917 codec = in.getCompressionCodec(); 2918 2919 for (int i=0; i < rawValues.length; ++i) { 2920 rawValues[i] = null; 2921 } 2922 2923 while (!atEof) { 2924 int count = 0; 2925 int bytesProcessed = 0; 2926 rawKeys.reset(); 2927 while (!atEof && 2928 bytesProcessed < memoryLimit && count < recordLimit) { 2929 2930 // Read a record into buffer 2931 // Note: Attempt to re-use 'rawValue' as far as possible 2932 int keyOffset = rawKeys.getLength(); 2933 ValueBytes rawValue = 2934 (count == keyOffsets.length || rawValues[count] == null) ? 2935 in.createValueBytes() : 2936 rawValues[count]; 2937 int recordLength = in.nextRaw(rawKeys, rawValue); 2938 if (recordLength == -1) { 2939 in.close(); 2940 if (deleteInput) { 2941 fs.delete(inFiles[currentFile], true); 2942 } 2943 currentFile += 1; 2944 atEof = currentFile >= inFiles.length; 2945 if (!atEof) { 2946 in = new Reader(fs, inFiles[currentFile], conf); 2947 } else { 2948 in = null; 2949 } 2950 continue; 2951 } 2952 2953 int keyLength = rawKeys.getLength() - keyOffset; 2954 2955 if (count == keyOffsets.length) 2956 grow(); 2957 2958 keyOffsets[count] = keyOffset; // update pointers 2959 pointers[count] = count; 2960 keyLengths[count] = keyLength; 2961 rawValues[count] = rawValue; 2962 2963 bytesProcessed += recordLength; 2964 count++; 2965 } 2966 2967 // buffer is full -- sort & flush it 2968 if(LOG.isDebugEnabled()) { 2969 LOG.debug("flushing segment " + segments); 2970 } 2971 rawBuffer = rawKeys.getData(); 2972 sort(count); 2973 // indicate we're making progress 2974 if (progressable != null) { 2975 progressable.progress(); 2976 } 2977 flush(count, bytesProcessed, compressionType, codec, 2978 segments==0 && atEof); 2979 segments++; 2980 } 2981 return segments; 2982 } 2983 2984 public void close() throws IOException { 2985 if (in != null) { 2986 in.close(); 2987 } 2988 if (out != null) { 2989 out.close(); 2990 } 2991 if (indexOut != null) { 2992 indexOut.close(); 2993 } 2994 } 2995 2996 private void grow() { 2997 int newLength = keyOffsets.length * 3 / 2; 2998 keyOffsets = grow(keyOffsets, newLength); 2999 pointers = grow(pointers, newLength); 3000 pointersCopy = new int[newLength]; 3001 keyLengths = grow(keyLengths, newLength); 3002 rawValues = grow(rawValues, newLength); 3003 } 3004 3005 private int[] grow(int[] old, int newLength) { 3006 int[] result = new int[newLength]; 3007 System.arraycopy(old, 0, result, 0, old.length); 3008 return result; 3009 } 3010 3011 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 3012 ValueBytes[] result = new ValueBytes[newLength]; 3013 System.arraycopy(old, 0, result, 0, old.length); 3014 for (int i=old.length; i < newLength; ++i) { 3015 result[i] = null; 3016 } 3017 return result; 3018 } 3019 3020 private void flush(int count, int bytesProcessed, 3021 CompressionType compressionType, 3022 CompressionCodec codec, 3023 boolean done) throws IOException { 3024 if (out == null) { 3025 outName = done ? outFile : outFile.suffix(".0"); 3026 out = fs.create(outName); 3027 if (!done) { 3028 indexOut = fs.create(outName.suffix(".index")); 3029 } 3030 } 3031 3032 long segmentStart = out.getPos(); 3033 Writer writer = createWriter(conf, Writer.stream(out), 3034 Writer.keyClass(keyClass), Writer.valueClass(valClass), 3035 Writer.compression(compressionType, codec), 3036 Writer.metadata(done ? metadata : new Metadata())); 3037 3038 if (!done) { 3039 writer.sync = null; // disable sync on temp files 3040 } 3041 3042 for (int i = 0; i < count; i++) { // write in sorted order 3043 int p = pointers[i]; 3044 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 3045 } 3046 writer.close(); 3047 3048 if (!done) { 3049 // Save the segment length 3050 WritableUtils.writeVLong(indexOut, segmentStart); 3051 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 3052 indexOut.flush(); 3053 } 3054 } 3055 3056 private void sort(int count) { 3057 System.arraycopy(pointers, 0, pointersCopy, 0, count); 3058 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 3059 } 3060 class SeqFileComparator implements Comparator<IntWritable> { 3061 @Override 3062 public int compare(IntWritable I, IntWritable J) { 3063 return comparator.compare(rawBuffer, keyOffsets[I.get()], 3064 keyLengths[I.get()], rawBuffer, 3065 keyOffsets[J.get()], keyLengths[J.get()]); 3066 } 3067 } 3068 3069 /** set the progressable object in order to report progress */ 3070 public void setProgressable(Progressable progressable) 3071 { 3072 this.progressable = progressable; 3073 } 3074 3075 } // SequenceFile.Sorter.SortPass 3076 3077 /** The interface to iterate over raw keys/values of SequenceFiles. */ 3078 public static interface RawKeyValueIterator { 3079 /** Gets the current raw key 3080 * @return DataOutputBuffer 3081 * @throws IOException 3082 */ 3083 DataOutputBuffer getKey() throws IOException; 3084 /** Gets the current raw value 3085 * @return ValueBytes 3086 * @throws IOException 3087 */ 3088 ValueBytes getValue() throws IOException; 3089 /** Sets up the current key and value (for getKey and getValue) 3090 * @return true if there exists a key/value, false otherwise 3091 * @throws IOException 3092 */ 3093 boolean next() throws IOException; 3094 /** closes the iterator so that the underlying streams can be closed 3095 * @throws IOException 3096 */ 3097 void close() throws IOException; 3098 /** Gets the Progress object; this has a float (0.0 - 1.0) 3099 * indicating the bytes processed by the iterator so far 3100 */ 3101 Progress getProgress(); 3102 } 3103 3104 /** 3105 * Merges the list of segments of type <code>SegmentDescriptor</code> 3106 * @param segments the list of SegmentDescriptors 3107 * @param tmpDir the directory to write temporary files into 3108 * @return RawKeyValueIterator 3109 * @throws IOException 3110 */ 3111 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3112 Path tmpDir) 3113 throws IOException { 3114 // pass in object to report progress, if present 3115 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3116 return mQueue.merge(); 3117 } 3118 3119 /** 3120 * Merges the contents of files passed in Path[] using a max factor value 3121 * that is already set 3122 * @param inNames the array of path names 3123 * @param deleteInputs true if the input files should be deleted when 3124 * unnecessary 3125 * @param tmpDir the directory to write temporary files into 3126 * @return RawKeyValueIteratorMergeQueue 3127 * @throws IOException 3128 */ 3129 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3130 Path tmpDir) 3131 throws IOException { 3132 return merge(inNames, deleteInputs, 3133 (inNames.length < factor) ? inNames.length : factor, 3134 tmpDir); 3135 } 3136 3137 /** 3138 * Merges the contents of files passed in Path[] 3139 * @param inNames the array of path names 3140 * @param deleteInputs true if the input files should be deleted when 3141 * unnecessary 3142 * @param factor the factor that will be used as the maximum merge fan-in 3143 * @param tmpDir the directory to write temporary files into 3144 * @return RawKeyValueIteratorMergeQueue 3145 * @throws IOException 3146 */ 3147 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3148 int factor, Path tmpDir) 3149 throws IOException { 3150 //get the segments from inNames 3151 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3152 for (int i = 0; i < inNames.length; i++) { 3153 SegmentDescriptor s = new SegmentDescriptor(0, 3154 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3155 s.preserveInput(!deleteInputs); 3156 s.doSync(); 3157 a.add(s); 3158 } 3159 this.factor = factor; 3160 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3161 return mQueue.merge(); 3162 } 3163 3164 /** 3165 * Merges the contents of files passed in Path[] 3166 * @param inNames the array of path names 3167 * @param tempDir the directory for creating temp files during merge 3168 * @param deleteInputs true if the input files should be deleted when 3169 * unnecessary 3170 * @return RawKeyValueIteratorMergeQueue 3171 * @throws IOException 3172 */ 3173 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3174 boolean deleteInputs) 3175 throws IOException { 3176 //outFile will basically be used as prefix for temp files for the 3177 //intermediate merge outputs 3178 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3179 //get the segments from inNames 3180 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3181 for (int i = 0; i < inNames.length; i++) { 3182 SegmentDescriptor s = new SegmentDescriptor(0, 3183 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3184 s.preserveInput(!deleteInputs); 3185 s.doSync(); 3186 a.add(s); 3187 } 3188 factor = (inNames.length < factor) ? inNames.length : factor; 3189 // pass in object to report progress, if present 3190 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3191 return mQueue.merge(); 3192 } 3193 3194 /** 3195 * Clones the attributes (like compression of the input file and creates a 3196 * corresponding Writer 3197 * @param inputFile the path of the input file whose attributes should be 3198 * cloned 3199 * @param outputFile the path of the output file 3200 * @param prog the Progressable to report status during the file write 3201 * @return Writer 3202 * @throws IOException 3203 */ 3204 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3205 Progressable prog) throws IOException { 3206 Reader reader = new Reader(conf, 3207 Reader.file(inputFile), 3208 new Reader.OnlyHeaderOption()); 3209 CompressionType compress = reader.getCompressionType(); 3210 CompressionCodec codec = reader.getCompressionCodec(); 3211 reader.close(); 3212 3213 Writer writer = createWriter(conf, 3214 Writer.file(outputFile), 3215 Writer.keyClass(keyClass), 3216 Writer.valueClass(valClass), 3217 Writer.compression(compress, codec), 3218 Writer.progressable(prog)); 3219 return writer; 3220 } 3221 3222 /** 3223 * Writes records from RawKeyValueIterator into a file represented by the 3224 * passed writer 3225 * @param records the RawKeyValueIterator 3226 * @param writer the Writer created earlier 3227 * @throws IOException 3228 */ 3229 public void writeFile(RawKeyValueIterator records, Writer writer) 3230 throws IOException { 3231 while(records.next()) { 3232 writer.appendRaw(records.getKey().getData(), 0, 3233 records.getKey().getLength(), records.getValue()); 3234 } 3235 writer.sync(); 3236 } 3237 3238 /** Merge the provided files. 3239 * @param inFiles the array of input path names 3240 * @param outFile the final output file 3241 * @throws IOException 3242 */ 3243 public void merge(Path[] inFiles, Path outFile) throws IOException { 3244 if (fs.exists(outFile)) { 3245 throw new IOException("already exists: " + outFile); 3246 } 3247 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3248 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3249 3250 writeFile(r, writer); 3251 3252 writer.close(); 3253 } 3254 3255 /** sort calls this to generate the final merged output */ 3256 private int mergePass(Path tmpDir) throws IOException { 3257 if(LOG.isDebugEnabled()) { 3258 LOG.debug("running merge pass"); 3259 } 3260 Writer writer = cloneFileAttributes( 3261 outFile.suffix(".0"), outFile, null); 3262 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3263 outFile.suffix(".0.index"), tmpDir); 3264 writeFile(r, writer); 3265 3266 writer.close(); 3267 return 0; 3268 } 3269 3270 /** Used by mergePass to merge the output of the sort 3271 * @param inName the name of the input file containing sorted segments 3272 * @param indexIn the offsets of the sorted segments 3273 * @param tmpDir the relative directory to store intermediate results in 3274 * @return RawKeyValueIterator 3275 * @throws IOException 3276 */ 3277 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3278 throws IOException { 3279 //get the segments from indexIn 3280 //we create a SegmentContainer so that we can track segments belonging to 3281 //inName and delete inName as soon as we see that we have looked at all 3282 //the contained segments during the merge process & hence don't need 3283 //them anymore 3284 SegmentContainer container = new SegmentContainer(inName, indexIn); 3285 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3286 return mQueue.merge(); 3287 } 3288 3289 /** This class implements the core of the merge logic */ 3290 private class MergeQueue extends PriorityQueue 3291 implements RawKeyValueIterator { 3292 private boolean compress; 3293 private boolean blockCompress; 3294 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3295 private ValueBytes rawValue; 3296 private long totalBytesProcessed; 3297 private float progPerByte; 3298 private Progress mergeProgress = new Progress(); 3299 private Path tmpDir; 3300 private Progressable progress = null; //handle to the progress reporting object 3301 private SegmentDescriptor minSegment; 3302 3303 //a TreeMap used to store the segments sorted by size (segment offset and 3304 //segment path name is used to break ties between segments of same sizes) 3305 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3306 new TreeMap<SegmentDescriptor, Void>(); 3307 3308 @SuppressWarnings("unchecked") 3309 public void put(SegmentDescriptor stream) throws IOException { 3310 if (size() == 0) { 3311 compress = stream.in.isCompressed(); 3312 blockCompress = stream.in.isBlockCompressed(); 3313 } else if (compress != stream.in.isCompressed() || 3314 blockCompress != stream.in.isBlockCompressed()) { 3315 throw new IOException("All merged files must be compressed or not."); 3316 } 3317 super.put(stream); 3318 } 3319 3320 /** 3321 * A queue of file segments to merge 3322 * @param segments the file segments to merge 3323 * @param tmpDir a relative local directory to save intermediate files in 3324 * @param progress the reference to the Progressable object 3325 */ 3326 public MergeQueue(List <SegmentDescriptor> segments, 3327 Path tmpDir, Progressable progress) { 3328 int size = segments.size(); 3329 for (int i = 0; i < size; i++) { 3330 sortedSegmentSizes.put(segments.get(i), null); 3331 } 3332 this.tmpDir = tmpDir; 3333 this.progress = progress; 3334 } 3335 @Override 3336 protected boolean lessThan(Object a, Object b) { 3337 // indicate we're making progress 3338 if (progress != null) { 3339 progress.progress(); 3340 } 3341 SegmentDescriptor msa = (SegmentDescriptor)a; 3342 SegmentDescriptor msb = (SegmentDescriptor)b; 3343 return comparator.compare(msa.getKey().getData(), 0, 3344 msa.getKey().getLength(), msb.getKey().getData(), 0, 3345 msb.getKey().getLength()) < 0; 3346 } 3347 @Override 3348 public void close() throws IOException { 3349 SegmentDescriptor ms; // close inputs 3350 while ((ms = (SegmentDescriptor)pop()) != null) { 3351 ms.cleanup(); 3352 } 3353 minSegment = null; 3354 } 3355 @Override 3356 public DataOutputBuffer getKey() throws IOException { 3357 return rawKey; 3358 } 3359 @Override 3360 public ValueBytes getValue() throws IOException { 3361 return rawValue; 3362 } 3363 @Override 3364 public boolean next() throws IOException { 3365 if (size() == 0) 3366 return false; 3367 if (minSegment != null) { 3368 //minSegment is non-null for all invocations of next except the first 3369 //one. For the first invocation, the priority queue is ready for use 3370 //but for the subsequent invocations, first adjust the queue 3371 adjustPriorityQueue(minSegment); 3372 if (size() == 0) { 3373 minSegment = null; 3374 return false; 3375 } 3376 } 3377 minSegment = (SegmentDescriptor)top(); 3378 long startPos = minSegment.in.getPosition(); // Current position in stream 3379 //save the raw key reference 3380 rawKey = minSegment.getKey(); 3381 //load the raw value. Re-use the existing rawValue buffer 3382 if (rawValue == null) { 3383 rawValue = minSegment.in.createValueBytes(); 3384 } 3385 minSegment.nextRawValue(rawValue); 3386 long endPos = minSegment.in.getPosition(); // End position after reading value 3387 updateProgress(endPos - startPos); 3388 return true; 3389 } 3390 3391 @Override 3392 public Progress getProgress() { 3393 return mergeProgress; 3394 } 3395 3396 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3397 long startPos = ms.in.getPosition(); // Current position in stream 3398 boolean hasNext = ms.nextRawKey(); 3399 long endPos = ms.in.getPosition(); // End position after reading key 3400 updateProgress(endPos - startPos); 3401 if (hasNext) { 3402 adjustTop(); 3403 } else { 3404 pop(); 3405 ms.cleanup(); 3406 } 3407 } 3408 3409 private void updateProgress(long bytesProcessed) { 3410 totalBytesProcessed += bytesProcessed; 3411 if (progPerByte > 0) { 3412 mergeProgress.set(totalBytesProcessed * progPerByte); 3413 } 3414 } 3415 3416 /** This is the single level merge that is called multiple times 3417 * depending on the factor size and the number of segments 3418 * @return RawKeyValueIterator 3419 * @throws IOException 3420 */ 3421 public RawKeyValueIterator merge() throws IOException { 3422 //create the MergeStreams from the sorted map created in the constructor 3423 //and dump the final output to a file 3424 int numSegments = sortedSegmentSizes.size(); 3425 int origFactor = factor; 3426 int passNo = 1; 3427 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3428 do { 3429 //get the factor for this pass of merge 3430 factor = getPassFactor(passNo, numSegments); 3431 List<SegmentDescriptor> segmentsToMerge = 3432 new ArrayList<SegmentDescriptor>(); 3433 int segmentsConsidered = 0; 3434 int numSegmentsToConsider = factor; 3435 while (true) { 3436 //extract the smallest 'factor' number of segment pointers from the 3437 //TreeMap. Call cleanup on the empty segments (no key/value data) 3438 SegmentDescriptor[] mStream = 3439 getSegmentDescriptors(numSegmentsToConsider); 3440 for (int i = 0; i < mStream.length; i++) { 3441 if (mStream[i].nextRawKey()) { 3442 segmentsToMerge.add(mStream[i]); 3443 segmentsConsidered++; 3444 // Count the fact that we read some bytes in calling nextRawKey() 3445 updateProgress(mStream[i].in.getPosition()); 3446 } 3447 else { 3448 mStream[i].cleanup(); 3449 numSegments--; //we ignore this segment for the merge 3450 } 3451 } 3452 //if we have the desired number of segments 3453 //or looked at all available segments, we break 3454 if (segmentsConsidered == factor || 3455 sortedSegmentSizes.size() == 0) { 3456 break; 3457 } 3458 3459 numSegmentsToConsider = factor - segmentsConsidered; 3460 } 3461 //feed the streams to the priority queue 3462 initialize(segmentsToMerge.size()); clear(); 3463 for (int i = 0; i < segmentsToMerge.size(); i++) { 3464 put(segmentsToMerge.get(i)); 3465 } 3466 //if we have lesser number of segments remaining, then just return the 3467 //iterator, else do another single level merge 3468 if (numSegments <= factor) { 3469 //calculate the length of the remaining segments. Required for 3470 //calculating the merge progress 3471 long totalBytes = 0; 3472 for (int i = 0; i < segmentsToMerge.size(); i++) { 3473 totalBytes += segmentsToMerge.get(i).segmentLength; 3474 } 3475 if (totalBytes != 0) //being paranoid 3476 progPerByte = 1.0f / (float)totalBytes; 3477 //reset factor to what it originally was 3478 factor = origFactor; 3479 return this; 3480 } else { 3481 //we want to spread the creation of temp files on multiple disks if 3482 //available under the space constraints 3483 long approxOutputSize = 0; 3484 for (SegmentDescriptor s : segmentsToMerge) { 3485 approxOutputSize += s.segmentLength + 3486 ChecksumFileSystem.getApproxChkSumLength( 3487 s.segmentLength); 3488 } 3489 Path tmpFilename = 3490 new Path(tmpDir, "intermediate").suffix("." + passNo); 3491 3492 Path outputFile = lDirAlloc.getLocalPathForWrite( 3493 tmpFilename.toString(), 3494 approxOutputSize, conf); 3495 if(LOG.isDebugEnabled()) { 3496 LOG.debug("writing intermediate results to " + outputFile); 3497 } 3498 Writer writer = cloneFileAttributes( 3499 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3500 fs.makeQualified(outputFile), null); 3501 writer.sync = null; //disable sync for temp files 3502 writeFile(this, writer); 3503 writer.close(); 3504 3505 //we finished one single level merge; now clean up the priority 3506 //queue 3507 this.close(); 3508 3509 SegmentDescriptor tempSegment = 3510 new SegmentDescriptor(0, 3511 fs.getFileStatus(outputFile).getLen(), outputFile); 3512 //put the segment back in the TreeMap 3513 sortedSegmentSizes.put(tempSegment, null); 3514 numSegments = sortedSegmentSizes.size(); 3515 passNo++; 3516 } 3517 //we are worried about only the first pass merge factor. So reset the 3518 //factor to what it originally was 3519 factor = origFactor; 3520 } while(true); 3521 } 3522 3523 //Hadoop-591 3524 public int getPassFactor(int passNo, int numSegments) { 3525 if (passNo > 1 || numSegments <= factor || factor == 1) 3526 return factor; 3527 int mod = (numSegments - 1) % (factor - 1); 3528 if (mod == 0) 3529 return factor; 3530 return mod + 1; 3531 } 3532 3533 /** Return (& remove) the requested number of segment descriptors from the 3534 * sorted map. 3535 */ 3536 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3537 if (numDescriptors > sortedSegmentSizes.size()) 3538 numDescriptors = sortedSegmentSizes.size(); 3539 SegmentDescriptor[] SegmentDescriptors = 3540 new SegmentDescriptor[numDescriptors]; 3541 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3542 int i = 0; 3543 while (i < numDescriptors) { 3544 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3545 iter.remove(); 3546 } 3547 return SegmentDescriptors; 3548 } 3549 } // SequenceFile.Sorter.MergeQueue 3550 3551 /** This class defines a merge segment. This class can be subclassed to 3552 * provide a customized cleanup method implementation. In this 3553 * implementation, cleanup closes the file handle and deletes the file 3554 */ 3555 public class SegmentDescriptor implements Comparable { 3556 3557 long segmentOffset; //the start of the segment in the file 3558 long segmentLength; //the length of the segment 3559 Path segmentPathName; //the path name of the file containing the segment 3560 boolean ignoreSync = true; //set to true for temp files 3561 private Reader in = null; 3562 private DataOutputBuffer rawKey = null; //this will hold the current key 3563 private boolean preserveInput = false; //delete input segment files? 3564 3565 /** Constructs a segment 3566 * @param segmentOffset the offset of the segment in the file 3567 * @param segmentLength the length of the segment 3568 * @param segmentPathName the path name of the file containing the segment 3569 */ 3570 public SegmentDescriptor (long segmentOffset, long segmentLength, 3571 Path segmentPathName) { 3572 this.segmentOffset = segmentOffset; 3573 this.segmentLength = segmentLength; 3574 this.segmentPathName = segmentPathName; 3575 } 3576 3577 /** Do the sync checks */ 3578 public void doSync() {ignoreSync = false;} 3579 3580 /** Whether to delete the files when no longer needed */ 3581 public void preserveInput(boolean preserve) { 3582 preserveInput = preserve; 3583 } 3584 3585 public boolean shouldPreserveInput() { 3586 return preserveInput; 3587 } 3588 3589 @Override 3590 public int compareTo(Object o) { 3591 SegmentDescriptor that = (SegmentDescriptor)o; 3592 if (this.segmentLength != that.segmentLength) { 3593 return (this.segmentLength < that.segmentLength ? -1 : 1); 3594 } 3595 if (this.segmentOffset != that.segmentOffset) { 3596 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3597 } 3598 return (this.segmentPathName.toString()). 3599 compareTo(that.segmentPathName.toString()); 3600 } 3601 3602 @Override 3603 public boolean equals(Object o) { 3604 if (!(o instanceof SegmentDescriptor)) { 3605 return false; 3606 } 3607 SegmentDescriptor that = (SegmentDescriptor)o; 3608 if (this.segmentLength == that.segmentLength && 3609 this.segmentOffset == that.segmentOffset && 3610 this.segmentPathName.toString().equals( 3611 that.segmentPathName.toString())) { 3612 return true; 3613 } 3614 return false; 3615 } 3616 3617 @Override 3618 public int hashCode() { 3619 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3620 } 3621 3622 /** Fills up the rawKey object with the key returned by the Reader 3623 * @return true if there is a key returned; false, otherwise 3624 * @throws IOException 3625 */ 3626 public boolean nextRawKey() throws IOException { 3627 if (in == null) { 3628 int bufferSize = getBufferSize(conf); 3629 Reader reader = new Reader(conf, 3630 Reader.file(segmentPathName), 3631 Reader.bufferSize(bufferSize), 3632 Reader.start(segmentOffset), 3633 Reader.length(segmentLength)); 3634 3635 //sometimes we ignore syncs especially for temp merge files 3636 if (ignoreSync) reader.ignoreSync(); 3637 3638 if (reader.getKeyClass() != keyClass) 3639 throw new IOException("wrong key class: " + reader.getKeyClass() + 3640 " is not " + keyClass); 3641 if (reader.getValueClass() != valClass) 3642 throw new IOException("wrong value class: "+reader.getValueClass()+ 3643 " is not " + valClass); 3644 this.in = reader; 3645 rawKey = new DataOutputBuffer(); 3646 } 3647 rawKey.reset(); 3648 int keyLength = 3649 in.nextRawKey(rawKey); 3650 return (keyLength >= 0); 3651 } 3652 3653 /** Fills up the passed rawValue with the value corresponding to the key 3654 * read earlier 3655 * @param rawValue 3656 * @return the length of the value 3657 * @throws IOException 3658 */ 3659 public int nextRawValue(ValueBytes rawValue) throws IOException { 3660 int valLength = in.nextRawValue(rawValue); 3661 return valLength; 3662 } 3663 3664 /** Returns the stored rawKey */ 3665 public DataOutputBuffer getKey() { 3666 return rawKey; 3667 } 3668 3669 /** closes the underlying reader */ 3670 private void close() throws IOException { 3671 this.in.close(); 3672 this.in = null; 3673 } 3674 3675 /** The default cleanup. Subclasses can override this with a custom 3676 * cleanup 3677 */ 3678 public void cleanup() throws IOException { 3679 close(); 3680 if (!preserveInput) { 3681 fs.delete(segmentPathName, true); 3682 } 3683 } 3684 } // SequenceFile.Sorter.SegmentDescriptor 3685 3686 /** This class provisions multiple segments contained within a single 3687 * file 3688 */ 3689 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3690 3691 SegmentContainer parentContainer = null; 3692 3693 /** Constructs a segment 3694 * @param segmentOffset the offset of the segment in the file 3695 * @param segmentLength the length of the segment 3696 * @param segmentPathName the path name of the file containing the segment 3697 * @param parent the parent SegmentContainer that holds the segment 3698 */ 3699 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3700 Path segmentPathName, SegmentContainer parent) { 3701 super(segmentOffset, segmentLength, segmentPathName); 3702 this.parentContainer = parent; 3703 } 3704 /** The default cleanup. Subclasses can override this with a custom 3705 * cleanup 3706 */ 3707 @Override 3708 public void cleanup() throws IOException { 3709 super.close(); 3710 if (super.shouldPreserveInput()) return; 3711 parentContainer.cleanup(); 3712 } 3713 3714 @Override 3715 public boolean equals(Object o) { 3716 if (!(o instanceof LinkedSegmentsDescriptor)) { 3717 return false; 3718 } 3719 return super.equals(o); 3720 } 3721 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3722 3723 /** The class that defines a container for segments to be merged. Primarily 3724 * required to delete temp files as soon as all the contained segments 3725 * have been looked at */ 3726 private class SegmentContainer { 3727 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3728 private int numSegmentsContained; //# of segments contained 3729 private Path inName; //input file from where segments are created 3730 3731 //the list of segments read from the file 3732 private ArrayList <SegmentDescriptor> segments = 3733 new ArrayList <SegmentDescriptor>(); 3734 /** This constructor is there primarily to serve the sort routine that 3735 * generates a single output file with an associated index file */ 3736 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3737 //get the segments from indexIn 3738 FSDataInputStream fsIndexIn = fs.open(indexIn); 3739 long end = fs.getFileStatus(indexIn).getLen(); 3740 while (fsIndexIn.getPos() < end) { 3741 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3742 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3743 Path segmentName = inName; 3744 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3745 segmentLength, segmentName, this)); 3746 } 3747 fsIndexIn.close(); 3748 fs.delete(indexIn, true); 3749 numSegmentsContained = segments.size(); 3750 this.inName = inName; 3751 } 3752 3753 public List <SegmentDescriptor> getSegmentList() { 3754 return segments; 3755 } 3756 public void cleanup() throws IOException { 3757 numSegmentsCleanedUp++; 3758 if (numSegmentsCleanedUp == numSegmentsContained) { 3759 fs.delete(inName, true); 3760 } 3761 } 3762 } //SequenceFile.Sorter.SegmentContainer 3763 3764 } // SequenceFile.Sorter 3765 3766} // SequenceFile