File: //opt/alt/python37/lib/python3.7/site-packages/pika/adapters/utils/connection_workflow.py
"""Implements `AMQPConnectionWorkflow` - the default workflow of performing
multiple TCP/[SSL]/AMQP connection attempts with timeouts and retries until one
succeeds or all attempts fail.
Defines the interface `AbstractAMQPConnectionWorkflow` that facilitates
implementing custom connection workflows.
"""
import functools
import logging
import socket
import pika.compat
import pika.exceptions
import pika.tcp_socket_opts
from pika import __version__
_LOG = logging.getLogger(__name__)
class AMQPConnectorException(Exception):
"""Base exception for this module"""
class AMQPConnectorStackTimeout(AMQPConnectorException):
"""Overall TCP/[SSL]/AMQP stack connection attempt timed out."""
class AMQPConnectorAborted(AMQPConnectorException):
"""Asynchronous request was aborted"""
class AMQPConnectorWrongState(AMQPConnectorException):
"""AMQPConnector operation requested in wrong state, such as aborting after
completion was reported.
"""
class AMQPConnectorPhaseErrorBase(AMQPConnectorException):
"""Wrapper for exception that occurred during a particular bring-up phase.
"""
def __init__(self, exception, *args):
"""
:param BaseException exception: error that occurred while waiting for a
subclass-specific protocol bring-up phase to complete.
:param args: args for parent class
"""
super(AMQPConnectorPhaseErrorBase, self).__init__(*args)
self.exception = exception
def __repr__(self):
return '{}: {!r}'.format(self.__class__.__name__, self.exception)
class AMQPConnectorSocketConnectError(AMQPConnectorPhaseErrorBase):
"""Error connecting TCP socket to remote peer"""
class AMQPConnectorTransportSetupError(AMQPConnectorPhaseErrorBase):
"""Error setting up transport after TCP connected but before AMQP handshake.
"""
class AMQPConnectorAMQPHandshakeError(AMQPConnectorPhaseErrorBase):
"""Error during AMQP handshake"""
class AMQPConnectionWorkflowAborted(AMQPConnectorException):
"""AMQP Connection workflow was aborted."""
class AMQPConnectionWorkflowWrongState(AMQPConnectorException):
"""AMQP Connection Workflow operation requested in wrong state, such as
aborting after completion was reported.
"""
class AMQPConnectionWorkflowFailed(AMQPConnectorException):
"""Indicates that AMQP connection workflow failed.
"""
def __init__(self, exceptions, *args):
"""
:param sequence exceptions: Exceptions that occurred during the
workflow.
:param args: args to pass to base class
"""
super(AMQPConnectionWorkflowFailed, self).__init__(*args)
self.exceptions = tuple(exceptions)
def __repr__(self):
return ('{}: {} exceptions in all; last exception - {!r}; first '
'exception - {!r}').format(
self.__class__.__name__, len(self.exceptions),
self.exceptions[-1],
self.exceptions[0] if len(self.exceptions) > 1 else None)
class AMQPConnector(object):
"""Performs a single TCP/[SSL]/AMQP connection workflow.
"""
_STATE_INIT = 0 # start() hasn't been called yet
_STATE_TCP = 1 # TCP/IP connection establishment
_STATE_TRANSPORT = 2 # [SSL] and transport linkup
_STATE_AMQP = 3 # AMQP connection handshake
_STATE_TIMEOUT = 4 # overall TCP/[SSL]/AMQP timeout
_STATE_ABORTING = 5 # abort() called - aborting workflow
_STATE_DONE = 6 # result reported to client
def __init__(self, conn_factory, nbio):
"""
:param callable conn_factory: A function that takes
`pika.connection.Parameters` as its only arg and returns a brand new
`pika.connection.Connection`-based adapter instance each time it is
called. The factory must instantiate the connection with
`internal_connection_workflow=False`.
:param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
"""
self._conn_factory = conn_factory
self._nbio = nbio
self._addr_record = None # type: tuple
self._conn_params = None # type: pika.connection.Parameters
self._on_done = None # will be provided via start()
# TCP connection timeout
# pylint: disable=C0301
self._tcp_timeout_ref = None # type: pika.adapters.utils.nbio_interface.AbstractTimerReference
# Overall TCP/[SSL]/AMQP timeout
self._stack_timeout_ref = None # type: pika.adapters.utils.nbio_interface.AbstractTimerReference
# Current task
self._task_ref = None # type: pika.adapters.utils.nbio_interface.AbstractIOReference
self._sock = None # type: socket.socket
self._amqp_conn = None # type: pika.connection.Connection
self._state = self._STATE_INIT
def start(self, addr_record, conn_params, on_done):
"""Asynchronously perform a single TCP/[SSL]/AMQP connection attempt.
:param tuple addr_record: a single resolved address record compatible
with `socket.getaddrinfo()` format.
:param pika.connection.Parameters conn_params:
:param callable on_done: Function to call upon completion of the
workflow: `on_done(pika.connection.Connection | BaseException)`. If
exception, it's going to be one of the following:
`AMQPConnectorSocketConnectError`
`AMQPConnectorTransportSetupError`
`AMQPConnectorAMQPHandshakeError`
`AMQPConnectorAborted`
"""
if self._state != self._STATE_INIT:
raise AMQPConnectorWrongState(
'Already in progress or finished; state={}'.format(self._state))
self._addr_record = addr_record
self._conn_params = conn_params
self._on_done = on_done
# Create socket and initiate TCP/IP connection
self._state = self._STATE_TCP
self._sock = socket.socket(*self._addr_record[:3])
self._sock.setsockopt(pika.compat.SOL_TCP, socket.TCP_NODELAY, 1)
pika.tcp_socket_opts.set_sock_opts(self._conn_params.tcp_options,
self._sock)
self._sock.setblocking(False)
addr = self._addr_record[4]
_LOG.info('Pika version %s connecting to %r', __version__, addr)
self._task_ref = self._nbio.connect_socket(
self._sock, addr, on_done=self._on_tcp_connection_done)
# Start socket connection timeout timer
self._tcp_timeout_ref = None
if self._conn_params.socket_timeout is not None:
self._tcp_timeout_ref = self._nbio.call_later(
self._conn_params.socket_timeout,
self._on_tcp_connection_timeout)
# Start overall TCP/[SSL]/AMQP stack connection timeout timer
self._stack_timeout_ref = None
if self._conn_params.stack_timeout is not None:
self._stack_timeout_ref = self._nbio.call_later(
self._conn_params.stack_timeout, self._on_overall_timeout)
def abort(self):
"""Abort the workflow asynchronously. The completion callback will be
called with an instance of AMQPConnectorAborted.
NOTE: we can't cancel/close synchronously because aborting pika
Connection and its transport requires an asynchronous operation.
:raises AMQPConnectorWrongState: If called after completion has been
reported or the workflow not started yet.
"""
if self._state == self._STATE_INIT:
raise AMQPConnectorWrongState('Cannot abort before starting.')
if self._state == self._STATE_DONE:
raise AMQPConnectorWrongState('Cannot abort after completion was reported')
self._state = self._STATE_ABORTING
self._deactivate()
_LOG.info(
'AMQPConnector: beginning client-initiated asynchronous '
'abort; %r/%s', self._conn_params.host, self._addr_record)
if self._amqp_conn is None:
_LOG.debug('AMQPConnector.abort(): no connection, so just '
'scheduling completion report via I/O loop.')
self._nbio.add_callback_threadsafe(
functools.partial(self._report_completion_and_cleanup,
AMQPConnectorAborted()))
else:
if not self._amqp_conn.is_closing:
# Initiate close of AMQP connection and wait for asynchronous
# callback from the Connection instance before reporting
# completion to client
_LOG.debug('AMQPConnector.abort(): closing Connection.')
self._amqp_conn.close(
320, 'Client-initiated abort of AMQP Connection Workflow.')
else:
# It's already closing, must be due to our timeout processing,
# so we'll just piggy back on the callback it registered
_LOG.debug('AMQPConnector.abort(): closing of Connection was '
'already initiated.')
assert self._state == self._STATE_TIMEOUT, \
('Connection is closing, but not in TIMEOUT state; state={}'
.format(self._state))
def _close(self):
"""Cancel asynchronous tasks and clean up to assist garbage collection.
Transition to STATE_DONE.
"""
self._deactivate()
if self._sock is not None:
self._sock.close()
self._sock = None
self._conn_factory = None
self._nbio = None
self._addr_record = None
self._on_done = None
self._state = self._STATE_DONE
def _deactivate(self):
"""Cancel asynchronous tasks.
"""
# NOTE: self._amqp_conn requires special handling as it doesn't support
# synchronous closing. We special-case it elsewhere in the code where
# needed.
assert self._amqp_conn is None, \
'_deactivate called with self._amqp_conn not None; state={}'.format(
self._state)
if self._tcp_timeout_ref is not None:
self._tcp_timeout_ref.cancel()
self._tcp_timeout_ref = None
if self._stack_timeout_ref is not None:
self._stack_timeout_ref.cancel()
self._stack_timeout_ref = None
if self._task_ref is not None:
self._task_ref.cancel()
self._task_ref = None
def _report_completion_and_cleanup(self, result):
"""Clean up and invoke client's `on_done` callback.
:param pika.connection.Connection | BaseException result: value to pass
to user's `on_done` callback.
"""
if isinstance(result, BaseException):
_LOG.error('AMQPConnector - reporting failure: %r', result)
else:
_LOG.info('AMQPConnector - reporting success: %r', result)
on_done = self._on_done
self._close()
on_done(result)
def _on_tcp_connection_timeout(self):
"""Handle TCP connection timeout
Reports AMQPConnectorSocketConnectError with socket.timeout inside.
"""
self._tcp_timeout_ref = None
error = AMQPConnectorSocketConnectError(
socket.timeout('TCP connection attempt timed out: {!r}/{}'.format(
self._conn_params.host, self._addr_record)))
self._report_completion_and_cleanup(error)
def _on_overall_timeout(self):
"""Handle overall TCP/[SSL]/AMQP connection attempt timeout by reporting
`Timeout` error to the client.
Reports AMQPConnectorSocketConnectError if timeout occurred during
socket TCP connection attempt.
Reports AMQPConnectorTransportSetupError if timeout occurred during
tramsport [SSL] setup attempt.
Reports AMQPConnectorAMQPHandshakeError if timeout occurred during
AMQP handshake.
"""
self._stack_timeout_ref = None
prev_state = self._state
self._state = self._STATE_TIMEOUT
if prev_state == self._STATE_AMQP:
msg = ('Timeout while setting up AMQP to {!r}/{}; ssl={}'.format(
self._conn_params.host, self._addr_record,
bool(self._conn_params.ssl_options)))
_LOG.error(msg)
# Initiate close of AMQP connection and wait for asynchronous
# callback from the Connection instance before reporting completion
# to client
assert not self._amqp_conn.is_open, \
'Unexpected open state of {!r}'.format(self._amqp_conn)
if not self._amqp_conn.is_closing:
self._amqp_conn.close(320, msg)
return
if prev_state == self._STATE_TCP:
error = AMQPConnectorSocketConnectError(
AMQPConnectorStackTimeout(
'Timeout while connecting socket to {!r}/{}'.format(
self._conn_params.host, self._addr_record)))
else:
assert prev_state == self._STATE_TRANSPORT
error = AMQPConnectorTransportSetupError(
AMQPConnectorStackTimeout(
'Timeout while setting up transport to {!r}/{}; ssl={}'.
format(self._conn_params.host, self._addr_record,
bool(self._conn_params.ssl_options))))
self._report_completion_and_cleanup(error)
def _on_tcp_connection_done(self, exc):
"""Handle completion of asynchronous socket connection attempt.
Reports AMQPConnectorSocketConnectError if TCP socket connection
failed.
:param None|BaseException exc: None on success; exception object on
failure
"""
self._task_ref = None
if self._tcp_timeout_ref is not None:
self._tcp_timeout_ref.cancel()
self._tcp_timeout_ref = None
if exc is not None:
_LOG.error('TCP Connection attempt failed: %r; dest=%r', exc,
self._addr_record)
self._report_completion_and_cleanup(
AMQPConnectorSocketConnectError(exc))
return
# We succeeded in making a TCP/IP connection to the server
_LOG.debug('TCP connection to broker established: %r.', self._sock)
# Now set up the transport
self._state = self._STATE_TRANSPORT
ssl_context = server_hostname = None
if self._conn_params.ssl_options is not None:
ssl_context = self._conn_params.ssl_options.context
server_hostname = self._conn_params.ssl_options.server_hostname
if server_hostname is None:
server_hostname = self._conn_params.host
self._task_ref = self._nbio.create_streaming_connection(
protocol_factory=functools.partial(self._conn_factory,
self._conn_params),
sock=self._sock,
ssl_context=ssl_context,
server_hostname=server_hostname,
on_done=self._on_transport_establishment_done)
self._sock = None # create_streaming_connection() takes ownership
def _on_transport_establishment_done(self, result):
"""Handle asynchronous completion of
`AbstractIOServices.create_streaming_connection()`
Reports AMQPConnectorTransportSetupError if transport ([SSL]) setup
failed.
:param sequence|BaseException result: On success, a two-tuple
(transport, protocol); on failure, exception instance.
"""
self._task_ref = None
if isinstance(result, BaseException):
_LOG.error(
'Attempt to create the streaming transport failed: %r; '
'%r/%s; ssl=%s', result, self._conn_params.host,
self._addr_record, bool(self._conn_params.ssl_options))
self._report_completion_and_cleanup(
AMQPConnectorTransportSetupError(result))
return
# We succeeded in setting up the streaming transport!
# result is a two-tuple (transport, protocol)
_LOG.info('Streaming transport linked up: %r.', result)
_transport, self._amqp_conn = result
# AMQP handshake is in progress - initiated during transport link-up
self._state = self._STATE_AMQP
# We explicitly remove default handler because it raises an exception.
self._amqp_conn.add_on_open_error_callback(
self._on_amqp_handshake_done, remove_default=True)
self._amqp_conn.add_on_open_callback(self._on_amqp_handshake_done)
def _on_amqp_handshake_done(self, connection, error=None):
"""Handle completion of AMQP connection handshake attempt.
NOTE: we handle two types of callbacks - success with just connection
arg as well as the open-error callback with connection and error
Reports AMQPConnectorAMQPHandshakeError if AMQP handshake failed.
:param pika.connection.Connection connection:
:param BaseException | None error: None on success, otherwise
failure
"""
_LOG.debug(
'AMQPConnector: AMQP handshake attempt completed; state=%s; '
'error=%r; %r/%s', self._state, error, self._conn_params.host,
self._addr_record)
# Don't need it any more; and _deactivate() checks that it's None
self._amqp_conn = None
if self._state == self._STATE_ABORTING:
# Client-initiated abort takes precedence over timeout
result = AMQPConnectorAborted()
elif self._state == self._STATE_TIMEOUT:
result = AMQPConnectorAMQPHandshakeError(
AMQPConnectorStackTimeout(
'Timeout during AMQP handshake{!r}/{}; ssl={}'.format(
self._conn_params.host, self._addr_record,
bool(self._conn_params.ssl_options))))
elif self._state == self._STATE_AMQP:
if error is None:
_LOG.debug(
'AMQPConnector: AMQP connection established for %r/%s: %r',
self._conn_params.host, self._addr_record, connection)
result = connection
else:
_LOG.debug(
'AMQPConnector: AMQP connection handshake failed for '
'%r/%s: %r', self._conn_params.host, self._addr_record,
error)
result = AMQPConnectorAMQPHandshakeError(error)
else:
# We timed out or aborted and initiated closing of the connection,
# but this callback snuck in
_LOG.debug(
'AMQPConnector: Ignoring AMQP handshake completion '
'notification due to wrong state=%s; error=%r; conn=%r',
self._state, error, connection)
return
self._report_completion_and_cleanup(result)
class AbstractAMQPConnectionWorkflow(pika.compat.AbstractBase):
"""Interface for implementing a custom TCP/[SSL]/AMQP connection workflow.
"""
def start(self, connection_configs, connector_factory, native_loop,
on_done):
"""Asynchronously perform the workflow until success or all retries
are exhausted. Called by the adapter.
:param sequence connection_configs: A sequence of one or more
`pika.connection.Parameters`-based objects. Will attempt to connect
using each config in the given order.
:param callable connector_factory: call it without args to obtain a new
instance of `AMQPConnector` for each connection attempt.
See `AMQPConnector` for details.
:param native_loop: Native I/O loop passed by app to the adapter or
obtained by the adapter by default.
:param callable on_done: Function to call upon completion of the
workflow:
`on_done(pika.connection.Connection |
AMQPConnectionWorkflowFailed |
AMQPConnectionWorkflowAborted)`.
`Connection`-based adapter on success,
`AMQPConnectionWorkflowFailed` on failure,
`AMQPConnectionWorkflowAborted` if workflow was aborted.
:raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
as after starting the workflow.
"""
raise NotImplementedError
def abort(self):
"""Abort the workflow asynchronously. The completion callback will be
called with an instance of AMQPConnectionWorkflowAborted.
NOTE: we can't cancel/close synchronously because aborting pika
Connection and its transport requires an asynchronous operation.
:raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
as before starting or after completion has been reported.
"""
raise NotImplementedError
class AMQPConnectionWorkflow(AbstractAMQPConnectionWorkflow):
"""Implements Pika's default workflow for performing multiple TCP/[SSL]/AMQP
connection attempts with timeouts and retries until one succeeds or all
attempts fail.
The workflow:
while not success and retries remain:
1. For each given config (pika.connection.Parameters object):
A. Perform DNS resolution of the config's host.
B. Attempt to establish TCP/[SSL]/AMQP for each resolved address
until one succeeds, in which case we're done.
2. If all configs failed but retries remain, resume from beginning
after the given retry pause. NOTE: failure of DNS resolution
is equivalent to one cycle and will be retried after the pause
if retries remain.
"""
_SOCK_TYPE = socket.SOCK_STREAM
_IPPROTO = socket.IPPROTO_TCP
_STATE_INIT = 0
_STATE_ACTIVE = 1
_STATE_ABORTING = 2
_STATE_DONE = 3
def __init__(self, _until_first_amqp_attempt=False):
"""
:param int | float retry_pause: Non-negative number of seconds to wait
before retrying the config sequence. Meaningful only if retries is
greater than 0. Defaults to 2 seconds.
:param bool _until_first_amqp_attempt: INTERNAL USE ONLY; ends workflow
after first AMQP handshake attempt, regardless of outcome (success
or failure). The automatic connection logic in
`pika.connection.Connection` enables this because it's not
designed/tested to reset all state properly to handle more than one
AMQP handshake attempt.
TODO: Do we need getaddrinfo timeout?
TODO: Would it be useful to implement exponential back-off?
"""
self._attempts_remaining = None # supplied by start()
self._retry_pause = None # supplied by start()
self._until_first_amqp_attempt = _until_first_amqp_attempt
# Provided by set_io_services()
# pylint: disable=C0301
self._nbio = None # type: pika.adapters.utils.nbio_interface.AbstractIOServices
# Current index within `_connection_configs`; initialized when
# starting a new connection sequence.
self._current_config_index = None
self._connection_configs = None # supplied by start()
self._connector_factory = None # supplied by start()
self._on_done = None # supplied by start()
self._connector = None # type: AMQPConnector
self._task_ref = None # current cancelable asynchronous task or timer
self._addrinfo_iter = None
# Exceptions from all failed connection attempts in this workflow
self._connection_errors = []
self._state = self._STATE_INIT
def set_io_services(self, nbio):
"""Called by the conneciton adapter only on pika's
`AMQPConnectionWorkflow` instance to provide it the adapter-specific
`AbstractIOServices` object before calling the `start()` method.
NOTE: Custom workflow implementations should use the native I/O loop
directly because `AbstractIOServices` is private to Pika
implementation and its interface may change without notice.
:param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
"""
self._nbio = nbio
def start(
self,
connection_configs,
connector_factory,
native_loop, # pylint: disable=W0613
on_done):
"""Override `AbstractAMQPConnectionWorkflow.start()`.
NOTE: This implementation uses `connection_attempts` and `retry_delay`
values from the last element of the given `connection_configs` sequence
as the overall number of connection attempts of the entire
`connection_configs` sequence and pause between each sequence.
"""
if self._state != self._STATE_INIT:
raise AMQPConnectorWrongState(
'Already in progress or finished; state={}'.format(self._state))
try:
iter(connection_configs)
except Exception as error:
raise TypeError(
'connection_configs does not support iteration: {!r}'.format(
error))
if not connection_configs:
raise ValueError(
'connection_configs is empty: {!r}.'.format(connection_configs))
self._connection_configs = connection_configs
self._connector_factory = connector_factory
self._on_done = on_done
self._attempts_remaining = connection_configs[-1].connection_attempts
self._retry_pause = connection_configs[-1].retry_delay
self._state = self._STATE_ACTIVE
_LOG.debug('Starting AMQP Connection workflow asynchronously.')
# Begin from our own I/O loop context to avoid calling back into client
# from client's call here
self._task_ref = self._nbio.call_later(
0, functools.partial(self._start_new_cycle_async, first=True))
def abort(self):
"""Override `AbstractAMQPConnectionWorkflow.abort()`.
"""
if self._state == self._STATE_INIT:
raise AMQPConnectorWrongState('Cannot abort before starting.')
elif self._state == self._STATE_DONE:
raise AMQPConnectorWrongState(
'Cannot abort after completion was reported')
self._state = self._STATE_ABORTING
self._deactivate()
_LOG.info('AMQPConnectionWorkflow: beginning client-initiated '
'asynchronous abort.')
if self._connector is None:
_LOG.debug('AMQPConnectionWorkflow.abort(): no connector, so just '
'scheduling completion report via I/O loop.')
self._nbio.add_callback_threadsafe(
functools.partial(self._report_completion_and_cleanup,
AMQPConnectionWorkflowAborted()))
else:
_LOG.debug('AMQPConnectionWorkflow.abort(): requesting '
'connector.abort().')
self._connector.abort()
def _close(self):
"""Cancel asynchronous tasks and clean up to assist garbage collection.
Transition to _STATE_DONE.
"""
self._deactivate()
self._connection_configs = None
self._nbio = None
self._connector_factory = None
self._on_done = None
self._connector = None
self._addrinfo_iter = None
self._connection_errors = None
self._state = self._STATE_DONE
def _deactivate(self):
"""Cancel asynchronous tasks.
"""
if self._task_ref is not None:
self._task_ref.cancel()
self._task_ref = None
def _report_completion_and_cleanup(self, result):
"""Clean up and invoke client's `on_done` callback.
:param pika.connection.Connection | AMQPConnectionWorkflowFailed result:
value to pass to user's `on_done` callback.
"""
if isinstance(result, BaseException):
_LOG.error('AMQPConnectionWorkflow - reporting failure: %r', result)
else:
_LOG.info('AMQPConnectionWorkflow - reporting success: %r', result)
on_done = self._on_done
self._close()
on_done(result)
def _start_new_cycle_async(self, first):
"""Start a new workflow cycle (if any more attempts are left) beginning
with the first Parameters object in self._connection_configs. If out of
attempts, report `AMQPConnectionWorkflowFailed`.
:param bool first: if True, don't delay; otherwise delay next attempt by
`self._retry_pause` seconds.
"""
self._task_ref = None
assert self._attempts_remaining >= 0, self._attempts_remaining
if self._attempts_remaining <= 0:
error = AMQPConnectionWorkflowFailed(self._connection_errors)
_LOG.error('AMQP connection workflow failed: %r.', error)
self._report_completion_and_cleanup(error)
return
self._attempts_remaining -= 1
_LOG.debug(
'Beginning a new AMQP connection workflow cycle; attempts '
'remaining after this: %s', self._attempts_remaining)
self._current_config_index = None
self._task_ref = self._nbio.call_later(
0 if first else self._retry_pause, self._try_next_config_async)
def _try_next_config_async(self):
"""Attempt to connect using the next Parameters config. If there are no
more configs, start a new cycle.
"""
self._task_ref = None
if self._current_config_index is None:
self._current_config_index = 0
else:
self._current_config_index += 1
if self._current_config_index >= len(self._connection_configs):
_LOG.debug('_try_next_config_async: starting a new cycle.')
self._start_new_cycle_async(first=False)
return
params = self._connection_configs[self._current_config_index]
_LOG.debug('_try_next_config_async: %r:%s', params.host, params.port)
# Begin with host address resolution
assert self._task_ref is None
self._task_ref = self._nbio.getaddrinfo(
host=params.host,
port=params.port,
socktype=self._SOCK_TYPE,
proto=self._IPPROTO,
on_done=self._on_getaddrinfo_async_done)
def _on_getaddrinfo_async_done(self, addrinfos_or_exc):
"""Handles completion callback from asynchronous `getaddrinfo()`.
:param list | BaseException addrinfos_or_exc: resolved address records
returned by `getaddrinfo()` or an exception object from failure.
"""
self._task_ref = None
if isinstance(addrinfos_or_exc, BaseException):
_LOG.error('getaddrinfo failed: %r.', addrinfos_or_exc)
self._connection_errors.append(addrinfos_or_exc)
self._start_new_cycle_async(first=False)
return
_LOG.debug('getaddrinfo returned %s records', len(addrinfos_or_exc))
self._addrinfo_iter = iter(addrinfos_or_exc)
self._try_next_resolved_address()
def _try_next_resolved_address(self):
"""Try connecting using next resolved address. If there aren't any left,
continue with next Parameters config.
"""
try:
addr_record = next(self._addrinfo_iter)
except StopIteration:
_LOG.debug(
'_try_next_resolved_address: continuing with next config.')
self._try_next_config_async()
return
_LOG.debug('Attempting to connect using address record %r', addr_record)
self._connector = self._connector_factory() # type: AMQPConnector
self._connector.start(
addr_record=addr_record,
conn_params=self._connection_configs[self._current_config_index],
on_done=self._on_connector_done)
def _on_connector_done(self, conn_or_exc):
"""Handle completion of connection attempt by `AMQPConnector`.
:param pika.connection.Connection | BaseException conn_or_exc: See
`AMQPConnector.start()` for exception details.
"""
self._connector = None
_LOG.debug('Connection attempt completed with %r', conn_or_exc)
if isinstance(conn_or_exc, BaseException):
self._connection_errors.append(conn_or_exc)
if isinstance(conn_or_exc, AMQPConnectorAborted):
assert self._state == self._STATE_ABORTING, \
'Expected _STATE_ABORTING, but got {!r}'.format(self._state)
self._report_completion_and_cleanup(
AMQPConnectionWorkflowAborted())
elif (self._until_first_amqp_attempt and
isinstance(conn_or_exc, AMQPConnectorAMQPHandshakeError)):
_LOG.debug('Ending AMQP connection workflow after first failed '
'AMQP handshake due to _until_first_amqp_attempt.')
if isinstance(conn_or_exc.exception,
pika.exceptions.ConnectionOpenAborted):
error = AMQPConnectionWorkflowAborted
else:
error = AMQPConnectionWorkflowFailed(
self._connection_errors)
self._report_completion_and_cleanup(error)
else:
self._try_next_resolved_address()
else:
# Success!
self._report_completion_and_cleanup(conn_or_exc)