001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hdfs; 020 021import com.google.common.collect.Iterators; 022import org.apache.hadoop.classification.InterfaceAudience; 023import org.apache.hadoop.classification.InterfaceStability; 024import org.apache.hadoop.hdfs.inotify.EventBatch; 025import org.apache.hadoop.hdfs.inotify.EventBatchList; 026import org.apache.hadoop.hdfs.inotify.MissingEventsException; 027import org.apache.hadoop.hdfs.protocol.ClientProtocol; 028import org.apache.hadoop.util.Time; 029import org.apache.htrace.Sampler; 030import org.apache.htrace.Trace; 031import org.apache.htrace.TraceScope; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import java.io.IOException; 036import java.util.Iterator; 037import java.util.Random; 038import java.util.concurrent.TimeUnit; 039 040/** 041 * Stream for reading inotify events. DFSInotifyEventInputStreams should not 042 * be shared among multiple threads. 043 */ 044@InterfaceAudience.Public 045@InterfaceStability.Unstable 046public class DFSInotifyEventInputStream { 047 public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream 048 .class); 049 050 /** 051 * The trace sampler to use when making RPCs to the NameNode. 052 */ 053 private final Sampler<?> traceSampler; 054 055 private final ClientProtocol namenode; 056 private Iterator<EventBatch> it; 057 private long lastReadTxid; 058 /** 059 * The most recent txid the NameNode told us it has sync'ed -- helps us 060 * determine how far behind we are in the edit stream. 061 */ 062 private long syncTxid; 063 /** 064 * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}. 065 */ 066 private Random rng = new Random(); 067 068 private static final int INITIAL_WAIT_MS = 10; 069 070 DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode) 071 throws IOException { 072 // Only consider new transaction IDs. 073 this(traceSampler, namenode, namenode.getCurrentEditLogTxid()); 074 } 075 076 DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode, 077 long lastReadTxid) throws IOException { 078 this.traceSampler = traceSampler; 079 this.namenode = namenode; 080 this.it = Iterators.emptyIterator(); 081 this.lastReadTxid = lastReadTxid; 082 } 083 084 /** 085 * Returns the next batch of events in the stream or null if no new 086 * batches are currently available. 087 * 088 * @throws IOException because of network error or edit log 089 * corruption. Also possible if JournalNodes are unresponsive in the 090 * QJM setting (even one unresponsive JournalNode is enough in rare cases), 091 * so catching this exception and retrying at least a few times is 092 * recommended. 093 * @throws MissingEventsException if we cannot return the next batch in the 094 * stream because the data for the events (and possibly some subsequent 095 * events) has been deleted (generally because this stream is a very large 096 * number of transactions behind the current state of the NameNode). It is 097 * safe to continue reading from the stream after this exception is thrown 098 * The next available batch of events will be returned. 099 */ 100 public EventBatch poll() throws IOException, MissingEventsException { 101 TraceScope scope = 102 Trace.startSpan("inotifyPoll", traceSampler); 103 try { 104 // need to keep retrying until the NN sends us the latest committed txid 105 if (lastReadTxid == -1) { 106 LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); 107 lastReadTxid = namenode.getCurrentEditLogTxid(); 108 return null; 109 } 110 if (!it.hasNext()) { 111 EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); 112 if (el.getLastTxid() != -1) { 113 // we only want to set syncTxid when we were actually able to read some 114 // edits on the NN -- otherwise it will seem like edits are being 115 // generated faster than we can read them when the problem is really 116 // that we are temporarily unable to read edits 117 syncTxid = el.getSyncTxid(); 118 it = el.getBatches().iterator(); 119 long formerLastReadTxid = lastReadTxid; 120 lastReadTxid = el.getLastTxid(); 121 if (el.getFirstTxid() != formerLastReadTxid + 1) { 122 throw new MissingEventsException(formerLastReadTxid + 1, 123 el.getFirstTxid()); 124 } 125 } else { 126 LOG.debug("poll(): read no edits from the NN when requesting edits " + 127 "after txid {}", lastReadTxid); 128 return null; 129 } 130 } 131 132 if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the 133 // newly seen edit log ops actually got converted to events 134 return it.next(); 135 } else { 136 return null; 137 } 138 } finally { 139 scope.close(); 140 } 141 } 142 143 /** 144 * Return a estimate of how many transaction IDs behind the NameNode's 145 * current state this stream is. Clients should periodically call this method 146 * and check if its result is steadily increasing, which indicates that they 147 * are falling behind (i.e. transaction are being generated faster than the 148 * client is reading them). If a client falls too far behind events may be 149 * deleted before the client can read them. 150 * <p/> 151 * A return value of -1 indicates that an estimate could not be produced, and 152 * should be ignored. The value returned by this method is really only useful 153 * when compared to previous or subsequent returned values. 154 */ 155 public long getTxidsBehindEstimate() { 156 if (syncTxid == 0) { 157 return -1; 158 } else { 159 assert syncTxid >= lastReadTxid; 160 // this gives the difference between the last txid we have fetched to the 161 // client and syncTxid at the time we last fetched events from the 162 // NameNode 163 return syncTxid - lastReadTxid; 164 } 165 } 166 167 /** 168 * Returns the next event batch in the stream, waiting up to the specified 169 * amount of time for a new batch. Returns null if one is not available at the 170 * end of the specified amount of time. The time before the method returns may 171 * exceed the specified amount of time by up to the time required for an RPC 172 * to the NameNode. 173 * 174 * @param time number of units of the given TimeUnit to wait 175 * @param tu the desired TimeUnit 176 * @throws IOException see {@link DFSInotifyEventInputStream#poll()} 177 * @throws MissingEventsException 178 * see {@link DFSInotifyEventInputStream#poll()} 179 * @throws InterruptedException if the calling thread is interrupted 180 */ 181 public EventBatch poll(long time, TimeUnit tu) throws IOException, 182 InterruptedException, MissingEventsException { 183 TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler); 184 EventBatch next = null; 185 try { 186 long initialTime = Time.monotonicNow(); 187 long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); 188 long nextWait = INITIAL_WAIT_MS; 189 while ((next = poll()) == null) { 190 long timeLeft = totalWait - (Time.monotonicNow() - initialTime); 191 if (timeLeft <= 0) { 192 LOG.debug("timed poll(): timed out"); 193 break; 194 } else if (timeLeft < nextWait * 2) { 195 nextWait = timeLeft; 196 } else { 197 nextWait *= 2; 198 } 199 LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", 200 nextWait); 201 Thread.sleep(nextWait); 202 } 203 } finally { 204 scope.close(); 205 } 206 return next; 207 } 208 209 /** 210 * Returns the next batch of events in the stream, waiting indefinitely if 211 * a new batch is not immediately available. 212 * 213 * @throws IOException see {@link DFSInotifyEventInputStream#poll()} 214 * @throws MissingEventsException see 215 * {@link DFSInotifyEventInputStream#poll()} 216 * @throws InterruptedException if the calling thread is interrupted 217 */ 218 public EventBatch take() throws IOException, InterruptedException, 219 MissingEventsException { 220 TraceScope scope = Trace.startSpan("inotifyTake", traceSampler); 221 EventBatch next = null; 222 try { 223 int nextWaitMin = INITIAL_WAIT_MS; 224 while ((next = poll()) == null) { 225 // sleep for a random period between nextWaitMin and nextWaitMin * 2 226 // to avoid stampedes at the NN if there are multiple clients 227 int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); 228 LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); 229 Thread.sleep(sleepTime); 230 // the maximum sleep is 2 minutes 231 nextWaitMin = Math.min(60000, nextWaitMin * 2); 232 } 233 } finally { 234 scope.close(); 235 } 236 237 return next; 238 } 239}