OpenStack Authenticator
AuthToken
covers the case where SL1 needs to reach out to a token server
using basic authentication. The received token is then used in subsequent
requests to the API endpoints. There are many different flavors of exactly
how this works.
The out-of-the-box AuthToken
implementation assumes the following:
There is a URL to retrieve the token from
The request is a POST
The request’s body is
{"username": username, "password": password}
The response from the token request must include the token in the body.
The key to access the token from the response body is configurable in the credential.
The token must then be used in subsequent requests and included in the header.
See Authentication
For this example, we will be looking to make custom authenticator
to support the Openstack authentcation flow. Openstack supports token
authentication and we can re-use some components of out-of-the-box AuthToken
to accomplish this.
The following are sample request/response payloads from an Openstack server. We will
use this information to build out the various components of the new AuthOpenstack
authenticator. See Openstack Curl Examples
The following shows an example of making a request to the Openstack token server.
curl -i \
-H "Content-Type: application/json" \
-d '
{
"auth": {
"identity": {
"methods": ["password"],
"password": {
"user": {
"name": "admin",
"domain": { "id": "default" },
"password": "adminpwd"
}
}
}
}
}' \
"http://myopenstackserver:5000/v3/auth/tokens"
This does not match the POST body of AuthToken
as it formats
its POST body as {'username': username, 'password': password}
.
Now let us examine the response from the token request:
HTTP/1.1 201 Created
X-Subject-Token: MIIFvgY...
Vary: X-Auth-Token
Content-Type: application/json
Content-Length: 1025
Date: Tue, 10 Jun 2014 20:55:16 GMT
{
"token": {
"methods": ["password"],
"roles": [{
"id": "9fe2ff9ee4384b1894a90878d3e92bab",
"name": "_member_"
}, {
"id": "c703057be878458588961ce9a0ce686b",
"name": "admin"
}],
"expires_at": "2014-06-10T2:55:16.806001Z",
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin"
},
"catalog": [{
"endpoints": [{
"url": "http://localhost:3537/v2.0",
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88"
}, {
"url": "http://localhost:5000/v2.0",
"region": "RegionOne",
"interface": "internal",
"id": "8707e3735d4415c97ae231b4841eb1c"
}, {
"url": "http://localhost:5000/v2.0",
"region": "RegionOne",
"interface": "public",
"id": "ef303187fc8d41668f25199c298396a5"
}],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone"
}],
"extras": {},
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "3ec3164f750146be97f21559ee4d9c51",
"name": "admin"
},
"audit_ids": ["yRt0UrxJSs6-WYJgwEMMmg"],
"issued_at": "201406-10T20:55:16.806027Z"
}
}
AuthToken
expects to find the token from the POST body of the
response. Notice that the token is not included in the payload but
rather in the header. See
Openstack Auth API
Our AuthOpenstack
authenticator need to send the proper payload to
the token server and then retrieve the token from the header instead
of the payload. First, the request post body will need to be properly
formatted.
The code for AuthToken
is shown below. Highlighted is the authentication request’s
POST handling.
1from collections import namedtuple
2
3import requests
4from requests.auth import AuthBase
5
6from silo.auth.authenticator import extract_refresh_data
7from silo.auth.base_token import TokenBasedAuthenticator
8from silo.auth.exceptions import AuthenticationFailed
9from silo.auth.logger import CredentialFilter
10
11from silo.auth import add_auth, Authenticator, auth_logger
12
13DEFAULT_TIME_TO_NEXT_REQUEST_SECS = 60
14NEXT_REQUEST_TIME = "next_request_time"
15token_info = namedtuple("token_info", (NEXT_REQUEST_TIME, "token"))
16
17
18class TokenAuth(AuthBase):
19 """Requests auth implementation for use in Requests.Sessions.
20 :param AuthBase AuthBase: Parent class that TokenAuth subclasses
21 :param str token_label: Key in the request that will hold the Token value
22 :param str token: The token to retrieve resources with
23 """
24
25 def __init__(self, token_label, token):
26 self.token_label = token_label
27 self.token = token
28
29 def __call__(self, r):
30 r.headers[self.token_label] = self.token
31 return r
32
33
34@add_auth
35class AuthToken(TokenBasedAuthenticator):
36 """AuthToken is a class that encapsulates the authentication lifecycle for TokenAuth.
37 This implementation is responsible to logging into the auth_endpoint using
38 a basic username/password and retrieving a Token from the response where the
39 key in the response is defined by "auth_header", the `token_label` in this class.
40 :ivar TokenBasedAuthenticator TokenBasedAuthenticator: Helper class for token authentication
41 :ivar silo.apps.collection.credentials.Credential credentials: credentials
42 retrieved from em7/UCF
43 """
44
45 def __init__(self, credentials, **kwargs):
46 """Constructor Method"""
47 Authenticator.__init__(self, credentials)
48 self.ssl_verify = bool(
49 int(credentials.fields.get("options", {}).get("SSLVERIFYPEER", "1"))
50 )
51
52 self.url = credentials.fields["auth_endpoint"]
53 self.token_label = credentials.fields["auth_header"]
54 self.token_format = credentials.fields.get("token_format", "{}")
55 self.token_key = credentials.fields["token_key"]
56
57 self.username = credentials.fields["username"]
58 self.password = credentials.fields["password"]
59 self.client_info = {"username": self.username, "password": self.password}
60
61 self.auth_refresh = extract_refresh_data(credentials)
62 self.cred_filter = CredentialFilter(credentials)
63 self.headers = credentials.fields.get("headers", {})
64
65 try:
66 time_to_next_request = credentials.fields["time_to_next_request"]
67 except KeyError:
68 time_to_next_request = DEFAULT_TIME_TO_NEXT_REQUEST_SECS # 1 minute
69 auth_logger.debug(
70 "Time to Next Request missing from credential. Default: %d seconds.",
71 time_to_next_request,
72 )
73
74 TokenBasedAuthenticator.__init__(
75 self,
76 self.url,
77 self.token_key,
78 self._safe_cache,
79 self.get_authenticator_id(),
80 time_to_next_request,
81 **kwargs
82 )
83
84 def execute_auth_workflow(self):
85 """Perform the authentication to get the token
86 Once authenticated, save the information to the cache to re-use at a
87 later time
88 :return: Token information related to the payload
89 :rtype: token_info
90 """
91 token_resp = AuthToken.request_token(
92 self.url, self.client_info, self.headers, verify=self.ssl_verify
93 )
94 self.debug(token_resp)
95 return self.parse_json_result(token_resp)
96
97 def set_auth(self, token):
98 """Set the auth info
99 Set the auth information for the given token and update the filters
100 :param str token: Token
101 """
102 self.auth = TokenAuth(self.token_label, self.token_format.format(token))
103 # Add rendered token in token format
104 self.cred_filter.update_secret_from_auth(self.auth)
105 # Add stand-alone token
106 self.cred_filter.update_secret("token", token)
107
108 def get_authenticator_id(self):
109 """Builds the unique identifier based on the
110 properties that make this auth unique
111 :return: A unique identifier for this auth
112 :rtype: str
113 """
114 header_list = [(key, value) for key, value in self.headers.items()]
115 header_list.sort()
116 headers_string = "{}".format(header_list)
117 key = "{0}-{1}-{2}-{3}-{4}-{5}".format(
118 self.username,
119 self.url,
120 headers_string,
121 self.token_label,
122 self.token_format,
123 self.token_key,
124 )
125
126 return key
127
128 @staticmethod
129 def build(credentials, **kwargs):
130 """Build method to be called by get_authmethod in low_code"""
131 return AuthToken(credentials, **kwargs)
132
133 @staticmethod
134 def get_name():
135 """Look-up name/key for the Authenticator in get_authmethod() and register_auth()"""
136 return "AuthToken"
137
138 @staticmethod
139 def get_desc():
140 """Describes the purpose of the Authenticator"""
141 return "Token Authentication for HTTP"
142
143 @staticmethod
144 def request_token(url, auth, headers, **kwargs):
145 """Perform the request to gather the result from the endpoint
146 :param str url: URL to query to obtain the token
147 :param dict auth: Authentication information for the request
148 :param dict headers: Headers to use when requesting the token
149 :rtype: request.response
150 """
151 request_payload = {
152 "json": auth,
153 "headers": headers,
154 }
155 request_payload.update(kwargs)
156 try:
157 return requests.post(url, **request_payload)
158 except requests.ConnectionError as err:
159 auth_logger.warning("Cannot connect to [%s]: %s", url, type(err))
160 raise AuthenticationFailed(
161 "Unable to Authenticate. Cannot connect to the Token Retrieval Endpoint"
162 )
Request Body Update
The function that must be modified is execute_auth_workflow()
as we will
be replacing the AuthToken.request_token()
function. This new function
will be part of the authenticator class called AuthOpenstack
. The
code changes shown below formats the POST body into the expected format.
1@add_auth
2class AuthOpenstack(AuthToken):
3 def execute_auth_workflow(self):
4 """Perform the authentication to get the token
5 Once authenticated, save the information to the cache to re-use at a
6 later time
7 :return: Token information related to the payload
8 :rtype: token_info
9 """
10 token_resp = AuthOpenstack.request_token(
11 self.url, self.client_info, self.headers, verify=self.ssl_verify
12 )
13 return self.parse_json_result(token_resp)
14
15 @staticmethod
16 def request_token(self, url, auth, headers, kwargs**):
17 """Perform the request to gather the result from the endpoint
18 :param str url: URL to query to obtain the token
19 :param dict auth: Authentication information for the request
20 :param dict headers: Headers to use when requesting the token
21 :rtype: request.response
22 """
23
24 body = {
25 "auth": {
26 "methods": ["password"],
27 "password": {
28 "user": {
29 "name": auth["username"],
30 "domain": {
31 "id": "default"
32 },
33 "password": auth["password"]
34 }
35 }
36 }
37 }
38
39 request_payload = {
40 "json": body,
41 "headers": headers,
42 }
43 request_payload.update(kwargs)
44 try:
45 return requests.post(url, **request_payload)
46 except requests.ConnectionError as err:
47 auth_logger.warning("Cannot connect to [%s]: %s", url, type(err))
48 raise AuthenticationFailed(
49 "Unable to Authenticate. Cannot connect to the Token Retrieval Endpoint"
50 )
To recap, a new authenticator called AuthOpenstack
which inherits from
AuthToken
was defined. Next the execute_auth_workflow()
function was
modified to call a new static method called request_token()
. This function
prepares the expected request body for the authentication server.
Next, the response handle will need to be changed to handle receiving the token
from the header. Below is our current update and the next section that will need
to be updated. The function parse_json_result()
is part of AuthToken
’s
parent class TokenBasedAuthenticator
and the function’s code is show below.
1@add_auth
2class AuthOpenstack(AuthToken):
3 def execute_auth_workflow(self):
4 """Perform the authentication to get the token
5 Once authenticated, save the information to the cache to re-use at a
6 later time
7 :return: Token information related to the payload
8 :rtype: token_info
9 """
10 token_resp = AuthOpenstack.request_token(
11 self.url, self.client_info, self.headers, verify=self.ssl_verify
12 )
13 return self.parse_json_result(token_resp)
14
15 def parse_json_result(self, resp):
16 """Parses the JSON response after requesting token info
17 Parse the response, store any required information, and set the auth
18 so future requests can use the token.
19 :param requests.models.Response resp: Response from the request
20 :return: Information from the payload
21 :rtype: dict
22 """
23 if not self.check_result(resp):
24 self.auth = None
25 self.mark_as_failed()
26 auth_logger.warning("Invalid response received during token generation, %s", resp)
27 return
28
29 try:
30 token_resp_json = resp.json()
31 except ValueError:
32 self.mark_as_failed()
33 auth_logger.warning("Non-JSON information returned from %s", self.url)
34 return
35
36 self.extract_access_token(token_resp_json)
37 self.extract_expires_in(token_resp_json)
38
39 self.set_auth(token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR])
40 return token_resp_json
41
42 def extract_access_token(self, token_resp_json):
43 """Extracts a token out of a JSON response payload
44 :param dict token_resp_json: response from
45 :raises AuthenticationFailed: token/access key not found
46 """
47 try:
48 token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR] = token_resp_json[
49 self.payload_token_key
50 ]
51 except KeyError:
52 avail_keys = '", "'.join(token_resp_json.keys())
53 raise AuthenticationFailed(
54 'Unable to find the key, {}, within the payload. Available keys: "{}"'.format(
55 self.payload_token_key, avail_keys
56 )
57 )
58
59 def extract_expires_in(self, token_resp_json):
60 """Populates expires_in field in cachable token data
61 Adds the default expires_in populated with when the token
62 will expire for the dynamic refresh. Otherwise, this will
63 put a static time-to-live for the token.
64 :param str token_resp_json: JSON response from request
65 """
66 if self.auth_refresh.type == REFRESH_TOKEN_STATIC:
67 auth_logger.debug("Static Token refresh:: ttl: %s", self.auth_refresh.info.ttl)
68 token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
69 self.auth_refresh.info.ttl
70 )
71 else: # dynamic case
72 auth_logger.debug(
73 "Dynamic Token refresh:: expiry_Key: %s", self.auth_refresh.info.expiry_key
74 )
75 if self.auth_refresh.info.expiry_key:
76 # User has defined an expires_in key to look-up
77 try:
78 token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
79 token_resp_json[self.auth_refresh.info.expiry_key]
80 )
81 except KeyError:
82 avail_keys = '", "'.join(token_resp_json.keys())
83 auth_logger.warning(
84 "Cannot find %s in authentication response. This is required for setting "
85 "a token refresh timer. Defaulting token refresh timer to %d hour. "
86 "Available keys: '%s'",
87 self.auth_refresh.info.expiry_key,
88 self.auth_refresh.info.ttl / 60**2,
89 avail_keys,
90 )
91 token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
92 self.auth_refresh.info.ttl
93 )
94 else:
95 auth_logger.debug(
96 "Dynamic token refresh selected, but no expiry key has been defined. "
97 "Setting token refresh to %s",
98 self.auth_refresh.info.ttl,
99 )
100 token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
101 self.auth_refresh.info.ttl
102 )
The responsibility of parse_json_result()
is to extract the received
response into an access-token
and expires-in
. access-token
is
the token that is coming from the header and expires-in
is located in
the body of response under the key expires_at
which follows ISO 8601.
Extracting the Token
The token is located in the header of the response under the key
X-Subject-Token
. The function parse_json_result()
needs the
following update.
1@add_auth
2class AuthOpenstack(AuthToken):
3 PAYLOAD_TOKEN_KEY = "X-Subject-Token"
4
5 def parse_json_result(self, resp):
6 """Parses the JSON response after requesting token info
7 Parse the response, store any required information, and set the auth
8 so future requests can use the token.
9 :param requests.models.Response resp: Response from the request
10 :return: Information from the payload
11 :rtype: dict
12 """
13 if not self.check_result(resp):
14 self.auth = None
15 self.mark_as_failed()
16 auth_logger.warning("Invalid response received during token generation, %s", resp)
17 return
18
19 try:
20 token_resp_json = resp.json()
21 except ValueError:
22 self.mark_as_failed()
23 auth_logger.warning("Non-JSON information returned from %s", self.url)
24 return
25
26 self.extract_access_token(resp.headers, token_resp_json)
27 self.extract_expires_in(token_resp_json)
28
29 self.set_auth(token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR])
30 return token_resp_json
31
32 def extract_access_token(self, headers, token_resp_json):
33 """Extracts a token out of a JSON header payload
34 :param dict headers: auth header response
35 :param dict token_resp_json: auth body response
36 :raises AuthenticationFailed: token/access key not found
37 """
38 try:
39 token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR] = headers[
40 AuthOpenstack.PAYLOAD_TOKEN_KEY
41 ]
42 except KeyError:
43 avail_keys = '", "'.join(headers.keys())
44 raise AuthenticationFailed(
45 'Unable to find the key, {}, within the header. Available keys: "{}"'.format(
46 AuthOpenstack.PAYLOAD_TOKEN_KEY, avail_keys
47 )
48 )
The function extract_access_token()
was overriden and the headers are now a
parameter. The function still assumes following an OAuth2 flow where access_in
is a field in the payload. So token_resp_json
needs to be provided such that
the inherited class functions can cache the token for other collections.
Extracting the Expiry Time
The extract_expires_in()
function will be updated to handle the ISO 8601
format and allow for dynamic token refreshes. It uses a function called
handle_expiry_time()
which is designed to work with the expires_in
field for OAuth2 workflows. expires_in
is some amount of seconds in
the future the token will expire. However, ISO 8601 is a definitive time
in the future and not a duration. The good news is that the ISO 8601 can
be converted to epoch time.
Below is the existing handle_expiry_time()
. This code needs to be
reused for the static refresh intervals. So we will copy and
paste the contents to a new function. There we will modify it to handle
the new time format.
def handle_expiry_time(self, expires_in_time):
"""Converters expires_in seconds to linux epoch time
Allows manipulation of the expires_in_time that is
received from the json payload after authentications. The
default implementation assumes that the expires_in field
is in seconds and a time from response. However, this can be
a fixed time where an override to this method would be required.
:param str expires_in_time: expires_in response time in seconds
:rtype: int
:return: linux epoch expiration time
"""
auth_logger.debug("handle_expiry_time --> expires_in:: %s", expires_in_time)
try:
expires_in_time_int = int(expires_in_time)
except ValueError:
raise AuthenticationFailed(
(
"Unable to convert expires in time: {} to linux epoch time. "
"Expires in time must be castable to an integer and is expected "
"to be some duration of time since the authentication response. "
"Consider overriding this method if the input is a fixed calendar time."
).format(expires_in_time)
)
return int(time.time()) + expires_in_time_int
Below is the new handle_iso8601_expiry_time()
. It makes use of datetime to
parse and convert to time since epoch. It is similar in style to previous function,
handle_expiry_time()
.
from datetime import datetime
def handle_iso8601_expiry_time(self, expires_at_time):
"""Converters expires_at ISO 8601 to linux epoch time
example format: CCYY-MM-DDThh:mm:ss.sssZ
example input: 2015-08-27T09:49:58.000000Z
There is a special case were this value can be `null`
:param str expires_at_time: ISO 8601 expiry time
:rtype: int
:return: linux epoch expiration time
"""
auth_logger.debug("handle_iso8601_expiry_time --> expires_at:: %s", expires_at_time)
try:
utc_time = datetime.strptime(expires_at_time, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
raise AuthenticationFailed(
(
"Unable to parse expires at time: {}"
).format(expires_at_time)
)
return utc_time.strftime("%s")
Finally, the expires_at
extract method needs to be updated to correctly
select the correct value. Since the expiry key is static and won’t change, the
class is updated to handle the selection token.expires_at
from the body
of the response. Again, we are using handle_expiry_time()
for the static
refresh case and handle_iso8601_expiry_time()
for the data extracted from
the Openstack authentication response.
1class AuthOpenstack(AuthToken):
2
3 def extract_expires_in(self, token_resp_json):
4 """Populates expires_in field in cachable token data
5 Adds the default expires_in populated with when the token
6 will expire for the dynamic refresh. Otherwise, this will
7 put a static time-to-live for the token.
8 :param str token_resp_json: JSON response from request
9 """
10 if self.auth_refresh.type == REFRESH_TOKEN_STATIC:
11 auth_logger.debug("Static Token refresh:: ttl: %s", self.auth_refresh.info.ttl)
12 token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
13 self.auth_refresh.info.ttl
14 )
15 else: # dynamic case
16 try:
17 token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_iso8601_expiry_time(
18 token_resp_json["token"]["expires_at"]
19 )
20 except KeyError:
21 avail_keys = '", "'.join(token_resp_json.keys())
22 auth_logger.warning(
23 "Cannot find %s in authentication response. This is required for setting "
24 "a token refresh timer. Defaulting token refresh timer to %d hour. "
25 "Available keys: '%s'",
26 self.auth_refresh.info.expiry_key,
27 self.auth_refresh.info.ttl / 60**2,
28 avail_keys,
29 )
30 token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
31 self.auth_refresh.info.ttl
32 )
Putting It All Together
Taking all the components that were developed above, we can now pull them into SL1. Below are the different components that are required.
Snippet
import requests
from datetime import datetime
from silo.auth import add_auth, AuthToken, auth_logger
from silo.auth.exceptions import AuthenticationFailed
from silo.auth.authenticator import DEFAULT_EXPIRES_IN_SELECTOR, REFRESH_TOKEN_STATIC
@add_auth
class AuthOpenstack(AuthToken):
PAYLOAD_TOKEN_KEY = "X-Subject-Token"
def __init__(self, credentials, **kwargs):
AuthToken.__init__(self,credentials, **kwargs)
@staticmethod
def build(credentials, **kwargs):
"""Build method to be called by get_authmethod in low_code"""
return AuthOpenstack(credentials, **kwargs)
@staticmethod
def get_name():
"""Look-up name/key for the Authenticator in get_authmethod() and register_auth()"""
return "AuthOpenstack"
@staticmethod
def get_desc():
"""Describes the purpose of the Authenticator"""
return "Openstack Authentication for HTTP"
def execute_auth_workflow(self):
"""Perform the authentication to get the token
Once authenticated, save the information to the cache to re-use at a
later time
:return: Token information related to the payload
:rtype: token_info
"""
token_resp = AuthOpenstack.request_token(
self.url, self.client_info, self.headers, verify=self.ssl_verify
)
return self.parse_json_result(token_resp)
@staticmethod
def request_token(self, url, auth, headers, kwargs**):
"""Perform the request to gather the result from the endpoint
:param str url: URL to query to obtain the token
:param dict auth: Authentication information for the request
:param dict headers: Headers to use when requesting the token
:rtype: request.response
"""
body = {
"auth": {
"methods": ["password"],
"password": {
"user": {
"name": auth["username"],
"domain": {
"id": "default"
},
"password": auth["password"]
}
}
}
}
request_payload = {
"json": body,
"headers": headers,
}
request_payload.update(kwargs)
try:
return requests.post(url, **request_payload)
except requests.ConnectionError as err:
auth_logger.warning("Cannot connect to [%s]: %s", url, type(err))
raise AuthenticationFailed(
"Unable to Authenticate. Cannot connect to the Token Retrieval Endpoint"
)
def parse_json_result(self, resp):
"""Parses the JSON response after requesting token info
Parse the response, store any required information, and set the auth
so future requests can use the token.
:param requests.models.Response resp: Response from the request
:return: Information from the payload
:rtype: dict
"""
if not self.check_result(resp):
self.auth = None
self.mark_as_failed()
auth_logger.warning("Invalid response received during token generation, %s", resp)
return
try:
token_resp_json = resp.json()
except ValueError:
self.mark_as_failed()
auth_logger.warning("Non-JSON information returned from %s", self.url)
return
self.extract_access_token(resp.headers, token_resp_json)
self.extract_expires_in(token_resp_json)
self.set_auth(token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR])
return token_resp_json
def extract_access_token(self, headers, token_resp_json):
"""Extracts a token out of a JSON header payload
:param dict headers: auth header response
:param dict token_resp_json: auth body response
:raises AuthenticationFailed: token/access key not found
"""
try:
token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR] = headers[
AuthOpenstack.PAYLOAD_TOKEN_KEY
]
except KeyError:
avail_keys = '", "'.join(headers.keys())
raise AuthenticationFailed(
'Unable to find the key, {}, within the header. Available keys: "{}"'.format(
AuthOpenstack.PAYLOAD_TOKEN_KEY, avail_keys
)
)
def extract_expires_in(self, token_resp_json):
"""Populates expires_in field in cachable token data
Adds the default expires_in populated with when the token
will expire for the dynamic refresh. Otherwise, this will
put a static time-to-live for the token.
:param str token_resp_json: JSON response from request
"""
if self.auth_refresh.type == REFRESH_TOKEN_STATIC:
auth_logger.debug("Static Token refresh:: ttl: %s", self.auth_refresh.info.ttl)
token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
self.auth_refresh.info.ttl
)
else: # dynamic case
try:
token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_iso8601_xpiry_time(
token_resp_json["token"]["expires_at"]
)
except KeyError:
avail_keys = '", "'.join(token_resp_json.keys())
auth_logger.warning(
"Cannot find %s in authentication response. This is required for setting "
"a token refresh timer. Defaulting token refresh timer to %d hour. "
"Available keys: '%s'",
self.auth_refresh.info.expiry_key,
self.auth_refresh.info.ttl / 60**2,
avail_keys,
)
token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
self.auth_refresh.info.ttl
)
def handle_iso8601_expiry_time(self, expires_at_time):
"""Converters expires_at ISO 8601 to linux epoch time
example format: CCYY-MM-DDThh:mm:ss.sssZ
example input: 2015-08-27T09:49:58.000000Z
There is a special case were this value can be `null`
:param str expires_at_time: ISO 8601 expiry time
:rtype: int
:return: linux epoch expiration time
"""
auth_logger.debug("handle_expiry_time --> expires_at:: %s", expires_at_time)
try:
utc_time = datetime.strptime(expires_at_time, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
raise AuthenticationFailed(
(
"Unable to parse expires at time: {}"
).format(expires_at_time)
)
return utc_time.strftime("%s")
Snippet Argument
low_code:
version: 2
steps:
- http:
uri: "/v3/users"
- json
Credential
Authentication Type -
Token Authentication
Authenticator Override -
AuthOpenstack
URL -
http://myopenstackserver:5000
Token Retrieval Endpoint -
http://myopenstackserver:5000/v3/auth/tokens
Authorization Header -
X-Auth-Token
Bearer Token Format -
{}
Note
Replace the URL
and Token Retrieval Endpoint
with
your Openstack URL.