001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.sink;
020
021import com.google.common.annotations.VisibleForTesting;
022import java.io.Closeable;
023import java.io.IOException;
024import java.io.PrintStream;
025import java.net.InetAddress;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.nio.charset.StandardCharsets;
029import java.util.Calendar;
030import java.util.Date;
031import java.util.TimeZone;
032import java.util.Timer;
033import java.util.TimerTask;
034
035import org.apache.commons.configuration.SubsetConfiguration;
036import org.apache.commons.lang.time.FastDateFormat;
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.classification.InterfaceStability;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.LocatedFileStatus;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.fs.RemoteIterator;
045import org.apache.hadoop.metrics2.AbstractMetric;
046import org.apache.hadoop.metrics2.MetricsException;
047import org.apache.hadoop.metrics2.MetricsRecord;
048import org.apache.hadoop.metrics2.MetricsSink;
049import org.apache.hadoop.metrics2.MetricsTag;
050import org.apache.hadoop.security.SecurityUtil;
051import org.apache.hadoop.security.UserGroupInformation;
052
053/**
054 * <p>This class is a metrics sink that uses
055 * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs.  Every
056 * hour a new directory will be created under the path specified by the
057 * <code>basepath</code> property. All metrics will be logged to a file in the
058 * current hour's directory in a file named &lt;hostname&gt;.log, where
059 * &lt;hostname&gt; is the name of the host on which the metrics logging
060 * process is running. The base path is set by the
061 * <code>&lt;prefix&gt;.sink.&lt;instance&gt;.basepath</code> property.  The
062 * time zone used to create the current hour's directory name is GMT.  If the
063 * <code>basepath</code> property isn't specified, it will default to
064 * &quot;/tmp&quot;, which is the temp directory on whatever default file
065 * system is configured for the cluster.</p>
066 *
067 * <p>The <code>&lt;prefix&gt;.sink.&lt;instance&gt;.ignore-error</code>
068 * property controls whether an exception is thrown when an error is encountered
069 * writing a log file.  The default value is <code>true</code>.  When set to
070 * <code>false</code>, file errors are quietly swallowed.</p>
071 *
072 * <p>The primary use of this class is for logging to HDFS.  As it uses
073 * {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
074 * however, it can be used to write to the local file system, Amazon S3, or any
075 * other supported file system.  The base path for the sink will determine the
076 * file system used.  An unqualified path will write to the default file system
077 * set by the configuration.</p>
078 *
079 * <p>Not all file systems support the ability to append to files.  In file
080 * systems without the ability to append to files, only one writer can write to
081 * a file at a time.  To allow for concurrent writes from multiple daemons on a
082 * single host, the <code>source</code> property should be set to the name of
083 * the source daemon, e.g. <i>namenode</i>.  The value of the
084 * <code>source</code> property should typically be the same as the property's
085 * prefix.  If this property is not set, the source is taken to be
086 * <i>unknown</i>.</p>
087 *
088 * <p>Instead of appending to an existing file, by default the sink
089 * will create a new file with a suffix of &quot;.&lt;n&gt;&quet;, where
090 * <i>n</i> is the next lowest integer that isn't already used in a file name,
091 * similar to the Hadoop daemon logs.  NOTE: the file with the <b>highest</b>
092 * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.</p>
093 *
094 * <p>For file systems that allow append, the sink supports appending to the
095 * existing file instead. If the <code>allow-append</code> property is set to
096 * true, the sink will instead append to the existing file on file systems that
097 * support appends. By default, the <code>allow-append</code> property is
098 * false.</p>
099 *
100 * <p>Note that when writing to HDFS with <code>allow-append</code> set to true,
101 * there is a minimum acceptable number of data nodes.  If the number of data
102 * nodes drops below that minimum, the append will succeed, but reading the
103 * data will fail with an IOException in the DataStreamer class.  The minimum
104 * number of data nodes required for a successful append is generally 2 or
105 * 3.</p>
106 *
107 * <p>Note also that when writing to HDFS, the file size information is not
108 * updated until the file is closed (e.g. at the top of the hour) even though
109 * the data is being written successfully. This is a known HDFS limitation that
110 * exists because of the performance cost of updating the metadata.  See
111 * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
112 *
113 * <p>When using this sink in a secure (Kerberos) environment, two additional
114 * properties must be set: <code>keytab-key</code> and
115 * <code>principal-key</code>. <code>keytab-key</code> should contain the key by
116 * which the keytab file can be found in the configuration, for example,
117 * <code>yarn.nodemanager.keytab</code>. <code>principal-key</code> should
118 * contain the key by which the principal can be found in the configuration,
119 * for example, <code>yarn.nodemanager.principal</code>.
120 */
121@InterfaceAudience.Public
122@InterfaceStability.Evolving
123public class RollingFileSystemSink implements MetricsSink, Closeable {
124  private static final String BASEPATH_KEY = "basepath";
125  private static final String SOURCE_KEY = "source";
126  private static final String IGNORE_ERROR_KEY = "ignore-error";
127  private static final String ALLOW_APPEND_KEY = "allow-append";
128  private static final String KEYTAB_PROPERTY_KEY = "keytab-key";
129  private static final String USERNAME_PROPERTY_KEY = "principal-key";
130  private static final String SOURCE_DEFAULT = "unknown";
131  private static final String BASEPATH_DEFAULT = "/tmp";
132  private static final FastDateFormat DATE_FORMAT =
133      FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
134  private final Object lock = new Object();
135  private boolean initialized = false;
136  private SubsetConfiguration properties;
137  private Configuration conf;
138  private String source;
139  private boolean ignoreError;
140  private boolean allowAppend;
141  private Path basePath;
142  private FileSystem fileSystem;
143  // The current directory path into which we're writing files
144  private Path currentDirPath;
145  // The path to the current file into which we're writing data
146  private Path currentFilePath;
147  // The stream to which we're currently writing.
148  private PrintStream currentOutStream;
149  // We keep this only to be able to call hsynch() on it.
150  private FSDataOutputStream currentFSOutStream;
151  private Timer flushTimer;
152
153  // This flag is used during testing to make the flusher thread run after only
154  // a short pause instead of waiting for the top of the hour.
155  @VisibleForTesting
156  protected static boolean flushQuickly = false;
157  // This flag is used by the flusher thread to indicate that it has run. Used
158  // only for testing purposes.
159  @VisibleForTesting
160  protected static volatile boolean hasFlushed = false;
161  // Use this configuration instead of loading a new one.
162  @VisibleForTesting
163  protected static Configuration suppliedConf = null;
164  // Use this file system instead of getting a new one.
165  @VisibleForTesting
166  protected static FileSystem suppliedFilesystem = null;
167
168  @Override
169  public void init(SubsetConfiguration metrics2Properties) {
170    properties = metrics2Properties;
171    basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
172    source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT);
173    ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false);
174    allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false);
175
176    conf = loadConf();
177    UserGroupInformation.setConfiguration(conf);
178
179    // Don't do secure setup if it's not needed.
180    if (UserGroupInformation.isSecurityEnabled()) {
181      // Validate config so that we don't get an NPE
182      checkForProperty(properties, KEYTAB_PROPERTY_KEY);
183      checkForProperty(properties, USERNAME_PROPERTY_KEY);
184
185
186      try {
187        // Login as whoever we're supposed to be and let the hostname be pulled
188        // from localhost. If security isn't enabled, this does nothing.
189        SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY),
190            properties.getString(USERNAME_PROPERTY_KEY));
191      } catch (IOException ex) {
192        throw new MetricsException("Error logging in securely: ["
193            + ex.toString() + "]", ex);
194      }
195    }
196  }
197
198  /**
199   * Initialize the connection to HDFS and create the base directory. Also
200   * launch the flush thread.
201   */
202  private boolean initFs() {
203    boolean success = false;
204
205    fileSystem = getFileSystem();
206
207    // This step isn't strictly necessary, but it makes debugging issues much
208    // easier. We try to create the base directory eagerly and fail with
209    // copious debug info if it fails.
210    try {
211      fileSystem.mkdirs(basePath);
212      success = true;
213    } catch (Exception ex) {
214      if (!ignoreError) {
215        throw new MetricsException("Failed to create " + basePath + "["
216            + SOURCE_KEY + "=" + source + ", "
217            + ALLOW_APPEND_KEY + "=" + allowAppend + ", "
218            + stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", "
219            + stringifySecurityProperty(USERNAME_PROPERTY_KEY)
220            + "] -- " + ex.toString(), ex);
221      }
222    }
223
224    if (success) {
225      // If we're permitted to append, check if we actually can
226      if (allowAppend) {
227        allowAppend = checkAppend(fileSystem);
228      }
229
230      flushTimer = new Timer("RollingFileSystemSink Flusher", true);
231    }
232
233    return success;
234  }
235
236  /**
237   * Turn a security property into a nicely formatted set of <i>name=value</i>
238   * strings, allowing for either the property or the configuration not to be
239   * set.
240   *
241   * @param properties the sink properties
242   * @param conf the conf
243   * @param property the property to stringify
244   * @return the stringified property
245   */
246  private String stringifySecurityProperty(String property) {
247    String securityProperty;
248
249    if (properties.containsKey(property)) {
250      String propertyValue = properties.getString(property);
251      String confValue = conf.get(properties.getString(property));
252
253      if (confValue != null) {
254        securityProperty = property + "=" + propertyValue
255            + ", " + properties.getString(property) + "=" + confValue;
256      } else {
257        securityProperty = property + "=" + propertyValue
258            + ", " + properties.getString(property) + "=<NOT SET>";
259      }
260    } else {
261      securityProperty = property + "=<NOT SET>";
262    }
263
264    return securityProperty;
265  }
266
267  /**
268   * Throw a {@link MetricsException} if the given property is not set.
269   *
270   * @param conf the configuration to test
271   * @param key the key to validate
272   */
273  private static void checkForProperty(SubsetConfiguration conf, String key) {
274    if (!conf.containsKey(key)) {
275      throw new MetricsException("Configuration is missing " + key
276          + " property");
277    }
278  }
279
280  /**
281   * Return the supplied configuration for testing or otherwise load a new
282   * configuration.
283   *
284   * @return the configuration to use
285   */
286  private Configuration loadConf() {
287    Configuration c;
288
289    if (suppliedConf != null) {
290      c = suppliedConf;
291    } else {
292      // The config we're handed in init() isn't the one we want here, so we
293      // create a new one to pick up the full settings.
294      c = new Configuration();
295    }
296
297    return c;
298  }
299
300  /**
301   * Return the supplied file system for testing or otherwise get a new file
302   * system.
303   *
304   * @param conf the configuration
305   * @return the file system to use
306   * @throws MetricsException thrown if the file system could not be retrieved
307   */
308  private FileSystem getFileSystem() throws MetricsException {
309    FileSystem fs = null;
310
311    if (suppliedFilesystem != null) {
312      fs = suppliedFilesystem;
313    } else {
314      try {
315        fs = FileSystem.get(new URI(basePath.toString()), conf);
316      } catch (URISyntaxException ex) {
317        throw new MetricsException("The supplied filesystem base path URI"
318            + " is not a valid URI: " + basePath.toString(), ex);
319      } catch (IOException ex) {
320        throw new MetricsException("Error connecting to file system: "
321            + basePath + " [" + ex.toString() + "]", ex);
322      }
323    }
324
325    return fs;
326  }
327
328  /**
329   * Test whether the file system supports append and return the answer.
330   * @param fs the target file system
331   */
332  private boolean checkAppend(FileSystem fs) {
333    boolean canAppend = true;
334
335    try {
336      fs.append(basePath);
337    } catch (IOException ex) {
338      if (ex.getMessage().equals("Not supported")) {
339        canAppend = false;
340      }
341    }
342
343    return canAppend;
344  }
345
346  /**
347   * Check the current directory against the time stamp.  If they're not
348   * the same, create a new directory and a new log file in that directory.
349   *
350   * @throws MetricsException thrown if an error occurs while creating the
351   * new directory or new log file
352   */
353  private void rollLogDirIfNeeded() throws MetricsException {
354    Date now = new Date();
355    String currentDir = DATE_FORMAT.format(now);
356    Path path = new Path(basePath, currentDir);
357
358    // We check whether currentOutStream is null instead of currentDirPath,
359    // because if currentDirPath is null, then currentOutStream is null, but
360    // currentOutStream can be null for other reasons.
361    if ((currentOutStream == null) || !path.equals(currentDirPath)) {
362      // If we're not yet connected to HDFS, create the connection
363      if (!initialized) {
364        initialized = initFs();
365      }
366
367      if (initialized) {
368        // Close the stream. This step could have been handled already by the
369        // flusher thread, but if it has, the PrintStream will just swallow the
370        // exception, which is fine.
371        if (currentOutStream != null) {
372          currentOutStream.close();
373        }
374
375        currentDirPath = path;
376
377        try {
378          rollLogDir();
379        } catch (IOException ex) {
380          throwMetricsException("Failed to create new log file", ex);
381        }
382
383        scheduleFlush(now);
384      }
385    }
386  }
387
388  /**
389   * Schedule the current hour's directory to be flushed at the top of the next
390   * hour. If this ends up running after the top of the next hour, it will
391   * execute immediately.
392   *
393   * @param now the current time
394   */
395  private void scheduleFlush(Date now) {
396    // Store the current currentDirPath to close later
397    final PrintStream toClose = currentOutStream;
398    Calendar next = Calendar.getInstance();
399
400    next.setTime(now);
401
402    if (flushQuickly) {
403      // If we're running unit tests, flush after a short pause
404      next.add(Calendar.MILLISECOND, 400);
405    } else {
406      // Otherwise flush at the top of the hour
407      next.set(Calendar.SECOND, 0);
408      next.set(Calendar.MINUTE, 0);
409      next.add(Calendar.HOUR, 1);
410    }
411
412    flushTimer.schedule(new TimerTask() {
413      @Override
414      public void run() {
415        synchronized (lock) {
416          // This close may have already been done by a putMetrics() call. If it
417          // has, the PrintStream will swallow the exception, which is fine.
418          toClose.close();
419        }
420
421        hasFlushed = true;
422      }
423    }, next.getTime());
424  }
425
426  /**
427   * Create a new directory based on the current hour and a new log file in
428   * that directory.
429   *
430   * @throws IOException thrown if an error occurs while creating the
431   * new directory or new log file
432   */
433  private void rollLogDir() throws IOException {
434    String fileName =
435        source + "-" + InetAddress.getLocalHost().getHostName() + ".log";
436
437    Path targetFile = new Path(currentDirPath, fileName);
438    fileSystem.mkdirs(currentDirPath);
439
440    if (allowAppend) {
441      createOrAppendLogFile(targetFile);
442    } else {
443      createLogFile(targetFile);
444    }
445  }
446
447  /**
448   * Create a new log file and return the {@link FSDataOutputStream}. If a
449   * file with the specified path already exists, add a suffix, starting with 1
450   * and try again. Keep incrementing the suffix until a nonexistent target
451   * path is found.
452   *
453   * Once the file is open, update {@link #currentFSOutStream},
454   * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
455   *
456   * @param initial the target path
457   * @throws IOException thrown if the call to see if the exists fails
458   */
459  private void createLogFile(Path initial) throws IOException {
460    Path currentAttempt = initial;
461    // Start at 0 so that if the base filname exists, we start with the suffix
462    // ".1".
463    int id = 0;
464
465    while (true) {
466      // First try blindly creating the file. If we fail, it either means
467      // the file exists, or the operation actually failed.  We do it this way
468      // because if we check whether the file exists, it might still be created
469      // by the time we try to create it. Creating first works like a
470      // test-and-set.
471      try {
472        currentFSOutStream = fileSystem.create(currentAttempt, false);
473        currentOutStream = new PrintStream(currentFSOutStream, true,
474            StandardCharsets.UTF_8.name());
475        currentFilePath = currentAttempt;
476        break;
477      } catch (IOException ex) {
478        // Now we can check to see if the file exists to know why we failed
479        if (fileSystem.exists(currentAttempt)) {
480          id = getNextIdToTry(initial, id);
481          currentAttempt = new Path(initial.toString() + "." + id);
482        } else {
483          throw ex;
484        }
485      }
486    }
487  }
488
489  /**
490   * Return the next ID suffix to use when creating the log file. This method
491   * will look at the files in the directory, find the one with the highest
492   * ID suffix, and 1 to that suffix, and return it. This approach saves a full
493   * linear probe, which matters in the case where there are a large number of
494   * log files.
495   *
496   * @param initial the base file path
497   * @param lastId the last ID value that was used
498   * @return the next ID to try
499   * @throws IOException thrown if there's an issue querying the files in the
500   * directory
501   */
502  private int getNextIdToTry(Path initial, int lastId)
503      throws IOException {
504    RemoteIterator<LocatedFileStatus> files =
505        fileSystem.listFiles(currentDirPath, true);
506    String base = initial.toString();
507    int id = lastId;
508
509    while (files.hasNext()) {
510      String file = files.next().getPath().getName();
511
512      if (file.startsWith(base)) {
513        int fileId = extractId(file);
514
515        if (fileId > id) {
516          id = fileId;
517        }
518      }
519    }
520
521    // Return either 1 more than the highest we found or 1 more than the last
522    // ID used (if no ID was found).
523    return id + 1;
524  }
525
526  /**
527   * Extract the ID from the suffix of the given file name.
528   *
529   * @param file the file name
530   * @return the ID or -1 if no ID could be extracted
531   */
532  private int extractId(String file) {
533    int index = file.lastIndexOf(".");
534    int id = -1;
535
536    // A hostname has to have at least 1 character
537    if (index > 0) {
538      try {
539        id = Integer.parseInt(file.substring(index + 1));
540      } catch (NumberFormatException ex) {
541        // This can happen if there's no suffix, but there is a dot in the
542        // hostname.  Just ignore it.
543      }
544    }
545
546    return id;
547  }
548
549  /**
550   * Create a new log file and return the {@link FSDataOutputStream}. If a
551   * file with the specified path already exists, open the file for append
552   * instead.
553   *
554   * Once the file is open, update {@link #currentFSOutStream},
555   * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
556   *
557   * @param initial the target path
558   * @throws IOException thrown if the call to see the append operation fails.
559   */
560  private void createOrAppendLogFile(Path targetFile) throws IOException {
561    // First try blindly creating the file. If we fail, it either means
562    // the file exists, or the operation actually failed.  We do it this way
563    // because if we check whether the file exists, it might still be created
564    // by the time we try to create it. Creating first works like a
565    // test-and-set.
566    try {
567      currentFSOutStream = fileSystem.create(targetFile, false);
568      currentOutStream = new PrintStream(currentFSOutStream, true,
569          StandardCharsets.UTF_8.name());
570    } catch (IOException ex) {
571      // Try appending instead.  If we fail, if means the file doesn't
572      // actually exist yet or the operation actually failed.
573      try {
574        currentFSOutStream = fileSystem.append(targetFile);
575        currentOutStream = new PrintStream(currentFSOutStream, true,
576            StandardCharsets.UTF_8.name());
577      } catch (IOException ex2) {
578        // If the original create failed for a legit but transitory
579        // reason, the append will fail because the file now doesn't exist,
580        // resulting in a confusing stack trace.  To avoid that, we set
581        // the cause of the second exception to be the first exception.
582        // It's still a tiny bit confusing, but it's enough
583        // information that someone should be able to figure it out.
584        ex2.initCause(ex);
585
586        throw ex2;
587      }
588    }
589
590    currentFilePath = targetFile;
591  }
592
593  @Override
594  public void putMetrics(MetricsRecord record) {
595    synchronized (lock) {
596      rollLogDirIfNeeded();
597
598      if (currentOutStream != null) {
599        currentOutStream.printf("%d %s.%s", record.timestamp(),
600            record.context(), record.name());
601
602        String separator = ": ";
603
604        for (MetricsTag tag : record.tags()) {
605          currentOutStream.printf("%s%s=%s", separator, tag.name(),
606              tag.value());
607          separator = ", ";
608        }
609
610        for (AbstractMetric metric : record.metrics()) {
611          currentOutStream.printf("%s%s=%s", separator, metric.name(),
612              metric.value());
613        }
614
615        currentOutStream.println();
616
617        // If we don't hflush(), the data may not be written until the file is
618        // closed. The file won't be closed until the top of the hour *AND*
619        // another record is received. Calling hflush() makes sure that the data
620        // is complete at the top of the hour.
621        try {
622          currentFSOutStream.hflush();
623        } catch (IOException ex) {
624          throwMetricsException("Failed flushing the stream", ex);
625        }
626
627        checkForErrors("Unable to write to log file");
628      } else if (!ignoreError) {
629        throwMetricsException("Unable to write to log file");
630      }
631    }
632  }
633
634  @Override
635  public void flush() {
636    synchronized (lock) {
637      // currentOutStream is null if currentFSOutStream is null
638      if (currentFSOutStream != null) {
639        try {
640          currentFSOutStream.hflush();
641        } catch (IOException ex) {
642          throwMetricsException("Unable to flush log file", ex);
643        }
644      }
645    }
646  }
647
648  @Override
649  public void close() {
650    synchronized (lock) {
651      if (currentOutStream != null) {
652        currentOutStream.close();
653
654        try {
655          checkForErrors("Unable to close log file");
656        } finally {
657          // Null out the streams just in case someone tries to reuse us.
658          currentOutStream = null;
659          currentFSOutStream = null;
660        }
661      }
662    }
663  }
664
665  /**
666   * If the sink isn't set to ignore errors, throw a {@link MetricsException}
667   * if the stream encountered an exception.  The message parameter will be used
668   * as the new exception's message with the current file name
669   * ({@link #currentFilePath}) appended to it.
670   *
671   * @param message the exception message. The message will have the current
672   * file name ({@link #currentFilePath}) appended to it.
673   * @throws MetricsException thrown if there was an error and the sink isn't
674   * ignoring errors
675   */
676  private void checkForErrors(String message)
677      throws MetricsException {
678    if (!ignoreError && currentOutStream.checkError()) {
679      throw new MetricsException(message + ": " + currentFilePath);
680    }
681  }
682
683  /**
684   * If the sink isn't set to ignore errors, wrap the Throwable in a
685   * {@link MetricsException} and throw it.  The message parameter will be used
686   * as the new exception's message with the current file name
687   * ({@link #currentFilePath}) and the Throwable's string representation
688   * appended to it.
689   *
690   * @param message the exception message. The message will have the current
691   * file name ({@link #currentFilePath}) and the Throwable's string
692   * representation appended to it.
693   * @param t the Throwable to wrap
694   */
695  private void throwMetricsException(String message, Throwable t) {
696    if (!ignoreError) {
697      throw new MetricsException(message + ": " + currentFilePath + " ["
698          + t.toString() + "]", t);
699    }
700  }
701
702  /**
703   * If the sink isn't set to ignore errors, throw a new
704   * {@link MetricsException}.  The message parameter will be used  as the
705   * new exception's message with the current file name
706   * ({@link #currentFilePath}) appended to it.
707   *
708   * @param message the exception message. The message will have the current
709   * file name ({@link #currentFilePath}) appended to it.
710   */
711  private void throwMetricsException(String message) {
712    if (!ignoreError) {
713      throw new MetricsException(message + ": " + currentFilePath);
714    }
715  }
716}