001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.IOException; 022import java.util.Arrays; 023 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.fs.FileUtil; 027import org.apache.hadoop.fs.PathFilter; 028 029import org.apache.hadoop.io.MapFile; 030import org.apache.hadoop.io.WritableComparable; 031import org.apache.hadoop.io.Writable; 032import org.apache.hadoop.io.SequenceFile.CompressionType; 033import org.apache.hadoop.io.compress.CompressionCodec; 034import org.apache.hadoop.io.compress.DefaultCodec; 035import org.apache.hadoop.mapreduce.Partitioner; 036import org.apache.hadoop.mapreduce.RecordWriter; 037import org.apache.hadoop.mapreduce.TaskAttemptContext; 038import org.apache.hadoop.util.ReflectionUtils; 039import org.apache.hadoop.classification.InterfaceAudience; 040import org.apache.hadoop.classification.InterfaceStability; 041import org.apache.hadoop.conf.Configuration; 042 043/** 044 * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes 045 * {@link MapFile}s. 046 */ 047@InterfaceAudience.Public 048@InterfaceStability.Stable 049public class MapFileOutputFormat 050 extends FileOutputFormat<WritableComparable<?>, Writable> { 051 052 public RecordWriter<WritableComparable<?>, Writable> getRecordWriter( 053 TaskAttemptContext context) throws IOException { 054 Configuration conf = context.getConfiguration(); 055 CompressionCodec codec = null; 056 CompressionType compressionType = CompressionType.NONE; 057 if (getCompressOutput(context)) { 058 // find the kind of compression to do 059 compressionType = SequenceFileOutputFormat.getOutputCompressionType(context); 060 061 // find the right codec 062 Class<?> codecClass = getOutputCompressorClass(context, 063 DefaultCodec.class); 064 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); 065 } 066 067 Path file = getDefaultWorkFile(context, ""); 068 FileSystem fs = file.getFileSystem(conf); 069 // ignore the progress parameter, since MapFile is local 070 final MapFile.Writer out = 071 new MapFile.Writer(conf, fs, file.toString(), 072 context.getOutputKeyClass().asSubclass(WritableComparable.class), 073 context.getOutputValueClass().asSubclass(Writable.class), 074 compressionType, codec, context); 075 076 return new RecordWriter<WritableComparable<?>, Writable>() { 077 public void write(WritableComparable<?> key, Writable value) 078 throws IOException { 079 out.append(key, value); 080 } 081 082 public void close(TaskAttemptContext context) throws IOException { 083 out.close(); 084 } 085 }; 086 } 087 088 /** Open the output generated by this format. */ 089 public static MapFile.Reader[] getReaders(Path dir, 090 Configuration conf) throws IOException { 091 FileSystem fs = dir.getFileSystem(conf); 092 PathFilter filter = new PathFilter() { 093 @Override 094 public boolean accept(Path path) { 095 String name = path.getName(); 096 if (name.startsWith("_") || name.startsWith(".")) 097 return false; 098 return true; 099 } 100 }; 101 Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, filter)); 102 103 // sort names, so that hash partitioning works 104 Arrays.sort(names); 105 106 MapFile.Reader[] parts = new MapFile.Reader[names.length]; 107 for (int i = 0; i < names.length; i++) { 108 parts[i] = new MapFile.Reader(fs, names[i].toString(), conf); 109 } 110 return parts; 111 } 112 113 /** Get an entry from output generated by this class. */ 114 public static <K extends WritableComparable<?>, V extends Writable> 115 Writable getEntry(MapFile.Reader[] readers, 116 Partitioner<K, V> partitioner, K key, V value) throws IOException { 117 int part = partitioner.getPartition(key, value, readers.length); 118 return readers[part].get(key, value); 119 } 120} 121