OpenStack Authenticator

AuthToken covers the case where SL1 needs to reach out to a token server using basic authentication. The received token is then used in subsequent requests to the API endpoints. There are many different flavors of exactly how this works.

The out-of-the-box AuthToken implementation assumes the following:

  1. There is a URL to retrieve the token from

  2. The request is a POST

  3. The request’s body is {"username": username, "password": password}

  4. The response from the token request must include the token in the body.

  5. The key to access the token from the response body is configurable in the credential.

  6. The token must then be used in subsequent requests and included in the header.

See REST Authentication

For this example, we will be looking to make custom authenticator to support the Openstack authentcation flow. Openstack supports token authentication and we can re-use some components of out-of-the-box AuthToken to accomplish this.

The following are sample request/response payloads from an Openstack server. We will use this information to build out the various components of the new AuthOpenstack authenticator. See Openstack Curl Examples

The following shows an example of making a request to the Openstack token server.

curl -i \
-H "Content-Type: application/json" \
-d '
    {
        "auth": {
            "identity": {
                "methods": ["password"],
                "password": {
                    "user": {
                        "name": "admin",
                        "domain": { "id": "default" },
                        "password": "adminpwd"
                    }
                }
            }
        }
    }' \
"http://myopenstackserver:5000/v3/auth/tokens"

This does not match the POST body of AuthToken as it formats its POST body as {'username': username, 'password': password}.

Now let us examine the response from the token request:

HTTP/1.1 201 Created
X-Subject-Token: MIIFvgY...

Vary: X-Auth-Token

Content-Type: application/json
Content-Length: 1025
Date: Tue, 10 Jun 2014 20:55:16 GMT

{
"token": {
    "methods": ["password"],
    "roles": [{
    "id": "9fe2ff9ee4384b1894a90878d3e92bab",
    "name": "_member_"
    }, {
    "id": "c703057be878458588961ce9a0ce686b",
    "name": "admin"
    }],

    "expires_at": "2014-06-10T2:55:16.806001Z",

    "project": {
    "domain": {
        "id": "default",
        "name": "Default"
    },
    "id": "8538a3f13f9541b28c2620eb19065e45",
    "name": "admin"
    },

    "catalog": [{
    "endpoints": [{
        "url": "http://localhost:3537/v2.0",
        "region": "RegionOne",
        "interface": "admin",
        "id": "29beb2f1567642eb810b042b6719ea88"
    }, {
        "url": "http://localhost:5000/v2.0",
        "region": "RegionOne",
        "interface": "internal",
        "id": "8707e3735d4415c97ae231b4841eb1c"
    }, {
        "url": "http://localhost:5000/v2.0",
        "region": "RegionOne",
        "interface": "public",
        "id": "ef303187fc8d41668f25199c298396a5"
    }],
    "type": "identity",
    "id": "bd73972c0e14fb69bae8ff76e112a90",
    "name": "keystone"
    }],

    "extras": {},

    "user": {
    "domain": {
        "id": "default",
        "name": "Default"
    },

    "id": "3ec3164f750146be97f21559ee4d9c51",
    "name": "admin"
    },

    "audit_ids": ["yRt0UrxJSs6-WYJgwEMMmg"],
    "issued_at": "201406-10T20:55:16.806027Z"
  }
}

AuthToken expects to find the token from the POST body of the response. Notice that the token is not included in the payload but rather in the header. See Openstack Auth API

Our AuthOpenstack authenticator need to send the proper payload to the token server and then retrieve the token from the header instead of the payload. First, the request post body will need to be properly formatted.

The code for AuthToken is shown below. Highlighted is the authentication request’s POST handling.

  1from collections import namedtuple
  2
  3import requests
  4from requests.auth import AuthBase
  5
  6from silo.auth.authenticator import extract_refresh_data
  7from silo.auth.base_token import TokenBasedAuthenticator
  8from silo.auth.exceptions import AuthenticationFailed
  9from silo.auth.logger import CredentialFilter
 10
 11from silo.auth import add_auth, Authenticator, auth_logger
 12
 13DEFAULT_TIME_TO_NEXT_REQUEST_SECS = 60
 14NEXT_REQUEST_TIME = "next_request_time"
 15token_info = namedtuple("token_info", (NEXT_REQUEST_TIME, "token"))
 16
 17
 18class TokenAuth(AuthBase):
 19    """Requests auth implementation for use in Requests.Sessions.
 20    :param AuthBase AuthBase: Parent class that TokenAuth subclasses
 21    :param str token_label: Key in the request that will hold the Token value
 22    :param str token: The token to retrieve resources with
 23    """
 24
 25    def __init__(self, token_label, token):
 26        self.token_label = token_label
 27        self.token = token
 28
 29    def __call__(self, r):
 30        r.headers[self.token_label] = self.token
 31        return r
 32
 33
 34@add_auth
 35class AuthToken(TokenBasedAuthenticator):
 36    """AuthToken is a class that encapsulates the authentication lifecycle for TokenAuth.
 37    This implementation is responsible to logging into the auth_endpoint using
 38    a basic username/password and retrieving a Token from the response where the
 39    key in the response is defined by "auth_header", the `token_label` in this class.
 40    :ivar TokenBasedAuthenticator TokenBasedAuthenticator: Helper class for token authentication
 41    :ivar silo.apps.collection.credentials.Credential credentials: credentials
 42        retrieved from em7/UCF
 43    """
 44
 45    def __init__(self, credentials, **kwargs):
 46        """Constructor Method"""
 47        Authenticator.__init__(self, credentials)
 48        self.ssl_verify = bool(
 49            int(credentials.fields.get("options", {}).get("SSLVERIFYPEER", "1"))
 50        )
 51
 52        self.url = credentials.fields["auth_endpoint"]
 53        self.token_label = credentials.fields["auth_header"]
 54        self.token_format = credentials.fields.get("token_format", "{}")
 55        self.token_key = credentials.fields["token_key"]
 56
 57        self.username = credentials.fields["username"]
 58        self.password = credentials.fields["password"]
 59        self.client_info = {"username": self.username, "password": self.password}
 60
 61        self.auth_refresh = extract_refresh_data(credentials)
 62        self.cred_filter = CredentialFilter(credentials)
 63        self.headers = credentials.fields.get("headers", {})
 64
 65        try:
 66            time_to_next_request = credentials.fields["time_to_next_request"]
 67        except KeyError:
 68            time_to_next_request = DEFAULT_TIME_TO_NEXT_REQUEST_SECS  # 1 minute
 69            auth_logger.debug(
 70                "Time to Next Request missing from credential. Default: %d seconds.",
 71                time_to_next_request,
 72            )
 73
 74        TokenBasedAuthenticator.__init__(
 75            self,
 76            self.url,
 77            self.token_key,
 78            self._safe_cache,
 79            self.get_authenticator_id(),
 80            time_to_next_request,
 81            **kwargs
 82        )
 83
 84    def execute_auth_workflow(self):
 85        """Perform the authentication to get the token
 86        Once authenticated, save the information to the cache to re-use at a
 87        later time
 88        :return: Token information related to the payload
 89        :rtype: token_info
 90        """
 91        token_resp = AuthToken.request_token(
 92            self.url, self.client_info, self.headers, verify=self.ssl_verify
 93        )
 94        self.debug(token_resp)
 95        return self.parse_json_result(token_resp)
 96
 97    def set_auth(self, token):
 98        """Set the auth info
 99        Set the auth information for the given token and update the filters
100        :param str token: Token
101        """
102        self.auth = TokenAuth(self.token_label, self.token_format.format(token))
103        # Add rendered token in token format
104        self.cred_filter.update_secret_from_auth(self.auth)
105        # Add stand-alone token
106        self.cred_filter.update_secret("token", token)
107
108    def get_authenticator_id(self):
109        """Builds the unique identifier based on the
110        properties that make this auth unique
111        :return: A unique identifier for this auth
112        :rtype: str
113        """
114        header_list = [(key, value) for key, value in self.headers.items()]
115        header_list.sort()
116        headers_string = "{}".format(header_list)
117        key = "{0}-{1}-{2}-{3}-{4}-{5}".format(
118            self.username,
119            self.url,
120            headers_string,
121            self.token_label,
122            self.token_format,
123            self.token_key,
124        )
125
126        return key
127
128    @staticmethod
129    def build(credentials, **kwargs):
130        """Build method to be called by get_authmethod in low_code"""
131        return AuthToken(credentials, **kwargs)
132
133    @staticmethod
134    def get_name():
135        """Look-up name/key for the Authenticator in get_authmethod() and register_auth()"""
136        return "AuthToken"
137
138    @staticmethod
139    def get_desc():
140        """Describes the purpose of the Authenticator"""
141        return "Token Authentication for HTTP"
142
143    @staticmethod
144    def request_token(url, auth, headers, **kwargs):
145        """Perform the request to gather the result from the endpoint
146        :param str url: URL to query to obtain the token
147        :param dict auth: Authentication information for the request
148        :param dict headers: Headers to use when requesting the token
149        :rtype: request.response
150        """
151        request_payload = {
152            "json": auth,
153            "headers": headers,
154        }
155        request_payload.update(kwargs)
156        try:
157            return requests.post(url, **request_payload)
158        except requests.ConnectionError as err:
159            auth_logger.warning("Cannot connect to [%s]: %s", url, type(err))
160            raise AuthenticationFailed(
161                "Unable to Authenticate. Cannot connect to the Token Retrieval Endpoint"
162            )

Request Body Update

The function that must be modified is execute_auth_workflow() as we will be replacing the AuthToken.request_token() function. This new function will be part of the authenticator class called AuthOpenstack. The code changes shown below formats the POST body into the expected format.

 1@add_auth
 2class AuthOpenstack(AuthToken):
 3    def execute_auth_workflow(self):
 4        """Perform the authentication to get the token
 5        Once authenticated, save the information to the cache to re-use at a
 6        later time
 7        :return: Token information related to the payload
 8        :rtype: token_info
 9        """
10        token_resp = AuthOpenstack.request_token(
11            self.url, self.client_info, self.headers, verify=self.ssl_verify
12        )
13        return self.parse_json_result(token_resp)
14
15    @staticmethod
16    def request_token(self, url, auth, headers, kwargs**):
17        """Perform the request to gather the result from the endpoint
18        :param str url: URL to query to obtain the token
19        :param dict auth: Authentication information for the request
20        :param dict headers: Headers to use when requesting the token
21        :rtype: request.response
22        """
23
24        body = {
25            "auth": {
26                "methods": ["password"],
27                "password": {
28                    "user": {
29                        "name": auth["username"],
30                        "domain": {
31                            "id": "default"
32                        },
33                        "password": auth["password"]
34                    }
35                }
36            }
37        }
38
39        request_payload = {
40            "json": body,
41            "headers": headers,
42        }
43        request_payload.update(kwargs)
44        try:
45            return requests.post(url, **request_payload)
46        except requests.ConnectionError as err:
47            auth_logger.warning("Cannot connect to [%s]: %s", url, type(err))
48            raise AuthenticationFailed(
49                "Unable to Authenticate. Cannot connect to the Token Retrieval Endpoint"
50            )

To recap, a new authenticator called AuthOpenstack which inherits from AuthToken was defined. Next the execute_auth_workflow() function was modified to call a new static method called request_token(). This function prepares the expected request body for the authentication server.

Next, the response handle will need to be changed to handle receiving the token from the header. Below is our current update and the next section that will need to be updated. The function parse_json_result() is part of AuthToken’s parent class TokenBasedAuthenticator and the function’s code is show below.

  1@add_auth
  2class AuthOpenstack(AuthToken):
  3    def execute_auth_workflow(self):
  4        """Perform the authentication to get the token
  5        Once authenticated, save the information to the cache to re-use at a
  6        later time
  7        :return: Token information related to the payload
  8        :rtype: token_info
  9        """
 10        token_resp = AuthOpenstack.request_token(
 11            self.url, self.client_info, self.headers, verify=self.ssl_verify
 12        )
 13        return self.parse_json_result(token_resp)
 14
 15    def parse_json_result(self, resp):
 16        """Parses the JSON response after requesting token info
 17        Parse the response, store any required information, and set the auth
 18        so future requests can use the token.
 19        :param requests.models.Response resp: Response from the request
 20        :return: Information from the payload
 21        :rtype: dict
 22        """
 23        if not self.check_result(resp):
 24            self.auth = None
 25            self.mark_as_failed()
 26            auth_logger.warning("Invalid response received during token generation, %s", resp)
 27            return
 28
 29        try:
 30            token_resp_json = resp.json()
 31        except ValueError:
 32            self.mark_as_failed()
 33            auth_logger.warning("Non-JSON information returned from %s", self.url)
 34            return
 35
 36        self.extract_access_token(token_resp_json)
 37        self.extract_expires_in(token_resp_json)
 38
 39        self.set_auth(token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR])
 40        return token_resp_json
 41
 42    def extract_access_token(self, token_resp_json):
 43        """Extracts a token out of a JSON response payload
 44        :param dict token_resp_json: response from
 45        :raises AuthenticationFailed: token/access key not found
 46        """
 47        try:
 48            token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR] = token_resp_json[
 49                self.payload_token_key
 50            ]
 51        except KeyError:
 52            avail_keys = '", "'.join(token_resp_json.keys())
 53            raise AuthenticationFailed(
 54                'Unable to find the key, {}, within the payload. Available keys: "{}"'.format(
 55                    self.payload_token_key, avail_keys
 56                )
 57            )
 58
 59    def extract_expires_in(self, token_resp_json):
 60        """Populates expires_in field in cachable token data
 61        Adds the default expires_in populated with when the token
 62        will expire for the dynamic refresh. Otherwise, this will
 63        put a static time-to-live for the token.
 64        :param str token_resp_json: JSON response from request
 65        """
 66        if self.auth_refresh.type == REFRESH_TOKEN_STATIC:
 67            auth_logger.debug("Static Token refresh:: ttl: %s", self.auth_refresh.info.ttl)
 68            token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
 69                self.auth_refresh.info.ttl
 70            )
 71        else:  # dynamic case
 72            auth_logger.debug(
 73                "Dynamic Token refresh:: expiry_Key: %s", self.auth_refresh.info.expiry_key
 74            )
 75            if self.auth_refresh.info.expiry_key:
 76                # User has defined an expires_in key to look-up
 77                try:
 78                    token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
 79                        token_resp_json[self.auth_refresh.info.expiry_key]
 80                    )
 81                except KeyError:
 82                    avail_keys = '", "'.join(token_resp_json.keys())
 83                    auth_logger.warning(
 84                        "Cannot find %s in authentication response. This is required for setting "
 85                        "a token refresh timer. Defaulting token refresh timer to %d hour. "
 86                        "Available keys: '%s'",
 87                        self.auth_refresh.info.expiry_key,
 88                        self.auth_refresh.info.ttl / 60**2,
 89                        avail_keys,
 90                    )
 91                    token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
 92                        self.auth_refresh.info.ttl
 93                    )
 94            else:
 95                auth_logger.debug(
 96                    "Dynamic token refresh selected, but no expiry key has been defined. "
 97                    "Setting token refresh to %s",
 98                    self.auth_refresh.info.ttl,
 99                )
100                token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
101                    self.auth_refresh.info.ttl
102                )

The responsibility of parse_json_result() is to extract the received response into an access-token and expires-in. access-token is the token that is coming from the header and expires-in is located in the body of response under the key expires_at which follows ISO 8601.

Extracting the Token

The token is located in the header of the response under the key X-Subject-Token. The function parse_json_result() needs the following update.

 1@add_auth
 2class AuthOpenstack(AuthToken):
 3    PAYLOAD_TOKEN_KEY = "X-Subject-Token"
 4
 5    def parse_json_result(self, resp):
 6        """Parses the JSON response after requesting token info
 7        Parse the response, store any required information, and set the auth
 8        so future requests can use the token.
 9        :param requests.models.Response resp: Response from the request
10        :return: Information from the payload
11        :rtype: dict
12        """
13        if not self.check_result(resp):
14            self.auth = None
15            self.mark_as_failed()
16            auth_logger.warning("Invalid response received during token generation, %s", resp)
17            return
18
19        try:
20            token_resp_json = resp.json()
21        except ValueError:
22            self.mark_as_failed()
23            auth_logger.warning("Non-JSON information returned from %s", self.url)
24            return
25
26        self.extract_access_token(resp.headers, token_resp_json)
27        self.extract_expires_in(token_resp_json)
28
29        self.set_auth(token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR])
30        return token_resp_json
31
32    def extract_access_token(self, headers, token_resp_json):
33        """Extracts a token out of a JSON header payload
34        :param dict headers: auth header response
35        :param dict token_resp_json: auth body response
36        :raises AuthenticationFailed: token/access key not found
37        """
38        try:
39            token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR] = headers[
40                AuthOpenstack.PAYLOAD_TOKEN_KEY
41            ]
42        except KeyError:
43            avail_keys = '", "'.join(headers.keys())
44            raise AuthenticationFailed(
45                'Unable to find the key, {}, within the header. Available keys: "{}"'.format(
46                    AuthOpenstack.PAYLOAD_TOKEN_KEY, avail_keys
47                )
48            )

The function extract_access_token() was overriden and the headers are now a parameter. The function still assumes following an OAuth2 flow where access_in is a field in the payload. So token_resp_json needs to be provided such that the inherited class functions can cache the token for other collections.

Extracting the Expiry Time

The extract_expires_in() function will be updated to handle the ISO 8601 format and allow for dynamic token refreshes. It uses a function called handle_expiry_time() which is designed to work with the expires_in field for OAuth2 workflows. expires_in is some amount of seconds in the future the token will expire. However, ISO 8601 is a definitive time in the future and not a duration. The good news is that the ISO 8601 can be converted to epoch time.

Below is the existing handle_expiry_time(). This code needs to be reused for the static refresh intervals. So we will copy and paste the contents to a new function. There we will modify it to handle the new time format.

def handle_expiry_time(self, expires_in_time):
    """Converters expires_in seconds to linux epoch time
    Allows manipulation of the expires_in_time that is
    received from the json payload after authentications. The
    default implementation assumes that the expires_in field
    is in seconds and a time from response. However, this can be
    a fixed time where an override to this method would be required.
    :param str expires_in_time: expires_in response time in seconds
    :rtype: int
    :return: linux epoch expiration time
    """
    auth_logger.debug("handle_expiry_time --> expires_in:: %s", expires_in_time)

    try:
        expires_in_time_int = int(expires_in_time)
    except ValueError:
        raise AuthenticationFailed(
            (
                "Unable to convert expires in time: {} to linux epoch time. "
                "Expires in time must be castable to an integer and is expected "
                "to be some duration of time since the authentication response. "
                "Consider overriding this method if the input is a fixed calendar time."
            ).format(expires_in_time)
        )

    return int(time.time()) + expires_in_time_int

Below is the new handle_iso8601_expiry_time(). It makes use of datetime to parse and convert to time since epoch. It is similar in style to previous function, handle_expiry_time().

from datetime import datetime

def handle_iso8601_expiry_time(self, expires_at_time):
    """Converters expires_at ISO 8601 to linux epoch time

    example format: CCYY-MM-DDThh:mm:ss.sssZ
    example input: 2015-08-27T09:49:58.000000Z

    There is a special case were this value can be `null`

    :param str expires_at_time: ISO 8601 expiry time
    :rtype: int
    :return: linux epoch expiration time
    """
    auth_logger.debug("handle_iso8601_expiry_time --> expires_at:: %s", expires_at_time)

    try:
        utc_time = datetime.strptime(expires_at_time, "%Y-%m-%dT%H:%M:%S.%fZ")
    except ValueError:
        raise AuthenticationFailed(
            (
                "Unable to parse expires at time: {}"
            ).format(expires_at_time)
        )

    return utc_time.strftime("%s")

Finally, the expires_at extract method needs to be updated to correctly select the correct value. Since the expiry key is static and won’t change, the class is updated to handle the selection token.expires_at from the body of the response. Again, we are using handle_expiry_time() for the static refresh case and handle_iso8601_expiry_time() for the data extracted from the Openstack authentication response.

 1class AuthOpenstack(AuthToken):
 2
 3    def extract_expires_in(self, token_resp_json):
 4        """Populates expires_in field in cachable token data
 5        Adds the default expires_in populated with when the token
 6        will expire for the dynamic refresh. Otherwise, this will
 7        put a static time-to-live for the token.
 8        :param str token_resp_json: JSON response from request
 9        """
10        if self.auth_refresh.type == REFRESH_TOKEN_STATIC:
11            auth_logger.debug("Static Token refresh:: ttl: %s", self.auth_refresh.info.ttl)
12            token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
13                self.auth_refresh.info.ttl
14            )
15        else:  # dynamic case
16            try:
17                token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_iso8601_expiry_time(
18                    token_resp_json["token"]["expires_at"]
19                )
20            except KeyError:
21                avail_keys = '", "'.join(token_resp_json.keys())
22                auth_logger.warning(
23                    "Cannot find %s in authentication response. This is required for setting "
24                    "a token refresh timer. Defaulting token refresh timer to %d hour. "
25                    "Available keys: '%s'",
26                    self.auth_refresh.info.expiry_key,
27                    self.auth_refresh.info.ttl / 60**2,
28                    avail_keys,
29                )
30                token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
31                    self.auth_refresh.info.ttl
32                )

Putting It All Together

Taking all the components that were developed above, we can now pull them into SL1. Below are the different components that are required.

Snippet

import requests

from datetime import datetime
from silo.auth import add_auth, AuthToken, auth_logger
from silo.auth.exceptions import AuthenticationFailed
from silo.auth.authenticator import DEFAULT_EXPIRES_IN_SELECTOR, REFRESH_TOKEN_STATIC

@add_auth
class AuthOpenstack(AuthToken):
    PAYLOAD_TOKEN_KEY = "X-Subject-Token"

    def __init__(self, credentials, **kwargs):
        AuthToken.__init__(self,credentials, **kwargs)

    @staticmethod
    def build(credentials, **kwargs):
        """Build method to be called by get_authmethod in low_code"""
        return AuthOpenstack(credentials, **kwargs)

    @staticmethod
    def get_name():
        """Look-up name/key for the Authenticator in get_authmethod() and register_auth()"""
        return "AuthOpenstack"

    @staticmethod
    def get_desc():
        """Describes the purpose of the Authenticator"""
        return "Openstack Authentication for HTTP"

    def execute_auth_workflow(self):
        """Perform the authentication to get the token
        Once authenticated, save the information to the cache to re-use at a
        later time
        :return: Token information related to the payload
        :rtype: token_info
        """
        token_resp = AuthOpenstack.request_token(
            self.url, self.client_info, self.headers, verify=self.ssl_verify
        )
        return self.parse_json_result(token_resp)

    @staticmethod
    def request_token(self, url, auth, headers, kwargs**):
        """Perform the request to gather the result from the endpoint
        :param str url: URL to query to obtain the token
        :param dict auth: Authentication information for the request
        :param dict headers: Headers to use when requesting the token
        :rtype: request.response
        """

        body = {
            "auth": {
                "methods": ["password"],
                "password": {
                    "user": {
                        "name": auth["username"],
                        "domain": {
                            "id": "default"
                        },
                        "password": auth["password"]
                    }
                }
            }
        }

        request_payload = {
            "json": body,
            "headers": headers,
        }
        request_payload.update(kwargs)
        try:
            return requests.post(url, **request_payload)
        except requests.ConnectionError as err:
            auth_logger.warning("Cannot connect to [%s]: %s", url, type(err))
            raise AuthenticationFailed(
                "Unable to Authenticate. Cannot connect to the Token Retrieval Endpoint"
            )

    def parse_json_result(self, resp):
        """Parses the JSON response after requesting token info
        Parse the response, store any required information, and set the auth
        so future requests can use the token.
        :param requests.models.Response resp: Response from the request
        :return: Information from the payload
        :rtype: dict
        """
        if not self.check_result(resp):
            self.auth = None
            self.mark_as_failed()
            auth_logger.warning("Invalid response received during token generation, %s", resp)
            return

        try:
            token_resp_json = resp.json()
        except ValueError:
            self.mark_as_failed()
            auth_logger.warning("Non-JSON information returned from %s", self.url)
            return

        self.extract_access_token(resp.headers, token_resp_json)
        self.extract_expires_in(token_resp_json)

        self.set_auth(token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR])
        return token_resp_json

    def extract_access_token(self, headers, token_resp_json):
        """Extracts a token out of a JSON header payload
        :param dict headers: auth header response
        :param dict token_resp_json: auth body response
        :raises AuthenticationFailed: token/access key not found
        """
        try:
            token_resp_json[DEFAULT_ACCESS_TOKEN_SELECTOR] = headers[
                AuthOpenstack.PAYLOAD_TOKEN_KEY
            ]
        except KeyError:
            avail_keys = '", "'.join(headers.keys())
            raise AuthenticationFailed(
                'Unable to find the key, {}, within the header. Available keys: "{}"'.format(
                    AuthOpenstack.PAYLOAD_TOKEN_KEY, avail_keys
                )
            )

    def extract_expires_in(self, token_resp_json):
        """Populates expires_in field in cachable token data
        Adds the default expires_in populated with when the token
        will expire for the dynamic refresh. Otherwise, this will
        put a static time-to-live for the token.
        :param str token_resp_json: JSON response from request
        """
        if self.auth_refresh.type == REFRESH_TOKEN_STATIC:
            auth_logger.debug("Static Token refresh:: ttl: %s", self.auth_refresh.info.ttl)
            token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
                self.auth_refresh.info.ttl
            )
        else:  # dynamic case
            try:
                token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_iso8601_xpiry_time(
                    token_resp_json["token"]["expires_at"]
                )
            except KeyError:
                avail_keys = '", "'.join(token_resp_json.keys())
                auth_logger.warning(
                    "Cannot find %s in authentication response. This is required for setting "
                    "a token refresh timer. Defaulting token refresh timer to %d hour. "
                    "Available keys: '%s'",
                    self.auth_refresh.info.expiry_key,
                    self.auth_refresh.info.ttl / 60**2,
                    avail_keys,
                )
                token_resp_json[DEFAULT_EXPIRES_IN_SELECTOR] = self.handle_expiry_time(
                    self.auth_refresh.info.ttl
                )

    def handle_iso8601_expiry_time(self, expires_at_time):
        """Converters expires_at ISO 8601 to linux epoch time

        example format: CCYY-MM-DDThh:mm:ss.sssZ
        example input: 2015-08-27T09:49:58.000000Z

        There is a special case were this value can be `null`

        :param str expires_at_time: ISO 8601 expiry time
        :rtype: int
        :return: linux epoch expiration time
        """
        auth_logger.debug("handle_expiry_time --> expires_at:: %s", expires_at_time)

        try:
            utc_time = datetime.strptime(expires_at_time, "%Y-%m-%dT%H:%M:%S.%fZ")
        except ValueError:
            raise AuthenticationFailed(
                (
                    "Unable to parse expires at time: {}"
                ).format(expires_at_time)
            )

        return utc_time.strftime("%s")

Snippet Argument

low_code:
    version: 2
    steps:
        - http:
            uri: "/v3/users"
        - json

Credential

  • Authentication Type - Token Authentication

  • Authenticator Override - AuthOpenstack

  • URL - http://myopenstackserver:5000

  • Token Retrieval Endpoint - http://myopenstackserver:5000/v3/auth/tokens

  • Authorization Header - X-Auth-Token

  • Bearer Token Format - {}

Note

Replace the URL and Token Retrieval Endpoint with your Openstack URL.