001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hdfs;
020
021import com.google.common.collect.Iterators;
022import org.apache.hadoop.classification.InterfaceAudience;
023import org.apache.hadoop.classification.InterfaceStability;
024import org.apache.hadoop.hdfs.inotify.EventBatch;
025import org.apache.hadoop.hdfs.inotify.EventBatchList;
026import org.apache.hadoop.hdfs.inotify.MissingEventsException;
027import org.apache.hadoop.hdfs.protocol.ClientProtocol;
028import org.apache.hadoop.util.Time;
029import org.apache.htrace.core.TraceScope;
030import org.apache.htrace.core.Tracer;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import java.io.IOException;
035import java.util.Iterator;
036import java.util.Random;
037import java.util.concurrent.TimeUnit;
038
039/**
040 * Stream for reading inotify events. DFSInotifyEventInputStreams should not
041 * be shared among multiple threads.
042 */
043@InterfaceAudience.Public
044@InterfaceStability.Unstable
045public class DFSInotifyEventInputStream {
046  public static final Logger LOG = LoggerFactory.getLogger(
047      DFSInotifyEventInputStream.class);
048
049  private final ClientProtocol namenode;
050  private Iterator<EventBatch> it;
051  private long lastReadTxid;
052  /**
053   * The most recent txid the NameNode told us it has sync'ed -- helps us
054   * determine how far behind we are in the edit stream.
055   */
056  private long syncTxid;
057  /**
058   * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}.
059   */
060  private Random rng = new Random();
061
062  private final Tracer tracer;
063
064  private static final int INITIAL_WAIT_MS = 10;
065
066  DFSInotifyEventInputStream(ClientProtocol namenode, Tracer tracer)
067        throws IOException {
068    // Only consider new transaction IDs.
069    this(namenode, tracer, namenode.getCurrentEditLogTxid());
070  }
071
072  DFSInotifyEventInputStream(ClientProtocol namenode, Tracer tracer,
073      long lastReadTxid) {
074    this.namenode = namenode;
075    this.it = Iterators.emptyIterator();
076    this.lastReadTxid = lastReadTxid;
077    this.tracer = tracer;
078  }
079
080  /**
081   * Returns the next batch of events in the stream or null if no new
082   * batches are currently available.
083   *
084   * @throws IOException because of network error or edit log
085   * corruption. Also possible if JournalNodes are unresponsive in the
086   * QJM setting (even one unresponsive JournalNode is enough in rare cases),
087   * so catching this exception and retrying at least a few times is
088   * recommended.
089   * @throws MissingEventsException if we cannot return the next batch in the
090   * stream because the data for the events (and possibly some subsequent
091   * events) has been deleted (generally because this stream is a very large
092   * number of transactions behind the current state of the NameNode). It is
093   * safe to continue reading from the stream after this exception is thrown
094   * The next available batch of events will be returned.
095   */
096  public EventBatch poll() throws IOException, MissingEventsException {
097    try (TraceScope ignored = tracer.newScope("inotifyPoll")) {
098      // need to keep retrying until the NN sends us the latest committed txid
099      if (lastReadTxid == -1) {
100        LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
101        lastReadTxid = namenode.getCurrentEditLogTxid();
102        return null;
103      }
104      if (!it.hasNext()) {
105        EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
106        if (el.getLastTxid() != -1) {
107          // we only want to set syncTxid when we were actually able to read some
108          // edits on the NN -- otherwise it will seem like edits are being
109          // generated faster than we can read them when the problem is really
110          // that we are temporarily unable to read edits
111          syncTxid = el.getSyncTxid();
112          it = el.getBatches().iterator();
113          long formerLastReadTxid = lastReadTxid;
114          lastReadTxid = el.getLastTxid();
115          if (el.getFirstTxid() != formerLastReadTxid + 1) {
116            throw new MissingEventsException(formerLastReadTxid + 1,
117                el.getFirstTxid());
118          }
119        } else {
120          LOG.debug("poll(): read no edits from the NN when requesting edits " +
121              "after txid {}", lastReadTxid);
122          return null;
123        }
124      }
125
126      if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
127        // newly seen edit log ops actually got converted to events
128        return it.next();
129      } else {
130        return null;
131      }
132    }
133  }
134
135  /**
136   * Return a estimate of how many transaction IDs behind the NameNode's
137   * current state this stream is. Clients should periodically call this method
138   * and check if its result is steadily increasing, which indicates that they
139   * are falling behind (i.e. transaction are being generated faster than the
140   * client is reading them). If a client falls too far behind events may be
141   * deleted before the client can read them.
142   * <p/>
143   * A return value of -1 indicates that an estimate could not be produced, and
144   * should be ignored. The value returned by this method is really only useful
145   * when compared to previous or subsequent returned values.
146   */
147  public long getTxidsBehindEstimate() {
148    if (syncTxid == 0) {
149      return -1;
150    } else {
151      assert syncTxid >= lastReadTxid;
152      // this gives the difference between the last txid we have fetched to the
153      // client and syncTxid at the time we last fetched events from the
154      // NameNode
155      return syncTxid - lastReadTxid;
156    }
157  }
158
159  /**
160   * Returns the next event batch in the stream, waiting up to the specified
161   * amount of time for a new batch. Returns null if one is not available at the
162   * end of the specified amount of time. The time before the method returns may
163   * exceed the specified amount of time by up to the time required for an RPC
164   * to the NameNode.
165   *
166   * @param time number of units of the given TimeUnit to wait
167   * @param tu the desired TimeUnit
168   * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
169   * @throws MissingEventsException
170   * see {@link DFSInotifyEventInputStream#poll()}
171   * @throws InterruptedException if the calling thread is interrupted
172   */
173  public EventBatch poll(long time, TimeUnit tu) throws IOException,
174      InterruptedException, MissingEventsException {
175    EventBatch next;
176    try (TraceScope ignored = tracer.newScope("inotifyPollWithTimeout")) {
177      long initialTime = Time.monotonicNow();
178      long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
179      long nextWait = INITIAL_WAIT_MS;
180      while ((next = poll()) == null) {
181        long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
182        if (timeLeft <= 0) {
183          LOG.debug("timed poll(): timed out");
184          break;
185        } else if (timeLeft < nextWait * 2) {
186          nextWait = timeLeft;
187        } else {
188          nextWait *= 2;
189        }
190        LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
191            nextWait);
192        Thread.sleep(nextWait);
193      }
194    }
195    return next;
196  }
197
198  /**
199   * Returns the next batch of events in the stream, waiting indefinitely if
200   * a new batch  is not immediately available.
201   *
202   * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
203   * @throws MissingEventsException see
204   * {@link DFSInotifyEventInputStream#poll()}
205   * @throws InterruptedException if the calling thread is interrupted
206   */
207  public EventBatch take() throws IOException, InterruptedException,
208      MissingEventsException {
209    EventBatch next;
210    try (TraceScope ignored = tracer.newScope("inotifyTake")) {
211      int nextWaitMin = INITIAL_WAIT_MS;
212      while ((next = poll()) == null) {
213        // sleep for a random period between nextWaitMin and nextWaitMin * 2
214        // to avoid stampedes at the NN if there are multiple clients
215        int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
216        LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
217        Thread.sleep(sleepTime);
218        // the maximum sleep is 2 minutes
219        nextWaitMin = Math.min(60000, nextWaitMin * 2);
220      }
221    }
222
223    return next;
224  }
225}