001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.util.*;
023import java.rmi.server.UID;
024import java.security.MessageDigest;
025
026import org.apache.commons.io.Charsets;
027import org.apache.commons.logging.*;
028import org.apache.hadoop.util.Options;
029import org.apache.hadoop.fs.*;
030import org.apache.hadoop.fs.Options.CreateOpts;
031import org.apache.hadoop.io.compress.CodecPool;
032import org.apache.hadoop.io.compress.CompressionCodec;
033import org.apache.hadoop.io.compress.CompressionInputStream;
034import org.apache.hadoop.io.compress.CompressionOutputStream;
035import org.apache.hadoop.io.compress.Compressor;
036import org.apache.hadoop.io.compress.Decompressor;
037import org.apache.hadoop.io.compress.DefaultCodec;
038import org.apache.hadoop.io.compress.GzipCodec;
039import org.apache.hadoop.io.compress.zlib.ZlibFactory;
040import org.apache.hadoop.io.serializer.Deserializer;
041import org.apache.hadoop.io.serializer.Serializer;
042import org.apache.hadoop.io.serializer.SerializationFactory;
043import org.apache.hadoop.classification.InterfaceAudience;
044import org.apache.hadoop.classification.InterfaceStability;
045import org.apache.hadoop.conf.*;
046import org.apache.hadoop.util.Progressable;
047import org.apache.hadoop.util.Progress;
048import org.apache.hadoop.util.ReflectionUtils;
049import org.apache.hadoop.util.NativeCodeLoader;
050import org.apache.hadoop.util.MergeSort;
051import org.apache.hadoop.util.PriorityQueue;
052import org.apache.hadoop.util.Time;
053
054/** 
055 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
056 * pairs.
057 * 
058 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
059 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
060 * reading and sorting respectively.</p>
061 * 
062 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
063 * {@link CompressionType} used to compress key/value pairs:
064 * <ol>
065 *   <li>
066 *   <code>Writer</code> : Uncompressed records.
067 *   </li>
068 *   <li>
069 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
070 *                                       values.
071 *   </li>
072 *   <li>
073 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
074 *                                      values are collected in 'blocks' 
075 *                                      separately and compressed. The size of 
076 *                                      the 'block' is configurable.
077 * </ol>
078 * 
079 * <p>The actual compression algorithm used to compress key and/or values can be
080 * specified by using the appropriate {@link CompressionCodec}.</p>
081 * 
082 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
083 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
084 *
085 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
086 * above <code>SequenceFile</code> formats.</p>
087 *
088 * <h4 id="Formats">SequenceFile Formats</h4>
089 * 
090 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
091 * depending on the <code>CompressionType</code> specified. All of them share a
092 * <a href="#Header">common header</a> described below.
093 * 
094 * <h5 id="Header">SequenceFile Header</h5>
095 * <ul>
096 *   <li>
097 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
098 *             version number (e.g. SEQ4 or SEQ6)
099 *   </li>
100 *   <li>
101 *   keyClassName -key class
102 *   </li>
103 *   <li>
104 *   valueClassName - value class
105 *   </li>
106 *   <li>
107 *   compression - A boolean which specifies if compression is turned on for 
108 *                 keys/values in this file.
109 *   </li>
110 *   <li>
111 *   blockCompression - A boolean which specifies if block-compression is 
112 *                      turned on for keys/values in this file.
113 *   </li>
114 *   <li>
115 *   compression codec - <code>CompressionCodec</code> class which is used for  
116 *                       compression of keys and/or values (if compression is 
117 *                       enabled).
118 *   </li>
119 *   <li>
120 *   metadata - {@link Metadata} for this file.
121 *   </li>
122 *   <li>
123 *   sync - A sync marker to denote end of the header.
124 *   </li>
125 * </ul>
126 * 
127 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
128 * <ul>
129 * <li>
130 * <a href="#Header">Header</a>
131 * </li>
132 * <li>
133 * Record
134 *   <ul>
135 *     <li>Record length</li>
136 *     <li>Key length</li>
137 *     <li>Key</li>
138 *     <li>Value</li>
139 *   </ul>
140 * </li>
141 * <li>
142 * A sync-marker every few <code>100</code> bytes or so.
143 * </li>
144 * </ul>
145 *
146 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
147 * <ul>
148 * <li>
149 * <a href="#Header">Header</a>
150 * </li>
151 * <li>
152 * Record
153 *   <ul>
154 *     <li>Record length</li>
155 *     <li>Key length</li>
156 *     <li>Key</li>
157 *     <li><i>Compressed</i> Value</li>
158 *   </ul>
159 * </li>
160 * <li>
161 * A sync-marker every few <code>100</code> bytes or so.
162 * </li>
163 * </ul>
164 * 
165 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
166 * <ul>
167 * <li>
168 * <a href="#Header">Header</a>
169 * </li>
170 * <li>
171 * Record <i>Block</i>
172 *   <ul>
173 *     <li>Uncompressed number of records in the block</li>
174 *     <li>Compressed key-lengths block-size</li>
175 *     <li>Compressed key-lengths block</li>
176 *     <li>Compressed keys block-size</li>
177 *     <li>Compressed keys block</li>
178 *     <li>Compressed value-lengths block-size</li>
179 *     <li>Compressed value-lengths block</li>
180 *     <li>Compressed values block-size</li>
181 *     <li>Compressed values block</li>
182 *   </ul>
183 * </li>
184 * <li>
185 * A sync-marker every block.
186 * </li>
187 * </ul>
188 * 
189 * <p>The compressed blocks of key lengths and value lengths consist of the 
190 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
191 * format.</p>
192 * 
193 * @see CompressionCodec
194 */
195@InterfaceAudience.Public
196@InterfaceStability.Stable
197public class SequenceFile {
198  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
199
200  private SequenceFile() {}                         // no public ctor
201
202  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
203  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
204  private static final byte VERSION_WITH_METADATA = (byte)6;
205  private static byte[] VERSION = new byte[] {
206    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
207  };
208
209  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
210  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
211  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
212
213  /** The number of bytes between sync points.*/
214  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
215
216  /** 
217   * The compression type used to compress key/value pairs in the 
218   * {@link SequenceFile}.
219   * 
220   * @see SequenceFile.Writer
221   */
222  public static enum CompressionType {
223    /** Do not compress records. */
224    NONE, 
225    /** Compress values only, each separately. */
226    RECORD,
227    /** Compress sequences of records together in blocks. */
228    BLOCK
229  }
230
231  /**
232   * Get the compression type for the reduce outputs
233   * @param job the job config to look in
234   * @return the kind of compression to use
235   */
236  static public CompressionType getDefaultCompressionType(Configuration job) {
237    String name = job.get("io.seqfile.compression.type");
238    return name == null ? CompressionType.RECORD : 
239      CompressionType.valueOf(name);
240  }
241  
242  /**
243   * Set the default compression type for sequence files.
244   * @param job the configuration to modify
245   * @param val the new compression type (none, block, record)
246   */
247  static public void setDefaultCompressionType(Configuration job, 
248                                               CompressionType val) {
249    job.set("io.seqfile.compression.type", val.toString());
250  }
251
252  /**
253   * Create a new Writer with the given options.
254   * @param conf the configuration to use
255   * @param opts the options to create the file with
256   * @return a new Writer
257   * @throws IOException
258   */
259  public static Writer createWriter(Configuration conf, Writer.Option... opts
260                                    ) throws IOException {
261    Writer.CompressionOption compressionOption = 
262      Options.getOption(Writer.CompressionOption.class, opts);
263    CompressionType kind;
264    if (compressionOption != null) {
265      kind = compressionOption.getValue();
266    } else {
267      kind = getDefaultCompressionType(conf);
268      opts = Options.prependOptions(opts, Writer.compression(kind));
269    }
270    switch (kind) {
271      default:
272      case NONE:
273        return new Writer(conf, opts);
274      case RECORD:
275        return new RecordCompressWriter(conf, opts);
276      case BLOCK:
277        return new BlockCompressWriter(conf, opts);
278    }
279  }
280
281  /**
282   * Construct the preferred type of SequenceFile Writer.
283   * @param fs The configured filesystem. 
284   * @param conf The configuration.
285   * @param name The name of the file. 
286   * @param keyClass The 'key' type.
287   * @param valClass The 'value' type.
288   * @return Returns the handle to the constructed SequenceFile Writer.
289   * @throws IOException
290   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
291   *     instead.
292   */
293  @Deprecated
294  public static Writer 
295    createWriter(FileSystem fs, Configuration conf, Path name, 
296                 Class keyClass, Class valClass) throws IOException {
297    return createWriter(conf, Writer.filesystem(fs),
298                        Writer.file(name), Writer.keyClass(keyClass),
299                        Writer.valueClass(valClass));
300  }
301  
302  /**
303   * Construct the preferred type of SequenceFile Writer.
304   * @param fs The configured filesystem. 
305   * @param conf The configuration.
306   * @param name The name of the file. 
307   * @param keyClass The 'key' type.
308   * @param valClass The 'value' type.
309   * @param compressionType The compression type.
310   * @return Returns the handle to the constructed SequenceFile Writer.
311   * @throws IOException
312   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
313   *     instead.
314   */
315  @Deprecated
316  public static Writer 
317    createWriter(FileSystem fs, Configuration conf, Path name, 
318                 Class keyClass, Class valClass, 
319                 CompressionType compressionType) throws IOException {
320    return createWriter(conf, Writer.filesystem(fs),
321                        Writer.file(name), Writer.keyClass(keyClass),
322                        Writer.valueClass(valClass), 
323                        Writer.compression(compressionType));
324  }
325  
326  /**
327   * Construct the preferred type of SequenceFile Writer.
328   * @param fs The configured filesystem. 
329   * @param conf The configuration.
330   * @param name The name of the file. 
331   * @param keyClass The 'key' type.
332   * @param valClass The 'value' type.
333   * @param compressionType The compression type.
334   * @param progress The Progressable object to track progress.
335   * @return Returns the handle to the constructed SequenceFile Writer.
336   * @throws IOException
337   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
338   *     instead.
339   */
340  @Deprecated
341  public static Writer
342    createWriter(FileSystem fs, Configuration conf, Path name, 
343                 Class keyClass, Class valClass, CompressionType compressionType,
344                 Progressable progress) throws IOException {
345    return createWriter(conf, Writer.file(name),
346                        Writer.filesystem(fs),
347                        Writer.keyClass(keyClass),
348                        Writer.valueClass(valClass), 
349                        Writer.compression(compressionType),
350                        Writer.progressable(progress));
351  }
352
353  /**
354   * Construct the preferred type of SequenceFile Writer.
355   * @param fs The configured filesystem. 
356   * @param conf The configuration.
357   * @param name The name of the file. 
358   * @param keyClass The 'key' type.
359   * @param valClass The 'value' type.
360   * @param compressionType The compression type.
361   * @param codec The compression codec.
362   * @return Returns the handle to the constructed SequenceFile Writer.
363   * @throws IOException
364   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
365   *     instead.
366   */
367  @Deprecated
368  public static Writer 
369    createWriter(FileSystem fs, Configuration conf, Path name, 
370                 Class keyClass, Class valClass, CompressionType compressionType, 
371                 CompressionCodec codec) throws IOException {
372    return createWriter(conf, Writer.file(name),
373                        Writer.filesystem(fs),
374                        Writer.keyClass(keyClass),
375                        Writer.valueClass(valClass), 
376                        Writer.compression(compressionType, codec));
377  }
378  
379  /**
380   * Construct the preferred type of SequenceFile Writer.
381   * @param fs The configured filesystem. 
382   * @param conf The configuration.
383   * @param name The name of the file. 
384   * @param keyClass The 'key' type.
385   * @param valClass The 'value' type.
386   * @param compressionType The compression type.
387   * @param codec The compression codec.
388   * @param progress The Progressable object to track progress.
389   * @param metadata The metadata of the file.
390   * @return Returns the handle to the constructed SequenceFile Writer.
391   * @throws IOException
392   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
393   *     instead.
394   */
395  @Deprecated
396  public static Writer
397    createWriter(FileSystem fs, Configuration conf, Path name, 
398                 Class keyClass, Class valClass, 
399                 CompressionType compressionType, CompressionCodec codec,
400                 Progressable progress, Metadata metadata) throws IOException {
401    return createWriter(conf, Writer.file(name),
402                        Writer.filesystem(fs),
403                        Writer.keyClass(keyClass),
404                        Writer.valueClass(valClass),
405                        Writer.compression(compressionType, codec),
406                        Writer.progressable(progress),
407                        Writer.metadata(metadata));
408  }
409
410  /**
411   * Construct the preferred type of SequenceFile Writer.
412   * @param fs The configured filesystem.
413   * @param conf The configuration.
414   * @param name The name of the file.
415   * @param keyClass The 'key' type.
416   * @param valClass The 'value' type.
417   * @param bufferSize buffer size for the underlaying outputstream.
418   * @param replication replication factor for the file.
419   * @param blockSize block size for the file.
420   * @param compressionType The compression type.
421   * @param codec The compression codec.
422   * @param progress The Progressable object to track progress.
423   * @param metadata The metadata of the file.
424   * @return Returns the handle to the constructed SequenceFile Writer.
425   * @throws IOException
426   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
427   *     instead.
428   */
429  @Deprecated
430  public static Writer
431    createWriter(FileSystem fs, Configuration conf, Path name,
432                 Class keyClass, Class valClass, int bufferSize,
433                 short replication, long blockSize,
434                 CompressionType compressionType, CompressionCodec codec,
435                 Progressable progress, Metadata metadata) throws IOException {
436    return createWriter(conf, Writer.file(name),
437                        Writer.filesystem(fs),
438                        Writer.keyClass(keyClass),
439                        Writer.valueClass(valClass), 
440                        Writer.bufferSize(bufferSize), 
441                        Writer.replication(replication),
442                        Writer.blockSize(blockSize),
443                        Writer.compression(compressionType, codec),
444                        Writer.progressable(progress),
445                        Writer.metadata(metadata));
446  }
447
448  /**
449   * Construct the preferred type of SequenceFile Writer.
450   * @param fs The configured filesystem.
451   * @param conf The configuration.
452   * @param name The name of the file.
453   * @param keyClass The 'key' type.
454   * @param valClass The 'value' type.
455   * @param bufferSize buffer size for the underlaying outputstream.
456   * @param replication replication factor for the file.
457   * @param blockSize block size for the file.
458   * @param createParent create parent directory if non-existent
459   * @param compressionType The compression type.
460   * @param codec The compression codec.
461   * @param metadata The metadata of the file.
462   * @return Returns the handle to the constructed SequenceFile Writer.
463   * @throws IOException
464   */
465  @Deprecated
466  public static Writer
467  createWriter(FileSystem fs, Configuration conf, Path name,
468               Class keyClass, Class valClass, int bufferSize,
469               short replication, long blockSize, boolean createParent,
470               CompressionType compressionType, CompressionCodec codec,
471               Metadata metadata) throws IOException {
472    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
473        conf, name, keyClass, valClass, compressionType, codec,
474        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
475        CreateOpts.bufferSize(bufferSize),
476        createParent ? CreateOpts.createParent()
477                     : CreateOpts.donotCreateParent(),
478        CreateOpts.repFac(replication),
479        CreateOpts.blockSize(blockSize)
480      );
481  }
482
483  /**
484   * Construct the preferred type of SequenceFile Writer.
485   * @param fc The context for the specified file.
486   * @param conf The configuration.
487   * @param name The name of the file.
488   * @param keyClass The 'key' type.
489   * @param valClass The 'value' type.
490   * @param compressionType The compression type.
491   * @param codec The compression codec.
492   * @param metadata The metadata of the file.
493   * @param createFlag gives the semantics of create: overwrite, append etc.
494   * @param opts file creation options; see {@link CreateOpts}.
495   * @return Returns the handle to the constructed SequenceFile Writer.
496   * @throws IOException
497   */
498  public static Writer
499  createWriter(FileContext fc, Configuration conf, Path name,
500               Class keyClass, Class valClass,
501               CompressionType compressionType, CompressionCodec codec,
502               Metadata metadata,
503               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
504               throws IOException {
505    return createWriter(conf, fc.create(name, createFlag, opts),
506          keyClass, valClass, compressionType, codec, metadata).ownStream();
507  }
508
509  /**
510   * Construct the preferred type of SequenceFile Writer.
511   * @param fs The configured filesystem. 
512   * @param conf The configuration.
513   * @param name The name of the file. 
514   * @param keyClass The 'key' type.
515   * @param valClass The 'value' type.
516   * @param compressionType The compression type.
517   * @param codec The compression codec.
518   * @param progress The Progressable object to track progress.
519   * @return Returns the handle to the constructed SequenceFile Writer.
520   * @throws IOException
521   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
522   *     instead.
523   */
524  @Deprecated
525  public static Writer
526    createWriter(FileSystem fs, Configuration conf, Path name, 
527                 Class keyClass, Class valClass, 
528                 CompressionType compressionType, CompressionCodec codec,
529                 Progressable progress) throws IOException {
530    return createWriter(conf, Writer.file(name),
531                        Writer.filesystem(fs),
532                        Writer.keyClass(keyClass),
533                        Writer.valueClass(valClass),
534                        Writer.compression(compressionType, codec),
535                        Writer.progressable(progress));
536  }
537
538  /**
539   * Construct the preferred type of 'raw' SequenceFile Writer.
540   * @param conf The configuration.
541   * @param out The stream on top which the writer is to be constructed.
542   * @param keyClass The 'key' type.
543   * @param valClass The 'value' type.
544   * @param compressionType The compression type.
545   * @param codec The compression codec.
546   * @param metadata The metadata of the file.
547   * @return Returns the handle to the constructed SequenceFile Writer.
548   * @throws IOException
549   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
550   *     instead.
551   */
552  @Deprecated
553  public static Writer
554    createWriter(Configuration conf, FSDataOutputStream out, 
555                 Class keyClass, Class valClass,
556                 CompressionType compressionType,
557                 CompressionCodec codec, Metadata metadata) throws IOException {
558    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
559                        Writer.valueClass(valClass), 
560                        Writer.compression(compressionType, codec),
561                        Writer.metadata(metadata));
562  }
563  
564  /**
565   * Construct the preferred type of 'raw' SequenceFile Writer.
566   * @param conf The configuration.
567   * @param out The stream on top which the writer is to be constructed.
568   * @param keyClass The 'key' type.
569   * @param valClass The 'value' type.
570   * @param compressionType The compression type.
571   * @param codec The compression codec.
572   * @return Returns the handle to the constructed SequenceFile Writer.
573   * @throws IOException
574   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
575   *     instead.
576   */
577  @Deprecated
578  public static Writer
579    createWriter(Configuration conf, FSDataOutputStream out, 
580                 Class keyClass, Class valClass, CompressionType compressionType,
581                 CompressionCodec codec) throws IOException {
582    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
583                        Writer.valueClass(valClass),
584                        Writer.compression(compressionType, codec));
585  }
586  
587
588  /** The interface to 'raw' values of SequenceFiles. */
589  public static interface ValueBytes {
590
591    /** Writes the uncompressed bytes to the outStream.
592     * @param outStream : Stream to write uncompressed bytes into.
593     * @throws IOException
594     */
595    public void writeUncompressedBytes(DataOutputStream outStream)
596      throws IOException;
597
598    /** Write compressed bytes to outStream. 
599     * Note: that it will NOT compress the bytes if they are not compressed.
600     * @param outStream : Stream to write compressed bytes into.
601     */
602    public void writeCompressedBytes(DataOutputStream outStream) 
603      throws IllegalArgumentException, IOException;
604
605    /**
606     * Size of stored data.
607     */
608    public int getSize();
609  }
610  
611  private static class UncompressedBytes implements ValueBytes {
612    private int dataSize;
613    private byte[] data;
614    
615    private UncompressedBytes() {
616      data = null;
617      dataSize = 0;
618    }
619    
620    private void reset(DataInputStream in, int length) throws IOException {
621      if (data == null) {
622        data = new byte[length];
623      } else if (length > data.length) {
624        data = new byte[Math.max(length, data.length * 2)];
625      }
626      dataSize = -1;
627      in.readFully(data, 0, length);
628      dataSize = length;
629    }
630    
631    @Override
632    public int getSize() {
633      return dataSize;
634    }
635    
636    @Override
637    public void writeUncompressedBytes(DataOutputStream outStream)
638      throws IOException {
639      outStream.write(data, 0, dataSize);
640    }
641
642    @Override
643    public void writeCompressedBytes(DataOutputStream outStream) 
644      throws IllegalArgumentException, IOException {
645      throw 
646        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
647    }
648
649  } // UncompressedBytes
650  
651  private static class CompressedBytes implements ValueBytes {
652    private int dataSize;
653    private byte[] data;
654    DataInputBuffer rawData = null;
655    CompressionCodec codec = null;
656    CompressionInputStream decompressedStream = null;
657
658    private CompressedBytes(CompressionCodec codec) {
659      data = null;
660      dataSize = 0;
661      this.codec = codec;
662    }
663
664    private void reset(DataInputStream in, int length) throws IOException {
665      if (data == null) {
666        data = new byte[length];
667      } else if (length > data.length) {
668        data = new byte[Math.max(length, data.length * 2)];
669      } 
670      dataSize = -1;
671      in.readFully(data, 0, length);
672      dataSize = length;
673    }
674    
675    @Override
676    public int getSize() {
677      return dataSize;
678    }
679    
680    @Override
681    public void writeUncompressedBytes(DataOutputStream outStream)
682      throws IOException {
683      if (decompressedStream == null) {
684        rawData = new DataInputBuffer();
685        decompressedStream = codec.createInputStream(rawData);
686      } else {
687        decompressedStream.resetState();
688      }
689      rawData.reset(data, 0, dataSize);
690
691      byte[] buffer = new byte[8192];
692      int bytesRead = 0;
693      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
694        outStream.write(buffer, 0, bytesRead);
695      }
696    }
697
698    @Override
699    public void writeCompressedBytes(DataOutputStream outStream) 
700      throws IllegalArgumentException, IOException {
701      outStream.write(data, 0, dataSize);
702    }
703
704  } // CompressedBytes
705  
706  /**
707   * The class encapsulating with the metadata of a file.
708   * The metadata of a file is a list of attribute name/value
709   * pairs of Text type.
710   *
711   */
712  public static class Metadata implements Writable {
713
714    private TreeMap<Text, Text> theMetadata;
715    
716    public Metadata() {
717      this(new TreeMap<Text, Text>());
718    }
719    
720    public Metadata(TreeMap<Text, Text> arg) {
721      if (arg == null) {
722        this.theMetadata = new TreeMap<Text, Text>();
723      } else {
724        this.theMetadata = arg;
725      }
726    }
727    
728    public Text get(Text name) {
729      return this.theMetadata.get(name);
730    }
731    
732    public void set(Text name, Text value) {
733      this.theMetadata.put(name, value);
734    }
735    
736    public TreeMap<Text, Text> getMetadata() {
737      return new TreeMap<Text, Text>(this.theMetadata);
738    }
739    
740    @Override
741    public void write(DataOutput out) throws IOException {
742      out.writeInt(this.theMetadata.size());
743      Iterator<Map.Entry<Text, Text>> iter =
744        this.theMetadata.entrySet().iterator();
745      while (iter.hasNext()) {
746        Map.Entry<Text, Text> en = iter.next();
747        en.getKey().write(out);
748        en.getValue().write(out);
749      }
750    }
751
752    @Override
753    public void readFields(DataInput in) throws IOException {
754      int sz = in.readInt();
755      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
756      this.theMetadata = new TreeMap<Text, Text>();
757      for (int i = 0; i < sz; i++) {
758        Text key = new Text();
759        Text val = new Text();
760        key.readFields(in);
761        val.readFields(in);
762        this.theMetadata.put(key, val);
763      }    
764    }
765
766    @Override
767    public boolean equals(Object other) {
768      if (other == null) {
769        return false;
770      }
771      if (other.getClass() != this.getClass()) {
772        return false;
773      } else {
774        return equals((Metadata)other);
775      }
776    }
777    
778    public boolean equals(Metadata other) {
779      if (other == null) return false;
780      if (this.theMetadata.size() != other.theMetadata.size()) {
781        return false;
782      }
783      Iterator<Map.Entry<Text, Text>> iter1 =
784        this.theMetadata.entrySet().iterator();
785      Iterator<Map.Entry<Text, Text>> iter2 =
786        other.theMetadata.entrySet().iterator();
787      while (iter1.hasNext() && iter2.hasNext()) {
788        Map.Entry<Text, Text> en1 = iter1.next();
789        Map.Entry<Text, Text> en2 = iter2.next();
790        if (!en1.getKey().equals(en2.getKey())) {
791          return false;
792        }
793        if (!en1.getValue().equals(en2.getValue())) {
794          return false;
795        }
796      }
797      if (iter1.hasNext() || iter2.hasNext()) {
798        return false;
799      }
800      return true;
801    }
802
803    @Override
804    public int hashCode() {
805      assert false : "hashCode not designed";
806      return 42; // any arbitrary constant will do 
807    }
808    
809    @Override
810    public String toString() {
811      StringBuilder sb = new StringBuilder();
812      sb.append("size: ").append(this.theMetadata.size()).append("\n");
813      Iterator<Map.Entry<Text, Text>> iter =
814        this.theMetadata.entrySet().iterator();
815      while (iter.hasNext()) {
816        Map.Entry<Text, Text> en = iter.next();
817        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
818        sb.append("\n");
819      }
820      return sb.toString();
821    }
822  }
823  
824  /** Write key/value pairs to a sequence-format file. */
825  public static class Writer implements java.io.Closeable, Syncable {
826    private Configuration conf;
827    FSDataOutputStream out;
828    boolean ownOutputStream = true;
829    DataOutputBuffer buffer = new DataOutputBuffer();
830
831    Class keyClass;
832    Class valClass;
833
834    private final CompressionType compress;
835    CompressionCodec codec = null;
836    CompressionOutputStream deflateFilter = null;
837    DataOutputStream deflateOut = null;
838    Metadata metadata = null;
839    Compressor compressor = null;
840
841    private boolean appendMode = false;
842
843    protected Serializer keySerializer;
844    protected Serializer uncompressedValSerializer;
845    protected Serializer compressedValSerializer;
846    
847    // Insert a globally unique 16-byte value every few entries, so that one
848    // can seek into the middle of a file and then synchronize with record
849    // starts and ends by scanning for this value.
850    long lastSyncPos;                     // position of last sync
851    byte[] sync;                          // 16 random bytes
852    {
853      try {                                       
854        MessageDigest digester = MessageDigest.getInstance("MD5");
855        long time = Time.now();
856        digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8));
857        sync = digester.digest();
858      } catch (Exception e) {
859        throw new RuntimeException(e);
860      }
861    }
862
863    public static interface Option {}
864    
865    static class FileOption extends Options.PathOption 
866                                    implements Option {
867      FileOption(Path path) {
868        super(path);
869      }
870    }
871
872    /**
873     * @deprecated only used for backwards-compatibility in the createWriter methods
874     * that take FileSystem.
875     */
876    @Deprecated
877    private static class FileSystemOption implements Option {
878      private final FileSystem value;
879      protected FileSystemOption(FileSystem value) {
880        this.value = value;
881      }
882      public FileSystem getValue() {
883        return value;
884      }
885    }
886
887    static class StreamOption extends Options.FSDataOutputStreamOption 
888                              implements Option {
889      StreamOption(FSDataOutputStream stream) {
890        super(stream);
891      }
892    }
893
894    static class BufferSizeOption extends Options.IntegerOption
895                                  implements Option {
896      BufferSizeOption(int value) {
897        super(value);
898      }
899    }
900    
901    static class BlockSizeOption extends Options.LongOption implements Option {
902      BlockSizeOption(long value) {
903        super(value);
904      }
905    }
906
907    static class ReplicationOption extends Options.IntegerOption
908                                   implements Option {
909      ReplicationOption(int value) {
910        super(value);
911      }
912    }
913
914    static class AppendIfExistsOption extends Options.BooleanOption implements
915        Option {
916      AppendIfExistsOption(boolean value) {
917        super(value);
918      }
919    }
920
921    static class KeyClassOption extends Options.ClassOption implements Option {
922      KeyClassOption(Class<?> value) {
923        super(value);
924      }
925    }
926
927    static class ValueClassOption extends Options.ClassOption
928                                          implements Option {
929      ValueClassOption(Class<?> value) {
930        super(value);
931      }
932    }
933
934    static class MetadataOption implements Option {
935      private final Metadata value;
936      MetadataOption(Metadata value) {
937        this.value = value;
938      }
939      Metadata getValue() {
940        return value;
941      }
942    }
943
944    static class ProgressableOption extends Options.ProgressableOption
945                                    implements Option {
946      ProgressableOption(Progressable value) {
947        super(value);
948      }
949    }
950
951    private static class CompressionOption implements Option {
952      private final CompressionType value;
953      private final CompressionCodec codec;
954      CompressionOption(CompressionType value) {
955        this(value, null);
956      }
957      CompressionOption(CompressionType value, CompressionCodec codec) {
958        this.value = value;
959        this.codec = (CompressionType.NONE != value && null == codec)
960          ? new DefaultCodec()
961          : codec;
962      }
963      CompressionType getValue() {
964        return value;
965      }
966      CompressionCodec getCodec() {
967        return codec;
968      }
969    }
970
971    public static Option file(Path value) {
972      return new FileOption(value);
973    }
974
975    /**
976     * @deprecated only used for backwards-compatibility in the createWriter methods
977     * that take FileSystem.
978     */
979    @Deprecated
980    private static Option filesystem(FileSystem fs) {
981      return new SequenceFile.Writer.FileSystemOption(fs);
982    }
983    
984    public static Option bufferSize(int value) {
985      return new BufferSizeOption(value);
986    }
987    
988    public static Option stream(FSDataOutputStream value) {
989      return new StreamOption(value);
990    }
991    
992    public static Option replication(short value) {
993      return new ReplicationOption(value);
994    }
995    
996    public static Option appendIfExists(boolean value) {
997      return new AppendIfExistsOption(value);
998    }
999
1000    public static Option blockSize(long value) {
1001      return new BlockSizeOption(value);
1002    }
1003    
1004    public static Option progressable(Progressable value) {
1005      return new ProgressableOption(value);
1006    }
1007
1008    public static Option keyClass(Class<?> value) {
1009      return new KeyClassOption(value);
1010    }
1011    
1012    public static Option valueClass(Class<?> value) {
1013      return new ValueClassOption(value);
1014    }
1015    
1016    public static Option metadata(Metadata value) {
1017      return new MetadataOption(value);
1018    }
1019
1020    public static Option compression(CompressionType value) {
1021      return new CompressionOption(value);
1022    }
1023
1024    public static Option compression(CompressionType value,
1025        CompressionCodec codec) {
1026      return new CompressionOption(value, codec);
1027    }
1028    
1029    /**
1030     * Construct a uncompressed writer from a set of options.
1031     * @param conf the configuration to use
1032     * @param options the options used when creating the writer
1033     * @throws IOException if it fails
1034     */
1035    Writer(Configuration conf, 
1036           Option... opts) throws IOException {
1037      BlockSizeOption blockSizeOption = 
1038        Options.getOption(BlockSizeOption.class, opts);
1039      BufferSizeOption bufferSizeOption = 
1040        Options.getOption(BufferSizeOption.class, opts);
1041      ReplicationOption replicationOption = 
1042        Options.getOption(ReplicationOption.class, opts);
1043      ProgressableOption progressOption = 
1044        Options.getOption(ProgressableOption.class, opts);
1045      FileOption fileOption = Options.getOption(FileOption.class, opts);
1046      AppendIfExistsOption appendIfExistsOption = Options.getOption(
1047          AppendIfExistsOption.class, opts);
1048      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1049      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1050      KeyClassOption keyClassOption = 
1051        Options.getOption(KeyClassOption.class, opts);
1052      ValueClassOption valueClassOption = 
1053        Options.getOption(ValueClassOption.class, opts);
1054      MetadataOption metadataOption = 
1055        Options.getOption(MetadataOption.class, opts);
1056      CompressionOption compressionTypeOption =
1057        Options.getOption(CompressionOption.class, opts);
1058      // check consistency of options
1059      if ((fileOption == null) == (streamOption == null)) {
1060        throw new IllegalArgumentException("file or stream must be specified");
1061      }
1062      if (fileOption == null && (blockSizeOption != null ||
1063                                 bufferSizeOption != null ||
1064                                 replicationOption != null ||
1065                                 progressOption != null)) {
1066        throw new IllegalArgumentException("file modifier options not " +
1067                                           "compatible with stream");
1068      }
1069
1070      FSDataOutputStream out;
1071      boolean ownStream = fileOption != null;
1072      if (ownStream) {
1073        Path p = fileOption.getValue();
1074        FileSystem fs;
1075        if (fsOption != null) {
1076          fs = fsOption.getValue();
1077        } else {
1078          fs = p.getFileSystem(conf);
1079        }
1080        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1081          bufferSizeOption.getValue();
1082        short replication = replicationOption == null ? 
1083          fs.getDefaultReplication(p) :
1084          (short) replicationOption.getValue();
1085        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1086          blockSizeOption.getValue();
1087        Progressable progress = progressOption == null ? null :
1088          progressOption.getValue();
1089
1090        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
1091            && fs.exists(p)) {
1092
1093          // Read the file and verify header details
1094          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
1095              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
1096          try {
1097
1098            if (keyClassOption.getValue() != reader.getKeyClass()
1099                || valueClassOption.getValue() != reader.getValueClass()) {
1100              throw new IllegalArgumentException(
1101                  "Key/value class provided does not match the file");
1102            }
1103
1104            if (reader.getVersion() != VERSION[3]) {
1105              throw new VersionMismatchException(VERSION[3],
1106                  reader.getVersion());
1107            }
1108
1109            if (metadataOption != null) {
1110              LOG.info("MetaData Option is ignored during append");
1111            }
1112            metadataOption = (MetadataOption) SequenceFile.Writer
1113                .metadata(reader.getMetadata());
1114
1115            CompressionOption readerCompressionOption = new CompressionOption(
1116                reader.getCompressionType(), reader.getCompressionCodec());
1117
1118            if (readerCompressionOption.value != compressionTypeOption.value
1119                || !readerCompressionOption.codec.getClass().getName()
1120                    .equals(compressionTypeOption.codec.getClass().getName())) {
1121              throw new IllegalArgumentException(
1122                  "Compression option provided does not match the file");
1123            }
1124
1125            sync = reader.getSync();
1126
1127          } finally {
1128            reader.close();
1129          }
1130
1131          out = fs.append(p, bufferSize, progress);
1132          this.appendMode = true;
1133        } else {
1134          out = fs
1135              .create(p, true, bufferSize, replication, blockSize, progress);
1136        }
1137      } else {
1138        out = streamOption.getValue();
1139      }
1140      Class<?> keyClass = keyClassOption == null ?
1141          Object.class : keyClassOption.getValue();
1142      Class<?> valueClass = valueClassOption == null ?
1143          Object.class : valueClassOption.getValue();
1144      Metadata metadata = metadataOption == null ?
1145          new Metadata() : metadataOption.getValue();
1146      this.compress = compressionTypeOption.getValue();
1147      final CompressionCodec codec = compressionTypeOption.getCodec();
1148      if (codec != null &&
1149          (codec instanceof GzipCodec) &&
1150          !NativeCodeLoader.isNativeCodeLoaded() &&
1151          !ZlibFactory.isNativeZlibLoaded(conf)) {
1152        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1153                                           "GzipCodec without native-hadoop " +
1154                                           "code!");
1155      }
1156      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1157    }
1158
1159    /** Create the named file.
1160     * @deprecated Use 
1161     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1162     *   instead.
1163     */
1164    @Deprecated
1165    public Writer(FileSystem fs, Configuration conf, Path name, 
1166                  Class keyClass, Class valClass) throws IOException {
1167      this.compress = CompressionType.NONE;
1168      init(conf, fs.create(name), true, keyClass, valClass, null, 
1169           new Metadata());
1170    }
1171    
1172    /** Create the named file with write-progress reporter.
1173     * @deprecated Use 
1174     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1175     *   instead.
1176     */
1177    @Deprecated
1178    public Writer(FileSystem fs, Configuration conf, Path name, 
1179                  Class keyClass, Class valClass,
1180                  Progressable progress, Metadata metadata) throws IOException {
1181      this.compress = CompressionType.NONE;
1182      init(conf, fs.create(name, progress), true, keyClass, valClass,
1183           null, metadata);
1184    }
1185    
1186    /** Create the named file with write-progress reporter. 
1187     * @deprecated Use 
1188     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1189     *   instead.
1190     */
1191    @Deprecated
1192    public Writer(FileSystem fs, Configuration conf, Path name,
1193                  Class keyClass, Class valClass,
1194                  int bufferSize, short replication, long blockSize,
1195                  Progressable progress, Metadata metadata) throws IOException {
1196      this.compress = CompressionType.NONE;
1197      init(conf,
1198           fs.create(name, true, bufferSize, replication, blockSize, progress),
1199           true, keyClass, valClass, null, metadata);
1200    }
1201
1202    boolean isCompressed() { return compress != CompressionType.NONE; }
1203    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1204    
1205    Writer ownStream() { this.ownOutputStream = true; return this;  }
1206
1207    /** Write and flush the file header. */
1208    private void writeFileHeader() 
1209      throws IOException {
1210      out.write(VERSION);
1211      Text.writeString(out, keyClass.getName());
1212      Text.writeString(out, valClass.getName());
1213      
1214      out.writeBoolean(this.isCompressed());
1215      out.writeBoolean(this.isBlockCompressed());
1216      
1217      if (this.isCompressed()) {
1218        Text.writeString(out, (codec.getClass()).getName());
1219      }
1220      this.metadata.write(out);
1221      out.write(sync);                       // write the sync bytes
1222      out.flush();                           // flush header
1223    }
1224
1225    /** Initialize. */
1226    @SuppressWarnings("unchecked")
1227    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1228              Class keyClass, Class valClass,
1229              CompressionCodec codec, Metadata metadata) 
1230      throws IOException {
1231      this.conf = conf;
1232      this.out = out;
1233      this.ownOutputStream = ownStream;
1234      this.keyClass = keyClass;
1235      this.valClass = valClass;
1236      this.codec = codec;
1237      this.metadata = metadata;
1238      SerializationFactory serializationFactory = new SerializationFactory(conf);
1239      this.keySerializer = serializationFactory.getSerializer(keyClass);
1240      if (this.keySerializer == null) {
1241        throw new IOException(
1242            "Could not find a serializer for the Key class: '"
1243                + keyClass.getCanonicalName() + "'. "
1244                + "Please ensure that the configuration '" +
1245                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1246                + "properly configured, if you're using"
1247                + "custom serialization.");
1248      }
1249      this.keySerializer.open(buffer);
1250      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1251      if (this.uncompressedValSerializer == null) {
1252        throw new IOException(
1253            "Could not find a serializer for the Value class: '"
1254                + valClass.getCanonicalName() + "'. "
1255                + "Please ensure that the configuration '" +
1256                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1257                + "properly configured, if you're using"
1258                + "custom serialization.");
1259      }
1260      this.uncompressedValSerializer.open(buffer);
1261      if (this.codec != null) {
1262        ReflectionUtils.setConf(this.codec, this.conf);
1263        this.compressor = CodecPool.getCompressor(this.codec);
1264        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1265        this.deflateOut = 
1266          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1267        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1268        if (this.compressedValSerializer == null) {
1269          throw new IOException(
1270              "Could not find a serializer for the Value class: '"
1271                  + valClass.getCanonicalName() + "'. "
1272                  + "Please ensure that the configuration '" +
1273                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1274                  + "properly configured, if you're using"
1275                  + "custom serialization.");
1276        }
1277        this.compressedValSerializer.open(deflateOut);
1278      }
1279
1280      if (appendMode) {
1281        sync();
1282      } else {
1283        writeFileHeader();
1284      }
1285    }
1286    
1287    /** Returns the class of keys in this file. */
1288    public Class getKeyClass() { return keyClass; }
1289
1290    /** Returns the class of values in this file. */
1291    public Class getValueClass() { return valClass; }
1292
1293    /** Returns the compression codec of data in this file. */
1294    public CompressionCodec getCompressionCodec() { return codec; }
1295    
1296    /** create a sync point */
1297    public void sync() throws IOException {
1298      if (sync != null && lastSyncPos != out.getPos()) {
1299        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1300        out.write(sync);                          // write sync
1301        lastSyncPos = out.getPos();               // update lastSyncPos
1302      }
1303    }
1304
1305    /**
1306     * flush all currently written data to the file system
1307     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1308     */
1309    @Deprecated
1310    public void syncFs() throws IOException {
1311      if (out != null) {
1312        out.hflush();  // flush contents to file system
1313      }
1314    }
1315
1316    @Override
1317    public void hsync() throws IOException {
1318      if (out != null) {
1319        out.hsync();
1320      }
1321    }
1322
1323    @Override
1324    public void hflush() throws IOException {
1325      if (out != null) {
1326        out.hflush();
1327      }
1328    }
1329    
1330    /** Returns the configuration of this file. */
1331    Configuration getConf() { return conf; }
1332    
1333    /** Close the file. */
1334    @Override
1335    public synchronized void close() throws IOException {
1336      keySerializer.close();
1337      uncompressedValSerializer.close();
1338      if (compressedValSerializer != null) {
1339        compressedValSerializer.close();
1340      }
1341
1342      CodecPool.returnCompressor(compressor);
1343      compressor = null;
1344      
1345      if (out != null) {
1346        
1347        // Close the underlying stream iff we own it...
1348        if (ownOutputStream) {
1349          out.close();
1350        } else {
1351          out.flush();
1352        }
1353        out = null;
1354      }
1355    }
1356
1357    synchronized void checkAndWriteSync() throws IOException {
1358      if (sync != null &&
1359          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1360        sync();
1361      }
1362    }
1363
1364    /** Append a key/value pair. */
1365    public void append(Writable key, Writable val)
1366      throws IOException {
1367      append((Object) key, (Object) val);
1368    }
1369
1370    /** Append a key/value pair. */
1371    @SuppressWarnings("unchecked")
1372    public synchronized void append(Object key, Object val)
1373      throws IOException {
1374      if (key.getClass() != keyClass)
1375        throw new IOException("wrong key class: "+key.getClass().getName()
1376                              +" is not "+keyClass);
1377      if (val.getClass() != valClass)
1378        throw new IOException("wrong value class: "+val.getClass().getName()
1379                              +" is not "+valClass);
1380
1381      buffer.reset();
1382
1383      // Append the 'key'
1384      keySerializer.serialize(key);
1385      int keyLength = buffer.getLength();
1386      if (keyLength < 0)
1387        throw new IOException("negative length keys not allowed: " + key);
1388
1389      // Append the 'value'
1390      if (compress == CompressionType.RECORD) {
1391        deflateFilter.resetState();
1392        compressedValSerializer.serialize(val);
1393        deflateOut.flush();
1394        deflateFilter.finish();
1395      } else {
1396        uncompressedValSerializer.serialize(val);
1397      }
1398
1399      // Write the record out
1400      checkAndWriteSync();                                // sync
1401      out.writeInt(buffer.getLength());                   // total record length
1402      out.writeInt(keyLength);                            // key portion length
1403      out.write(buffer.getData(), 0, buffer.getLength()); // data
1404    }
1405
1406    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1407        int keyLength, ValueBytes val) throws IOException {
1408      if (keyLength < 0)
1409        throw new IOException("negative length keys not allowed: " + keyLength);
1410
1411      int valLength = val.getSize();
1412
1413      checkAndWriteSync();
1414      
1415      out.writeInt(keyLength+valLength);          // total record length
1416      out.writeInt(keyLength);                    // key portion length
1417      out.write(keyData, keyOffset, keyLength);   // key
1418      val.writeUncompressedBytes(out);            // value
1419    }
1420
1421    /** Returns the current length of the output file.
1422     *
1423     * <p>This always returns a synchronized position.  In other words,
1424     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1425     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1426     * the key may be earlier in the file than key last written when this
1427     * method was called (e.g., with block-compression, it may be the first key
1428     * in the block that was being written when this method was called).
1429     */
1430    public synchronized long getLength() throws IOException {
1431      return out.getPos();
1432    }
1433
1434  } // class Writer
1435
1436  /** Write key/compressed-value pairs to a sequence-format file. */
1437  static class RecordCompressWriter extends Writer {
1438    
1439    RecordCompressWriter(Configuration conf, 
1440                         Option... options) throws IOException {
1441      super(conf, options);
1442    }
1443
1444    /** Append a key/value pair. */
1445    @Override
1446    @SuppressWarnings("unchecked")
1447    public synchronized void append(Object key, Object val)
1448      throws IOException {
1449      if (key.getClass() != keyClass)
1450        throw new IOException("wrong key class: "+key.getClass().getName()
1451                              +" is not "+keyClass);
1452      if (val.getClass() != valClass)
1453        throw new IOException("wrong value class: "+val.getClass().getName()
1454                              +" is not "+valClass);
1455
1456      buffer.reset();
1457
1458      // Append the 'key'
1459      keySerializer.serialize(key);
1460      int keyLength = buffer.getLength();
1461      if (keyLength < 0)
1462        throw new IOException("negative length keys not allowed: " + key);
1463
1464      // Compress 'value' and append it
1465      deflateFilter.resetState();
1466      compressedValSerializer.serialize(val);
1467      deflateOut.flush();
1468      deflateFilter.finish();
1469
1470      // Write the record out
1471      checkAndWriteSync();                                // sync
1472      out.writeInt(buffer.getLength());                   // total record length
1473      out.writeInt(keyLength);                            // key portion length
1474      out.write(buffer.getData(), 0, buffer.getLength()); // data
1475    }
1476
1477    /** Append a key/value pair. */
1478    @Override
1479    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1480        int keyLength, ValueBytes val) throws IOException {
1481
1482      if (keyLength < 0)
1483        throw new IOException("negative length keys not allowed: " + keyLength);
1484
1485      int valLength = val.getSize();
1486      
1487      checkAndWriteSync();                        // sync
1488      out.writeInt(keyLength+valLength);          // total record length
1489      out.writeInt(keyLength);                    // key portion length
1490      out.write(keyData, keyOffset, keyLength);   // 'key' data
1491      val.writeCompressedBytes(out);              // 'value' data
1492    }
1493    
1494  } // RecordCompressionWriter
1495
1496  /** Write compressed key/value blocks to a sequence-format file. */
1497  static class BlockCompressWriter extends Writer {
1498    
1499    private int noBufferedRecords = 0;
1500    
1501    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1502    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1503
1504    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1505    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1506
1507    private final int compressionBlockSize;
1508    
1509    BlockCompressWriter(Configuration conf,
1510                        Option... options) throws IOException {
1511      super(conf, options);
1512      compressionBlockSize = 
1513        conf.getInt("io.seqfile.compress.blocksize", 1000000);
1514      keySerializer.close();
1515      keySerializer.open(keyBuffer);
1516      uncompressedValSerializer.close();
1517      uncompressedValSerializer.open(valBuffer);
1518    }
1519
1520    /** Workhorse to check and write out compressed data/lengths */
1521    private synchronized 
1522      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1523      throws IOException {
1524      deflateFilter.resetState();
1525      buffer.reset();
1526      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1527                       uncompressedDataBuffer.getLength());
1528      deflateOut.flush();
1529      deflateFilter.finish();
1530      
1531      WritableUtils.writeVInt(out, buffer.getLength());
1532      out.write(buffer.getData(), 0, buffer.getLength());
1533    }
1534    
1535    /** Compress and flush contents to dfs */
1536    @Override
1537    public synchronized void sync() throws IOException {
1538      if (noBufferedRecords > 0) {
1539        super.sync();
1540        
1541        // No. of records
1542        WritableUtils.writeVInt(out, noBufferedRecords);
1543        
1544        // Write 'keys' and lengths
1545        writeBuffer(keyLenBuffer);
1546        writeBuffer(keyBuffer);
1547        
1548        // Write 'values' and lengths
1549        writeBuffer(valLenBuffer);
1550        writeBuffer(valBuffer);
1551        
1552        // Flush the file-stream
1553        out.flush();
1554        
1555        // Reset internal states
1556        keyLenBuffer.reset();
1557        keyBuffer.reset();
1558        valLenBuffer.reset();
1559        valBuffer.reset();
1560        noBufferedRecords = 0;
1561      }
1562      
1563    }
1564    
1565    /** Close the file. */
1566    @Override
1567    public synchronized void close() throws IOException {
1568      if (out != null) {
1569        sync();
1570      }
1571      super.close();
1572    }
1573
1574    /** Append a key/value pair. */
1575    @Override
1576    @SuppressWarnings("unchecked")
1577    public synchronized void append(Object key, Object val)
1578      throws IOException {
1579      if (key.getClass() != keyClass)
1580        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1581      if (val.getClass() != valClass)
1582        throw new IOException("wrong value class: "+val+" is not "+valClass);
1583
1584      // Save key/value into respective buffers 
1585      int oldKeyLength = keyBuffer.getLength();
1586      keySerializer.serialize(key);
1587      int keyLength = keyBuffer.getLength() - oldKeyLength;
1588      if (keyLength < 0)
1589        throw new IOException("negative length keys not allowed: " + key);
1590      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1591
1592      int oldValLength = valBuffer.getLength();
1593      uncompressedValSerializer.serialize(val);
1594      int valLength = valBuffer.getLength() - oldValLength;
1595      WritableUtils.writeVInt(valLenBuffer, valLength);
1596      
1597      // Added another key/value pair
1598      ++noBufferedRecords;
1599      
1600      // Compress and flush?
1601      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1602      if (currentBlockSize >= compressionBlockSize) {
1603        sync();
1604      }
1605    }
1606    
1607    /** Append a key/value pair. */
1608    @Override
1609    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1610        int keyLength, ValueBytes val) throws IOException {
1611      
1612      if (keyLength < 0)
1613        throw new IOException("negative length keys not allowed");
1614
1615      int valLength = val.getSize();
1616      
1617      // Save key/value data in relevant buffers
1618      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1619      keyBuffer.write(keyData, keyOffset, keyLength);
1620      WritableUtils.writeVInt(valLenBuffer, valLength);
1621      val.writeUncompressedBytes(valBuffer);
1622
1623      // Added another key/value pair
1624      ++noBufferedRecords;
1625
1626      // Compress and flush?
1627      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1628      if (currentBlockSize >= compressionBlockSize) {
1629        sync();
1630      }
1631    }
1632  
1633  } // BlockCompressionWriter
1634
1635  /** Get the configured buffer size */
1636  private static int getBufferSize(Configuration conf) {
1637    return conf.getInt("io.file.buffer.size", 4096);
1638  }
1639
1640  /** Reads key/value pairs from a sequence-format file. */
1641  public static class Reader implements java.io.Closeable {
1642    private String filename;
1643    private FSDataInputStream in;
1644    private DataOutputBuffer outBuf = new DataOutputBuffer();
1645
1646    private byte version;
1647
1648    private String keyClassName;
1649    private String valClassName;
1650    private Class keyClass;
1651    private Class valClass;
1652
1653    private CompressionCodec codec = null;
1654    private Metadata metadata = null;
1655    
1656    private byte[] sync = new byte[SYNC_HASH_SIZE];
1657    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1658    private boolean syncSeen;
1659
1660    private long headerEnd;
1661    private long end;
1662    private int keyLength;
1663    private int recordLength;
1664
1665    private boolean decompress;
1666    private boolean blockCompressed;
1667    
1668    private Configuration conf;
1669
1670    private int noBufferedRecords = 0;
1671    private boolean lazyDecompress = true;
1672    private boolean valuesDecompressed = true;
1673    
1674    private int noBufferedKeys = 0;
1675    private int noBufferedValues = 0;
1676    
1677    private DataInputBuffer keyLenBuffer = null;
1678    private CompressionInputStream keyLenInFilter = null;
1679    private DataInputStream keyLenIn = null;
1680    private Decompressor keyLenDecompressor = null;
1681    private DataInputBuffer keyBuffer = null;
1682    private CompressionInputStream keyInFilter = null;
1683    private DataInputStream keyIn = null;
1684    private Decompressor keyDecompressor = null;
1685
1686    private DataInputBuffer valLenBuffer = null;
1687    private CompressionInputStream valLenInFilter = null;
1688    private DataInputStream valLenIn = null;
1689    private Decompressor valLenDecompressor = null;
1690    private DataInputBuffer valBuffer = null;
1691    private CompressionInputStream valInFilter = null;
1692    private DataInputStream valIn = null;
1693    private Decompressor valDecompressor = null;
1694    
1695    private Deserializer keyDeserializer;
1696    private Deserializer valDeserializer;
1697
1698    /**
1699     * A tag interface for all of the Reader options
1700     */
1701    public static interface Option {}
1702    
1703    /**
1704     * Create an option to specify the path name of the sequence file.
1705     * @param value the path to read
1706     * @return a new option
1707     */
1708    public static Option file(Path value) {
1709      return new FileOption(value);
1710    }
1711    
1712    /**
1713     * Create an option to specify the stream with the sequence file.
1714     * @param value the stream to read.
1715     * @return a new option
1716     */
1717    public static Option stream(FSDataInputStream value) {
1718      return new InputStreamOption(value);
1719    }
1720    
1721    /**
1722     * Create an option to specify the starting byte to read.
1723     * @param value the number of bytes to skip over
1724     * @return a new option
1725     */
1726    public static Option start(long value) {
1727      return new StartOption(value);
1728    }
1729    
1730    /**
1731     * Create an option to specify the number of bytes to read.
1732     * @param value the number of bytes to read
1733     * @return a new option
1734     */
1735    public static Option length(long value) {
1736      return new LengthOption(value);
1737    }
1738    
1739    /**
1740     * Create an option with the buffer size for reading the given pathname.
1741     * @param value the number of bytes to buffer
1742     * @return a new option
1743     */
1744    public static Option bufferSize(int value) {
1745      return new BufferSizeOption(value);
1746    }
1747
1748    private static class FileOption extends Options.PathOption 
1749                                    implements Option {
1750      private FileOption(Path value) {
1751        super(value);
1752      }
1753    }
1754    
1755    private static class InputStreamOption
1756        extends Options.FSDataInputStreamOption 
1757        implements Option {
1758      private InputStreamOption(FSDataInputStream value) {
1759        super(value);
1760      }
1761    }
1762
1763    private static class StartOption extends Options.LongOption
1764                                     implements Option {
1765      private StartOption(long value) {
1766        super(value);
1767      }
1768    }
1769
1770    private static class LengthOption extends Options.LongOption
1771                                      implements Option {
1772      private LengthOption(long value) {
1773        super(value);
1774      }
1775    }
1776
1777    private static class BufferSizeOption extends Options.IntegerOption
1778                                      implements Option {
1779      private BufferSizeOption(int value) {
1780        super(value);
1781      }
1782    }
1783
1784    // only used directly
1785    private static class OnlyHeaderOption extends Options.BooleanOption 
1786                                          implements Option {
1787      private OnlyHeaderOption() {
1788        super(true);
1789      }
1790    }
1791
1792    public Reader(Configuration conf, Option... opts) throws IOException {
1793      // Look up the options, these are null if not set
1794      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1795      InputStreamOption streamOpt = 
1796        Options.getOption(InputStreamOption.class, opts);
1797      StartOption startOpt = Options.getOption(StartOption.class, opts);
1798      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1799      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1800      OnlyHeaderOption headerOnly = 
1801        Options.getOption(OnlyHeaderOption.class, opts);
1802      // check for consistency
1803      if ((fileOpt == null) == (streamOpt == null)) {
1804        throw new 
1805          IllegalArgumentException("File or stream option must be specified");
1806      }
1807      if (fileOpt == null && bufOpt != null) {
1808        throw new IllegalArgumentException("buffer size can only be set when" +
1809                                           " a file is specified.");
1810      }
1811      // figure out the real values
1812      Path filename = null;
1813      FSDataInputStream file;
1814      final long len;
1815      if (fileOpt != null) {
1816        filename = fileOpt.getValue();
1817        FileSystem fs = filename.getFileSystem(conf);
1818        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1819        len = null == lenOpt
1820          ? fs.getFileStatus(filename).getLen()
1821          : lenOpt.getValue();
1822        file = openFile(fs, filename, bufSize, len);
1823      } else {
1824        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1825        file = streamOpt.getValue();
1826      }
1827      long start = startOpt == null ? 0 : startOpt.getValue();
1828      // really set up
1829      initialize(filename, file, start, len, conf, headerOnly != null);
1830    }
1831
1832    /**
1833     * Construct a reader by opening a file from the given file system.
1834     * @param fs The file system used to open the file.
1835     * @param file The file being read.
1836     * @param conf Configuration
1837     * @throws IOException
1838     * @deprecated Use Reader(Configuration, Option...) instead.
1839     */
1840    @Deprecated
1841    public Reader(FileSystem fs, Path file, 
1842                  Configuration conf) throws IOException {
1843      this(conf, file(file.makeQualified(fs)));
1844    }
1845
1846    /**
1847     * Construct a reader by the given input stream.
1848     * @param in An input stream.
1849     * @param buffersize unused
1850     * @param start The starting position.
1851     * @param length The length being read.
1852     * @param conf Configuration
1853     * @throws IOException
1854     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1855     */
1856    @Deprecated
1857    public Reader(FSDataInputStream in, int buffersize,
1858        long start, long length, Configuration conf) throws IOException {
1859      this(conf, stream(in), start(start), length(length));
1860    }
1861
1862    /** Common work of the constructors. */
1863    private void initialize(Path filename, FSDataInputStream in,
1864                            long start, long length, Configuration conf,
1865                            boolean tempReader) throws IOException {
1866      if (in == null) {
1867        throw new IllegalArgumentException("in == null");
1868      }
1869      this.filename = filename == null ? "<unknown>" : filename.toString();
1870      this.in = in;
1871      this.conf = conf;
1872      boolean succeeded = false;
1873      try {
1874        seek(start);
1875        this.end = this.in.getPos() + length;
1876        // if it wrapped around, use the max
1877        if (end < length) {
1878          end = Long.MAX_VALUE;
1879        }
1880        init(tempReader);
1881        succeeded = true;
1882      } finally {
1883        if (!succeeded) {
1884          IOUtils.cleanup(LOG, this.in);
1885        }
1886      }
1887    }
1888
1889    /**
1890     * Override this method to specialize the type of
1891     * {@link FSDataInputStream} returned.
1892     * @param fs The file system used to open the file.
1893     * @param file The file being read.
1894     * @param bufferSize The buffer size used to read the file.
1895     * @param length The length being read if it is >= 0.  Otherwise,
1896     *               the length is not available.
1897     * @return The opened stream.
1898     * @throws IOException
1899     */
1900    protected FSDataInputStream openFile(FileSystem fs, Path file,
1901        int bufferSize, long length) throws IOException {
1902      return fs.open(file, bufferSize);
1903    }
1904    
1905    /**
1906     * Initialize the {@link Reader}
1907     * @param tmpReader <code>true</code> if we are constructing a temporary
1908     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1909     *                  and hence do not initialize every component; 
1910     *                  <code>false</code> otherwise.
1911     * @throws IOException
1912     */
1913    private void init(boolean tempReader) throws IOException {
1914      byte[] versionBlock = new byte[VERSION.length];
1915      String exceptionMsg = this + " not a SequenceFile";
1916
1917      // Try to read sequence file header.
1918      try {
1919        in.readFully(versionBlock);
1920      } catch (EOFException e) {
1921        throw new EOFException(exceptionMsg);
1922      }
1923
1924      if ((versionBlock[0] != VERSION[0]) ||
1925          (versionBlock[1] != VERSION[1]) ||
1926          (versionBlock[2] != VERSION[2])) {
1927        throw new IOException(this + " not a SequenceFile");
1928      }
1929
1930      // Set 'version'
1931      version = versionBlock[3];
1932      if (version > VERSION[3]) {
1933        throw new VersionMismatchException(VERSION[3], version);
1934      }
1935
1936      if (version < BLOCK_COMPRESS_VERSION) {
1937        UTF8 className = new UTF8();
1938
1939        className.readFields(in);
1940        keyClassName = className.toStringChecked(); // key class name
1941
1942        className.readFields(in);
1943        valClassName = className.toStringChecked(); // val class name
1944      } else {
1945        keyClassName = Text.readString(in);
1946        valClassName = Text.readString(in);
1947      }
1948
1949      if (version > 2) {                          // if version > 2
1950        this.decompress = in.readBoolean();       // is compressed?
1951      } else {
1952        decompress = false;
1953      }
1954
1955      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1956        this.blockCompressed = in.readBoolean();  // is block-compressed?
1957      } else {
1958        blockCompressed = false;
1959      }
1960      
1961      // if version >= 5
1962      // setup the compression codec
1963      if (decompress) {
1964        if (version >= CUSTOM_COMPRESS_VERSION) {
1965          String codecClassname = Text.readString(in);
1966          try {
1967            Class<? extends CompressionCodec> codecClass
1968              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1969            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1970          } catch (ClassNotFoundException cnfe) {
1971            throw new IllegalArgumentException("Unknown codec: " + 
1972                                               codecClassname, cnfe);
1973          }
1974        } else {
1975          codec = new DefaultCodec();
1976          ((Configurable)codec).setConf(conf);
1977        }
1978      }
1979      
1980      this.metadata = new Metadata();
1981      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1982        this.metadata.readFields(in);
1983      }
1984      
1985      if (version > 1) {                          // if version > 1
1986        in.readFully(sync);                       // read sync bytes
1987        headerEnd = in.getPos();                  // record end of header
1988      }
1989      
1990      // Initialize... *not* if this we are constructing a temporary Reader
1991      if (!tempReader) {
1992        valBuffer = new DataInputBuffer();
1993        if (decompress) {
1994          valDecompressor = CodecPool.getDecompressor(codec);
1995          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1996          valIn = new DataInputStream(valInFilter);
1997        } else {
1998          valIn = valBuffer;
1999        }
2000
2001        if (blockCompressed) {
2002          keyLenBuffer = new DataInputBuffer();
2003          keyBuffer = new DataInputBuffer();
2004          valLenBuffer = new DataInputBuffer();
2005
2006          keyLenDecompressor = CodecPool.getDecompressor(codec);
2007          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
2008                                                   keyLenDecompressor);
2009          keyLenIn = new DataInputStream(keyLenInFilter);
2010
2011          keyDecompressor = CodecPool.getDecompressor(codec);
2012          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
2013          keyIn = new DataInputStream(keyInFilter);
2014
2015          valLenDecompressor = CodecPool.getDecompressor(codec);
2016          valLenInFilter = codec.createInputStream(valLenBuffer, 
2017                                                   valLenDecompressor);
2018          valLenIn = new DataInputStream(valLenInFilter);
2019        }
2020        
2021        SerializationFactory serializationFactory =
2022          new SerializationFactory(conf);
2023        this.keyDeserializer =
2024          getDeserializer(serializationFactory, getKeyClass());
2025        if (this.keyDeserializer == null) {
2026          throw new IOException(
2027              "Could not find a deserializer for the Key class: '"
2028                  + getKeyClass().getCanonicalName() + "'. "
2029                  + "Please ensure that the configuration '" +
2030                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2031                  + "properly configured, if you're using "
2032                  + "custom serialization.");
2033        }
2034        if (!blockCompressed) {
2035          this.keyDeserializer.open(valBuffer);
2036        } else {
2037          this.keyDeserializer.open(keyIn);
2038        }
2039        this.valDeserializer =
2040          getDeserializer(serializationFactory, getValueClass());
2041        if (this.valDeserializer == null) {
2042          throw new IOException(
2043              "Could not find a deserializer for the Value class: '"
2044                  + getValueClass().getCanonicalName() + "'. "
2045                  + "Please ensure that the configuration '" +
2046                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2047                  + "properly configured, if you're using "
2048                  + "custom serialization.");
2049        }
2050        this.valDeserializer.open(valIn);
2051      }
2052    }
2053    
2054    @SuppressWarnings("unchecked")
2055    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
2056      return sf.getDeserializer(c);
2057    }
2058    
2059    /** Close the file. */
2060    @Override
2061    public synchronized void close() throws IOException {
2062      // Return the decompressors to the pool
2063      CodecPool.returnDecompressor(keyLenDecompressor);
2064      CodecPool.returnDecompressor(keyDecompressor);
2065      CodecPool.returnDecompressor(valLenDecompressor);
2066      CodecPool.returnDecompressor(valDecompressor);
2067      keyLenDecompressor = keyDecompressor = null;
2068      valLenDecompressor = valDecompressor = null;
2069      
2070      if (keyDeserializer != null) {
2071        keyDeserializer.close();
2072      }
2073      if (valDeserializer != null) {
2074        valDeserializer.close();
2075      }
2076      
2077      // Close the input-stream
2078      in.close();
2079    }
2080
2081    /** Returns the name of the key class. */
2082    public String getKeyClassName() {
2083      return keyClassName;
2084    }
2085
2086    /** Returns the class of keys in this file. */
2087    public synchronized Class<?> getKeyClass() {
2088      if (null == keyClass) {
2089        try {
2090          keyClass = WritableName.getClass(getKeyClassName(), conf);
2091        } catch (IOException e) {
2092          throw new RuntimeException(e);
2093        }
2094      }
2095      return keyClass;
2096    }
2097
2098    /** Returns the name of the value class. */
2099    public String getValueClassName() {
2100      return valClassName;
2101    }
2102
2103    /** Returns the class of values in this file. */
2104    public synchronized Class<?> getValueClass() {
2105      if (null == valClass) {
2106        try {
2107          valClass = WritableName.getClass(getValueClassName(), conf);
2108        } catch (IOException e) {
2109          throw new RuntimeException(e);
2110        }
2111      }
2112      return valClass;
2113    }
2114
2115    /** Returns true if values are compressed. */
2116    public boolean isCompressed() { return decompress; }
2117    
2118    /** Returns true if records are block-compressed. */
2119    public boolean isBlockCompressed() { return blockCompressed; }
2120    
2121    /** Returns the compression codec of data in this file. */
2122    public CompressionCodec getCompressionCodec() { return codec; }
2123    
2124    private byte[] getSync() {
2125      return sync;
2126    }
2127
2128    private byte getVersion() {
2129      return version;
2130    }
2131
2132    /**
2133     * Get the compression type for this file.
2134     * @return the compression type
2135     */
2136    public CompressionType getCompressionType() {
2137      if (decompress) {
2138        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2139      } else {
2140        return CompressionType.NONE;
2141      }
2142    }
2143
2144    /** Returns the metadata object of the file */
2145    public Metadata getMetadata() {
2146      return this.metadata;
2147    }
2148    
2149    /** Returns the configuration used for this file. */
2150    Configuration getConf() { return conf; }
2151    
2152    /** Read a compressed buffer */
2153    private synchronized void readBuffer(DataInputBuffer buffer, 
2154                                         CompressionInputStream filter) throws IOException {
2155      // Read data into a temporary buffer
2156      DataOutputBuffer dataBuffer = new DataOutputBuffer();
2157
2158      try {
2159        int dataBufferLength = WritableUtils.readVInt(in);
2160        dataBuffer.write(in, dataBufferLength);
2161      
2162        // Set up 'buffer' connected to the input-stream
2163        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2164      } finally {
2165        dataBuffer.close();
2166      }
2167
2168      // Reset the codec
2169      filter.resetState();
2170    }
2171    
2172    /** Read the next 'compressed' block */
2173    private synchronized void readBlock() throws IOException {
2174      // Check if we need to throw away a whole block of 
2175      // 'values' due to 'lazy decompression' 
2176      if (lazyDecompress && !valuesDecompressed) {
2177        in.seek(WritableUtils.readVInt(in)+in.getPos());
2178        in.seek(WritableUtils.readVInt(in)+in.getPos());
2179      }
2180      
2181      // Reset internal states
2182      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2183      valuesDecompressed = false;
2184
2185      //Process sync
2186      if (sync != null) {
2187        in.readInt();
2188        in.readFully(syncCheck);                // read syncCheck
2189        if (!Arrays.equals(sync, syncCheck))    // check it
2190          throw new IOException("File is corrupt!");
2191      }
2192      syncSeen = true;
2193
2194      // Read number of records in this block
2195      noBufferedRecords = WritableUtils.readVInt(in);
2196      
2197      // Read key lengths and keys
2198      readBuffer(keyLenBuffer, keyLenInFilter);
2199      readBuffer(keyBuffer, keyInFilter);
2200      noBufferedKeys = noBufferedRecords;
2201      
2202      // Read value lengths and values
2203      if (!lazyDecompress) {
2204        readBuffer(valLenBuffer, valLenInFilter);
2205        readBuffer(valBuffer, valInFilter);
2206        noBufferedValues = noBufferedRecords;
2207        valuesDecompressed = true;
2208      }
2209    }
2210
2211    /** 
2212     * Position valLenIn/valIn to the 'value' 
2213     * corresponding to the 'current' key 
2214     */
2215    private synchronized void seekToCurrentValue() throws IOException {
2216      if (!blockCompressed) {
2217        if (decompress) {
2218          valInFilter.resetState();
2219        }
2220        valBuffer.reset();
2221      } else {
2222        // Check if this is the first value in the 'block' to be read
2223        if (lazyDecompress && !valuesDecompressed) {
2224          // Read the value lengths and values
2225          readBuffer(valLenBuffer, valLenInFilter);
2226          readBuffer(valBuffer, valInFilter);
2227          noBufferedValues = noBufferedRecords;
2228          valuesDecompressed = true;
2229        }
2230        
2231        // Calculate the no. of bytes to skip
2232        // Note: 'current' key has already been read!
2233        int skipValBytes = 0;
2234        int currentKey = noBufferedKeys + 1;          
2235        for (int i=noBufferedValues; i > currentKey; --i) {
2236          skipValBytes += WritableUtils.readVInt(valLenIn);
2237          --noBufferedValues;
2238        }
2239        
2240        // Skip to the 'val' corresponding to 'current' key
2241        if (skipValBytes > 0) {
2242          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2243            throw new IOException("Failed to seek to " + currentKey + 
2244                                  "(th) value!");
2245          }
2246        }
2247      }
2248    }
2249
2250    /**
2251     * Get the 'value' corresponding to the last read 'key'.
2252     * @param val : The 'value' to be read.
2253     * @throws IOException
2254     */
2255    public synchronized void getCurrentValue(Writable val) 
2256      throws IOException {
2257      if (val instanceof Configurable) {
2258        ((Configurable) val).setConf(this.conf);
2259      }
2260
2261      // Position stream to 'current' value
2262      seekToCurrentValue();
2263
2264      if (!blockCompressed) {
2265        val.readFields(valIn);
2266        
2267        if (valIn.read() > 0) {
2268          LOG.info("available bytes: " + valIn.available());
2269          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2270                                + " bytes, should read " +
2271                                (valBuffer.getLength()-keyLength));
2272        }
2273      } else {
2274        // Get the value
2275        int valLength = WritableUtils.readVInt(valLenIn);
2276        val.readFields(valIn);
2277        
2278        // Read another compressed 'value'
2279        --noBufferedValues;
2280        
2281        // Sanity check
2282        if ((valLength < 0) && LOG.isDebugEnabled()) {
2283          LOG.debug(val + " is a zero-length value");
2284        }
2285      }
2286
2287    }
2288    
2289    /**
2290     * Get the 'value' corresponding to the last read 'key'.
2291     * @param val : The 'value' to be read.
2292     * @throws IOException
2293     */
2294    public synchronized Object getCurrentValue(Object val) 
2295      throws IOException {
2296      if (val instanceof Configurable) {
2297        ((Configurable) val).setConf(this.conf);
2298      }
2299
2300      // Position stream to 'current' value
2301      seekToCurrentValue();
2302
2303      if (!blockCompressed) {
2304        val = deserializeValue(val);
2305        
2306        if (valIn.read() > 0) {
2307          LOG.info("available bytes: " + valIn.available());
2308          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2309                                + " bytes, should read " +
2310                                (valBuffer.getLength()-keyLength));
2311        }
2312      } else {
2313        // Get the value
2314        int valLength = WritableUtils.readVInt(valLenIn);
2315        val = deserializeValue(val);
2316        
2317        // Read another compressed 'value'
2318        --noBufferedValues;
2319        
2320        // Sanity check
2321        if ((valLength < 0) && LOG.isDebugEnabled()) {
2322          LOG.debug(val + " is a zero-length value");
2323        }
2324      }
2325      return val;
2326
2327    }
2328
2329    @SuppressWarnings("unchecked")
2330    private Object deserializeValue(Object val) throws IOException {
2331      return valDeserializer.deserialize(val);
2332    }
2333    
2334    /** Read the next key in the file into <code>key</code>, skipping its
2335     * value.  True if another entry exists, and false at end of file. */
2336    public synchronized boolean next(Writable key) throws IOException {
2337      if (key.getClass() != getKeyClass())
2338        throw new IOException("wrong key class: "+key.getClass().getName()
2339                              +" is not "+keyClass);
2340
2341      if (!blockCompressed) {
2342        outBuf.reset();
2343        
2344        keyLength = next(outBuf);
2345        if (keyLength < 0)
2346          return false;
2347        
2348        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2349        
2350        key.readFields(valBuffer);
2351        valBuffer.mark(0);
2352        if (valBuffer.getPosition() != keyLength)
2353          throw new IOException(key + " read " + valBuffer.getPosition()
2354                                + " bytes, should read " + keyLength);
2355      } else {
2356        //Reset syncSeen
2357        syncSeen = false;
2358        
2359        if (noBufferedKeys == 0) {
2360          try {
2361            readBlock();
2362          } catch (EOFException eof) {
2363            return false;
2364          }
2365        }
2366        
2367        int keyLength = WritableUtils.readVInt(keyLenIn);
2368        
2369        // Sanity check
2370        if (keyLength < 0) {
2371          return false;
2372        }
2373        
2374        //Read another compressed 'key'
2375        key.readFields(keyIn);
2376        --noBufferedKeys;
2377      }
2378
2379      return true;
2380    }
2381
2382    /** Read the next key/value pair in the file into <code>key</code> and
2383     * <code>val</code>.  Returns true if such a pair exists and false when at
2384     * end of file */
2385    public synchronized boolean next(Writable key, Writable val)
2386      throws IOException {
2387      if (val.getClass() != getValueClass())
2388        throw new IOException("wrong value class: "+val+" is not "+valClass);
2389
2390      boolean more = next(key);
2391      
2392      if (more) {
2393        getCurrentValue(val);
2394      }
2395
2396      return more;
2397    }
2398    
2399    /**
2400     * Read and return the next record length, potentially skipping over 
2401     * a sync block.
2402     * @return the length of the next record or -1 if there is no next record
2403     * @throws IOException
2404     */
2405    private synchronized int readRecordLength() throws IOException {
2406      if (in.getPos() >= end) {
2407        return -1;
2408      }      
2409      int length = in.readInt();
2410      if (version > 1 && sync != null &&
2411          length == SYNC_ESCAPE) {              // process a sync entry
2412        in.readFully(syncCheck);                // read syncCheck
2413        if (!Arrays.equals(sync, syncCheck))    // check it
2414          throw new IOException("File is corrupt!");
2415        syncSeen = true;
2416        if (in.getPos() >= end) {
2417          return -1;
2418        }
2419        length = in.readInt();                  // re-read length
2420      } else {
2421        syncSeen = false;
2422      }
2423      
2424      return length;
2425    }
2426    
2427    /** Read the next key/value pair in the file into <code>buffer</code>.
2428     * Returns the length of the key read, or -1 if at end of file.  The length
2429     * of the value may be computed by calling buffer.getLength() before and
2430     * after calls to this method. */
2431    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2432    @Deprecated
2433    synchronized int next(DataOutputBuffer buffer) throws IOException {
2434      // Unsupported for block-compressed sequence files
2435      if (blockCompressed) {
2436        throw new IOException("Unsupported call for block-compressed" +
2437                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2438      }
2439      try {
2440        int length = readRecordLength();
2441        if (length == -1) {
2442          return -1;
2443        }
2444        int keyLength = in.readInt();
2445        buffer.write(in, length);
2446        return keyLength;
2447      } catch (ChecksumException e) {             // checksum failure
2448        handleChecksumException(e);
2449        return next(buffer);
2450      }
2451    }
2452
2453    public ValueBytes createValueBytes() {
2454      ValueBytes val = null;
2455      if (!decompress || blockCompressed) {
2456        val = new UncompressedBytes();
2457      } else {
2458        val = new CompressedBytes(codec);
2459      }
2460      return val;
2461    }
2462
2463    /**
2464     * Read 'raw' records.
2465     * @param key - The buffer into which the key is read
2466     * @param val - The 'raw' value
2467     * @return Returns the total record length or -1 for end of file
2468     * @throws IOException
2469     */
2470    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2471      throws IOException {
2472      if (!blockCompressed) {
2473        int length = readRecordLength();
2474        if (length == -1) {
2475          return -1;
2476        }
2477        int keyLength = in.readInt();
2478        int valLength = length - keyLength;
2479        key.write(in, keyLength);
2480        if (decompress) {
2481          CompressedBytes value = (CompressedBytes)val;
2482          value.reset(in, valLength);
2483        } else {
2484          UncompressedBytes value = (UncompressedBytes)val;
2485          value.reset(in, valLength);
2486        }
2487        
2488        return length;
2489      } else {
2490        //Reset syncSeen
2491        syncSeen = false;
2492        
2493        // Read 'key'
2494        if (noBufferedKeys == 0) {
2495          if (in.getPos() >= end) 
2496            return -1;
2497
2498          try { 
2499            readBlock();
2500          } catch (EOFException eof) {
2501            return -1;
2502          }
2503        }
2504        int keyLength = WritableUtils.readVInt(keyLenIn);
2505        if (keyLength < 0) {
2506          throw new IOException("zero length key found!");
2507        }
2508        key.write(keyIn, keyLength);
2509        --noBufferedKeys;
2510        
2511        // Read raw 'value'
2512        seekToCurrentValue();
2513        int valLength = WritableUtils.readVInt(valLenIn);
2514        UncompressedBytes rawValue = (UncompressedBytes)val;
2515        rawValue.reset(valIn, valLength);
2516        --noBufferedValues;
2517        
2518        return (keyLength+valLength);
2519      }
2520      
2521    }
2522
2523    /**
2524     * Read 'raw' keys.
2525     * @param key - The buffer into which the key is read
2526     * @return Returns the key length or -1 for end of file
2527     * @throws IOException
2528     */
2529    public synchronized int nextRawKey(DataOutputBuffer key) 
2530      throws IOException {
2531      if (!blockCompressed) {
2532        recordLength = readRecordLength();
2533        if (recordLength == -1) {
2534          return -1;
2535        }
2536        keyLength = in.readInt();
2537        key.write(in, keyLength);
2538        return keyLength;
2539      } else {
2540        //Reset syncSeen
2541        syncSeen = false;
2542        
2543        // Read 'key'
2544        if (noBufferedKeys == 0) {
2545          if (in.getPos() >= end) 
2546            return -1;
2547
2548          try { 
2549            readBlock();
2550          } catch (EOFException eof) {
2551            return -1;
2552          }
2553        }
2554        int keyLength = WritableUtils.readVInt(keyLenIn);
2555        if (keyLength < 0) {
2556          throw new IOException("zero length key found!");
2557        }
2558        key.write(keyIn, keyLength);
2559        --noBufferedKeys;
2560        
2561        return keyLength;
2562      }
2563      
2564    }
2565
2566    /** Read the next key in the file, skipping its
2567     * value.  Return null at end of file. */
2568    public synchronized Object next(Object key) throws IOException {
2569      if (key != null && key.getClass() != getKeyClass()) {
2570        throw new IOException("wrong key class: "+key.getClass().getName()
2571                              +" is not "+keyClass);
2572      }
2573
2574      if (!blockCompressed) {
2575        outBuf.reset();
2576        
2577        keyLength = next(outBuf);
2578        if (keyLength < 0)
2579          return null;
2580        
2581        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2582        
2583        key = deserializeKey(key);
2584        valBuffer.mark(0);
2585        if (valBuffer.getPosition() != keyLength)
2586          throw new IOException(key + " read " + valBuffer.getPosition()
2587                                + " bytes, should read " + keyLength);
2588      } else {
2589        //Reset syncSeen
2590        syncSeen = false;
2591        
2592        if (noBufferedKeys == 0) {
2593          try {
2594            readBlock();
2595          } catch (EOFException eof) {
2596            return null;
2597          }
2598        }
2599        
2600        int keyLength = WritableUtils.readVInt(keyLenIn);
2601        
2602        // Sanity check
2603        if (keyLength < 0) {
2604          return null;
2605        }
2606        
2607        //Read another compressed 'key'
2608        key = deserializeKey(key);
2609        --noBufferedKeys;
2610      }
2611
2612      return key;
2613    }
2614
2615    @SuppressWarnings("unchecked")
2616    private Object deserializeKey(Object key) throws IOException {
2617      return keyDeserializer.deserialize(key);
2618    }
2619
2620    /**
2621     * Read 'raw' values.
2622     * @param val - The 'raw' value
2623     * @return Returns the value length
2624     * @throws IOException
2625     */
2626    public synchronized int nextRawValue(ValueBytes val) 
2627      throws IOException {
2628      
2629      // Position stream to current value
2630      seekToCurrentValue();
2631 
2632      if (!blockCompressed) {
2633        int valLength = recordLength - keyLength;
2634        if (decompress) {
2635          CompressedBytes value = (CompressedBytes)val;
2636          value.reset(in, valLength);
2637        } else {
2638          UncompressedBytes value = (UncompressedBytes)val;
2639          value.reset(in, valLength);
2640        }
2641         
2642        return valLength;
2643      } else {
2644        int valLength = WritableUtils.readVInt(valLenIn);
2645        UncompressedBytes rawValue = (UncompressedBytes)val;
2646        rawValue.reset(valIn, valLength);
2647        --noBufferedValues;
2648        return valLength;
2649      }
2650      
2651    }
2652
2653    private void handleChecksumException(ChecksumException e)
2654      throws IOException {
2655      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2656        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2657        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2658      } else {
2659        throw e;
2660      }
2661    }
2662
2663    /** disables sync. often invoked for tmp files */
2664    synchronized void ignoreSync() {
2665      sync = null;
2666    }
2667    
2668    /** Set the current byte position in the input file.
2669     *
2670     * <p>The position passed must be a position returned by {@link
2671     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2672     * position, use {@link SequenceFile.Reader#sync(long)}.
2673     */
2674    public synchronized void seek(long position) throws IOException {
2675      in.seek(position);
2676      if (blockCompressed) {                      // trigger block read
2677        noBufferedKeys = 0;
2678        valuesDecompressed = true;
2679      }
2680    }
2681
2682    /** Seek to the next sync mark past a given position.*/
2683    public synchronized void sync(long position) throws IOException {
2684      if (position+SYNC_SIZE >= end) {
2685        seek(end);
2686        return;
2687      }
2688
2689      if (position < headerEnd) {
2690        // seek directly to first record
2691        in.seek(headerEnd);
2692        // note the sync marker "seen" in the header
2693        syncSeen = true;
2694        return;
2695      }
2696
2697      try {
2698        seek(position+4);                         // skip escape
2699        in.readFully(syncCheck);
2700        int syncLen = sync.length;
2701        for (int i = 0; in.getPos() < end; i++) {
2702          int j = 0;
2703          for (; j < syncLen; j++) {
2704            if (sync[j] != syncCheck[(i+j)%syncLen])
2705              break;
2706          }
2707          if (j == syncLen) {
2708            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2709            return;
2710          }
2711          syncCheck[i%syncLen] = in.readByte();
2712        }
2713      } catch (ChecksumException e) {             // checksum failure
2714        handleChecksumException(e);
2715      }
2716    }
2717
2718    /** Returns true iff the previous call to next passed a sync mark.*/
2719    public synchronized boolean syncSeen() { return syncSeen; }
2720
2721    /** Return the current byte position in the input file. */
2722    public synchronized long getPosition() throws IOException {
2723      return in.getPos();
2724    }
2725
2726    /** Returns the name of the file. */
2727    @Override
2728    public String toString() {
2729      return filename;
2730    }
2731
2732  }
2733
2734  /** Sorts key/value pairs in a sequence-format file.
2735   *
2736   * <p>For best performance, applications should make sure that the {@link
2737   * Writable#readFields(DataInput)} implementation of their keys is
2738   * very efficient.  In particular, it should avoid allocating memory.
2739   */
2740  public static class Sorter {
2741
2742    private RawComparator comparator;
2743
2744    private MergeSort mergeSort; //the implementation of merge sort
2745    
2746    private Path[] inFiles;                     // when merging or sorting
2747
2748    private Path outFile;
2749
2750    private int memory; // bytes
2751    private int factor; // merged per pass
2752
2753    private FileSystem fs = null;
2754
2755    private Class keyClass;
2756    private Class valClass;
2757
2758    private Configuration conf;
2759    private Metadata metadata;
2760    
2761    private Progressable progressable = null;
2762
2763    /** Sort and merge files containing the named classes. */
2764    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2765                  Class valClass, Configuration conf)  {
2766      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2767    }
2768
2769    /** Sort and merge using an arbitrary {@link RawComparator}. */
2770    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2771                  Class valClass, Configuration conf) {
2772      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2773    }
2774
2775    /** Sort and merge using an arbitrary {@link RawComparator}. */
2776    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2777                  Class valClass, Configuration conf, Metadata metadata) {
2778      this.fs = fs;
2779      this.comparator = comparator;
2780      this.keyClass = keyClass;
2781      this.valClass = valClass;
2782      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2783      this.factor = conf.getInt("io.sort.factor", 100);
2784      this.conf = conf;
2785      this.metadata = metadata;
2786    }
2787
2788    /** Set the number of streams to merge at once.*/
2789    public void setFactor(int factor) { this.factor = factor; }
2790
2791    /** Get the number of streams to merge at once.*/
2792    public int getFactor() { return factor; }
2793
2794    /** Set the total amount of buffer memory, in bytes.*/
2795    public void setMemory(int memory) { this.memory = memory; }
2796
2797    /** Get the total amount of buffer memory, in bytes.*/
2798    public int getMemory() { return memory; }
2799
2800    /** Set the progressable object in order to report progress. */
2801    public void setProgressable(Progressable progressable) {
2802      this.progressable = progressable;
2803    }
2804    
2805    /** 
2806     * Perform a file sort from a set of input files into an output file.
2807     * @param inFiles the files to be sorted
2808     * @param outFile the sorted output file
2809     * @param deleteInput should the input files be deleted as they are read?
2810     */
2811    public void sort(Path[] inFiles, Path outFile,
2812                     boolean deleteInput) throws IOException {
2813      if (fs.exists(outFile)) {
2814        throw new IOException("already exists: " + outFile);
2815      }
2816
2817      this.inFiles = inFiles;
2818      this.outFile = outFile;
2819
2820      int segments = sortPass(deleteInput);
2821      if (segments > 1) {
2822        mergePass(outFile.getParent());
2823      }
2824    }
2825
2826    /** 
2827     * Perform a file sort from a set of input files and return an iterator.
2828     * @param inFiles the files to be sorted
2829     * @param tempDir the directory where temp files are created during sort
2830     * @param deleteInput should the input files be deleted as they are read?
2831     * @return iterator the RawKeyValueIterator
2832     */
2833    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2834                                              boolean deleteInput) throws IOException {
2835      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2836      if (fs.exists(outFile)) {
2837        throw new IOException("already exists: " + outFile);
2838      }
2839      this.inFiles = inFiles;
2840      //outFile will basically be used as prefix for temp files in the cases
2841      //where sort outputs multiple sorted segments. For the single segment
2842      //case, the outputFile itself will contain the sorted data for that
2843      //segment
2844      this.outFile = outFile;
2845
2846      int segments = sortPass(deleteInput);
2847      if (segments > 1)
2848        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2849                     tempDir);
2850      else if (segments == 1)
2851        return merge(new Path[]{outFile}, true, tempDir);
2852      else return null;
2853    }
2854
2855    /**
2856     * The backwards compatible interface to sort.
2857     * @param inFile the input file to sort
2858     * @param outFile the sorted output file
2859     */
2860    public void sort(Path inFile, Path outFile) throws IOException {
2861      sort(new Path[]{inFile}, outFile, false);
2862    }
2863    
2864    private int sortPass(boolean deleteInput) throws IOException {
2865      if(LOG.isDebugEnabled()) {
2866        LOG.debug("running sort pass");
2867      }
2868      SortPass sortPass = new SortPass();         // make the SortPass
2869      sortPass.setProgressable(progressable);
2870      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2871      try {
2872        return sortPass.run(deleteInput);         // run it
2873      } finally {
2874        sortPass.close();                         // close it
2875      }
2876    }
2877
2878    private class SortPass {
2879      private int memoryLimit = memory/4;
2880      private int recordLimit = 1000000;
2881      
2882      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2883      private byte[] rawBuffer;
2884
2885      private int[] keyOffsets = new int[1024];
2886      private int[] pointers = new int[keyOffsets.length];
2887      private int[] pointersCopy = new int[keyOffsets.length];
2888      private int[] keyLengths = new int[keyOffsets.length];
2889      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2890      
2891      private ArrayList segmentLengths = new ArrayList();
2892      
2893      private Reader in = null;
2894      private FSDataOutputStream out = null;
2895      private FSDataOutputStream indexOut = null;
2896      private Path outName;
2897
2898      private Progressable progressable = null;
2899
2900      public int run(boolean deleteInput) throws IOException {
2901        int segments = 0;
2902        int currentFile = 0;
2903        boolean atEof = (currentFile >= inFiles.length);
2904        CompressionType compressionType;
2905        CompressionCodec codec = null;
2906        segmentLengths.clear();
2907        if (atEof) {
2908          return 0;
2909        }
2910        
2911        // Initialize
2912        in = new Reader(fs, inFiles[currentFile], conf);
2913        compressionType = in.getCompressionType();
2914        codec = in.getCompressionCodec();
2915        
2916        for (int i=0; i < rawValues.length; ++i) {
2917          rawValues[i] = null;
2918        }
2919        
2920        while (!atEof) {
2921          int count = 0;
2922          int bytesProcessed = 0;
2923          rawKeys.reset();
2924          while (!atEof && 
2925                 bytesProcessed < memoryLimit && count < recordLimit) {
2926
2927            // Read a record into buffer
2928            // Note: Attempt to re-use 'rawValue' as far as possible
2929            int keyOffset = rawKeys.getLength();       
2930            ValueBytes rawValue = 
2931              (count == keyOffsets.length || rawValues[count] == null) ? 
2932              in.createValueBytes() : 
2933              rawValues[count];
2934            int recordLength = in.nextRaw(rawKeys, rawValue);
2935            if (recordLength == -1) {
2936              in.close();
2937              if (deleteInput) {
2938                fs.delete(inFiles[currentFile], true);
2939              }
2940              currentFile += 1;
2941              atEof = currentFile >= inFiles.length;
2942              if (!atEof) {
2943                in = new Reader(fs, inFiles[currentFile], conf);
2944              } else {
2945                in = null;
2946              }
2947              continue;
2948            }
2949
2950            int keyLength = rawKeys.getLength() - keyOffset;
2951
2952            if (count == keyOffsets.length)
2953              grow();
2954
2955            keyOffsets[count] = keyOffset;                // update pointers
2956            pointers[count] = count;
2957            keyLengths[count] = keyLength;
2958            rawValues[count] = rawValue;
2959
2960            bytesProcessed += recordLength; 
2961            count++;
2962          }
2963
2964          // buffer is full -- sort & flush it
2965          if(LOG.isDebugEnabled()) {
2966            LOG.debug("flushing segment " + segments);
2967          }
2968          rawBuffer = rawKeys.getData();
2969          sort(count);
2970          // indicate we're making progress
2971          if (progressable != null) {
2972            progressable.progress();
2973          }
2974          flush(count, bytesProcessed, compressionType, codec, 
2975                segments==0 && atEof);
2976          segments++;
2977        }
2978        return segments;
2979      }
2980
2981      public void close() throws IOException {
2982        if (in != null) {
2983          in.close();
2984        }
2985        if (out != null) {
2986          out.close();
2987        }
2988        if (indexOut != null) {
2989          indexOut.close();
2990        }
2991      }
2992
2993      private void grow() {
2994        int newLength = keyOffsets.length * 3 / 2;
2995        keyOffsets = grow(keyOffsets, newLength);
2996        pointers = grow(pointers, newLength);
2997        pointersCopy = new int[newLength];
2998        keyLengths = grow(keyLengths, newLength);
2999        rawValues = grow(rawValues, newLength);
3000      }
3001
3002      private int[] grow(int[] old, int newLength) {
3003        int[] result = new int[newLength];
3004        System.arraycopy(old, 0, result, 0, old.length);
3005        return result;
3006      }
3007      
3008      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
3009        ValueBytes[] result = new ValueBytes[newLength];
3010        System.arraycopy(old, 0, result, 0, old.length);
3011        for (int i=old.length; i < newLength; ++i) {
3012          result[i] = null;
3013        }
3014        return result;
3015      }
3016
3017      private void flush(int count, int bytesProcessed, 
3018                         CompressionType compressionType, 
3019                         CompressionCodec codec, 
3020                         boolean done) throws IOException {
3021        if (out == null) {
3022          outName = done ? outFile : outFile.suffix(".0");
3023          out = fs.create(outName);
3024          if (!done) {
3025            indexOut = fs.create(outName.suffix(".index"));
3026          }
3027        }
3028
3029        long segmentStart = out.getPos();
3030        Writer writer = createWriter(conf, Writer.stream(out), 
3031            Writer.keyClass(keyClass), Writer.valueClass(valClass),
3032            Writer.compression(compressionType, codec),
3033            Writer.metadata(done ? metadata : new Metadata()));
3034        
3035        if (!done) {
3036          writer.sync = null;                     // disable sync on temp files
3037        }
3038
3039        for (int i = 0; i < count; i++) {         // write in sorted order
3040          int p = pointers[i];
3041          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
3042        }
3043        writer.close();
3044        
3045        if (!done) {
3046          // Save the segment length
3047          WritableUtils.writeVLong(indexOut, segmentStart);
3048          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
3049          indexOut.flush();
3050        }
3051      }
3052
3053      private void sort(int count) {
3054        System.arraycopy(pointers, 0, pointersCopy, 0, count);
3055        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
3056      }
3057      class SeqFileComparator implements Comparator<IntWritable> {
3058        @Override
3059        public int compare(IntWritable I, IntWritable J) {
3060          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
3061                                    keyLengths[I.get()], rawBuffer, 
3062                                    keyOffsets[J.get()], keyLengths[J.get()]);
3063        }
3064      }
3065      
3066      /** set the progressable object in order to report progress */
3067      public void setProgressable(Progressable progressable)
3068      {
3069        this.progressable = progressable;
3070      }
3071      
3072    } // SequenceFile.Sorter.SortPass
3073
3074    /** The interface to iterate over raw keys/values of SequenceFiles. */
3075    public static interface RawKeyValueIterator {
3076      /** Gets the current raw key
3077       * @return DataOutputBuffer
3078       * @throws IOException
3079       */
3080      DataOutputBuffer getKey() throws IOException; 
3081      /** Gets the current raw value
3082       * @return ValueBytes 
3083       * @throws IOException
3084       */
3085      ValueBytes getValue() throws IOException; 
3086      /** Sets up the current key and value (for getKey and getValue)
3087       * @return true if there exists a key/value, false otherwise 
3088       * @throws IOException
3089       */
3090      boolean next() throws IOException;
3091      /** closes the iterator so that the underlying streams can be closed
3092       * @throws IOException
3093       */
3094      void close() throws IOException;
3095      /** Gets the Progress object; this has a float (0.0 - 1.0) 
3096       * indicating the bytes processed by the iterator so far
3097       */
3098      Progress getProgress();
3099    }    
3100    
3101    /**
3102     * Merges the list of segments of type <code>SegmentDescriptor</code>
3103     * @param segments the list of SegmentDescriptors
3104     * @param tmpDir the directory to write temporary files into
3105     * @return RawKeyValueIterator
3106     * @throws IOException
3107     */
3108    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3109                                     Path tmpDir) 
3110      throws IOException {
3111      // pass in object to report progress, if present
3112      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3113      return mQueue.merge();
3114    }
3115
3116    /**
3117     * Merges the contents of files passed in Path[] using a max factor value
3118     * that is already set
3119     * @param inNames the array of path names
3120     * @param deleteInputs true if the input files should be deleted when 
3121     * unnecessary
3122     * @param tmpDir the directory to write temporary files into
3123     * @return RawKeyValueIteratorMergeQueue
3124     * @throws IOException
3125     */
3126    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3127                                     Path tmpDir) 
3128      throws IOException {
3129      return merge(inNames, deleteInputs, 
3130                   (inNames.length < factor) ? inNames.length : factor,
3131                   tmpDir);
3132    }
3133
3134    /**
3135     * Merges the contents of files passed in Path[]
3136     * @param inNames the array of path names
3137     * @param deleteInputs true if the input files should be deleted when 
3138     * unnecessary
3139     * @param factor the factor that will be used as the maximum merge fan-in
3140     * @param tmpDir the directory to write temporary files into
3141     * @return RawKeyValueIteratorMergeQueue
3142     * @throws IOException
3143     */
3144    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3145                                     int factor, Path tmpDir) 
3146      throws IOException {
3147      //get the segments from inNames
3148      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3149      for (int i = 0; i < inNames.length; i++) {
3150        SegmentDescriptor s = new SegmentDescriptor(0,
3151            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3152        s.preserveInput(!deleteInputs);
3153        s.doSync();
3154        a.add(s);
3155      }
3156      this.factor = factor;
3157      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3158      return mQueue.merge();
3159    }
3160
3161    /**
3162     * Merges the contents of files passed in Path[]
3163     * @param inNames the array of path names
3164     * @param tempDir the directory for creating temp files during merge
3165     * @param deleteInputs true if the input files should be deleted when 
3166     * unnecessary
3167     * @return RawKeyValueIteratorMergeQueue
3168     * @throws IOException
3169     */
3170    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3171                                     boolean deleteInputs) 
3172      throws IOException {
3173      //outFile will basically be used as prefix for temp files for the
3174      //intermediate merge outputs           
3175      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3176      //get the segments from inNames
3177      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3178      for (int i = 0; i < inNames.length; i++) {
3179        SegmentDescriptor s = new SegmentDescriptor(0,
3180            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3181        s.preserveInput(!deleteInputs);
3182        s.doSync();
3183        a.add(s);
3184      }
3185      factor = (inNames.length < factor) ? inNames.length : factor;
3186      // pass in object to report progress, if present
3187      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3188      return mQueue.merge();
3189    }
3190
3191    /**
3192     * Clones the attributes (like compression of the input file and creates a 
3193     * corresponding Writer
3194     * @param inputFile the path of the input file whose attributes should be 
3195     * cloned
3196     * @param outputFile the path of the output file 
3197     * @param prog the Progressable to report status during the file write
3198     * @return Writer
3199     * @throws IOException
3200     */
3201    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3202                                      Progressable prog) throws IOException {
3203      Reader reader = new Reader(conf,
3204                                 Reader.file(inputFile),
3205                                 new Reader.OnlyHeaderOption());
3206      CompressionType compress = reader.getCompressionType();
3207      CompressionCodec codec = reader.getCompressionCodec();
3208      reader.close();
3209
3210      Writer writer = createWriter(conf, 
3211                                   Writer.file(outputFile), 
3212                                   Writer.keyClass(keyClass), 
3213                                   Writer.valueClass(valClass), 
3214                                   Writer.compression(compress, codec), 
3215                                   Writer.progressable(prog));
3216      return writer;
3217    }
3218
3219    /**
3220     * Writes records from RawKeyValueIterator into a file represented by the 
3221     * passed writer
3222     * @param records the RawKeyValueIterator
3223     * @param writer the Writer created earlier 
3224     * @throws IOException
3225     */
3226    public void writeFile(RawKeyValueIterator records, Writer writer) 
3227      throws IOException {
3228      while(records.next()) {
3229        writer.appendRaw(records.getKey().getData(), 0, 
3230                         records.getKey().getLength(), records.getValue());
3231      }
3232      writer.sync();
3233    }
3234        
3235    /** Merge the provided files.
3236     * @param inFiles the array of input path names
3237     * @param outFile the final output file
3238     * @throws IOException
3239     */
3240    public void merge(Path[] inFiles, Path outFile) throws IOException {
3241      if (fs.exists(outFile)) {
3242        throw new IOException("already exists: " + outFile);
3243      }
3244      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3245      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3246      
3247      writeFile(r, writer);
3248
3249      writer.close();
3250    }
3251
3252    /** sort calls this to generate the final merged output */
3253    private int mergePass(Path tmpDir) throws IOException {
3254      if(LOG.isDebugEnabled()) {
3255        LOG.debug("running merge pass");
3256      }
3257      Writer writer = cloneFileAttributes(
3258                                          outFile.suffix(".0"), outFile, null);
3259      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3260                                    outFile.suffix(".0.index"), tmpDir);
3261      writeFile(r, writer);
3262
3263      writer.close();
3264      return 0;
3265    }
3266
3267    /** Used by mergePass to merge the output of the sort
3268     * @param inName the name of the input file containing sorted segments
3269     * @param indexIn the offsets of the sorted segments
3270     * @param tmpDir the relative directory to store intermediate results in
3271     * @return RawKeyValueIterator
3272     * @throws IOException
3273     */
3274    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3275      throws IOException {
3276      //get the segments from indexIn
3277      //we create a SegmentContainer so that we can track segments belonging to
3278      //inName and delete inName as soon as we see that we have looked at all
3279      //the contained segments during the merge process & hence don't need 
3280      //them anymore
3281      SegmentContainer container = new SegmentContainer(inName, indexIn);
3282      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3283      return mQueue.merge();
3284    }
3285    
3286    /** This class implements the core of the merge logic */
3287    private class MergeQueue extends PriorityQueue 
3288      implements RawKeyValueIterator {
3289      private boolean compress;
3290      private boolean blockCompress;
3291      private DataOutputBuffer rawKey = new DataOutputBuffer();
3292      private ValueBytes rawValue;
3293      private long totalBytesProcessed;
3294      private float progPerByte;
3295      private Progress mergeProgress = new Progress();
3296      private Path tmpDir;
3297      private Progressable progress = null; //handle to the progress reporting object
3298      private SegmentDescriptor minSegment;
3299      
3300      //a TreeMap used to store the segments sorted by size (segment offset and
3301      //segment path name is used to break ties between segments of same sizes)
3302      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3303        new TreeMap<SegmentDescriptor, Void>();
3304            
3305      @SuppressWarnings("unchecked")
3306      public void put(SegmentDescriptor stream) throws IOException {
3307        if (size() == 0) {
3308          compress = stream.in.isCompressed();
3309          blockCompress = stream.in.isBlockCompressed();
3310        } else if (compress != stream.in.isCompressed() || 
3311                   blockCompress != stream.in.isBlockCompressed()) {
3312          throw new IOException("All merged files must be compressed or not.");
3313        } 
3314        super.put(stream);
3315      }
3316      
3317      /**
3318       * A queue of file segments to merge
3319       * @param segments the file segments to merge
3320       * @param tmpDir a relative local directory to save intermediate files in
3321       * @param progress the reference to the Progressable object
3322       */
3323      public MergeQueue(List <SegmentDescriptor> segments,
3324          Path tmpDir, Progressable progress) {
3325        int size = segments.size();
3326        for (int i = 0; i < size; i++) {
3327          sortedSegmentSizes.put(segments.get(i), null);
3328        }
3329        this.tmpDir = tmpDir;
3330        this.progress = progress;
3331      }
3332      @Override
3333      protected boolean lessThan(Object a, Object b) {
3334        // indicate we're making progress
3335        if (progress != null) {
3336          progress.progress();
3337        }
3338        SegmentDescriptor msa = (SegmentDescriptor)a;
3339        SegmentDescriptor msb = (SegmentDescriptor)b;
3340        return comparator.compare(msa.getKey().getData(), 0, 
3341                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3342                                  msb.getKey().getLength()) < 0;
3343      }
3344      @Override
3345      public void close() throws IOException {
3346        SegmentDescriptor ms;                           // close inputs
3347        while ((ms = (SegmentDescriptor)pop()) != null) {
3348          ms.cleanup();
3349        }
3350        minSegment = null;
3351      }
3352      @Override
3353      public DataOutputBuffer getKey() throws IOException {
3354        return rawKey;
3355      }
3356      @Override
3357      public ValueBytes getValue() throws IOException {
3358        return rawValue;
3359      }
3360      @Override
3361      public boolean next() throws IOException {
3362        if (size() == 0)
3363          return false;
3364        if (minSegment != null) {
3365          //minSegment is non-null for all invocations of next except the first
3366          //one. For the first invocation, the priority queue is ready for use
3367          //but for the subsequent invocations, first adjust the queue 
3368          adjustPriorityQueue(minSegment);
3369          if (size() == 0) {
3370            minSegment = null;
3371            return false;
3372          }
3373        }
3374        minSegment = (SegmentDescriptor)top();
3375        long startPos = minSegment.in.getPosition(); // Current position in stream
3376        //save the raw key reference
3377        rawKey = minSegment.getKey();
3378        //load the raw value. Re-use the existing rawValue buffer
3379        if (rawValue == null) {
3380          rawValue = minSegment.in.createValueBytes();
3381        }
3382        minSegment.nextRawValue(rawValue);
3383        long endPos = minSegment.in.getPosition(); // End position after reading value
3384        updateProgress(endPos - startPos);
3385        return true;
3386      }
3387      
3388      @Override
3389      public Progress getProgress() {
3390        return mergeProgress; 
3391      }
3392
3393      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3394        long startPos = ms.in.getPosition(); // Current position in stream
3395        boolean hasNext = ms.nextRawKey();
3396        long endPos = ms.in.getPosition(); // End position after reading key
3397        updateProgress(endPos - startPos);
3398        if (hasNext) {
3399          adjustTop();
3400        } else {
3401          pop();
3402          ms.cleanup();
3403        }
3404      }
3405
3406      private void updateProgress(long bytesProcessed) {
3407        totalBytesProcessed += bytesProcessed;
3408        if (progPerByte > 0) {
3409          mergeProgress.set(totalBytesProcessed * progPerByte);
3410        }
3411      }
3412      
3413      /** This is the single level merge that is called multiple times 
3414       * depending on the factor size and the number of segments
3415       * @return RawKeyValueIterator
3416       * @throws IOException
3417       */
3418      public RawKeyValueIterator merge() throws IOException {
3419        //create the MergeStreams from the sorted map created in the constructor
3420        //and dump the final output to a file
3421        int numSegments = sortedSegmentSizes.size();
3422        int origFactor = factor;
3423        int passNo = 1;
3424        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3425        do {
3426          //get the factor for this pass of merge
3427          factor = getPassFactor(passNo, numSegments);
3428          List<SegmentDescriptor> segmentsToMerge =
3429            new ArrayList<SegmentDescriptor>();
3430          int segmentsConsidered = 0;
3431          int numSegmentsToConsider = factor;
3432          while (true) {
3433            //extract the smallest 'factor' number of segment pointers from the 
3434            //TreeMap. Call cleanup on the empty segments (no key/value data)
3435            SegmentDescriptor[] mStream = 
3436              getSegmentDescriptors(numSegmentsToConsider);
3437            for (int i = 0; i < mStream.length; i++) {
3438              if (mStream[i].nextRawKey()) {
3439                segmentsToMerge.add(mStream[i]);
3440                segmentsConsidered++;
3441                // Count the fact that we read some bytes in calling nextRawKey()
3442                updateProgress(mStream[i].in.getPosition());
3443              }
3444              else {
3445                mStream[i].cleanup();
3446                numSegments--; //we ignore this segment for the merge
3447              }
3448            }
3449            //if we have the desired number of segments
3450            //or looked at all available segments, we break
3451            if (segmentsConsidered == factor || 
3452                sortedSegmentSizes.size() == 0) {
3453              break;
3454            }
3455              
3456            numSegmentsToConsider = factor - segmentsConsidered;
3457          }
3458          //feed the streams to the priority queue
3459          initialize(segmentsToMerge.size()); clear();
3460          for (int i = 0; i < segmentsToMerge.size(); i++) {
3461            put(segmentsToMerge.get(i));
3462          }
3463          //if we have lesser number of segments remaining, then just return the
3464          //iterator, else do another single level merge
3465          if (numSegments <= factor) {
3466            //calculate the length of the remaining segments. Required for 
3467            //calculating the merge progress
3468            long totalBytes = 0;
3469            for (int i = 0; i < segmentsToMerge.size(); i++) {
3470              totalBytes += segmentsToMerge.get(i).segmentLength;
3471            }
3472            if (totalBytes != 0) //being paranoid
3473              progPerByte = 1.0f / (float)totalBytes;
3474            //reset factor to what it originally was
3475            factor = origFactor;
3476            return this;
3477          } else {
3478            //we want to spread the creation of temp files on multiple disks if 
3479            //available under the space constraints
3480            long approxOutputSize = 0; 
3481            for (SegmentDescriptor s : segmentsToMerge) {
3482              approxOutputSize += s.segmentLength + 
3483                                  ChecksumFileSystem.getApproxChkSumLength(
3484                                  s.segmentLength);
3485            }
3486            Path tmpFilename = 
3487              new Path(tmpDir, "intermediate").suffix("." + passNo);
3488
3489            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3490                                                tmpFilename.toString(),
3491                                                approxOutputSize, conf);
3492            if(LOG.isDebugEnabled()) { 
3493              LOG.debug("writing intermediate results to " + outputFile);
3494            }
3495            Writer writer = cloneFileAttributes(
3496                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3497                                                fs.makeQualified(outputFile), null);
3498            writer.sync = null; //disable sync for temp files
3499            writeFile(this, writer);
3500            writer.close();
3501            
3502            //we finished one single level merge; now clean up the priority 
3503            //queue
3504            this.close();
3505            
3506            SegmentDescriptor tempSegment = 
3507              new SegmentDescriptor(0,
3508                  fs.getFileStatus(outputFile).getLen(), outputFile);
3509            //put the segment back in the TreeMap
3510            sortedSegmentSizes.put(tempSegment, null);
3511            numSegments = sortedSegmentSizes.size();
3512            passNo++;
3513          }
3514          //we are worried about only the first pass merge factor. So reset the 
3515          //factor to what it originally was
3516          factor = origFactor;
3517        } while(true);
3518      }
3519  
3520      //Hadoop-591
3521      public int getPassFactor(int passNo, int numSegments) {
3522        if (passNo > 1 || numSegments <= factor || factor == 1) 
3523          return factor;
3524        int mod = (numSegments - 1) % (factor - 1);
3525        if (mod == 0)
3526          return factor;
3527        return mod + 1;
3528      }
3529      
3530      /** Return (& remove) the requested number of segment descriptors from the
3531       * sorted map.
3532       */
3533      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3534        if (numDescriptors > sortedSegmentSizes.size())
3535          numDescriptors = sortedSegmentSizes.size();
3536        SegmentDescriptor[] SegmentDescriptors = 
3537          new SegmentDescriptor[numDescriptors];
3538        Iterator iter = sortedSegmentSizes.keySet().iterator();
3539        int i = 0;
3540        while (i < numDescriptors) {
3541          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3542          iter.remove();
3543        }
3544        return SegmentDescriptors;
3545      }
3546    } // SequenceFile.Sorter.MergeQueue
3547
3548    /** This class defines a merge segment. This class can be subclassed to 
3549     * provide a customized cleanup method implementation. In this 
3550     * implementation, cleanup closes the file handle and deletes the file 
3551     */
3552    public class SegmentDescriptor implements Comparable {
3553      
3554      long segmentOffset; //the start of the segment in the file
3555      long segmentLength; //the length of the segment
3556      Path segmentPathName; //the path name of the file containing the segment
3557      boolean ignoreSync = true; //set to true for temp files
3558      private Reader in = null; 
3559      private DataOutputBuffer rawKey = null; //this will hold the current key
3560      private boolean preserveInput = false; //delete input segment files?
3561      
3562      /** Constructs a segment
3563       * @param segmentOffset the offset of the segment in the file
3564       * @param segmentLength the length of the segment
3565       * @param segmentPathName the path name of the file containing the segment
3566       */
3567      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3568                                Path segmentPathName) {
3569        this.segmentOffset = segmentOffset;
3570        this.segmentLength = segmentLength;
3571        this.segmentPathName = segmentPathName;
3572      }
3573      
3574      /** Do the sync checks */
3575      public void doSync() {ignoreSync = false;}
3576      
3577      /** Whether to delete the files when no longer needed */
3578      public void preserveInput(boolean preserve) {
3579        preserveInput = preserve;
3580      }
3581
3582      public boolean shouldPreserveInput() {
3583        return preserveInput;
3584      }
3585      
3586      @Override
3587      public int compareTo(Object o) {
3588        SegmentDescriptor that = (SegmentDescriptor)o;
3589        if (this.segmentLength != that.segmentLength) {
3590          return (this.segmentLength < that.segmentLength ? -1 : 1);
3591        }
3592        if (this.segmentOffset != that.segmentOffset) {
3593          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3594        }
3595        return (this.segmentPathName.toString()).
3596          compareTo(that.segmentPathName.toString());
3597      }
3598
3599      @Override
3600      public boolean equals(Object o) {
3601        if (!(o instanceof SegmentDescriptor)) {
3602          return false;
3603        }
3604        SegmentDescriptor that = (SegmentDescriptor)o;
3605        if (this.segmentLength == that.segmentLength &&
3606            this.segmentOffset == that.segmentOffset &&
3607            this.segmentPathName.toString().equals(
3608              that.segmentPathName.toString())) {
3609          return true;
3610        }
3611        return false;
3612      }
3613
3614      @Override
3615      public int hashCode() {
3616        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3617      }
3618
3619      /** Fills up the rawKey object with the key returned by the Reader
3620       * @return true if there is a key returned; false, otherwise
3621       * @throws IOException
3622       */
3623      public boolean nextRawKey() throws IOException {
3624        if (in == null) {
3625          int bufferSize = getBufferSize(conf); 
3626          Reader reader = new Reader(conf,
3627                                     Reader.file(segmentPathName), 
3628                                     Reader.bufferSize(bufferSize),
3629                                     Reader.start(segmentOffset), 
3630                                     Reader.length(segmentLength));
3631        
3632          //sometimes we ignore syncs especially for temp merge files
3633          if (ignoreSync) reader.ignoreSync();
3634
3635          if (reader.getKeyClass() != keyClass)
3636            throw new IOException("wrong key class: " + reader.getKeyClass() +
3637                                  " is not " + keyClass);
3638          if (reader.getValueClass() != valClass)
3639            throw new IOException("wrong value class: "+reader.getValueClass()+
3640                                  " is not " + valClass);
3641          this.in = reader;
3642          rawKey = new DataOutputBuffer();
3643        }
3644        rawKey.reset();
3645        int keyLength = 
3646          in.nextRawKey(rawKey);
3647        return (keyLength >= 0);
3648      }
3649
3650      /** Fills up the passed rawValue with the value corresponding to the key
3651       * read earlier
3652       * @param rawValue
3653       * @return the length of the value
3654       * @throws IOException
3655       */
3656      public int nextRawValue(ValueBytes rawValue) throws IOException {
3657        int valLength = in.nextRawValue(rawValue);
3658        return valLength;
3659      }
3660      
3661      /** Returns the stored rawKey */
3662      public DataOutputBuffer getKey() {
3663        return rawKey;
3664      }
3665      
3666      /** closes the underlying reader */
3667      private void close() throws IOException {
3668        this.in.close();
3669        this.in = null;
3670      }
3671
3672      /** The default cleanup. Subclasses can override this with a custom 
3673       * cleanup 
3674       */
3675      public void cleanup() throws IOException {
3676        close();
3677        if (!preserveInput) {
3678          fs.delete(segmentPathName, true);
3679        }
3680      }
3681    } // SequenceFile.Sorter.SegmentDescriptor
3682    
3683    /** This class provisions multiple segments contained within a single
3684     *  file
3685     */
3686    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3687
3688      SegmentContainer parentContainer = null;
3689
3690      /** Constructs a segment
3691       * @param segmentOffset the offset of the segment in the file
3692       * @param segmentLength the length of the segment
3693       * @param segmentPathName the path name of the file containing the segment
3694       * @param parent the parent SegmentContainer that holds the segment
3695       */
3696      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3697                                       Path segmentPathName, SegmentContainer parent) {
3698        super(segmentOffset, segmentLength, segmentPathName);
3699        this.parentContainer = parent;
3700      }
3701      /** The default cleanup. Subclasses can override this with a custom 
3702       * cleanup 
3703       */
3704      @Override
3705      public void cleanup() throws IOException {
3706        super.close();
3707        if (super.shouldPreserveInput()) return;
3708        parentContainer.cleanup();
3709      }
3710      
3711      @Override
3712      public boolean equals(Object o) {
3713        if (!(o instanceof LinkedSegmentsDescriptor)) {
3714          return false;
3715        }
3716        return super.equals(o);
3717      }
3718    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3719
3720    /** The class that defines a container for segments to be merged. Primarily
3721     * required to delete temp files as soon as all the contained segments
3722     * have been looked at */
3723    private class SegmentContainer {
3724      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3725      private int numSegmentsContained; //# of segments contained
3726      private Path inName; //input file from where segments are created
3727      
3728      //the list of segments read from the file
3729      private ArrayList <SegmentDescriptor> segments = 
3730        new ArrayList <SegmentDescriptor>();
3731      /** This constructor is there primarily to serve the sort routine that 
3732       * generates a single output file with an associated index file */
3733      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3734        //get the segments from indexIn
3735        FSDataInputStream fsIndexIn = fs.open(indexIn);
3736        long end = fs.getFileStatus(indexIn).getLen();
3737        while (fsIndexIn.getPos() < end) {
3738          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3739          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3740          Path segmentName = inName;
3741          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3742                                                    segmentLength, segmentName, this));
3743        }
3744        fsIndexIn.close();
3745        fs.delete(indexIn, true);
3746        numSegmentsContained = segments.size();
3747        this.inName = inName;
3748      }
3749
3750      public List <SegmentDescriptor> getSegmentList() {
3751        return segments;
3752      }
3753      public void cleanup() throws IOException {
3754        numSegmentsCleanedUp++;
3755        if (numSegmentsCleanedUp == numSegmentsContained) {
3756          fs.delete(inName, true);
3757        }
3758      }
3759    } //SequenceFile.Sorter.SegmentContainer
3760
3761  } // SequenceFile.Sorter
3762
3763} // SequenceFile