001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.DataInput;
022import java.io.IOException;
023import java.util.concurrent.ConcurrentHashMap;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.conf.Configurable;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.util.ReflectionUtils;
030
031/** A Comparator for {@link WritableComparable}s.
032 *
033 * <p>This base implementation uses the natural ordering.  To define alternate
034 * orderings, override {@link #compare(WritableComparable,WritableComparable)}.
035 *
036 * <p>One may optimize compare-intensive operations by overriding
037 * {@link #compare(byte[],int,int,byte[],int,int)}.  Static utility methods are
038 * provided to assist in optimized implementations of this method.
039 */
040@InterfaceAudience.Public
041@InterfaceStability.Stable
042public class WritableComparator implements RawComparator, Configurable {
043
044  private static final ConcurrentHashMap<Class, WritableComparator> comparators 
045          = new ConcurrentHashMap<Class, WritableComparator>(); // registry
046
047  private Configuration conf;
048
049  /** For backwards compatibility. **/
050  public static WritableComparator get(Class<? extends WritableComparable> c) {
051    return get(c, null);
052  }
053
054  /** Get a comparator for a {@link WritableComparable} implementation. */
055  public static WritableComparator get(
056      Class<? extends WritableComparable> c, Configuration conf) {
057    WritableComparator comparator = comparators.get(c);
058    if (comparator == null) {
059      // force the static initializers to run
060      forceInit(c);
061      // look to see if it is defined now
062      comparator = comparators.get(c);
063      // if not, use the generic one
064      if (comparator == null) {
065        comparator = new WritableComparator(c, conf, true);
066      }
067    }
068    // Newly passed Configuration objects should be used.
069    ReflectionUtils.setConf(comparator, conf);
070    return comparator;
071  }
072
073  @Override
074  public void setConf(Configuration conf) {
075    this.conf = conf;
076  }
077
078  @Override
079  public Configuration getConf() {
080    return conf;
081  }
082
083  /**
084   * Force initialization of the static members.
085   * As of Java 5, referencing a class doesn't force it to initialize. Since
086   * this class requires that the classes be initialized to declare their
087   * comparators, we force that initialization to happen.
088   * @param cls the class to initialize
089   */
090  private static void forceInit(Class<?> cls) {
091    try {
092      Class.forName(cls.getName(), true, cls.getClassLoader());
093    } catch (ClassNotFoundException e) {
094      throw new IllegalArgumentException("Can't initialize class " + cls, e);
095    }
096  } 
097
098  /** Register an optimized comparator for a {@link WritableComparable}
099   * implementation. Comparators registered with this method must be
100   * thread-safe. */
101  public static void define(Class c, WritableComparator comparator) {
102    comparators.put(c, comparator);
103  }
104
105  private final Class<? extends WritableComparable> keyClass;
106  private final WritableComparable key1;
107  private final WritableComparable key2;
108  private final DataInputBuffer buffer;
109
110  protected WritableComparator() {
111    this(null);
112  }
113
114  /** Construct for a {@link WritableComparable} implementation. */
115  protected WritableComparator(Class<? extends WritableComparable> keyClass) {
116    this(keyClass, null, false);
117  }
118
119  protected WritableComparator(Class<? extends WritableComparable> keyClass,
120      boolean createInstances) {
121    this(keyClass, null, createInstances);
122  }
123
124  protected WritableComparator(Class<? extends WritableComparable> keyClass,
125                               Configuration conf,
126                               boolean createInstances) {
127    this.keyClass = keyClass;
128    this.conf = (conf != null) ? conf : new Configuration();
129    if (createInstances) {
130      key1 = newKey();
131      key2 = newKey();
132      buffer = new DataInputBuffer();
133    } else {
134      key1 = key2 = null;
135      buffer = null;
136    }
137  }
138
139  /** Returns the WritableComparable implementation class. */
140  public Class<? extends WritableComparable> getKeyClass() { return keyClass; }
141
142  /** Construct a new {@link WritableComparable} instance. */
143  public WritableComparable newKey() {
144    return ReflectionUtils.newInstance(keyClass, conf);
145  }
146
147  /** Optimization hook.  Override this to make SequenceFile.Sorter's scream.
148   *
149   * <p>The default implementation reads the data into two {@link
150   * WritableComparable}s (using {@link
151   * Writable#readFields(DataInput)}, then calls {@link
152   * #compare(WritableComparable,WritableComparable)}.
153   */
154  @Override
155  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
156    try {
157      buffer.reset(b1, s1, l1);                   // parse key1
158      key1.readFields(buffer);
159      
160      buffer.reset(b2, s2, l2);                   // parse key2
161      key2.readFields(buffer);
162      
163      buffer.reset(null, 0, 0);                   // clean up reference
164    } catch (IOException e) {
165      throw new RuntimeException(e);
166    }
167    
168    return compare(key1, key2);                   // compare them
169  }
170
171  /** Compare two WritableComparables.
172   *
173   * <p> The default implementation uses the natural ordering, calling {@link
174   * Comparable#compareTo(Object)}. */
175  @SuppressWarnings("unchecked")
176  public int compare(WritableComparable a, WritableComparable b) {
177    return a.compareTo(b);
178  }
179
180  @Override
181  public int compare(Object a, Object b) {
182    return compare((WritableComparable)a, (WritableComparable)b);
183  }
184
185  /** Lexicographic order of binary data. */
186  public static int compareBytes(byte[] b1, int s1, int l1,
187                                 byte[] b2, int s2, int l2) {
188    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
189  }
190
191  /** Compute hash for binary data. */
192  public static int hashBytes(byte[] bytes, int offset, int length) {
193    int hash = 1;
194    for (int i = offset; i < offset + length; i++)
195      hash = (31 * hash) + (int)bytes[i];
196    return hash;
197  }
198  
199  /** Compute hash for binary data. */
200  public static int hashBytes(byte[] bytes, int length) {
201    return hashBytes(bytes, 0, length);
202  }
203
204  /** Parse an unsigned short from a byte array. */
205  public static int readUnsignedShort(byte[] bytes, int start) {
206    return (((bytes[start]   & 0xff) <<  8) +
207            ((bytes[start+1] & 0xff)));
208  }
209
210  /** Parse an integer from a byte array. */
211  public static int readInt(byte[] bytes, int start) {
212    return (((bytes[start  ] & 0xff) << 24) +
213            ((bytes[start+1] & 0xff) << 16) +
214            ((bytes[start+2] & 0xff) <<  8) +
215            ((bytes[start+3] & 0xff)));
216
217  }
218
219  /** Parse a float from a byte array. */
220  public static float readFloat(byte[] bytes, int start) {
221    return Float.intBitsToFloat(readInt(bytes, start));
222  }
223
224  /** Parse a long from a byte array. */
225  public static long readLong(byte[] bytes, int start) {
226    return ((long)(readInt(bytes, start)) << 32) +
227      (readInt(bytes, start+4) & 0xFFFFFFFFL);
228  }
229
230  /** Parse a double from a byte array. */
231  public static double readDouble(byte[] bytes, int start) {
232    return Double.longBitsToDouble(readLong(bytes, start));
233  }
234
235  /**
236   * Reads a zero-compressed encoded long from a byte array and returns it.
237   * @param bytes byte array with decode long
238   * @param start starting index
239   * @throws java.io.IOException 
240   * @return deserialized long
241   */
242  public static long readVLong(byte[] bytes, int start) throws IOException {
243    int len = bytes[start];
244    if (len >= -112) {
245      return len;
246    }
247    boolean isNegative = (len < -120);
248    len = isNegative ? -(len + 120) : -(len + 112);
249    if (start+1+len>bytes.length)
250      throw new IOException(
251                            "Not enough number of bytes for a zero-compressed integer");
252    long i = 0;
253    for (int idx = 0; idx < len; idx++) {
254      i = i << 8;
255      i = i | (bytes[start+1+idx] & 0xFF);
256    }
257    return (isNegative ? (i ^ -1L) : i);
258  }
259  
260  /**
261   * Reads a zero-compressed encoded integer from a byte array and returns it.
262   * @param bytes byte array with the encoded integer
263   * @param start start index
264   * @throws java.io.IOException 
265   * @return deserialized integer
266   */
267  public static int readVInt(byte[] bytes, int start) throws IOException {
268    return (int) readVLong(bytes, start);
269  }
270}