001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hdfs; 020 021import com.google.common.collect.Iterators; 022import org.apache.hadoop.classification.InterfaceAudience; 023import org.apache.hadoop.classification.InterfaceStability; 024import org.apache.hadoop.hdfs.inotify.EventBatch; 025import org.apache.hadoop.hdfs.inotify.EventBatchList; 026import org.apache.hadoop.hdfs.inotify.MissingEventsException; 027import org.apache.hadoop.hdfs.protocol.ClientProtocol; 028import org.apache.hadoop.util.Time; 029import org.apache.htrace.core.TraceScope; 030import org.apache.htrace.core.Tracer; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import java.io.IOException; 035import java.util.Iterator; 036import java.util.Random; 037import java.util.concurrent.TimeUnit; 038 039/** 040 * Stream for reading inotify events. DFSInotifyEventInputStreams should not 041 * be shared among multiple threads. 042 */ 043@InterfaceAudience.Public 044@InterfaceStability.Unstable 045public class DFSInotifyEventInputStream { 046 public static final Logger LOG = LoggerFactory.getLogger( 047 DFSInotifyEventInputStream.class); 048 049 private final ClientProtocol namenode; 050 private Iterator<EventBatch> it; 051 private long lastReadTxid; 052 /** 053 * The most recent txid the NameNode told us it has sync'ed -- helps us 054 * determine how far behind we are in the edit stream. 055 */ 056 private long syncTxid; 057 /** 058 * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}. 059 */ 060 private Random rng = new Random(); 061 062 private final Tracer tracer; 063 064 private static final int INITIAL_WAIT_MS = 10; 065 066 DFSInotifyEventInputStream(ClientProtocol namenode, Tracer tracer) 067 throws IOException { 068 // Only consider new transaction IDs. 069 this(namenode, tracer, namenode.getCurrentEditLogTxid()); 070 } 071 072 DFSInotifyEventInputStream(ClientProtocol namenode, Tracer tracer, 073 long lastReadTxid) { 074 this.namenode = namenode; 075 this.it = Iterators.emptyIterator(); 076 this.lastReadTxid = lastReadTxid; 077 this.tracer = tracer; 078 } 079 080 /** 081 * Returns the next batch of events in the stream or null if no new 082 * batches are currently available. 083 * 084 * @throws IOException because of network error or edit log 085 * corruption. Also possible if JournalNodes are unresponsive in the 086 * QJM setting (even one unresponsive JournalNode is enough in rare cases), 087 * so catching this exception and retrying at least a few times is 088 * recommended. 089 * @throws MissingEventsException if we cannot return the next batch in the 090 * stream because the data for the events (and possibly some subsequent 091 * events) has been deleted (generally because this stream is a very large 092 * number of transactions behind the current state of the NameNode). It is 093 * safe to continue reading from the stream after this exception is thrown 094 * The next available batch of events will be returned. 095 */ 096 public EventBatch poll() throws IOException, MissingEventsException { 097 try (TraceScope ignored = tracer.newScope("inotifyPoll")) { 098 // need to keep retrying until the NN sends us the latest committed txid 099 if (lastReadTxid == -1) { 100 LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); 101 lastReadTxid = namenode.getCurrentEditLogTxid(); 102 return null; 103 } 104 if (!it.hasNext()) { 105 EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); 106 if (el.getLastTxid() != -1) { 107 // we only want to set syncTxid when we were actually able to read some 108 // edits on the NN -- otherwise it will seem like edits are being 109 // generated faster than we can read them when the problem is really 110 // that we are temporarily unable to read edits 111 syncTxid = el.getSyncTxid(); 112 it = el.getBatches().iterator(); 113 long formerLastReadTxid = lastReadTxid; 114 lastReadTxid = el.getLastTxid(); 115 if (el.getFirstTxid() != formerLastReadTxid + 1) { 116 throw new MissingEventsException(formerLastReadTxid + 1, 117 el.getFirstTxid()); 118 } 119 } else { 120 LOG.debug("poll(): read no edits from the NN when requesting edits " + 121 "after txid {}", lastReadTxid); 122 return null; 123 } 124 } 125 126 if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the 127 // newly seen edit log ops actually got converted to events 128 return it.next(); 129 } else { 130 return null; 131 } 132 } 133 } 134 135 /** 136 * Return a estimate of how many transaction IDs behind the NameNode's 137 * current state this stream is. Clients should periodically call this method 138 * and check if its result is steadily increasing, which indicates that they 139 * are falling behind (i.e. transaction are being generated faster than the 140 * client is reading them). If a client falls too far behind events may be 141 * deleted before the client can read them. 142 * <p/> 143 * A return value of -1 indicates that an estimate could not be produced, and 144 * should be ignored. The value returned by this method is really only useful 145 * when compared to previous or subsequent returned values. 146 */ 147 public long getTxidsBehindEstimate() { 148 if (syncTxid == 0) { 149 return -1; 150 } else { 151 assert syncTxid >= lastReadTxid; 152 // this gives the difference between the last txid we have fetched to the 153 // client and syncTxid at the time we last fetched events from the 154 // NameNode 155 return syncTxid - lastReadTxid; 156 } 157 } 158 159 /** 160 * Returns the next event batch in the stream, waiting up to the specified 161 * amount of time for a new batch. Returns null if one is not available at the 162 * end of the specified amount of time. The time before the method returns may 163 * exceed the specified amount of time by up to the time required for an RPC 164 * to the NameNode. 165 * 166 * @param time number of units of the given TimeUnit to wait 167 * @param tu the desired TimeUnit 168 * @throws IOException see {@link DFSInotifyEventInputStream#poll()} 169 * @throws MissingEventsException 170 * see {@link DFSInotifyEventInputStream#poll()} 171 * @throws InterruptedException if the calling thread is interrupted 172 */ 173 public EventBatch poll(long time, TimeUnit tu) throws IOException, 174 InterruptedException, MissingEventsException { 175 EventBatch next; 176 try (TraceScope ignored = tracer.newScope("inotifyPollWithTimeout")) { 177 long initialTime = Time.monotonicNow(); 178 long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); 179 long nextWait = INITIAL_WAIT_MS; 180 while ((next = poll()) == null) { 181 long timeLeft = totalWait - (Time.monotonicNow() - initialTime); 182 if (timeLeft <= 0) { 183 LOG.debug("timed poll(): timed out"); 184 break; 185 } else if (timeLeft < nextWait * 2) { 186 nextWait = timeLeft; 187 } else { 188 nextWait *= 2; 189 } 190 LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", 191 nextWait); 192 Thread.sleep(nextWait); 193 } 194 } 195 return next; 196 } 197 198 /** 199 * Returns the next batch of events in the stream, waiting indefinitely if 200 * a new batch is not immediately available. 201 * 202 * @throws IOException see {@link DFSInotifyEventInputStream#poll()} 203 * @throws MissingEventsException see 204 * {@link DFSInotifyEventInputStream#poll()} 205 * @throws InterruptedException if the calling thread is interrupted 206 */ 207 public EventBatch take() throws IOException, InterruptedException, 208 MissingEventsException { 209 EventBatch next; 210 try (TraceScope ignored = tracer.newScope("inotifyTake")) { 211 int nextWaitMin = INITIAL_WAIT_MS; 212 while ((next = poll()) == null) { 213 // sleep for a random period between nextWaitMin and nextWaitMin * 2 214 // to avoid stampedes at the NN if there are multiple clients 215 int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); 216 LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); 217 Thread.sleep(sleepTime); 218 // the maximum sleep is 2 minutes 219 nextWaitMin = Math.min(60000, nextWaitMin * 2); 220 } 221 } 222 223 return next; 224 } 225}