001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hdfs.web.oauth2;
020
021import com.squareup.okhttp.OkHttpClient;
022import com.squareup.okhttp.Request;
023import com.squareup.okhttp.RequestBody;
024import com.squareup.okhttp.Response;
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hdfs.web.URLConnectionFactory;
029import org.apache.hadoop.util.Timer;
030import org.apache.http.HttpStatus;
031import org.codehaus.jackson.map.ObjectMapper;
032import org.codehaus.jackson.map.ObjectReader;
033
034import java.io.IOException;
035import java.util.Map;
036import java.util.concurrent.TimeUnit;
037
038import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
039import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
040import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
041import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
042import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
043import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
044import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.REFRESH_TOKEN;
045import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.URLENCODED;
046import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
047
048/**
049 * Supply a access token obtained via a refresh token (provided through the
050 * Configuration using the second half of the
051 * <a href="https://tools.ietf.org/html/rfc6749#section-4.1">
052 *   Authorization Code Grant workflow</a>.
053 */
054@InterfaceAudience.Public
055@InterfaceStability.Evolving
056public class ConfRefreshTokenBasedAccessTokenProvider
057    extends AccessTokenProvider {
058  private static final ObjectReader READER =
059      new ObjectMapper().reader(Map.class);
060
061  public static final String OAUTH_REFRESH_TOKEN_KEY
062      = "dfs.webhdfs.oauth2.refresh.token";
063  public static final String OAUTH_REFRESH_TOKEN_EXPIRES_KEY
064      = "dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch";
065
066  private AccessTokenTimer accessTokenTimer;
067
068  private String accessToken;
069
070  private String refreshToken;
071
072  private String clientId;
073
074  private String refreshURL;
075
076
077  public ConfRefreshTokenBasedAccessTokenProvider() {
078    this.accessTokenTimer = new AccessTokenTimer();
079  }
080
081  public ConfRefreshTokenBasedAccessTokenProvider(Timer timer) {
082    this.accessTokenTimer = new AccessTokenTimer(timer);
083  }
084
085  @Override
086  public void setConf(Configuration conf) {
087    super.setConf(conf);
088    refreshToken = notNull(conf, (OAUTH_REFRESH_TOKEN_KEY));
089
090    accessTokenTimer.setExpiresInMSSinceEpoch(
091        notNull(conf, OAUTH_REFRESH_TOKEN_EXPIRES_KEY));
092
093    clientId = notNull(conf, OAUTH_CLIENT_ID_KEY);
094    refreshURL = notNull(conf, OAUTH_REFRESH_URL_KEY);
095
096  }
097
098  @Override
099  public synchronized String getAccessToken() throws IOException {
100    if(accessTokenTimer.shouldRefresh()) {
101      refresh();
102    }
103
104    return accessToken;
105  }
106
107  void refresh() throws IOException {
108    try {
109      OkHttpClient client = new OkHttpClient();
110      client.setConnectTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
111          TimeUnit.MILLISECONDS);
112      client.setReadTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
113                TimeUnit.MILLISECONDS);
114
115      String bodyString = Utils.postBody(GRANT_TYPE, REFRESH_TOKEN,
116          REFRESH_TOKEN, refreshToken,
117          CLIENT_ID, clientId);
118
119      RequestBody body = RequestBody.create(URLENCODED, bodyString);
120
121      Request request = new Request.Builder()
122          .url(refreshURL)
123          .post(body)
124          .build();
125      Response responseBody = client.newCall(request).execute();
126
127      if (responseBody.code() != HttpStatus.SC_OK) {
128        throw new IllegalArgumentException("Received invalid http response: "
129            + responseBody.code() + ", text = " + responseBody.toString());
130      }
131
132      Map<?, ?> response = READER.readValue(responseBody.body().string());
133
134      String newExpiresIn = response.get(EXPIRES_IN).toString();
135      accessTokenTimer.setExpiresIn(newExpiresIn);
136
137      accessToken = response.get(ACCESS_TOKEN).toString();
138    } catch (Exception e) {
139      throw new IOException("Exception while refreshing access token", e);
140    }
141  }
142
143  public String getRefreshToken() {
144    return refreshToken;
145  }
146}