001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hdfs;
020
021import com.google.common.collect.Iterators;
022import org.apache.hadoop.classification.InterfaceAudience;
023import org.apache.hadoop.classification.InterfaceStability;
024import org.apache.hadoop.hdfs.inotify.EventBatch;
025import org.apache.hadoop.hdfs.inotify.EventBatchList;
026import org.apache.hadoop.hdfs.inotify.MissingEventsException;
027import org.apache.hadoop.hdfs.protocol.ClientProtocol;
028import org.apache.hadoop.util.Time;
029import org.apache.htrace.Sampler;
030import org.apache.htrace.Trace;
031import org.apache.htrace.TraceScope;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import java.io.IOException;
036import java.util.Iterator;
037import java.util.Random;
038import java.util.concurrent.TimeUnit;
039
040/**
041 * Stream for reading inotify events. DFSInotifyEventInputStreams should not
042 * be shared among multiple threads.
043 */
044@InterfaceAudience.Public
045@InterfaceStability.Unstable
046public class DFSInotifyEventInputStream {
047  public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
048      .class);
049
050  /**
051   * The trace sampler to use when making RPCs to the NameNode.
052   */
053  private final Sampler<?> traceSampler;
054
055  private final ClientProtocol namenode;
056  private Iterator<EventBatch> it;
057  private long lastReadTxid;
058  /**
059   * The most recent txid the NameNode told us it has sync'ed -- helps us
060   * determine how far behind we are in the edit stream.
061   */
062  private long syncTxid;
063  /**
064   * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}.
065   */
066  private Random rng = new Random();
067
068  private static final int INITIAL_WAIT_MS = 10;
069
070  DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode)
071        throws IOException {
072    // Only consider new transaction IDs.
073    this(traceSampler, namenode, namenode.getCurrentEditLogTxid());
074  }
075
076  DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode,
077        long lastReadTxid) throws IOException {
078    this.traceSampler = traceSampler;
079    this.namenode = namenode;
080    this.it = Iterators.emptyIterator();
081    this.lastReadTxid = lastReadTxid;
082  }
083
084  /**
085   * Returns the next batch of events in the stream or null if no new
086   * batches are currently available.
087   *
088   * @throws IOException because of network error or edit log
089   * corruption. Also possible if JournalNodes are unresponsive in the
090   * QJM setting (even one unresponsive JournalNode is enough in rare cases),
091   * so catching this exception and retrying at least a few times is
092   * recommended.
093   * @throws MissingEventsException if we cannot return the next batch in the
094   * stream because the data for the events (and possibly some subsequent
095   * events) has been deleted (generally because this stream is a very large
096   * number of transactions behind the current state of the NameNode). It is
097   * safe to continue reading from the stream after this exception is thrown
098   * The next available batch of events will be returned.
099   */
100  public EventBatch poll() throws IOException, MissingEventsException {
101    TraceScope scope =
102        Trace.startSpan("inotifyPoll", traceSampler);
103    try {
104      // need to keep retrying until the NN sends us the latest committed txid
105      if (lastReadTxid == -1) {
106        LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
107        lastReadTxid = namenode.getCurrentEditLogTxid();
108        return null;
109      }
110      if (!it.hasNext()) {
111        EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
112        if (el.getLastTxid() != -1) {
113          // we only want to set syncTxid when we were actually able to read some
114          // edits on the NN -- otherwise it will seem like edits are being
115          // generated faster than we can read them when the problem is really
116          // that we are temporarily unable to read edits
117          syncTxid = el.getSyncTxid();
118          it = el.getBatches().iterator();
119          long formerLastReadTxid = lastReadTxid;
120          lastReadTxid = el.getLastTxid();
121          if (el.getFirstTxid() != formerLastReadTxid + 1) {
122            throw new MissingEventsException(formerLastReadTxid + 1,
123                el.getFirstTxid());
124          }
125        } else {
126          LOG.debug("poll(): read no edits from the NN when requesting edits " +
127            "after txid {}", lastReadTxid);
128          return null;
129        }
130      }
131
132      if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
133        // newly seen edit log ops actually got converted to events
134        return it.next();
135      } else {
136        return null;
137      }
138    } finally {
139      scope.close();
140    }
141  }
142
143  /**
144   * Return a estimate of how many transaction IDs behind the NameNode's
145   * current state this stream is. Clients should periodically call this method
146   * and check if its result is steadily increasing, which indicates that they
147   * are falling behind (i.e. transaction are being generated faster than the
148   * client is reading them). If a client falls too far behind events may be
149   * deleted before the client can read them.
150   * <p/>
151   * A return value of -1 indicates that an estimate could not be produced, and
152   * should be ignored. The value returned by this method is really only useful
153   * when compared to previous or subsequent returned values.
154   */
155  public long getTxidsBehindEstimate() {
156    if (syncTxid == 0) {
157      return -1;
158    } else {
159      assert syncTxid >= lastReadTxid;
160      // this gives the difference between the last txid we have fetched to the
161      // client and syncTxid at the time we last fetched events from the
162      // NameNode
163      return syncTxid - lastReadTxid;
164    }
165  }
166
167  /**
168   * Returns the next event batch in the stream, waiting up to the specified
169   * amount of time for a new batch. Returns null if one is not available at the
170   * end of the specified amount of time. The time before the method returns may
171   * exceed the specified amount of time by up to the time required for an RPC
172   * to the NameNode.
173   *
174   * @param time number of units of the given TimeUnit to wait
175   * @param tu the desired TimeUnit
176   * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
177   * @throws MissingEventsException
178   * see {@link DFSInotifyEventInputStream#poll()}
179   * @throws InterruptedException if the calling thread is interrupted
180   */
181  public EventBatch poll(long time, TimeUnit tu) throws IOException,
182      InterruptedException, MissingEventsException {
183    TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
184    EventBatch next = null;
185    try {
186      long initialTime = Time.monotonicNow();
187      long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
188      long nextWait = INITIAL_WAIT_MS;
189      while ((next = poll()) == null) {
190        long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
191        if (timeLeft <= 0) {
192          LOG.debug("timed poll(): timed out");
193          break;
194        } else if (timeLeft < nextWait * 2) {
195          nextWait = timeLeft;
196        } else {
197          nextWait *= 2;
198        }
199        LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
200            nextWait);
201        Thread.sleep(nextWait);
202      }
203    } finally {
204      scope.close();
205    }
206    return next;
207  }
208
209  /**
210   * Returns the next batch of events in the stream, waiting indefinitely if
211   * a new batch  is not immediately available.
212   *
213   * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
214   * @throws MissingEventsException see
215   * {@link DFSInotifyEventInputStream#poll()}
216   * @throws InterruptedException if the calling thread is interrupted
217   */
218  public EventBatch take() throws IOException, InterruptedException,
219      MissingEventsException {
220    TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
221    EventBatch next = null;
222    try {
223      int nextWaitMin = INITIAL_WAIT_MS;
224      while ((next = poll()) == null) {
225        // sleep for a random period between nextWaitMin and nextWaitMin * 2
226        // to avoid stampedes at the NN if there are multiple clients
227        int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
228        LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
229        Thread.sleep(sleepTime);
230        // the maximum sleep is 2 minutes
231        nextWaitMin = Math.min(60000, nextWaitMin * 2);
232      }
233    } finally {
234      scope.close();
235    }
236
237    return next;
238  }
239}