001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.lib.output;
019
020import org.apache.hadoop.classification.InterfaceAudience;
021import org.apache.hadoop.classification.InterfaceStability;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.io.Text;
024import org.apache.hadoop.mapreduce.*;
025import org.apache.hadoop.mapreduce.Reducer.Context;
026import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
027import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
028import org.apache.hadoop.util.ReflectionUtils;
029
030import java.io.IOException;
031import java.util.*;
032
033/**
034 * The MultipleOutputs class simplifies writing output data 
035 * to multiple outputs
036 * 
037 * <p> 
038 * Case one: writing to additional outputs other than the job default output.
039 *
040 * Each additional output, or named output, may be configured with its own
041 * <code>OutputFormat</code>, with its own key class and with its own value
042 * class.
043 * </p>
044 * 
045 * <p>
046 * Case two: to write data to different files provided by user
047 * </p>
048 * 
049 * <p>
050 * MultipleOutputs supports counters, by default they are disabled. The 
051 * counters group is the {@link MultipleOutputs} class name. The names of the 
052 * counters are the same as the output name. These count the number records 
053 * written to each output name.
054 * </p>
055 * 
056 * Usage pattern for job submission:
057 * <pre>
058 *
059 * Job job = new Job();
060 *
061 * FileInputFormat.setInputPath(job, inDir);
062 * FileOutputFormat.setOutputPath(job, outDir);
063 *
064 * job.setMapperClass(MOMap.class);
065 * job.setReducerClass(MOReduce.class);
066 * ...
067 *
068 * // Defines additional single text based output 'text' for the job
069 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
070 * LongWritable.class, Text.class);
071 *
072 * // Defines additional sequence-file based output 'sequence' for the job
073 * MultipleOutputs.addNamedOutput(job, "seq",
074 *   SequenceFileOutputFormat.class,
075 *   LongWritable.class, Text.class);
076 * ...
077 *
078 * job.waitForCompletion(true);
079 * ...
080 * </pre>
081 * <p>
082 * Usage in Reducer:
083 * <pre>
084 * &lt;K, V&gt; String generateFileName(K k, V v) {
085 *   return k.toString() + "_" + v.toString();
086 * }
087 * 
088 * public class MOReduce extends
089 *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
090 * private MultipleOutputs mos;
091 * public void setup(Context context) {
092 * ...
093 * mos = new MultipleOutputs(context);
094 * }
095 *
096 * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
097 * Context context)
098 * throws IOException {
099 * ...
100 * mos.write("text", , key, new Text("Hello"));
101 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
102 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
103 * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
104 * ...
105 * }
106 *
107 * public void cleanup(Context) throws IOException {
108 * mos.close();
109 * ...
110 * }
111 *
112 * }
113 * </pre>
114 * 
115 * <p>
116 * When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat,
117 * MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat
118 * from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
119 * </p>
120 * 
121 * <p>
122 * Use <code>MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)</code> to write key and 
123 * value to a path specified by <code>baseOutputPath</code>, with no need to specify a named output.
124 * <b>Warning</b>: when the baseOutputPath passed to MultipleOutputs.write
125 * is a path that resolves outside of the final job output directory, the
126 * directory is created immediately and then persists through subsequent
127 * task retries, breaking the concept of output committing:
128 * </p>
129 * 
130 * <pre>
131 * private MultipleOutputs&lt;Text, Text&gt; out;
132 * 
133 * public void setup(Context context) {
134 *   out = new MultipleOutputs&lt;Text, Text&gt;(context);
135 *   ...
136 * }
137 * 
138 * public void reduce(Text key, Iterable&lt;Text&gt; values, Context context) throws IOException, InterruptedException {
139 * for (Text t : values) {
140 *   out.write(key, t, generateFileName(&lt;<i>parameter list...</i>&gt;));
141 *   }
142 * }
143 * 
144 * protected void cleanup(Context context) throws IOException, InterruptedException {
145 *   out.close();
146 * }
147 * </pre>
148 * 
149 * <p>
150 * Use your own code in <code>generateFileName()</code> to create a custom path to your results. 
151 * '/' characters in <code>baseOutputPath</code> will be translated into directory levels in your file system. 
152 * Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. 
153 * No call to <code>context.write()</code> is necessary. See example <code>generateFileName()</code> code below. 
154 * </p>
155 * 
156 * <pre>
157 * private String generateFileName(Text k) {
158 *   // expect Text k in format "Surname|Forename"
159 *   String[] kStr = k.toString().split("\\|");
160 *   
161 *   String sName = kStr[0];
162 *   String fName = kStr[1];
163 *
164 *   // example for k = Smith|John
165 *   // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
166 *   return sName + "/" + fName;
167 * }
168 * </pre>
169 * 
170 * <p>
171 * Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
172 * To prevent this use <code>LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);</code>
173 * instead of <code>job.setOutputFormatClass(TextOutputFormat.class);</code> in your Hadoop job configuration.
174 * </p> 
175 * 
176 */
177@InterfaceAudience.Public
178@InterfaceStability.Stable
179public class MultipleOutputs<KEYOUT, VALUEOUT> {
180
181  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
182
183  private static final String MO_PREFIX = 
184    "mapreduce.multipleoutputs.namedOutput.";
185
186  private static final String FORMAT = ".format";
187  private static final String KEY = ".key";
188  private static final String VALUE = ".value";
189  private static final String COUNTERS_ENABLED = 
190    "mapreduce.multipleoutputs.counters";
191
192  /**
193   * Counters group used by the counters of MultipleOutputs.
194   */
195  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
196
197  /**
198   * Cache for the taskContexts
199   */
200  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
201  /**
202   * Cached TaskAttemptContext which uses the job's configured settings
203   */
204  private TaskAttemptContext jobOutputFormatContext;
205
206  /**
207   * Checks if a named output name is valid token.
208   *
209   * @param namedOutput named output Name
210   * @throws IllegalArgumentException if the output name is not valid.
211   */
212  private static void checkTokenName(String namedOutput) {
213    if (namedOutput == null || namedOutput.length() == 0) {
214      throw new IllegalArgumentException(
215        "Name cannot be NULL or emtpy");
216    }
217    for (char ch : namedOutput.toCharArray()) {
218      if ((ch >= 'A') && (ch <= 'Z')) {
219        continue;
220      }
221      if ((ch >= 'a') && (ch <= 'z')) {
222        continue;
223      }
224      if ((ch >= '0') && (ch <= '9')) {
225        continue;
226      }
227      throw new IllegalArgumentException(
228        "Name cannot be have a '" + ch + "' char");
229    }
230  }
231
232  /**
233   * Checks if output name is valid.
234   *
235   * name cannot be the name used for the default output
236   * @param outputPath base output Name
237   * @throws IllegalArgumentException if the output name is not valid.
238   */
239  private static void checkBaseOutputPath(String outputPath) {
240    if (outputPath.equals(FileOutputFormat.PART)) {
241      throw new IllegalArgumentException("output name cannot be 'part'");
242    }
243  }
244  
245  /**
246   * Checks if a named output name is valid.
247   *
248   * @param namedOutput named output Name
249   * @throws IllegalArgumentException if the output name is not valid.
250   */
251  private static void checkNamedOutputName(JobContext job,
252      String namedOutput, boolean alreadyDefined) {
253    checkTokenName(namedOutput);
254    checkBaseOutputPath(namedOutput);
255    List<String> definedChannels = getNamedOutputsList(job);
256    if (alreadyDefined && definedChannels.contains(namedOutput)) {
257      throw new IllegalArgumentException("Named output '" + namedOutput +
258        "' already alreadyDefined");
259    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
260      throw new IllegalArgumentException("Named output '" + namedOutput +
261        "' not defined");
262    }
263  }
264
265  // Returns list of channel names.
266  private static List<String> getNamedOutputsList(JobContext job) {
267    List<String> names = new ArrayList<String>();
268    StringTokenizer st = new StringTokenizer(
269      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
270    while (st.hasMoreTokens()) {
271      names.add(st.nextToken());
272    }
273    return names;
274  }
275
276  // Returns the named output OutputFormat.
277  @SuppressWarnings("unchecked")
278  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
279    JobContext job, String namedOutput) {
280    return (Class<? extends OutputFormat<?, ?>>)
281      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
282      OutputFormat.class);
283  }
284
285  // Returns the key class for a named output.
286  private static Class<?> getNamedOutputKeyClass(JobContext job,
287                                                String namedOutput) {
288    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
289      Object.class);
290  }
291
292  // Returns the value class for a named output.
293  private static Class<?> getNamedOutputValueClass(
294      JobContext job, String namedOutput) {
295    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
296      null, Object.class);
297  }
298
299  /**
300   * Adds a named output for the job.
301   *
302   * @param job               job to add the named output
303   * @param namedOutput       named output name, it has to be a word, letters
304   *                          and numbers only, cannot be the word 'part' as
305   *                          that is reserved for the default output.
306   * @param outputFormatClass OutputFormat class.
307   * @param keyClass          key class
308   * @param valueClass        value class
309   */
310  @SuppressWarnings("unchecked")
311  public static void addNamedOutput(Job job, String namedOutput,
312      Class<? extends OutputFormat> outputFormatClass,
313      Class<?> keyClass, Class<?> valueClass) {
314    checkNamedOutputName(job, namedOutput, true);
315    Configuration conf = job.getConfiguration();
316    conf.set(MULTIPLE_OUTPUTS,
317      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
318    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
319      OutputFormat.class);
320    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
321    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
322  }
323
324  /**
325   * Enables or disables counters for the named outputs.
326   * 
327   * The counters group is the {@link MultipleOutputs} class name.
328   * The names of the counters are the same as the named outputs. These
329   * counters count the number records written to each output name.
330   * By default these counters are disabled.
331   *
332   * @param job    job  to enable counters
333   * @param enabled indicates if the counters will be enabled or not.
334   */
335  public static void setCountersEnabled(Job job, boolean enabled) {
336    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
337  }
338
339  /**
340   * Returns if the counters for the named outputs are enabled or not.
341   * By default these counters are disabled.
342   *
343   * @param job    the job 
344   * @return TRUE if the counters are enabled, FALSE if they are disabled.
345   */
346  public static boolean getCountersEnabled(JobContext job) {
347    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
348  }
349
350  /**
351   * Wraps RecordWriter to increment counters. 
352   */
353  @SuppressWarnings("unchecked")
354  private static class RecordWriterWithCounter extends RecordWriter {
355    private RecordWriter writer;
356    private String counterName;
357    private TaskInputOutputContext context;
358
359    public RecordWriterWithCounter(RecordWriter writer, String counterName,
360                                   TaskInputOutputContext context) {
361      this.writer = writer;
362      this.counterName = counterName;
363      this.context = context;
364    }
365
366    @SuppressWarnings({"unchecked"})
367    public void write(Object key, Object value) 
368        throws IOException, InterruptedException {
369      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
370      writer.write(key, value);
371    }
372
373    public void close(TaskAttemptContext context) 
374        throws IOException, InterruptedException {
375      writer.close(context);
376    }
377  }
378
379  // instance code, to be used from Mapper/Reducer code
380
381  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
382  private Set<String> namedOutputs;
383  private Map<String, RecordWriter<?, ?>> recordWriters;
384  private boolean countersEnabled;
385  
386  /**
387   * Creates and initializes multiple outputs support,
388   * it should be instantiated in the Mapper/Reducer setup method.
389   *
390   * @param context the TaskInputOutputContext object
391   */
392  public MultipleOutputs(
393      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
394    this.context = context;
395    namedOutputs = Collections.unmodifiableSet(
396      new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
397    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
398    countersEnabled = getCountersEnabled(context);
399  }
400
401  /**
402   * Write key and value to the namedOutput.
403   *
404   * Output path is a unique file generated for the namedOutput.
405   * For example, {namedOutput}-(m|r)-{part-number}
406   * 
407   * @param namedOutput the named output name
408   * @param key         the key
409   * @param value       the value
410   */
411  @SuppressWarnings("unchecked")
412  public <K, V> void write(String namedOutput, K key, V value)
413      throws IOException, InterruptedException {
414    write(namedOutput, key, value, namedOutput);
415  }
416
417  /**
418   * Write key and value to baseOutputPath using the namedOutput.
419   * 
420   * @param namedOutput    the named output name
421   * @param key            the key
422   * @param value          the value
423   * @param baseOutputPath base-output path to write the record to.
424   * Note: Framework will generate unique filename for the baseOutputPath
425   * <b>Warning</b>: when the baseOutputPath is a path that resolves
426   * outside of the final job output directory, the directory is created
427   * immediately and then persists through subsequent task retries, breaking
428   * the concept of output committing.
429   */
430  @SuppressWarnings("unchecked")
431  public <K, V> void write(String namedOutput, K key, V value,
432      String baseOutputPath) throws IOException, InterruptedException {
433    checkNamedOutputName(context, namedOutput, false);
434    checkBaseOutputPath(baseOutputPath);
435    if (!namedOutputs.contains(namedOutput)) {
436      throw new IllegalArgumentException("Undefined named output '" +
437        namedOutput + "'");
438    }
439    TaskAttemptContext taskContext = getContext(namedOutput);
440    getRecordWriter(taskContext, baseOutputPath).write(key, value);
441  }
442
443  /**
444   * Write key value to an output file name.
445   * 
446   * Gets the record writer from job's output format.  
447   * Job's output format should be a FileOutputFormat.
448   * 
449   * @param key       the key
450   * @param value     the value
451   * @param baseOutputPath base-output path to write the record to.
452   * Note: Framework will generate unique filename for the baseOutputPath
453   * <b>Warning</b>: when the baseOutputPath is a path that resolves
454   * outside of the final job output directory, the directory is created
455   * immediately and then persists through subsequent task retries, breaking
456   * the concept of output committing.
457   */
458  @SuppressWarnings("unchecked")
459  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
460      throws IOException, InterruptedException {
461    checkBaseOutputPath(baseOutputPath);
462    if (jobOutputFormatContext == null) {
463      jobOutputFormatContext = 
464        new TaskAttemptContextImpl(context.getConfiguration(), 
465                                   context.getTaskAttemptID(),
466                                   new WrappedStatusReporter(context));
467    }
468    getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
469  }
470
471  // by being synchronized MultipleOutputTask can be use with a
472  // MultithreadedMapper.
473  @SuppressWarnings("unchecked")
474  private synchronized RecordWriter getRecordWriter(
475      TaskAttemptContext taskContext, String baseFileName) 
476      throws IOException, InterruptedException {
477    
478    // look for record-writer in the cache
479    RecordWriter writer = recordWriters.get(baseFileName);
480    
481    // If not in cache, create a new one
482    if (writer == null) {
483      // get the record writer from context output format
484      FileOutputFormat.setOutputName(taskContext, baseFileName);
485      try {
486        writer = ((OutputFormat) ReflectionUtils.newInstance(
487          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
488          .getRecordWriter(taskContext);
489      } catch (ClassNotFoundException e) {
490        throw new IOException(e);
491      }
492 
493      // if counters are enabled, wrap the writer with context 
494      // to increment counters 
495      if (countersEnabled) {
496        writer = new RecordWriterWithCounter(writer, baseFileName, context);
497      }
498      
499      // add the record-writer to the cache
500      recordWriters.put(baseFileName, writer);
501    }
502    return writer;
503  }
504
505   // Create a taskAttemptContext for the named output with 
506   // output format and output key/value types put in the context
507  private TaskAttemptContext getContext(String nameOutput) throws IOException {
508      
509    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
510    
511    if (taskContext != null) {
512        return taskContext;
513    }
514    
515    // The following trick leverages the instantiation of a record writer via
516    // the job thus supporting arbitrary output formats.
517    Job job = Job.getInstance(context.getConfiguration());
518    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
519    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
520    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
521    taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
522        .getTaskAttemptID(), new WrappedStatusReporter(context));
523
524    taskContexts.put(nameOutput, taskContext);
525
526    return taskContext;
527  }
528
529  private static class WrappedStatusReporter extends StatusReporter {
530
531    TaskAttemptContext context;
532
533    public WrappedStatusReporter(TaskAttemptContext context) {
534      this.context = context;
535    }
536
537    @Override
538    public Counter getCounter(Enum<?> name) {
539      return context.getCounter(name);
540    }
541
542    @Override
543    public Counter getCounter(String group, String name) {
544      return context.getCounter(group, name);
545    }
546
547    @Override
548    public void progress() {
549      context.progress();
550    }
551
552    @Override
553    public float getProgress() {
554      return context.getProgress();
555    }
556    
557    @Override
558    public void setStatus(String status) {
559      context.setStatus(status);
560    }
561  }
562
563  /**
564   * Closes all the opened outputs.
565   * 
566   * This should be called from cleanup method of map/reduce task.
567   * If overridden subclasses must invoke <code>super.close()</code> at the
568   * end of their <code>close()</code>
569   * 
570   */
571  @SuppressWarnings("unchecked")
572  public void close() throws IOException, InterruptedException {
573    for (RecordWriter writer : recordWriters.values()) {
574      writer.close(context);
575    }
576  }
577}