001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.sink; 020 021import com.google.common.annotations.VisibleForTesting; 022import java.io.Closeable; 023import java.io.IOException; 024import java.io.PrintStream; 025import java.net.InetAddress; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.nio.charset.StandardCharsets; 029import java.util.Calendar; 030import java.util.Date; 031import java.util.TimeZone; 032import java.util.Timer; 033import java.util.TimerTask; 034import java.util.concurrent.ThreadLocalRandom; 035import java.util.concurrent.TimeUnit; 036import java.util.regex.Matcher; 037import java.util.regex.Pattern; 038 039import org.apache.commons.configuration.SubsetConfiguration; 040import org.apache.commons.lang.time.FastDateFormat; 041import org.apache.hadoop.classification.InterfaceAudience; 042import org.apache.hadoop.classification.InterfaceStability; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataOutputStream; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.LocatedFileStatus; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.fs.RemoteIterator; 049import org.apache.hadoop.metrics2.AbstractMetric; 050import org.apache.hadoop.metrics2.MetricsException; 051import org.apache.hadoop.metrics2.MetricsRecord; 052import org.apache.hadoop.metrics2.MetricsSink; 053import org.apache.hadoop.metrics2.MetricsTag; 054import org.apache.hadoop.security.SecurityUtil; 055import org.apache.hadoop.security.UserGroupInformation; 056 057/** 058 * <p>This class is a metrics sink that uses 059 * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every 060 * roll interval a new directory will be created under the path specified by the 061 * <code>basepath</code> property. All metrics will be logged to a file in the 062 * current interval's directory in a file named <hostname>.log, where 063 * <hostname> is the name of the host on which the metrics logging 064 * process is running. The base path is set by the 065 * <code><prefix>.sink.<instance>.basepath</code> property. The 066 * time zone used to create the current interval's directory name is GMT. If 067 * the <code>basepath</code> property isn't specified, it will default to 068 * "/tmp", which is the temp directory on whatever default file 069 * system is configured for the cluster.</p> 070 * 071 * <p>The <code><prefix>.sink.<instance>.ignore-error</code> 072 * property controls whether an exception is thrown when an error is encountered 073 * writing a log file. The default value is <code>true</code>. When set to 074 * <code>false</code>, file errors are quietly swallowed.</p> 075 * 076 * <p>The <code>roll-interval</code> property sets the amount of time before 077 * rolling the directory. The default value is 1 hour. The roll interval may 078 * not be less than 1 minute. The property's value should be given as 079 * <i>number unit</i>, where <i>number</i> is an integer value, and 080 * <i>unit</i> is a valid unit. Valid units are <i>minute</i>, <i>hour</i>, 081 * and <i>day</i>. The units are case insensitive and may be abbreviated or 082 * plural. If no units are specified, hours are assumed. For example, 083 * "2", "2h", "2 hour", and 084 * "2 hours" are all valid ways to specify two hours.</p> 085 * 086 * <p>The <code>roll-offset-interval-millis</code> property sets the upper 087 * bound on a random time interval (in milliseconds) that is used to delay 088 * before the initial roll. All subsequent rolls will happen an integer 089 * number of roll intervals after the initial roll, hence retaining the original 090 * offset. The purpose of this property is to insert some variance in the roll 091 * times so that large clusters using this sink on every node don't cause a 092 * performance impact on HDFS by rolling simultaneously. The default value is 093 * 30000 (30s). When writing to HDFS, as a rule of thumb, the roll offset in 094 * millis should be no less than the number of sink instances times 5. 095 * 096 * <p>The primary use of this class is for logging to HDFS. As it uses 097 * {@link org.apache.hadoop.fs.FileSystem} to access the target file system, 098 * however, it can be used to write to the local file system, Amazon S3, or any 099 * other supported file system. The base path for the sink will determine the 100 * file system used. An unqualified path will write to the default file system 101 * set by the configuration.</p> 102 * 103 * <p>Not all file systems support the ability to append to files. In file 104 * systems without the ability to append to files, only one writer can write to 105 * a file at a time. To allow for concurrent writes from multiple daemons on a 106 * single host, the <code>source</code> property is used to set unique headers 107 * for the log files. The property should be set to the name of 108 * the source daemon, e.g. <i>namenode</i>. The value of the 109 * <code>source</code> property should typically be the same as the property's 110 * prefix. If this property is not set, the source is taken to be 111 * <i>unknown</i>.</p> 112 * 113 * <p>Instead of appending to an existing file, by default the sink 114 * will create a new file with a suffix of ".<n>&quet;, where 115 * <i>n</i> is the next lowest integer that isn't already used in a file name, 116 * similar to the Hadoop daemon logs. NOTE: the file with the <b>highest</b> 117 * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.</p> 118 * 119 * <p>For file systems that allow append, the sink supports appending to the 120 * existing file instead. If the <code>allow-append</code> property is set to 121 * true, the sink will instead append to the existing file on file systems that 122 * support appends. By default, the <code>allow-append</code> property is 123 * false.</p> 124 * 125 * <p>Note that when writing to HDFS with <code>allow-append</code> set to true, 126 * there is a minimum acceptable number of data nodes. If the number of data 127 * nodes drops below that minimum, the append will succeed, but reading the 128 * data will fail with an IOException in the DataStreamer class. The minimum 129 * number of data nodes required for a successful append is generally 2 or 130 * 3.</p> 131 * 132 * <p>Note also that when writing to HDFS, the file size information is not 133 * updated until the file is closed (at the end of the interval) even though 134 * the data is being written successfully. This is a known HDFS limitation that 135 * exists because of the performance cost of updating the metadata. See 136 * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p> 137 * 138 * <p>When using this sink in a secure (Kerberos) environment, two additional 139 * properties must be set: <code>keytab-key</code> and 140 * <code>principal-key</code>. <code>keytab-key</code> should contain the key by 141 * which the keytab file can be found in the configuration, for example, 142 * <code>yarn.nodemanager.keytab</code>. <code>principal-key</code> should 143 * contain the key by which the principal can be found in the configuration, 144 * for example, <code>yarn.nodemanager.principal</code>. 145 */ 146@InterfaceAudience.Public 147@InterfaceStability.Evolving 148public class RollingFileSystemSink implements MetricsSink, Closeable { 149 private static final String BASEPATH_KEY = "basepath"; 150 private static final String SOURCE_KEY = "source"; 151 private static final String IGNORE_ERROR_KEY = "ignore-error"; 152 private static final boolean DEFAULT_IGNORE_ERROR = false; 153 private static final String ALLOW_APPEND_KEY = "allow-append"; 154 private static final boolean DEFAULT_ALLOW_APPEND = false; 155 private static final String KEYTAB_PROPERTY_KEY = "keytab-key"; 156 private static final String USERNAME_PROPERTY_KEY = "principal-key"; 157 private static final String ROLL_INTERVAL_KEY = "roll-interval"; 158 private static final String DEFAULT_ROLL_INTERVAL = "1h"; 159 private static final String ROLL_OFFSET_INTERVAL_MILLIS_KEY = 160 "roll-offset-interval-millis"; 161 private static final int DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS = 30000; 162 private static final String SOURCE_DEFAULT = "unknown"; 163 private static final String BASEPATH_DEFAULT = "/tmp"; 164 private static final FastDateFormat DATE_FORMAT = 165 FastDateFormat.getInstance("yyyyMMddHHmm", TimeZone.getTimeZone("GMT")); 166 private final Object lock = new Object(); 167 private boolean initialized = false; 168 private SubsetConfiguration properties; 169 private Configuration conf; 170 @VisibleForTesting 171 protected String source; 172 @VisibleForTesting 173 protected boolean ignoreError; 174 @VisibleForTesting 175 protected boolean allowAppend; 176 @VisibleForTesting 177 protected Path basePath; 178 private FileSystem fileSystem; 179 // The current directory path into which we're writing files 180 private Path currentDirPath; 181 // The path to the current file into which we're writing data 182 private Path currentFilePath; 183 // The stream to which we're currently writing. 184 private PrintStream currentOutStream; 185 // We keep this only to be able to call hsynch() on it. 186 private FSDataOutputStream currentFSOutStream; 187 private Timer flushTimer; 188 // The amount of time between rolls 189 @VisibleForTesting 190 protected long rollIntervalMillis; 191 // The maximum amount of random time to add to the initial roll 192 @VisibleForTesting 193 protected long rollOffsetIntervalMillis; 194 // The time for the nextFlush 195 @VisibleForTesting 196 protected Calendar nextFlush = null; 197 // This flag when true causes a metrics write to schedule a flush thread to 198 // run immediately, but only if a flush thread is already scheduled. (It's a 199 // timing thing. If the first write forces the flush, it will strand the 200 // second write.) 201 @VisibleForTesting 202 protected static boolean forceFlush = false; 203 // This flag is used by the flusher thread to indicate that it has run. Used 204 // only for testing purposes. 205 @VisibleForTesting 206 protected static volatile boolean hasFlushed = false; 207 // Use this configuration instead of loading a new one. 208 @VisibleForTesting 209 protected static Configuration suppliedConf = null; 210 // Use this file system instead of getting a new one. 211 @VisibleForTesting 212 protected static FileSystem suppliedFilesystem = null; 213 214 /** 215 * Create an empty instance. Required for reflection. 216 */ 217 public RollingFileSystemSink() { 218 } 219 220 /** 221 * Create an instance for testing. 222 * 223 * @param flushIntervalMillis the roll interval in millis 224 * @param flushOffsetIntervalMillis the roll offset interval in millis 225 */ 226 @VisibleForTesting 227 protected RollingFileSystemSink(long flushIntervalMillis, 228 long flushOffsetIntervalMillis) { 229 this.rollIntervalMillis = flushIntervalMillis; 230 this.rollOffsetIntervalMillis = flushOffsetIntervalMillis; 231 } 232 233 @Override 234 public void init(SubsetConfiguration metrics2Properties) { 235 properties = metrics2Properties; 236 basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT)); 237 source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT); 238 ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, DEFAULT_IGNORE_ERROR); 239 allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, DEFAULT_ALLOW_APPEND); 240 rollOffsetIntervalMillis = 241 getNonNegative(ROLL_OFFSET_INTERVAL_MILLIS_KEY, 242 DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS); 243 rollIntervalMillis = getRollInterval(); 244 245 conf = loadConf(); 246 UserGroupInformation.setConfiguration(conf); 247 248 // Don't do secure setup if it's not needed. 249 if (UserGroupInformation.isSecurityEnabled()) { 250 // Validate config so that we don't get an NPE 251 checkIfPropertyExists(KEYTAB_PROPERTY_KEY); 252 checkIfPropertyExists(USERNAME_PROPERTY_KEY); 253 254 255 try { 256 // Login as whoever we're supposed to be and let the hostname be pulled 257 // from localhost. If security isn't enabled, this does nothing. 258 SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY), 259 properties.getString(USERNAME_PROPERTY_KEY)); 260 } catch (IOException ex) { 261 throw new MetricsException("Error logging in securely: [" 262 + ex.toString() + "]", ex); 263 } 264 } 265 } 266 267 /** 268 * Initialize the connection to HDFS and create the base directory. Also 269 * launch the flush thread. 270 */ 271 private boolean initFs() { 272 boolean success = false; 273 274 fileSystem = getFileSystem(); 275 276 // This step isn't strictly necessary, but it makes debugging issues much 277 // easier. We try to create the base directory eagerly and fail with 278 // copious debug info if it fails. 279 try { 280 fileSystem.mkdirs(basePath); 281 success = true; 282 } catch (Exception ex) { 283 if (!ignoreError) { 284 throw new MetricsException("Failed to create " + basePath + "[" 285 + SOURCE_KEY + "=" + source + ", " 286 + ALLOW_APPEND_KEY + "=" + allowAppend + ", " 287 + stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", " 288 + stringifySecurityProperty(USERNAME_PROPERTY_KEY) 289 + "] -- " + ex.toString(), ex); 290 } 291 } 292 293 if (success) { 294 // If we're permitted to append, check if we actually can 295 if (allowAppend) { 296 allowAppend = checkAppend(fileSystem); 297 } 298 299 flushTimer = new Timer("RollingFileSystemSink Flusher", true); 300 setInitialFlushTime(new Date()); 301 } 302 303 return success; 304 } 305 306 /** 307 * Turn a security property into a nicely formatted set of <i>name=value</i> 308 * strings, allowing for either the property or the configuration not to be 309 * set. 310 * 311 * @param property the property to stringify 312 * @return the stringified property 313 */ 314 private String stringifySecurityProperty(String property) { 315 String securityProperty; 316 317 if (properties.containsKey(property)) { 318 String propertyValue = properties.getString(property); 319 String confValue = conf.get(properties.getString(property)); 320 321 if (confValue != null) { 322 securityProperty = property + "=" + propertyValue 323 + ", " + properties.getString(property) + "=" + confValue; 324 } else { 325 securityProperty = property + "=" + propertyValue 326 + ", " + properties.getString(property) + "=<NOT SET>"; 327 } 328 } else { 329 securityProperty = property + "=<NOT SET>"; 330 } 331 332 return securityProperty; 333 } 334 335 /** 336 * Extract the roll interval from the configuration and return it in 337 * milliseconds. 338 * 339 * @return the roll interval in millis 340 */ 341 @VisibleForTesting 342 protected long getRollInterval() { 343 String rollInterval = 344 properties.getString(ROLL_INTERVAL_KEY, DEFAULT_ROLL_INTERVAL); 345 Pattern pattern = Pattern.compile("^\\s*(\\d+)\\s*([A-Za-z]*)\\s*$"); 346 Matcher match = pattern.matcher(rollInterval); 347 long millis; 348 349 if (match.matches()) { 350 String flushUnit = match.group(2); 351 int rollIntervalInt; 352 353 try { 354 rollIntervalInt = Integer.parseInt(match.group(1)); 355 } catch (NumberFormatException ex) { 356 throw new MetricsException("Unrecognized flush interval: " 357 + rollInterval + ". Must be a number followed by an optional " 358 + "unit. The unit must be one of: minute, hour, day", ex); 359 } 360 361 if ("".equals(flushUnit)) { 362 millis = TimeUnit.HOURS.toMillis(rollIntervalInt); 363 } else { 364 switch (flushUnit.toLowerCase()) { 365 case "m": 366 case "min": 367 case "minute": 368 case "minutes": 369 millis = TimeUnit.MINUTES.toMillis(rollIntervalInt); 370 break; 371 case "h": 372 case "hr": 373 case "hour": 374 case "hours": 375 millis = TimeUnit.HOURS.toMillis(rollIntervalInt); 376 break; 377 case "d": 378 case "day": 379 case "days": 380 millis = TimeUnit.DAYS.toMillis(rollIntervalInt); 381 break; 382 default: 383 throw new MetricsException("Unrecognized unit for flush interval: " 384 + flushUnit + ". Must be one of: minute, hour, day"); 385 } 386 } 387 } else { 388 throw new MetricsException("Unrecognized flush interval: " 389 + rollInterval + ". Must be a number followed by an optional unit." 390 + " The unit must be one of: minute, hour, day"); 391 } 392 393 if (millis < 60000) { 394 throw new MetricsException("The flush interval property must be " 395 + "at least 1 minute. Value was " + rollInterval); 396 } 397 398 return millis; 399 } 400 401 /** 402 * Return the property value if it's non-negative and throw an exception if 403 * it's not. 404 * 405 * @param key the property key 406 * @param defaultValue the default value 407 */ 408 private long getNonNegative(String key, int defaultValue) { 409 int flushOffsetIntervalMillis = properties.getInt(key, defaultValue); 410 411 if (flushOffsetIntervalMillis < 0) { 412 throw new MetricsException("The " + key + " property must be " 413 + "non-negative. Value was " + flushOffsetIntervalMillis); 414 } 415 416 return flushOffsetIntervalMillis; 417 } 418 419 /** 420 * Throw a {@link MetricsException} if the given property is not set. 421 * 422 * @param key the key to validate 423 */ 424 private void checkIfPropertyExists(String key) { 425 if (!properties.containsKey(key)) { 426 throw new MetricsException("Metrics2 configuration is missing " + key 427 + " property"); 428 } 429 } 430 431 /** 432 * Return the supplied configuration for testing or otherwise load a new 433 * configuration. 434 * 435 * @return the configuration to use 436 */ 437 private Configuration loadConf() { 438 Configuration c; 439 440 if (suppliedConf != null) { 441 c = suppliedConf; 442 } else { 443 // The config we're handed in init() isn't the one we want here, so we 444 // create a new one to pick up the full settings. 445 c = new Configuration(); 446 } 447 448 return c; 449 } 450 451 /** 452 * Return the supplied file system for testing or otherwise get a new file 453 * system. 454 * 455 * @return the file system to use 456 * @throws MetricsException thrown if the file system could not be retrieved 457 */ 458 private FileSystem getFileSystem() throws MetricsException { 459 FileSystem fs = null; 460 461 if (suppliedFilesystem != null) { 462 fs = suppliedFilesystem; 463 } else { 464 try { 465 fs = FileSystem.get(new URI(basePath.toString()), conf); 466 } catch (URISyntaxException ex) { 467 throw new MetricsException("The supplied filesystem base path URI" 468 + " is not a valid URI: " + basePath.toString(), ex); 469 } catch (IOException ex) { 470 throw new MetricsException("Error connecting to file system: " 471 + basePath + " [" + ex.toString() + "]", ex); 472 } 473 } 474 475 return fs; 476 } 477 478 /** 479 * Test whether the file system supports append and return the answer. 480 * 481 * @param fs the target file system 482 */ 483 private boolean checkAppend(FileSystem fs) { 484 boolean canAppend = true; 485 486 try { 487 fs.append(basePath); 488 } catch (IOException ex) { 489 if (ex.getMessage().equals("Not supported")) { 490 canAppend = false; 491 } 492 } 493 494 return canAppend; 495 } 496 497 /** 498 * Check the current directory against the time stamp. If they're not 499 * the same, create a new directory and a new log file in that directory. 500 * 501 * @throws MetricsException thrown if an error occurs while creating the 502 * new directory or new log file 503 */ 504 private void rollLogDirIfNeeded() throws MetricsException { 505 // Because we're working relative to the clock, we use a Date instead 506 // of Time.monotonicNow(). 507 Date now = new Date(); 508 509 // We check whether currentOutStream is null instead of currentDirPath, 510 // because if currentDirPath is null, then currentOutStream is null, but 511 // currentOutStream can be null for other reasons. Same for nextFlush. 512 if ((currentOutStream == null) || now.after(nextFlush.getTime())) { 513 // If we're not yet connected to HDFS, create the connection 514 if (!initialized) { 515 initialized = initFs(); 516 } 517 518 if (initialized) { 519 // Close the stream. This step could have been handled already by the 520 // flusher thread, but if it has, the PrintStream will just swallow the 521 // exception, which is fine. 522 if (currentOutStream != null) { 523 currentOutStream.close(); 524 } 525 526 currentDirPath = findCurrentDirectory(now); 527 528 try { 529 rollLogDir(); 530 } catch (IOException ex) { 531 throwMetricsException("Failed to create new log file", ex); 532 } 533 534 // Update the time of the next flush 535 updateFlushTime(now); 536 // Schedule the next flush at that time 537 scheduleFlush(nextFlush.getTime()); 538 } 539 } else if (forceFlush) { 540 scheduleFlush(new Date()); 541 } 542 } 543 544 /** 545 * Use the given time to determine the current directory. The current 546 * directory will be based on the {@link #rollIntervalMinutes}. 547 * 548 * @param now the current time 549 * @return the current directory 550 */ 551 private Path findCurrentDirectory(Date now) { 552 long offset = ((now.getTime() - nextFlush.getTimeInMillis()) 553 / rollIntervalMillis) * rollIntervalMillis; 554 String currentDir = 555 DATE_FORMAT.format(new Date(nextFlush.getTimeInMillis() + offset)); 556 557 return new Path(basePath, currentDir); 558 } 559 560 /** 561 * Schedule the current interval's directory to be flushed. If this ends up 562 * running after the top of the next interval, it will execute immediately. 563 * 564 * @param when the time the thread should run 565 */ 566 private void scheduleFlush(Date when) { 567 // Store the current currentDirPath to close later 568 final PrintStream toClose = currentOutStream; 569 570 flushTimer.schedule(new TimerTask() { 571 @Override 572 public void run() { 573 synchronized (lock) { 574 // This close may have already been done by a putMetrics() call. If it 575 // has, the PrintStream will swallow the exception, which is fine. 576 toClose.close(); 577 } 578 579 hasFlushed = true; 580 } 581 }, when); 582 } 583 584 /** 585 * Update the {@link #nextFlush} variable to the next flush time. Add 586 * an integer number of flush intervals, preserving the initial random offset. 587 * 588 * @param now the current time 589 */ 590 @VisibleForTesting 591 protected void updateFlushTime(Date now) { 592 // In non-initial rounds, add an integer number of intervals to the last 593 // flush until a time in the future is achieved, thus preserving the 594 // original random offset. 595 int millis = 596 (int) (((now.getTime() - nextFlush.getTimeInMillis()) 597 / rollIntervalMillis + 1) * rollIntervalMillis); 598 599 nextFlush.add(Calendar.MILLISECOND, millis); 600 } 601 602 /** 603 * Set the {@link #nextFlush} variable to the initial flush time. The initial 604 * flush will be an integer number of flush intervals past the beginning of 605 * the current hour and will have a random offset added, up to 606 * {@link #rollOffsetIntervalMillis}. The initial flush will be a time in 607 * past that can be used from which to calculate future flush times. 608 * 609 * @param now the current time 610 */ 611 @VisibleForTesting 612 protected void setInitialFlushTime(Date now) { 613 // Start with the beginning of the current hour 614 nextFlush = Calendar.getInstance(); 615 nextFlush.setTime(now); 616 nextFlush.set(Calendar.MILLISECOND, 0); 617 nextFlush.set(Calendar.SECOND, 0); 618 nextFlush.set(Calendar.MINUTE, 0); 619 620 // In the first round, calculate the first flush as the largest number of 621 // intervals from the beginning of the current hour that's not in the 622 // future by: 623 // 1. Subtract the beginning of the hour from the current time 624 // 2. Divide by the roll interval and round down to get the number of whole 625 // intervals that have passed since the beginning of the hour 626 // 3. Multiply by the roll interval to get the number of millis between 627 // the beginning of the current hour and the beginning of the current 628 // interval. 629 int millis = (int) (((now.getTime() - nextFlush.getTimeInMillis()) 630 / rollIntervalMillis) * rollIntervalMillis); 631 632 // Then add some noise to help prevent all the nodes from 633 // closing their files at the same time. 634 if (rollOffsetIntervalMillis > 0) { 635 millis += ThreadLocalRandom.current().nextLong(rollOffsetIntervalMillis); 636 637 // If the added time puts us into the future, step back one roll interval 638 // because the code to increment nextFlush to the next flush expects that 639 // nextFlush is the next flush from the previous interval. There wasn't 640 // a previous interval, so we just fake it with the time in the past that 641 // would have been the previous interval if there had been one. 642 // 643 // It's OK if millis comes out negative. 644 while (nextFlush.getTimeInMillis() + millis > now.getTime()) { 645 millis -= rollIntervalMillis; 646 } 647 } 648 649 // Adjust the next flush time by millis to get the time of our ficticious 650 // previous next flush 651 nextFlush.add(Calendar.MILLISECOND, millis); 652 } 653 654 /** 655 * Create a new directory based on the current interval and a new log file in 656 * that directory. 657 * 658 * @throws IOException thrown if an error occurs while creating the 659 * new directory or new log file 660 */ 661 private void rollLogDir() throws IOException { 662 String fileName = 663 source + "-" + InetAddress.getLocalHost().getHostName() + ".log"; 664 665 Path targetFile = new Path(currentDirPath, fileName); 666 fileSystem.mkdirs(currentDirPath); 667 668 if (allowAppend) { 669 createOrAppendLogFile(targetFile); 670 } else { 671 createLogFile(targetFile); 672 } 673 } 674 675 /** 676 * Create a new log file and return the {@link FSDataOutputStream}. If a 677 * file with the specified path already exists, add a suffix, starting with 1 678 * and try again. Keep incrementing the suffix until a nonexistent target 679 * path is found. 680 * 681 * Once the file is open, update {@link #currentFSOutStream}, 682 * {@link #currentOutStream}, and {@#link #currentFilePath} are set 683 * appropriately. 684 * 685 * @param initial the target path 686 * @throws IOException thrown if the call to see if the exists fails 687 */ 688 private void createLogFile(Path initial) throws IOException { 689 Path currentAttempt = initial; 690 // Start at 0 so that if the base filname exists, we start with the suffix 691 // ".1". 692 int id = 0; 693 694 while (true) { 695 // First try blindly creating the file. If we fail, it either means 696 // the file exists, or the operation actually failed. We do it this way 697 // because if we check whether the file exists, it might still be created 698 // by the time we try to create it. Creating first works like a 699 // test-and-set. 700 try { 701 currentFSOutStream = fileSystem.create(currentAttempt, false); 702 currentOutStream = new PrintStream(currentFSOutStream, true, 703 StandardCharsets.UTF_8.name()); 704 currentFilePath = currentAttempt; 705 break; 706 } catch (IOException ex) { 707 // Now we can check to see if the file exists to know why we failed 708 if (fileSystem.exists(currentAttempt)) { 709 id = getNextIdToTry(initial, id); 710 currentAttempt = new Path(initial.toString() + "." + id); 711 } else { 712 throw ex; 713 } 714 } 715 } 716 } 717 718 /** 719 * Return the next ID suffix to use when creating the log file. This method 720 * will look at the files in the directory, find the one with the highest 721 * ID suffix, and 1 to that suffix, and return it. This approach saves a full 722 * linear probe, which matters in the case where there are a large number of 723 * log files. 724 * 725 * @param initial the base file path 726 * @param lastId the last ID value that was used 727 * @return the next ID to try 728 * @throws IOException thrown if there's an issue querying the files in the 729 * directory 730 */ 731 private int getNextIdToTry(Path initial, int lastId) 732 throws IOException { 733 RemoteIterator<LocatedFileStatus> files = 734 fileSystem.listFiles(currentDirPath, true); 735 String base = initial.toString(); 736 int id = lastId; 737 738 while (files.hasNext()) { 739 String file = files.next().getPath().getName(); 740 741 if (file.startsWith(base)) { 742 int fileId = extractId(file); 743 744 if (fileId > id) { 745 id = fileId; 746 } 747 } 748 } 749 750 // Return either 1 more than the highest we found or 1 more than the last 751 // ID used (if no ID was found). 752 return id + 1; 753 } 754 755 /** 756 * Extract the ID from the suffix of the given file name. 757 * 758 * @param file the file name 759 * @return the ID or -1 if no ID could be extracted 760 */ 761 private int extractId(String file) { 762 int index = file.lastIndexOf("."); 763 int id = -1; 764 765 // A hostname has to have at least 1 character 766 if (index > 0) { 767 try { 768 id = Integer.parseInt(file.substring(index + 1)); 769 } catch (NumberFormatException ex) { 770 // This can happen if there's no suffix, but there is a dot in the 771 // hostname. Just ignore it. 772 } 773 } 774 775 return id; 776 } 777 778 /** 779 * Create a new log file and return the {@link FSDataOutputStream}. If a 780 * file with the specified path already exists, open the file for append 781 * instead. 782 * 783 * Once the file is open, update {@link #currentFSOutStream}, 784 * {@link #currentOutStream}, and {@#link #currentFilePath}. 785 * 786 * @param initial the target path 787 * @throws IOException thrown if the call to see the append operation fails. 788 */ 789 private void createOrAppendLogFile(Path targetFile) throws IOException { 790 // First try blindly creating the file. If we fail, it either means 791 // the file exists, or the operation actually failed. We do it this way 792 // because if we check whether the file exists, it might still be created 793 // by the time we try to create it. Creating first works like a 794 // test-and-set. 795 try { 796 currentFSOutStream = fileSystem.create(targetFile, false); 797 currentOutStream = new PrintStream(currentFSOutStream, true, 798 StandardCharsets.UTF_8.name()); 799 } catch (IOException ex) { 800 // Try appending instead. If we fail, if means the file doesn't 801 // actually exist yet or the operation actually failed. 802 try { 803 currentFSOutStream = fileSystem.append(targetFile); 804 currentOutStream = new PrintStream(currentFSOutStream, true, 805 StandardCharsets.UTF_8.name()); 806 } catch (IOException ex2) { 807 // If the original create failed for a legit but transitory 808 // reason, the append will fail because the file now doesn't exist, 809 // resulting in a confusing stack trace. To avoid that, we set 810 // the cause of the second exception to be the first exception. 811 // It's still a tiny bit confusing, but it's enough 812 // information that someone should be able to figure it out. 813 ex2.initCause(ex); 814 815 throw ex2; 816 } 817 } 818 819 currentFilePath = targetFile; 820 } 821 822 @Override 823 public void putMetrics(MetricsRecord record) { 824 synchronized (lock) { 825 rollLogDirIfNeeded(); 826 827 if (currentOutStream != null) { 828 currentOutStream.printf("%d %s.%s", record.timestamp(), 829 record.context(), record.name()); 830 831 String separator = ": "; 832 833 for (MetricsTag tag : record.tags()) { 834 currentOutStream.printf("%s%s=%s", separator, tag.name(), 835 tag.value()); 836 separator = ", "; 837 } 838 839 for (AbstractMetric metric : record.metrics()) { 840 currentOutStream.printf("%s%s=%s", separator, metric.name(), 841 metric.value()); 842 } 843 844 currentOutStream.println(); 845 846 // If we don't hflush(), the data may not be written until the file is 847 // closed. The file won't be closed until the end of the interval *AND* 848 // another record is received. Calling hflush() makes sure that the data 849 // is complete at the end of the interval. 850 try { 851 currentFSOutStream.hflush(); 852 } catch (IOException ex) { 853 throwMetricsException("Failed flushing the stream", ex); 854 } 855 856 checkForErrors("Unable to write to log file"); 857 } else if (!ignoreError) { 858 throwMetricsException("Unable to write to log file"); 859 } 860 } 861 } 862 863 @Override 864 public void flush() { 865 synchronized (lock) { 866 // currentOutStream is null if currentFSOutStream is null 867 if (currentFSOutStream != null) { 868 try { 869 currentFSOutStream.hflush(); 870 } catch (IOException ex) { 871 throwMetricsException("Unable to flush log file", ex); 872 } 873 } 874 } 875 } 876 877 @Override 878 public void close() { 879 synchronized (lock) { 880 if (currentOutStream != null) { 881 currentOutStream.close(); 882 883 try { 884 checkForErrors("Unable to close log file"); 885 } finally { 886 // Null out the streams just in case someone tries to reuse us. 887 currentOutStream = null; 888 currentFSOutStream = null; 889 } 890 } 891 } 892 } 893 894 /** 895 * If the sink isn't set to ignore errors, throw a {@link MetricsException} 896 * if the stream encountered an exception. The message parameter will be used 897 * as the new exception's message with the current file name 898 * ({@link #currentFilePath}) appended to it. 899 * 900 * @param message the exception message. The message will have a colon and 901 * the current file name ({@link #currentFilePath}) appended to it. 902 * @throws MetricsException thrown if there was an error and the sink isn't 903 * ignoring errors 904 */ 905 private void checkForErrors(String message) 906 throws MetricsException { 907 if (!ignoreError && currentOutStream.checkError()) { 908 throw new MetricsException(message + ": " + currentFilePath); 909 } 910 } 911 912 /** 913 * If the sink isn't set to ignore errors, wrap the Throwable in a 914 * {@link MetricsException} and throw it. The message parameter will be used 915 * as the new exception's message with the current file name 916 * ({@link #currentFilePath}) and the Throwable's string representation 917 * appended to it. 918 * 919 * @param message the exception message. The message will have a colon, the 920 * current file name ({@link #currentFilePath}), and the Throwable's string 921 * representation (wrapped in square brackets) appended to it. 922 * @param t the Throwable to wrap 923 */ 924 private void throwMetricsException(String message, Throwable t) { 925 if (!ignoreError) { 926 throw new MetricsException(message + ": " + currentFilePath + " [" 927 + t.toString() + "]", t); 928 } 929 } 930 931 /** 932 * If the sink isn't set to ignore errors, throw a new 933 * {@link MetricsException}. The message parameter will be used as the 934 * new exception's message with the current file name 935 * ({@link #currentFilePath}) appended to it. 936 * 937 * @param message the exception message. The message will have a colon and 938 * the current file name ({@link #currentFilePath}) appended to it. 939 */ 940 private void throwMetricsException(String message) { 941 if (!ignoreError) { 942 throw new MetricsException(message + ": " + currentFilePath); 943 } 944 } 945}