001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hdfs.web.oauth2;
020
021import com.squareup.okhttp.OkHttpClient;
022import com.squareup.okhttp.Request;
023import com.squareup.okhttp.RequestBody;
024import com.squareup.okhttp.Response;
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hdfs.web.URLConnectionFactory;
029import org.apache.hadoop.util.Timer;
030import org.apache.http.HttpStatus;
031import org.codehaus.jackson.map.ObjectMapper;
032import org.codehaus.jackson.map.ObjectReader;
033
034import java.io.IOException;
035import java.util.Map;
036import java.util.concurrent.TimeUnit;
037
038import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
039import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
040import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
041import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_CREDENTIALS;
042import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
043import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_SECRET;
044import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
045import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
046import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.URLENCODED;
047import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
048
049/**
050 * Obtain an access token via the credential-based OAuth2 workflow.  This
051 * abstract class requires only that implementations provide the credential,
052 * which the class then uses to obtain a refresh token.
053 */
054@InterfaceAudience.Public
055@InterfaceStability.Evolving
056public abstract class CredentialBasedAccessTokenProvider
057    extends AccessTokenProvider {
058  private static final ObjectReader READER =
059      new ObjectMapper().reader(Map.class);
060
061  public static final String OAUTH_CREDENTIAL_KEY
062      = "dfs.webhdfs.oauth2.credential";
063
064  private AccessTokenTimer timer;
065
066  private String clientId;
067
068  private String refreshURL;
069
070  private String accessToken;
071
072  private boolean initialCredentialObtained = false;
073
074  CredentialBasedAccessTokenProvider() {
075    this.timer = new AccessTokenTimer();
076  }
077
078  CredentialBasedAccessTokenProvider(Timer timer) {
079    this.timer = new AccessTokenTimer(timer);
080  }
081
082  public abstract String getCredential();
083
084  @Override
085  public void setConf(Configuration conf) {
086    super.setConf(conf);
087    clientId = notNull(conf, OAUTH_CLIENT_ID_KEY);
088    refreshURL = notNull(conf, OAUTH_REFRESH_URL_KEY);
089  }
090
091  @Override
092  public synchronized String getAccessToken() throws IOException {
093    if(timer.shouldRefresh() || !initialCredentialObtained) {
094      refresh();
095      initialCredentialObtained = true;
096    }
097
098    return accessToken;
099  }
100
101  void refresh() throws IOException {
102    try {
103      OkHttpClient client = new OkHttpClient();
104      client.setConnectTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
105          TimeUnit.MILLISECONDS);
106      client.setReadTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
107          TimeUnit.MILLISECONDS);
108
109      String bodyString = Utils.postBody(CLIENT_SECRET, getCredential(),
110          GRANT_TYPE, CLIENT_CREDENTIALS,
111          CLIENT_ID, clientId);
112
113      RequestBody body = RequestBody.create(URLENCODED, bodyString);
114
115      Request request = new Request.Builder()
116          .url(refreshURL)
117          .post(body)
118          .build();
119      Response responseBody = client.newCall(request).execute();
120
121      if (responseBody.code() != HttpStatus.SC_OK) {
122        throw new IllegalArgumentException("Received invalid http response: "
123            + responseBody.code() + ", text = " + responseBody.toString());
124      }
125
126      Map<?, ?> response = READER.readValue(responseBody.body().string());
127
128      String newExpiresIn = response.get(EXPIRES_IN).toString();
129      timer.setExpiresIn(newExpiresIn);
130
131      accessToken = response.get(ACCESS_TOKEN).toString();
132
133    } catch (Exception e) {
134      throw new IOException("Unable to obtain access token from credential", e);
135    }
136  }
137}