001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.sink; 020 021import com.google.common.annotations.VisibleForTesting; 022import java.io.Closeable; 023import java.io.IOException; 024import java.io.PrintStream; 025import java.net.InetAddress; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.nio.charset.StandardCharsets; 029import java.util.Calendar; 030import java.util.Date; 031import java.util.TimeZone; 032import java.util.Timer; 033import java.util.TimerTask; 034 035import org.apache.commons.configuration.SubsetConfiguration; 036import org.apache.commons.lang.time.FastDateFormat; 037import org.apache.hadoop.classification.InterfaceAudience; 038import org.apache.hadoop.classification.InterfaceStability; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.LocatedFileStatus; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.fs.RemoteIterator; 045import org.apache.hadoop.metrics2.AbstractMetric; 046import org.apache.hadoop.metrics2.MetricsException; 047import org.apache.hadoop.metrics2.MetricsRecord; 048import org.apache.hadoop.metrics2.MetricsSink; 049import org.apache.hadoop.metrics2.MetricsTag; 050import org.apache.hadoop.security.SecurityUtil; 051import org.apache.hadoop.security.UserGroupInformation; 052 053/** 054 * <p>This class is a metrics sink that uses 055 * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every 056 * hour a new directory will be created under the path specified by the 057 * <code>basepath</code> property. All metrics will be logged to a file in the 058 * current hour's directory in a file named <hostname>.log, where 059 * <hostname> is the name of the host on which the metrics logging 060 * process is running. The base path is set by the 061 * <code><prefix>.sink.<instance>.basepath</code> property. The 062 * time zone used to create the current hour's directory name is GMT. If the 063 * <code>basepath</code> property isn't specified, it will default to 064 * "/tmp", which is the temp directory on whatever default file 065 * system is configured for the cluster.</p> 066 * 067 * <p>The <code><prefix>.sink.<instance>.ignore-error</code> 068 * property controls whether an exception is thrown when an error is encountered 069 * writing a log file. The default value is <code>true</code>. When set to 070 * <code>false</code>, file errors are quietly swallowed.</p> 071 * 072 * <p>The primary use of this class is for logging to HDFS. As it uses 073 * {@link org.apache.hadoop.fs.FileSystem} to access the target file system, 074 * however, it can be used to write to the local file system, Amazon S3, or any 075 * other supported file system. The base path for the sink will determine the 076 * file system used. An unqualified path will write to the default file system 077 * set by the configuration.</p> 078 * 079 * <p>Not all file systems support the ability to append to files. In file 080 * systems without the ability to append to files, only one writer can write to 081 * a file at a time. To allow for concurrent writes from multiple daemons on a 082 * single host, the <code>source</code> property should be set to the name of 083 * the source daemon, e.g. <i>namenode</i>. The value of the 084 * <code>source</code> property should typically be the same as the property's 085 * prefix. If this property is not set, the source is taken to be 086 * <i>unknown</i>.</p> 087 * 088 * <p>Instead of appending to an existing file, by default the sink 089 * will create a new file with a suffix of ".<n>&quet;, where 090 * <i>n</i> is the next lowest integer that isn't already used in a file name, 091 * similar to the Hadoop daemon logs. NOTE: the file with the <b>highest</b> 092 * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.</p> 093 * 094 * <p>For file systems that allow append, the sink supports appending to the 095 * existing file instead. If the <code>allow-append</code> property is set to 096 * true, the sink will instead append to the existing file on file systems that 097 * support appends. By default, the <code>allow-append</code> property is 098 * false.</p> 099 * 100 * <p>Note that when writing to HDFS with <code>allow-append</code> set to true, 101 * there is a minimum acceptable number of data nodes. If the number of data 102 * nodes drops below that minimum, the append will succeed, but reading the 103 * data will fail with an IOException in the DataStreamer class. The minimum 104 * number of data nodes required for a successful append is generally 2 or 105 * 3.</p> 106 * 107 * <p>Note also that when writing to HDFS, the file size information is not 108 * updated until the file is closed (e.g. at the top of the hour) even though 109 * the data is being written successfully. This is a known HDFS limitation that 110 * exists because of the performance cost of updating the metadata. See 111 * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p> 112 * 113 * <p>When using this sink in a secure (Kerberos) environment, two additional 114 * properties must be set: <code>keytab-key</code> and 115 * <code>principal-key</code>. <code>keytab-key</code> should contain the key by 116 * which the keytab file can be found in the configuration, for example, 117 * <code>yarn.nodemanager.keytab</code>. <code>principal-key</code> should 118 * contain the key by which the principal can be found in the configuration, 119 * for example, <code>yarn.nodemanager.principal</code>. 120 */ 121@InterfaceAudience.Public 122@InterfaceStability.Evolving 123public class RollingFileSystemSink implements MetricsSink, Closeable { 124 private static final String BASEPATH_KEY = "basepath"; 125 private static final String SOURCE_KEY = "source"; 126 private static final String IGNORE_ERROR_KEY = "ignore-error"; 127 private static final String ALLOW_APPEND_KEY = "allow-append"; 128 private static final String KEYTAB_PROPERTY_KEY = "keytab-key"; 129 private static final String USERNAME_PROPERTY_KEY = "principal-key"; 130 private static final String SOURCE_DEFAULT = "unknown"; 131 private static final String BASEPATH_DEFAULT = "/tmp"; 132 private static final FastDateFormat DATE_FORMAT = 133 FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT")); 134 private final Object lock = new Object(); 135 private boolean initialized = false; 136 private SubsetConfiguration properties; 137 private Configuration conf; 138 private String source; 139 private boolean ignoreError; 140 private boolean allowAppend; 141 private Path basePath; 142 private FileSystem fileSystem; 143 // The current directory path into which we're writing files 144 private Path currentDirPath; 145 // The path to the current file into which we're writing data 146 private Path currentFilePath; 147 // The stream to which we're currently writing. 148 private PrintStream currentOutStream; 149 // We keep this only to be able to call hsynch() on it. 150 private FSDataOutputStream currentFSOutStream; 151 private Timer flushTimer; 152 153 // This flag is used during testing to make the flusher thread run after only 154 // a short pause instead of waiting for the top of the hour. 155 @VisibleForTesting 156 protected static boolean flushQuickly = false; 157 // This flag is used by the flusher thread to indicate that it has run. Used 158 // only for testing purposes. 159 @VisibleForTesting 160 protected static volatile boolean hasFlushed = false; 161 // Use this configuration instead of loading a new one. 162 @VisibleForTesting 163 protected static Configuration suppliedConf = null; 164 // Use this file system instead of getting a new one. 165 @VisibleForTesting 166 protected static FileSystem suppliedFilesystem = null; 167 168 @Override 169 public void init(SubsetConfiguration metrics2Properties) { 170 properties = metrics2Properties; 171 basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT)); 172 source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT); 173 ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false); 174 allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false); 175 176 conf = loadConf(); 177 UserGroupInformation.setConfiguration(conf); 178 179 // Don't do secure setup if it's not needed. 180 if (UserGroupInformation.isSecurityEnabled()) { 181 // Validate config so that we don't get an NPE 182 checkForProperty(properties, KEYTAB_PROPERTY_KEY); 183 checkForProperty(properties, USERNAME_PROPERTY_KEY); 184 185 186 try { 187 // Login as whoever we're supposed to be and let the hostname be pulled 188 // from localhost. If security isn't enabled, this does nothing. 189 SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY), 190 properties.getString(USERNAME_PROPERTY_KEY)); 191 } catch (IOException ex) { 192 throw new MetricsException("Error logging in securely: [" 193 + ex.toString() + "]", ex); 194 } 195 } 196 } 197 198 /** 199 * Initialize the connection to HDFS and create the base directory. Also 200 * launch the flush thread. 201 */ 202 private boolean initFs() { 203 boolean success = false; 204 205 fileSystem = getFileSystem(); 206 207 // This step isn't strictly necessary, but it makes debugging issues much 208 // easier. We try to create the base directory eagerly and fail with 209 // copious debug info if it fails. 210 try { 211 fileSystem.mkdirs(basePath); 212 success = true; 213 } catch (Exception ex) { 214 if (!ignoreError) { 215 throw new MetricsException("Failed to create " + basePath + "[" 216 + SOURCE_KEY + "=" + source + ", " 217 + ALLOW_APPEND_KEY + "=" + allowAppend + ", " 218 + stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", " 219 + stringifySecurityProperty(USERNAME_PROPERTY_KEY) 220 + "] -- " + ex.toString(), ex); 221 } 222 } 223 224 if (success) { 225 // If we're permitted to append, check if we actually can 226 if (allowAppend) { 227 allowAppend = checkAppend(fileSystem); 228 } 229 230 flushTimer = new Timer("RollingFileSystemSink Flusher", true); 231 } 232 233 return success; 234 } 235 236 /** 237 * Turn a security property into a nicely formatted set of <i>name=value</i> 238 * strings, allowing for either the property or the configuration not to be 239 * set. 240 * 241 * @param properties the sink properties 242 * @param conf the conf 243 * @param property the property to stringify 244 * @return the stringified property 245 */ 246 private String stringifySecurityProperty(String property) { 247 String securityProperty; 248 249 if (properties.containsKey(property)) { 250 String propertyValue = properties.getString(property); 251 String confValue = conf.get(properties.getString(property)); 252 253 if (confValue != null) { 254 securityProperty = property + "=" + propertyValue 255 + ", " + properties.getString(property) + "=" + confValue; 256 } else { 257 securityProperty = property + "=" + propertyValue 258 + ", " + properties.getString(property) + "=<NOT SET>"; 259 } 260 } else { 261 securityProperty = property + "=<NOT SET>"; 262 } 263 264 return securityProperty; 265 } 266 267 /** 268 * Throw a {@link MetricsException} if the given property is not set. 269 * 270 * @param conf the configuration to test 271 * @param key the key to validate 272 */ 273 private static void checkForProperty(SubsetConfiguration conf, String key) { 274 if (!conf.containsKey(key)) { 275 throw new MetricsException("Configuration is missing " + key 276 + " property"); 277 } 278 } 279 280 /** 281 * Return the supplied configuration for testing or otherwise load a new 282 * configuration. 283 * 284 * @return the configuration to use 285 */ 286 private Configuration loadConf() { 287 Configuration c; 288 289 if (suppliedConf != null) { 290 c = suppliedConf; 291 } else { 292 // The config we're handed in init() isn't the one we want here, so we 293 // create a new one to pick up the full settings. 294 c = new Configuration(); 295 } 296 297 return c; 298 } 299 300 /** 301 * Return the supplied file system for testing or otherwise get a new file 302 * system. 303 * 304 * @param conf the configuration 305 * @return the file system to use 306 * @throws MetricsException thrown if the file system could not be retrieved 307 */ 308 private FileSystem getFileSystem() throws MetricsException { 309 FileSystem fs = null; 310 311 if (suppliedFilesystem != null) { 312 fs = suppliedFilesystem; 313 } else { 314 try { 315 fs = FileSystem.get(new URI(basePath.toString()), conf); 316 } catch (URISyntaxException ex) { 317 throw new MetricsException("The supplied filesystem base path URI" 318 + " is not a valid URI: " + basePath.toString(), ex); 319 } catch (IOException ex) { 320 throw new MetricsException("Error connecting to file system: " 321 + basePath + " [" + ex.toString() + "]", ex); 322 } 323 } 324 325 return fs; 326 } 327 328 /** 329 * Test whether the file system supports append and return the answer. 330 * @param fs the target file system 331 */ 332 private boolean checkAppend(FileSystem fs) { 333 boolean canAppend = true; 334 335 try { 336 fs.append(basePath); 337 } catch (IOException ex) { 338 if (ex.getMessage().equals("Not supported")) { 339 canAppend = false; 340 } 341 } 342 343 return canAppend; 344 } 345 346 /** 347 * Check the current directory against the time stamp. If they're not 348 * the same, create a new directory and a new log file in that directory. 349 * 350 * @throws MetricsException thrown if an error occurs while creating the 351 * new directory or new log file 352 */ 353 private void rollLogDirIfNeeded() throws MetricsException { 354 Date now = new Date(); 355 String currentDir = DATE_FORMAT.format(now); 356 Path path = new Path(basePath, currentDir); 357 358 // We check whether currentOutStream is null instead of currentDirPath, 359 // because if currentDirPath is null, then currentOutStream is null, but 360 // currentOutStream can be null for other reasons. 361 if ((currentOutStream == null) || !path.equals(currentDirPath)) { 362 // If we're not yet connected to HDFS, create the connection 363 if (!initialized) { 364 initialized = initFs(); 365 } 366 367 if (initialized) { 368 // Close the stream. This step could have been handled already by the 369 // flusher thread, but if it has, the PrintStream will just swallow the 370 // exception, which is fine. 371 if (currentOutStream != null) { 372 currentOutStream.close(); 373 } 374 375 currentDirPath = path; 376 377 try { 378 rollLogDir(); 379 } catch (IOException ex) { 380 throwMetricsException("Failed to create new log file", ex); 381 } 382 383 scheduleFlush(now); 384 } 385 } 386 } 387 388 /** 389 * Schedule the current hour's directory to be flushed at the top of the next 390 * hour. If this ends up running after the top of the next hour, it will 391 * execute immediately. 392 * 393 * @param now the current time 394 */ 395 private void scheduleFlush(Date now) { 396 // Store the current currentDirPath to close later 397 final PrintStream toClose = currentOutStream; 398 Calendar next = Calendar.getInstance(); 399 400 next.setTime(now); 401 402 if (flushQuickly) { 403 // If we're running unit tests, flush after a short pause 404 next.add(Calendar.MILLISECOND, 400); 405 } else { 406 // Otherwise flush at the top of the hour 407 next.set(Calendar.SECOND, 0); 408 next.set(Calendar.MINUTE, 0); 409 next.add(Calendar.HOUR, 1); 410 } 411 412 flushTimer.schedule(new TimerTask() { 413 @Override 414 public void run() { 415 synchronized (lock) { 416 // This close may have already been done by a putMetrics() call. If it 417 // has, the PrintStream will swallow the exception, which is fine. 418 toClose.close(); 419 } 420 421 hasFlushed = true; 422 } 423 }, next.getTime()); 424 } 425 426 /** 427 * Create a new directory based on the current hour and a new log file in 428 * that directory. 429 * 430 * @throws IOException thrown if an error occurs while creating the 431 * new directory or new log file 432 */ 433 private void rollLogDir() throws IOException { 434 String fileName = 435 source + "-" + InetAddress.getLocalHost().getHostName() + ".log"; 436 437 Path targetFile = new Path(currentDirPath, fileName); 438 fileSystem.mkdirs(currentDirPath); 439 440 if (allowAppend) { 441 createOrAppendLogFile(targetFile); 442 } else { 443 createLogFile(targetFile); 444 } 445 } 446 447 /** 448 * Create a new log file and return the {@link FSDataOutputStream}. If a 449 * file with the specified path already exists, add a suffix, starting with 1 450 * and try again. Keep incrementing the suffix until a nonexistent target 451 * path is found. 452 * 453 * Once the file is open, update {@link #currentFSOutStream}, 454 * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately. 455 * 456 * @param initial the target path 457 * @throws IOException thrown if the call to see if the exists fails 458 */ 459 private void createLogFile(Path initial) throws IOException { 460 Path currentAttempt = initial; 461 // Start at 0 so that if the base filname exists, we start with the suffix 462 // ".1". 463 int id = 0; 464 465 while (true) { 466 // First try blindly creating the file. If we fail, it either means 467 // the file exists, or the operation actually failed. We do it this way 468 // because if we check whether the file exists, it might still be created 469 // by the time we try to create it. Creating first works like a 470 // test-and-set. 471 try { 472 currentFSOutStream = fileSystem.create(currentAttempt, false); 473 currentOutStream = new PrintStream(currentFSOutStream, true, 474 StandardCharsets.UTF_8.name()); 475 currentFilePath = currentAttempt; 476 break; 477 } catch (IOException ex) { 478 // Now we can check to see if the file exists to know why we failed 479 if (fileSystem.exists(currentAttempt)) { 480 id = getNextIdToTry(initial, id); 481 currentAttempt = new Path(initial.toString() + "." + id); 482 } else { 483 throw ex; 484 } 485 } 486 } 487 } 488 489 /** 490 * Return the next ID suffix to use when creating the log file. This method 491 * will look at the files in the directory, find the one with the highest 492 * ID suffix, and 1 to that suffix, and return it. This approach saves a full 493 * linear probe, which matters in the case where there are a large number of 494 * log files. 495 * 496 * @param initial the base file path 497 * @param lastId the last ID value that was used 498 * @return the next ID to try 499 * @throws IOException thrown if there's an issue querying the files in the 500 * directory 501 */ 502 private int getNextIdToTry(Path initial, int lastId) 503 throws IOException { 504 RemoteIterator<LocatedFileStatus> files = 505 fileSystem.listFiles(currentDirPath, true); 506 String base = initial.toString(); 507 int id = lastId; 508 509 while (files.hasNext()) { 510 String file = files.next().getPath().getName(); 511 512 if (file.startsWith(base)) { 513 int fileId = extractId(file); 514 515 if (fileId > id) { 516 id = fileId; 517 } 518 } 519 } 520 521 // Return either 1 more than the highest we found or 1 more than the last 522 // ID used (if no ID was found). 523 return id + 1; 524 } 525 526 /** 527 * Extract the ID from the suffix of the given file name. 528 * 529 * @param file the file name 530 * @return the ID or -1 if no ID could be extracted 531 */ 532 private int extractId(String file) { 533 int index = file.lastIndexOf("."); 534 int id = -1; 535 536 // A hostname has to have at least 1 character 537 if (index > 0) { 538 try { 539 id = Integer.parseInt(file.substring(index + 1)); 540 } catch (NumberFormatException ex) { 541 // This can happen if there's no suffix, but there is a dot in the 542 // hostname. Just ignore it. 543 } 544 } 545 546 return id; 547 } 548 549 /** 550 * Create a new log file and return the {@link FSDataOutputStream}. If a 551 * file with the specified path already exists, open the file for append 552 * instead. 553 * 554 * Once the file is open, update {@link #currentFSOutStream}, 555 * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately. 556 * 557 * @param initial the target path 558 * @throws IOException thrown if the call to see the append operation fails. 559 */ 560 private void createOrAppendLogFile(Path targetFile) throws IOException { 561 // First try blindly creating the file. If we fail, it either means 562 // the file exists, or the operation actually failed. We do it this way 563 // because if we check whether the file exists, it might still be created 564 // by the time we try to create it. Creating first works like a 565 // test-and-set. 566 try { 567 currentFSOutStream = fileSystem.create(targetFile, false); 568 currentOutStream = new PrintStream(currentFSOutStream, true, 569 StandardCharsets.UTF_8.name()); 570 } catch (IOException ex) { 571 // Try appending instead. If we fail, if means the file doesn't 572 // actually exist yet or the operation actually failed. 573 try { 574 currentFSOutStream = fileSystem.append(targetFile); 575 currentOutStream = new PrintStream(currentFSOutStream, true, 576 StandardCharsets.UTF_8.name()); 577 } catch (IOException ex2) { 578 // If the original create failed for a legit but transitory 579 // reason, the append will fail because the file now doesn't exist, 580 // resulting in a confusing stack trace. To avoid that, we set 581 // the cause of the second exception to be the first exception. 582 // It's still a tiny bit confusing, but it's enough 583 // information that someone should be able to figure it out. 584 ex2.initCause(ex); 585 586 throw ex2; 587 } 588 } 589 590 currentFilePath = targetFile; 591 } 592 593 @Override 594 public void putMetrics(MetricsRecord record) { 595 synchronized (lock) { 596 rollLogDirIfNeeded(); 597 598 if (currentOutStream != null) { 599 currentOutStream.printf("%d %s.%s", record.timestamp(), 600 record.context(), record.name()); 601 602 String separator = ": "; 603 604 for (MetricsTag tag : record.tags()) { 605 currentOutStream.printf("%s%s=%s", separator, tag.name(), 606 tag.value()); 607 separator = ", "; 608 } 609 610 for (AbstractMetric metric : record.metrics()) { 611 currentOutStream.printf("%s%s=%s", separator, metric.name(), 612 metric.value()); 613 } 614 615 currentOutStream.println(); 616 617 // If we don't hflush(), the data may not be written until the file is 618 // closed. The file won't be closed until the top of the hour *AND* 619 // another record is received. Calling hflush() makes sure that the data 620 // is complete at the top of the hour. 621 try { 622 currentFSOutStream.hflush(); 623 } catch (IOException ex) { 624 throwMetricsException("Failed flushing the stream", ex); 625 } 626 627 checkForErrors("Unable to write to log file"); 628 } else if (!ignoreError) { 629 throwMetricsException("Unable to write to log file"); 630 } 631 } 632 } 633 634 @Override 635 public void flush() { 636 synchronized (lock) { 637 // currentOutStream is null if currentFSOutStream is null 638 if (currentFSOutStream != null) { 639 try { 640 currentFSOutStream.hflush(); 641 } catch (IOException ex) { 642 throwMetricsException("Unable to flush log file", ex); 643 } 644 } 645 } 646 } 647 648 @Override 649 public void close() { 650 synchronized (lock) { 651 if (currentOutStream != null) { 652 currentOutStream.close(); 653 654 try { 655 checkForErrors("Unable to close log file"); 656 } finally { 657 // Null out the streams just in case someone tries to reuse us. 658 currentOutStream = null; 659 currentFSOutStream = null; 660 } 661 } 662 } 663 } 664 665 /** 666 * If the sink isn't set to ignore errors, throw a {@link MetricsException} 667 * if the stream encountered an exception. The message parameter will be used 668 * as the new exception's message with the current file name 669 * ({@link #currentFilePath}) appended to it. 670 * 671 * @param message the exception message. The message will have the current 672 * file name ({@link #currentFilePath}) appended to it. 673 * @throws MetricsException thrown if there was an error and the sink isn't 674 * ignoring errors 675 */ 676 private void checkForErrors(String message) 677 throws MetricsException { 678 if (!ignoreError && currentOutStream.checkError()) { 679 throw new MetricsException(message + ": " + currentFilePath); 680 } 681 } 682 683 /** 684 * If the sink isn't set to ignore errors, wrap the Throwable in a 685 * {@link MetricsException} and throw it. The message parameter will be used 686 * as the new exception's message with the current file name 687 * ({@link #currentFilePath}) and the Throwable's string representation 688 * appended to it. 689 * 690 * @param message the exception message. The message will have the current 691 * file name ({@link #currentFilePath}) and the Throwable's string 692 * representation appended to it. 693 * @param t the Throwable to wrap 694 */ 695 private void throwMetricsException(String message, Throwable t) { 696 if (!ignoreError) { 697 throw new MetricsException(message + ": " + currentFilePath + " [" 698 + t.toString() + "]", t); 699 } 700 } 701 702 /** 703 * If the sink isn't set to ignore errors, throw a new 704 * {@link MetricsException}. The message parameter will be used as the 705 * new exception's message with the current file name 706 * ({@link #currentFilePath}) appended to it. 707 * 708 * @param message the exception message. The message will have the current 709 * file name ({@link #currentFilePath}) appended to it. 710 */ 711 private void throwMetricsException(String message) { 712 if (!ignoreError) { 713 throw new MetricsException(message + ": " + currentFilePath); 714 } 715 } 716}