"""
The :py:class:`AWSClient` and :py:class:`AsyncAWSClient` implement low-level
AWS clients. The clients provide only the mechanism for submitted signed HTTP
requests to the AWS APIs and are generally meant to be used by service specific
client API implementations.
"""
import datetime
import hashlib
import hmac
import json
import logging
import os
import socket
import sys
try:
from urllib.parse import urlparse
except ImportError: # pragma: nocover
from urlparse import urlparse
try:
from urllib.parse import quote
except ImportError: # pragma: nocover
from urllib import quote
from tornado import concurrent, httpclient, ioloop
try:
from tornado import curl_httpclient
except ImportError: # pragma: nocover
curl_httpclient = None
from tornado_aws import config, exceptions, txml
LOGGER = logging.getLogger(__name__)
MIME_AWZ_JSON = 'application/x-amz-json-1.1'
PYTHON3 = True if sys.version_info > (3, 0, 0) else False
_AWZ_CONTENT_TYPES = [
'application/json',
'application/x-amz-json-1.0',
'application/x-amz-json-1.1'
]
_REFRESH_EXCEPTIONS = [
'AuthFailure',
'AuthMissingFailure',
'AWS.InvalidAccount',
'ExpiredTokenException',
'InvalidSignatureException',
'MissingAuthenticationTokenException',
'UnrecognizedClientException'
]
_REFRESH_XML_EXCEPTIONS = [
'AuthFailure',
'ExpiredToken',
'InvalidClientTokenId',
'InvalidSecurity',
'MissingAuthenticationToken',
'SignatureDoesNotMatch'
]
_HEADER_FORMAT = '{0} Credential={1}/{2}, SignedHeaders={3}, Signature={4}'
[docs]class AWSClient(object):
"""Implement a low level AWS client that performs the request signing
required for AWS API requests.
``AWSClient`` uses the same configuration method and environment
variables as the AWS CLI. For configuration information visit the "Getting
Set Up" section of the `AWS Command Line Interface user guide
<http://docs.aws.amazon.com/cli/latest/userguide/>`_.
When creating the ``AWSClient`` instance you need to specify the
``service`` that you will be interacting with. This value is used when
signing the request headers and must match the service values as specified
in the `AWS General Reference documentation
<http://docs.aws.amazon.com/general/latest/gr/Welcome.html>`_.
The AWS configuration profile can be set when creating the
``AWSClient`` instance or by setting the ``AWS_DEFAULT_PROFILE``
environment variable. If neither are set, ``default`` will be used.
The AWS region is set by reading in configuration or by the
``AWS_DEFAULT_REGION`` environment variable. If neither or set, it will
attempt to be set by invoking the EC2 Instance Metadata and user data API,
if available.
The AWS access key can be set when creating a new instance. If it's not
passed in when creating the ``AWSClient``, the client will attempt to
get the key from the ``AWS_ACCESS_KEY_ID`` environment variable. If that is
not set, it will attempt to get the key from the AWS CLI credentials file.
The path to the credentials file can be overridden in the
``AWS_SHARED_CREDENTIALS_FILE`` environment variable. Note that a value
set in ``AWS_ACCESS_KEY_ID`` will only be used if there is an accompanying
value in ``AWS_SECRET_ACCESS_KEY`` environment variable.
If ``AWS_SECURITY_TOKEN`` or ``AWS_SESSION_TOKEN`` are set, they will be
automatically be used as well.
Like the access key, the secret key can be set when creating a new client
instance. The configuration logic matches the access key with the exception
of the environment variable. The secret key can set in the
``AWS_SECRET_ACCESS_KEY`` environment variable.
If there is no local configuration or credentials, the client will attempt
to load the information from the EC2 instance meta-data API, if it is
available.
The ``endpoint`` argument is primarily used for testing and allows for
the use of a specified base URL value instead of the auto-construction of
a URL using the service and region variables.
:param str service: The service for the API calls
:param str profile: Optionally specify the configuration profile name
:param str region: An optional AWS region to make requests to
:param str access_key: An optional access key
:param str secret_key: An optional secret access key
:param str security_token: An optional security token
:param str endpoint: Override the base endpoint URL
:raises: :exc:`tornado_aws.exceptions.ConfigNotFound`
:raises: :exc:`tornado_aws.exceptions.ConfigParserError`
:raises: :exc:`tornado_aws.exceptions.NoCredentialsError`
:raises: :exc:`tornado_aws.exceptions.NoProfileError`
"""
ALGORITHM = 'AWS4-HMAC-SHA256'
ASYNC = False
CONNECT_TIMEOUT = 10
REQUEST_TIMEOUT = 30
SCHEME = 'https'
def __init__(self, service, profile=None, region=None, access_key=None,
secret_key=None, security_token=None, endpoint=None):
self._client = self._get_client_adapter()
self._service = service
self._profile = profile or os.getenv('AWS_DEFAULT_PROFILE', 'default')
self._region = region or config.get_region(self._profile)
self._auth_config = config.Authorization(
self._profile, access_key, secret_key,
security_token, self._client)
self._endpoint_url = self._endpoint(endpoint)
self._host = self._hostname(self._endpoint_url)
[docs] def fetch(self, method, path='/', query_args=None, headers=None, body=b'',
recursed=False):
"""Executes a request, returning an
:py:class:`HTTPResponse <tornado.httpclient.HTTPResponse>`.
If an error occurs during the fetch, we raise an
:py:class:`HTTPError <tornado.httpclient.HTTPError>` unless the
``raise_error`` keyword argument is set to ``False``.
:param str method: HTTP request method
:param str path: The request path
:param dict query_args: Request query arguments
:param dict headers: Request headers
:param bytes body: The request body
:param bool recursed: Internally invoked if it's a recursive fetch
:rtype: :class:`~tornado.httpclient.HTTPResponse`
:raises: :class:`~tornado.httpclient.HTTPError`
:raises: :class:`~tornado_aws.exceptions.NoCredentialsError`
:raises: :class:`~tornado_aws.exceptions.AWSError`
"""
if self._auth_config.needs_credentials():
self._auth_config.refresh()
request = self._create_request(method, path, query_args, headers, body)
try:
result = self._client.fetch(request, raise_error=True)
return result
except (OSError, socket.error) as error:
LOGGER.error('Error making request: %s', error)
raise exceptions.RequestException(error=error)
except httpclient.HTTPError as error:
need_credentials, aws_error = self._process_error(error)
if need_credentials and not self._auth_config.local_credentials:
self._auth_config.reset()
if not recursed:
return self.fetch(method, path, query_args,
headers, body, True)
raise aws_error if aws_error else error
[docs] def close(self):
"""Closes the underlying HTTP client, freeing any resources used."""
self._client.close()
def _process_error(self, error):
"""Attempt to process the error coming from AWS. Returns ``True``
if the client should attempt to fetch credentials and the AWSError
exception to raise if the client did not have an authentication error.
:param tornado.httpclient.HTTPError error: The HTTP error
:rtype: (tuple, tornado_aws.exceptions.AWSError)
"""
LOGGER.error('Error: %r', error)
if error.code == 599:
return False, None
elif error.code == 400 and self._awz_response(error.response):
awz_error = self._parse_awz_error(error.response.body)
return ((awz_error and
awz_error['__type'] in _REFRESH_EXCEPTIONS),
exceptions.AWSError(
type=awz_error['__type'],
message=awz_error.get('message', '(null)')))
try:
xml_error = self._parse_xml_error(error.response.body)
except ValueError:
LOGGER.debug('Could not fallback to XML: %r', error)
return False, None
return ((xml_error and
xml_error['Code'] in _REFRESH_XML_EXCEPTIONS),
self._aws_error_from_xml(xml_error))
@staticmethod
def _aws_error_from_xml(error):
"""Return an AWSError exception for an XML error response, given the
variation in field names.
:param dict error: The parsed XML error
:rtype: tornado_aws.exceptions.AWSError
"""
return exceptions.AWSError(
type=error['Code'], message=error['Message'],
request_id=error.get('RequestId', error.get('x-amzn-RequestId')),
resource=error.get('Resource'))
@staticmethod
def _awz_response(response):
"""Returns ``True`` if the HTTPResponse headers indicate it is
an AWS style response
:param tornado.httpclient.HTTPResponse: The HTTP response
:rtype: bool
"""
return response.headers.get('Content-Type') in _AWZ_CONTENT_TYPES
@staticmethod
def _parse_awz_error(content):
"""Returns the AWZ error parsed out of the HTTPError that was raised.
:param bytes content: The response error content
:rtype: dict|None
"""
payload = json.loads(content.decode('utf-8'))
if isinstance(payload, dict) and '__type' in payload:
if '#' in payload['__type']:
payload['__type'] = \
payload['__type'][payload['__type'].index('#') + 1:]
return payload
@staticmethod
def _parse_xml_error(content):
"""Returns the XML error parsed out of the HTTPError that was raised.
:param bytes content: The response error content
:rtype: dict
:raises: ValueError
"""
payload = txml.loads(content.decode('utf-8'))
if 'Error' in payload:
return payload['Error']
elif 'Errors' in payload and 'Error' in payload['Errors']:
return payload['Errors']['Error']
elif 'Response' in payload and 'Errors' in payload['Response'] \
and 'Error' in payload['Response']['Errors']:
payload['Response']['Errors']['Error']['RequestId'] = \
payload['Response'].get('RequestID')
return payload['Response']['Errors']['Error']
key = tuple(payload.keys())[0]
if 'Message' in payload[key]:
return {'Code': key, 'Message': payload[key]['Message']}
raise ValueError
def _auth_header(self, amz_date, date_stamp, request_hash, signed_headers):
"""Return the Authorization string header value
:param str amz_date: The x-amz-date header value
:param str date_stamp: The signing date_stamp
:param str request_hash: The SHA-256 request hash
:param str signed_headers: A semicolon delimited list of header keys
:rtype: str
"""
scope, signature = self._signature(amz_date, date_stamp, request_hash)
return _HEADER_FORMAT.format(self.ALGORITHM,
self._auth_config.access_key,
scope,
signed_headers, signature)
def _create_request(self, method, path='/', query_args=None, headers=None,
body=b''):
"""Create the HTTPRequest instance that will be used to make the AWS
API request.
:param str method: HTTP request method
:param str path: The request path
:param dict query_args: Request query arguments
:param dict headers: Request headers
:param bytes body: The request body
:rtype: tornado.httpclient.HTTPRequest
"""
if headers is None:
headers = {}
signed_headers, signed_url = self._signed_request(
method, path, query_args or {}, dict(headers), body or b'')
return httpclient.HTTPRequest(
signed_url, method, signed_headers, body,
connect_timeout=self.CONNECT_TIMEOUT,
request_timeout=self.REQUEST_TIMEOUT)
def _endpoint(self, endpoint):
"""Return the user specified endpoint or dynamically create the
endpoint from the service and region.
:rtype: str
"""
if endpoint:
return endpoint
return '{}://{}.{}.amazonaws.com'.format(
self.SCHEME, self._service, self._region)
def _get_client_adapter(self):
"""Return a HTTP client
:rtype: :py:class:`tornado.httpclient.HTTPClient`
"""
return httpclient.HTTPClient(force_instance=True)
@staticmethod
def _hostname(url):
"""Parse the url returning a named tuple with the parts of the
the parsed URL
:param str url: The URL to parse
:return: str
"""
return urlparse(url).netloc
@staticmethod
def _quote(value):
"""Return the percent encoded value, ensuring there are no skipped
characters.
:param str value: The value to quote
:rtype: str
"""
return quote(value, safe='').replace('%7E', '~')
@staticmethod
def _sign(key, msg):
"""Sign the msg with the key
:param bytes key: The signing key
:param bytes msg: The value to sign
:return: bytes
"""
return hmac.new(key, msg, hashlib.sha256).digest()
def _signed_request(self, method, path, query_args, headers, body):
"""Create the request signature headers and return updated headers
for the request.
:param str method: HTTP request method
:param str path: The request path
:param dict query_args: Query string args
:param dict headers: Request headers
:param bytes body: The request body
:rtype: dict
"""
if PYTHON3 and isinstance(body, str):
body = body.encode('utf-8')
query_string = self._query_string(query_args)
timestamp = datetime.datetime.utcnow()
amz_date = timestamp.strftime('%Y%m%dT%H%M%SZ')
date_stamp = timestamp.strftime('%Y%m%d')
payload_hash = hashlib.sha256(body).hexdigest()
headers.update({
'Content-Length': str(len(body)),
'Date': amz_date,
'X-Amz-Content-sha256': payload_hash
})
headers.setdefault('Host', self._host)
# Temporary auth security token
if self._auth_config.security_token:
headers['X-Amz-Security-Token'] = self._auth_config.security_token
signed_headers, headers_string = self._signed_headers(headers)
request = '\n'.join([method, path, query_string, headers_string,
signed_headers, payload_hash])
request_hash = hashlib.sha256(request.encode('utf-8')).hexdigest()
headers['Authorization'] = self._auth_header(amz_date, date_stamp,
request_hash,
signed_headers)
return headers, '{0}{1}?{2}'.format(self._endpoint_url, path,
query_string)
def _query_string(self, query_args):
"""Return the sorted query string from the query args dict
:param dict query_args: The dict of query arguments
:rtype: str
"""
return '&'.join(['{0}={1}'.format(self._quote(k),
self._quote(query_args[k]))
for k in sorted(query_args.keys())])
def _signature(self, amz_date, date_stamp, request_hash):
"""Return the request scope and signature
:param str date_stamp: The signing date stamp
:param str amz_date: The x-amz-date header value
:param str request_hash: The canonical request signature hash
:rtype: str, str
"""
scope = '/'.join([date_stamp, self._region, self._service,
'aws4_request'])
to_sign = '\n'.join([self.ALGORITHM, amz_date, scope, request_hash])
signing_key = self._signing_key(date_stamp)
return scope, hmac.new(signing_key, to_sign.encode('utf-8'),
hashlib.sha256).hexdigest()
@staticmethod
def _signed_headers(headers):
"""Create and return the canonical headers string and the signed
headers list.
Canonical header names must be trimmed and lowercase, and sorted in
ASCII order.
The signed headers lists the headers in the canonical_headers list,
delimited with ";" and in alpha order.
:param dict headers: The request headers
:rtype: str, str
"""
tmp = {key.lower(): value for key, value in headers.items()}
signed_headers = ';'.join([k.lower() for k in sorted(tmp.keys())])
headers_string = '\n'.join(['{0}:{1}'.format(k, tmp[k])
for k in sorted(tmp.keys())]) + '\n'
return signed_headers, headers_string
def _signing_key(self, date_stamp):
"""Create the signature key for the request.
:param str date_stamp: Date in %Y%m%d format for signing
:rtype: bytes
"""
key = 'AWS4{0}'.format(self._auth_config.secret_key)
date = self._sign(key.encode('utf-8'), date_stamp.encode('utf-8'))
region = self._sign(date, self._region.encode('utf-8'))
service = self._sign(region, self._service.encode('utf-8'))
return self._sign(service, b'aws4_request')
[docs]class AsyncAWSClient(AWSClient):
"""Implement a low level AWS client that performs the request signing
required for AWS API requests.
``AWSClient`` uses the same configuration method and environment
variables as the AWS CLI. For configuration information visit the "Getting
Set Up" section of the `AWS Command Line Interface user guide
<http://docs.aws.amazon.com/cli/latest/userguide/>`_.
When creating the ``AWSClient`` instance you need to specify the
``service`` that you will be interacting with. This value is used when
signing the request headers and must match the service values as specified
in the `AWS General Reference documentation
<http://docs.aws.amazon.com/general/latest/gr/Welcome.html>`_.
The AWS configuration profile can be set when creating the
``AWSClient`` instance or by setting the ``AWS_DEFAULT_PROFILE``
environment variable. If neither are set, ``default`` will be used.
The AWS access key can be set when creating a new instance. If it's not
passed in when creating the ``AWSClient``, the client will attempt to
get the key from the ``AWS_ACCESS_KEY_ID`` environment variable. If that is
not set, it will attempt to get the key from the AWS CLI credentials file.
The path to the credentials file can be overridden in the
``AWS_SHARED_CREDENTIALS_FILE`` environment variable. Note that a value
set in ``AWS_ACCESS_KEY_ID`` will only be used if there is an accompanying
value in ``AWS_SECRET_ACCESS_KEY`` environment variable.
If ``AWS_SECURITY_TOKEN`` or ``AWS_SESSION_TOKEN`` are set, they will be
automatically be used as well.
Like the access key, the secret key can be set when creating a new client
instance. The configuration logic matches the access key with the exception
of the environment variable. The secret key can set in the
``AWS_SECRET_ACCESS_KEY`` environment variable.
The ``endpoint`` argument is primarily used for testing and allows for
the use of a specified base URL value instead of the auto-construction of
a URL using the service and region variables.
``max_clients`` allows for the specification of the maximum number if
concurrent asynchronous HTTP requests that the client will perform.
:param str service: The service for the API calls
:param str profile: Specify the configuration profile name
:param str region: The AWS region to make requests to
:param str access_key: The access key
:param str secret_key: The secret access key
:param str security_token: An optional security token
:param str endpoint: Override the base endpoint URL
:param int max_clients: Max simultaneous HTTP requests (Default: ``100``)
:param tornado.ioloop.IOLoop io_loop: Specify the IOLoop to use
:param bool force_instance: Keep an isolated instance of the HTTP client
:raises: :exc:`tornado_aws.exceptions.ConfigNotFound`
:raises: :exc:`tornado_aws.exceptions.ConfigParserError`
:raises: :exc:`tornado_aws.exceptions.NoCredentialsError`
:raises: :exc:`tornado_aws.exceptions.NoProfileError`
:raises: :exc:`tornado_aws.exceptions.CurlNotInstalledError`
"""
ASYNC = True
def __init__(self, service, profile=None, region=None, access_key=None,
secret_key=None, security_token=None, endpoint=None,
max_clients=100, use_curl=False, io_loop=None,
force_instance=True):
self._force_instance = force_instance
self._ioloop = io_loop or ioloop.IOLoop.current()
self._max_clients = max_clients
self._use_curl = use_curl
if use_curl and not curl_httpclient:
raise exceptions.CurlNotInstalledError
super(AsyncAWSClient, self).__init__(
service, profile, region, access_key, secret_key,
security_token, endpoint)
def _get_client_adapter(self):
"""Return an asynchronous HTTP client adapter
:rtype: :py:class:`tornado.httpclient.AsyncHTTPClient`
"""
if self._use_curl:
httpclient.AsyncHTTPClient.configure(
'tornado.curl_httpclient.CurlAsyncHTTPClient')
return httpclient.AsyncHTTPClient(
max_clients=self._max_clients, force_instance=self._force_instance)
[docs] def fetch(self, method, path='/', query_args=None, headers=None, body=None,
recursed=False):
"""Executes a request, returning an
:py:class:`HTTPResponse <tornado.httpclient.HTTPResponse>`.
If an error occurs during the fetch, we raise an
:py:class:`HTTPError <tornado.httpclient.HTTPError>` unless the
``raise_error`` keyword argument is set to ``False``.
:param str method: HTTP request method
:param str path: The request path
:param dict query_args: Request query arguments
:param dict headers: Request headers
:param bytes body: The request body
:param bool recursed: Internal use only
:rtype: :class:`~tornado.httpclient.HTTPResponse`
:raises: :class:`~tornado.httpclient.HTTPError`
:raises: :class:`~tornado_aws.exceptions.AWSError`
:raises: :class:`~tornado_aws.exceptions.NoCredentialsError`
"""
future = concurrent.Future()
def on_response(response):
aws_error, exc = None, response.exception()
if exc:
if isinstance(exc, httpclient.HTTPError):
need_credentials, aws_error = self._process_error(exc)
if need_credentials and not recursed:
self._auth_config.reset()
def on_retry(retry):
if not self._future_exception(retry, future):
future.set_result(retry.result())
request = self.fetch(method, path, query_args,
headers, body, True)
self._ioloop.add_future(request, on_retry)
return
LOGGER.error('Error making request: %s', aws_error or exc)
future.set_exception(
aws_error if aws_error else
exceptions.RequestException(error=exc))
else:
future.set_result(response.result())
def perform_request():
self._ioloop.add_future(
self._client.fetch(
self._create_request(
method, path, query_args, headers, body),
raise_error=True), on_response)
def on_refreshed(response):
if not self._future_exception(response, future):
perform_request()
if self._auth_config.needs_credentials():
request_future = self._auth_config.refresh()
self._ioloop.add_future(request_future, on_refreshed)
else:
perform_request()
return future
@staticmethod
def _future_exception(inner, outer):
exception = inner.exception()
if exception:
outer.set_exception(exception)
return bool(exception)