File: //opt/alt/python37/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py
"""Utilities for implementing `nbio_interface.AbstractIOServices` for
pika connection adapters.
"""
import collections
import errno
import functools
import logging
import numbers
import os
import socket
import ssl
import sys
import traceback
from pika.adapters.utils.nbio_interface import (AbstractIOReference,
AbstractStreamTransport)
import pika.compat
import pika.diagnostic_utils
# "Try again" error codes for non-blocking socket I/O - send()/recv().
# NOTE: POSIX.1 allows either error to be returned for this case and doesn't require
# them to have the same value.
_TRY_IO_AGAIN_SOCK_ERROR_CODES = (
errno.EAGAIN,
errno.EWOULDBLOCK,
)
# "Connection establishment pending" error codes for non-blocking socket
# connect() call.
# NOTE: EINPROGRESS for Posix and EWOULDBLOCK for Windows
_CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES = (
errno.EINPROGRESS,
errno.EWOULDBLOCK,
)
_LOGGER = logging.getLogger(__name__)
# Decorator that logs exceptions escaping from the decorated function
_log_exceptions = pika.diagnostic_utils.create_log_exception_decorator(_LOGGER) # pylint: disable=C0103
def check_callback_arg(callback, name):
"""Raise TypeError if callback is not callable
:param callback: callback to check
:param name: Name to include in exception text
:raises TypeError:
"""
if not callable(callback):
raise TypeError('{} must be callable, but got {!r}'.format(
name, callback))
def check_fd_arg(fd):
"""Raise TypeError if file descriptor is not an integer
:param fd: file descriptor
:raises TypeError:
"""
if not isinstance(fd, numbers.Integral):
raise TypeError(
'Paramter must be a file descriptor, but got {!r}'.format(fd))
def _retry_on_sigint(func):
"""Function decorator for retrying on SIGINT.
"""
@functools.wraps(func)
def retry_sigint_wrap(*args, **kwargs):
"""Wrapper for decorated function"""
while True:
try:
return func(*args, **kwargs)
except pika.compat.SOCKET_ERROR as error:
if error.errno == errno.EINTR:
continue
else:
raise
return retry_sigint_wrap
class SocketConnectionMixin(object):
"""Implements
`pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
on top of
`pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.
"""
def connect_socket(self, sock, resolved_addr, on_done):
"""Implement
:py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.
"""
return _AsyncSocketConnector(
nbio=self, sock=sock, resolved_addr=resolved_addr,
on_done=on_done).start()
class StreamingConnectionMixin(object):
"""Implements
`.nbio_interface.AbstractIOServices.create_streaming_connection()` on
top of `.nbio_interface.AbstractFileDescriptorServices` and basic
`nbio_interface.AbstractIOServices` services.
"""
def create_streaming_connection(self,
protocol_factory,
sock,
on_done,
ssl_context=None,
server_hostname=None):
"""Implement
:py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.
"""
try:
return _AsyncStreamConnector(
nbio=self,
protocol_factory=protocol_factory,
sock=sock,
ssl_context=ssl_context,
server_hostname=server_hostname,
on_done=on_done).start()
except Exception as error:
_LOGGER.error('create_streaming_connection(%s) failed: %r', sock,
error)
# Close the socket since this function takes ownership
try:
sock.close()
except Exception as error: # pylint: disable=W0703
# We log and suppress the exception from sock.close() so that
# the original error from _AsyncStreamConnector constructor will
# percolate
_LOGGER.error('%s.close() failed: %r', sock, error)
raise
class _AsyncServiceAsyncHandle(AbstractIOReference):
"""This module's adaptation of `.nbio_interface.AbstractIOReference`
"""
def __init__(self, subject):
"""
:param subject: subject of the reference containing a `cancel()` method
"""
self._cancel = subject.cancel
def cancel(self):
"""Cancel pending operation
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
return self._cancel()
class _AsyncSocketConnector(object):
"""Connects the given non-blocking socket asynchronously using
`.nbio_interface.AbstractFileDescriptorServices` and basic
`.nbio_interface.AbstractIOServices`. Used for implementing
`.nbio_interface.AbstractIOServices.connect_socket()`.
"""
_STATE_NOT_STARTED = 0 # start() not called yet
_STATE_ACTIVE = 1 # workflow started
_STATE_CANCELED = 2 # workflow aborted by user's cancel() call
_STATE_COMPLETED = 3 # workflow completed: succeeded or failed
def __init__(self, nbio, sock, resolved_addr, on_done):
"""
:param AbstractIOServices | AbstractFileDescriptorServices nbio:
:param socket.socket sock: non-blocking socket that needs to be
connected via `socket.socket.connect()`
:param tuple resolved_addr: resolved destination address/port two-tuple
which is compatible with the given's socket's address family
:param callable on_done: user callback that takes None upon successful
completion or exception upon error (check for `BaseException`) as
its only arg. It will not be called if the operation was cancelled.
:raises ValueError: if host portion of `resolved_addr` is not an IP
address or is inconsistent with the socket's address family as
validated via `socket.inet_pton()`
"""
check_callback_arg(on_done, 'on_done')
try:
socket.inet_pton(sock.family, resolved_addr[0])
except Exception as error: # pylint: disable=W0703
if not hasattr(socket, 'inet_pton'):
_LOGGER.debug(
'Unable to check resolved address: no socket.inet_pton().')
else:
msg = ('Invalid or unresolved IP address '
'{!r} for socket {}: {!r}').format(
resolved_addr, sock, error)
_LOGGER.error(msg)
raise ValueError(msg)
self._nbio = nbio
self._sock = sock
self._addr = resolved_addr
self._on_done = on_done
self._state = self._STATE_NOT_STARTED
self._watching_socket_events = False
@_log_exceptions
def _cleanup(self):
"""Remove socket watcher, if any
"""
if self._watching_socket_events:
self._watching_socket_events = False
self._nbio.remove_writer(self._sock.fileno())
def start(self):
"""Start asynchronous connection establishment.
:rtype: AbstractIOReference
"""
assert self._state == self._STATE_NOT_STARTED, (
'_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED',
self._state)
self._state = self._STATE_ACTIVE
# Continue the rest of the operation on the I/O loop to avoid calling
# user's completion callback from the scope of user's call
self._nbio.add_callback_threadsafe(self._start_async)
return _AsyncServiceAsyncHandle(self)
def cancel(self):
"""Cancel pending connection request without calling user's completion
callback.
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
if self._state == self._STATE_ACTIVE:
self._state = self._STATE_CANCELED
_LOGGER.debug('User canceled connection request for %s to %s',
self._sock, self._addr)
self._cleanup()
return True
_LOGGER.debug(
'_AsyncSocketConnector cancel requested when not ACTIVE: '
'state=%s; %s', self._state, self._sock)
return False
@_log_exceptions
def _report_completion(self, result):
"""Advance to COMPLETED state, remove socket watcher, and invoke user's
completion callback.
:param BaseException | None result: value to pass in user's callback
"""
_LOGGER.debug('_AsyncSocketConnector._report_completion(%r); %s',
result, self._sock)
assert isinstance(result, (BaseException, type(None))), (
'_AsyncSocketConnector._report_completion() expected exception or '
'None as result.', result)
assert self._state == self._STATE_ACTIVE, (
'_AsyncSocketConnector._report_completion() expected '
'_STATE_NOT_STARTED', self._state)
self._state = self._STATE_COMPLETED
self._cleanup()
self._on_done(result)
@_log_exceptions
def _start_async(self):
"""Called as callback from I/O loop to kick-start the workflow, so it's
safe to call user's completion callback from here, if needed
"""
if self._state != self._STATE_ACTIVE:
# Must have been canceled by user before we were called
_LOGGER.debug(
'Abandoning sock=%s connection establishment to %s '
'due to inactive state=%s', self._sock, self._addr, self._state)
return
try:
self._sock.connect(self._addr)
except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
if (isinstance(error, pika.compat.SOCKET_ERROR) and
error.errno in _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES):
# Connection establishment is pending
pass
else:
_LOGGER.error('%s.connect(%s) failed: %r', self._sock,
self._addr, error)
self._report_completion(error)
return
# Get notified when the socket becomes writable
try:
self._nbio.set_writer(self._sock.fileno(), self._on_writable)
except Exception as error: # pylint: disable=W0703
_LOGGER.exception('async.set_writer(%s) failed: %r', self._sock,
error)
self._report_completion(error)
return
else:
self._watching_socket_events = True
_LOGGER.debug('Connection-establishment is in progress for %s.',
self._sock)
@_log_exceptions
def _on_writable(self):
"""Called when socket connects or fails to. Check for predicament and
invoke user's completion callback.
"""
if self._state != self._STATE_ACTIVE:
# This should never happen since we remove the watcher upon
# `cancel()`
_LOGGER.error(
'Socket connection-establishment event watcher '
'called in inactive state (ignoring): %s; state=%s', self._sock,
self._state)
return
# The moment of truth...
error_code = self._sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if not error_code:
_LOGGER.info('Socket connected: %s', self._sock)
result = None
else:
error_msg = os.strerror(error_code)
_LOGGER.error('Socket failed to connect: %s; error=%s (%s)',
self._sock, error_code, error_msg)
result = pika.compat.SOCKET_ERROR(error_code, error_msg)
self._report_completion(result)
class _AsyncStreamConnector(object):
"""Performs asynchronous SSL session establishment, if requested, on the
already-connected socket and links the streaming transport to protocol.
Used for implementing
`.nbio_interface.AbstractIOServices.create_streaming_connection()`.
"""
_STATE_NOT_STARTED = 0 # start() not called yet
_STATE_ACTIVE = 1 # start() called and kicked off the workflow
_STATE_CANCELED = 2 # workflow terminated by cancel() request
_STATE_COMPLETED = 3 # workflow terminated by success or failure
def __init__(self, nbio, protocol_factory, sock, ssl_context,
server_hostname, on_done):
"""
NOTE: We take ownership of the given socket upon successful completion
of the constructor.
See `AbstractIOServices.create_streaming_connection()` for detailed
documentation of the corresponding args.
:param AbstractIOServices | AbstractFileDescriptorServices nbio:
:param callable protocol_factory:
:param socket.socket sock:
:param ssl.SSLContext | None ssl_context:
:param str | None server_hostname:
:param callable on_done:
"""
check_callback_arg(protocol_factory, 'protocol_factory')
check_callback_arg(on_done, 'on_done')
if not isinstance(ssl_context, (type(None), ssl.SSLContext)):
raise ValueError('Expected ssl_context=None | ssl.SSLContext, but '
'got {!r}'.format(ssl_context))
if server_hostname is not None and ssl_context is None:
raise ValueError('Non-None server_hostname must not be passed '
'without ssl context')
# Check that the socket connection establishment had completed in order
# to avoid stalling while waiting for the socket to become readable
# and/or writable.
try:
sock.getpeername()
except Exception as error:
raise ValueError(
'Expected connected socket, but getpeername() failed: '
'error={!r}; {}; '.format(error, sock))
self._nbio = nbio
self._protocol_factory = protocol_factory
self._sock = sock
self._ssl_context = ssl_context
self._server_hostname = server_hostname
self._on_done = on_done
self._state = self._STATE_NOT_STARTED
self._watching_socket = False
@_log_exceptions
def _cleanup(self, close):
"""Cancel pending async operations, if any
:param bool close: close the socket if true
"""
_LOGGER.debug('_AsyncStreamConnector._cleanup(%r)', close)
if self._watching_socket:
_LOGGER.debug(
'_AsyncStreamConnector._cleanup(%r): removing RdWr; %s', close,
self._sock)
self._watching_socket = False
self._nbio.remove_reader(self._sock.fileno())
self._nbio.remove_writer(self._sock.fileno())
try:
if close:
_LOGGER.debug(
'_AsyncStreamConnector._cleanup(%r): closing socket; %s',
close, self._sock)
try:
self._sock.close()
except Exception as error: # pylint: disable=W0703
_LOGGER.exception('_sock.close() failed: error=%r; %s',
error, self._sock)
raise
finally:
self._sock = None
self._nbio = None
self._protocol_factory = None
self._ssl_context = None
self._server_hostname = None
self._on_done = None
def start(self):
"""Kick off the workflow
:rtype: AbstractIOReference
"""
_LOGGER.debug('_AsyncStreamConnector.start(); %s', self._sock)
assert self._state == self._STATE_NOT_STARTED, (
'_AsyncStreamConnector.start() expected '
'_STATE_NOT_STARTED', self._state)
self._state = self._STATE_ACTIVE
# Request callback from I/O loop to start processing so that we don't
# end up making callbacks from the caller's scope
self._nbio.add_callback_threadsafe(self._start_async)
return _AsyncServiceAsyncHandle(self)
def cancel(self):
"""Cancel pending connection request without calling user's completion
callback.
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
if self._state == self._STATE_ACTIVE:
self._state = self._STATE_CANCELED
_LOGGER.debug('User canceled streaming linkup for %s', self._sock)
# Close the socket, since we took ownership
self._cleanup(close=True)
return True
_LOGGER.debug(
'_AsyncStreamConnector cancel requested when not ACTIVE: '
'state=%s; %s', self._state, self._sock)
return False
@_log_exceptions
def _report_completion(self, result):
"""Advance to COMPLETED state, cancel async operation(s), and invoke
user's completion callback.
:param BaseException | tuple result: value to pass in user's callback.
`tuple(transport, protocol)` on success, exception on error
"""
_LOGGER.debug('_AsyncStreamConnector._report_completion(%r); %s',
result, self._sock)
assert isinstance(result, (BaseException, tuple)), (
'_AsyncStreamConnector._report_completion() expected exception or '
'tuple as result.', result, self._state)
assert self._state == self._STATE_ACTIVE, (
'_AsyncStreamConnector._report_completion() expected '
'_STATE_ACTIVE', self._state)
self._state = self._STATE_COMPLETED
# Notify user
try:
self._on_done(result)
except Exception:
_LOGGER.exception('%r: _on_done(%r) failed.',
self._report_completion, result)
raise
finally:
# NOTE: Close the socket on error, since we took ownership of it
self._cleanup(close=isinstance(result, BaseException))
@_log_exceptions
def _start_async(self):
"""Called as callback from I/O loop to kick-start the workflow, so it's
safe to call user's completion callback from here if needed
"""
_LOGGER.debug('_AsyncStreamConnector._start_async(); %s', self._sock)
if self._state != self._STATE_ACTIVE:
# Must have been canceled by user before we were called
_LOGGER.debug(
'Abandoning streaming linkup due to inactive state '
'transition; state=%s; %s; .', self._state, self._sock)
return
# Link up protocol and transport if this is a plaintext linkup;
# otherwise kick-off SSL workflow first
if self._ssl_context is None:
self._linkup()
else:
_LOGGER.debug('Starting SSL handshake on %s', self._sock)
# Wrap our plain socket in ssl socket
try:
self._sock = self._ssl_context.wrap_socket(
self._sock,
server_side=False,
do_handshake_on_connect=False,
suppress_ragged_eofs=False, # False = error on incoming EOF
server_hostname=self._server_hostname)
except Exception as error: # pylint: disable=W0703
_LOGGER.exception('SSL wrap_socket(%s) failed: %r', self._sock,
error)
self._report_completion(error)
return
self._do_ssl_handshake()
@_log_exceptions
def _linkup(self):
"""Connection is ready: instantiate and link up transport and protocol,
and invoke user's completion callback.
"""
_LOGGER.debug('_AsyncStreamConnector._linkup()')
transport = None
try:
# Create the protocol
try:
protocol = self._protocol_factory()
except Exception as error:
_LOGGER.exception('protocol_factory() failed: error=%r; %s',
error, self._sock)
raise
if self._ssl_context is None:
# Create plaintext streaming transport
try:
transport = _AsyncPlaintextTransport(
self._sock, protocol, self._nbio)
except Exception as error:
_LOGGER.exception('PlainTransport() failed: error=%r; %s',
error, self._sock)
raise
else:
# Create SSL streaming transport
try:
transport = _AsyncSSLTransport(self._sock, protocol,
self._nbio)
except Exception as error:
_LOGGER.exception('SSLTransport() failed: error=%r; %s',
error, self._sock)
raise
_LOGGER.debug('_linkup(): created transport %r', transport)
# Acquaint protocol with its transport
try:
protocol.connection_made(transport)
except Exception as error:
_LOGGER.exception(
'protocol.connection_made(%r) failed: error=%r; %s',
transport, error, self._sock)
raise
_LOGGER.debug('_linkup(): introduced transport to protocol %r; %r',
transport, protocol)
except Exception as error: # pylint: disable=W0703
result = error
else:
result = (transport, protocol)
self._report_completion(result)
@_log_exceptions
def _do_ssl_handshake(self):
"""Perform asynchronous SSL handshake on the already wrapped socket
"""
_LOGGER.debug('_AsyncStreamConnector._do_ssl_handshake()')
if self._state != self._STATE_ACTIVE:
_LOGGER.debug(
'_do_ssl_handshake: Abandoning streaming linkup due '
'to inactive state transition; state=%s; %s; .', self._state,
self._sock)
return
done = False
try:
try:
self._sock.do_handshake()
except ssl.SSLError as error:
if error.errno == ssl.SSL_ERROR_WANT_READ:
_LOGGER.debug('SSL handshake wants read; %s.', self._sock)
self._watching_socket = True
self._nbio.set_reader(self._sock.fileno(),
self._do_ssl_handshake)
self._nbio.remove_writer(self._sock.fileno())
elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
_LOGGER.debug('SSL handshake wants write. %s', self._sock)
self._watching_socket = True
self._nbio.set_writer(self._sock.fileno(),
self._do_ssl_handshake)
self._nbio.remove_reader(self._sock.fileno())
else:
# Outer catch will report it
raise
else:
done = True
_LOGGER.info('SSL handshake completed successfully: %s',
self._sock)
except Exception as error: # pylint: disable=W0703
_LOGGER.exception('SSL do_handshake failed: error=%r; %s', error,
self._sock)
self._report_completion(error)
return
if done:
# Suspend I/O and link up transport with protocol
_LOGGER.debug(
'_do_ssl_handshake: removing watchers ahead of linkup: %s',
self._sock)
self._nbio.remove_reader(self._sock.fileno())
self._nbio.remove_writer(self._sock.fileno())
# So that our `_cleanup()` won't interfere with the transport's
# socket watcher configuration.
self._watching_socket = False
_LOGGER.debug(
'_do_ssl_handshake: pre-linkup removal of watchers is done; %s',
self._sock)
self._linkup()
class _AsyncTransportBase( # pylint: disable=W0223
AbstractStreamTransport):
"""Base class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.
"""
_STATE_ACTIVE = 1
_STATE_FAILED = 2 # connection failed
_STATE_ABORTED_BY_USER = 3 # cancel() called
_STATE_COMPLETED = 4 # done with connection
_MAX_RECV_BYTES = 4096 # per socket.recv() documentation recommendation
# Max per consume call to prevent event starvation
_MAX_CONSUME_BYTES = 1024 * 100
class RxEndOfFile(OSError):
"""We raise this internally when EOF (empty read) is detected on input.
"""
def __init__(self):
super(_AsyncTransportBase.RxEndOfFile, self).__init__(
-1, 'End of input stream (EOF)')
def __init__(self, sock, protocol, nbio):
"""
:param socket.socket | ssl.SSLSocket sock: connected socket
:param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
corresponding protocol in this transport/protocol pairing; the
protocol already had its `connection_made()` method called.
:param AbstractIOServices | AbstractFileDescriptorServices nbio:
"""
_LOGGER.debug('_AsyncTransportBase.__init__: %s', sock)
self._sock = sock
self._protocol = protocol
self._nbio = nbio
self._state = self._STATE_ACTIVE
self._tx_buffers = collections.deque()
self._tx_buffered_byte_count = 0
def abort(self):
"""Close connection abruptly without waiting for pending I/O to
complete. Will invoke the corresponding protocol's `connection_lost()`
method asynchronously (not in context of the abort() call).
:raises Exception: Exception-based exception on error
"""
_LOGGER.info('Aborting transport connection: state=%s; %s', self._state,
self._sock)
self._initiate_abort(None)
def get_protocol(self):
"""Return the protocol linked to this transport.
:rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
"""
return self._protocol
def get_write_buffer_size(self):
"""
:returns: Current size of output data buffered by the transport
:rtype: int
"""
return self._tx_buffered_byte_count
def _buffer_tx_data(self, data):
"""Buffer the given data until it can be sent asynchronously.
:param bytes data:
:raises ValueError: if called with empty data
"""
if not data:
_LOGGER.error('write() called with empty data: state=%s; %s',
self._state, self._sock)
raise ValueError('write() called with empty data {!r}'.format(data))
if self._state != self._STATE_ACTIVE:
_LOGGER.debug(
'Ignoring write() called during inactive state: '
'state=%s; %s', self._state, self._sock)
return
self._tx_buffers.append(data)
self._tx_buffered_byte_count += len(data)
def _consume(self):
"""Utility method for use by subclasses to ingest data from socket and
dispatch it to protocol's `data_received()` method socket-specific
"try again" exception, per-event data consumption limit is reached,
transport becomes inactive, or a fatal failure.
Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
until state becomes inactive (e.g., `protocol.data_received()` callback
aborts the transport)
:raises: Whatever the corresponding `sock.recv()` raises except the
socket error with errno.EINTR
:raises: Whatever the `protocol.data_received()` callback raises
:raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream
"""
bytes_consumed = 0
while (self._state == self._STATE_ACTIVE and
bytes_consumed < self._MAX_CONSUME_BYTES):
data = self._sigint_safe_recv(self._sock, self._MAX_RECV_BYTES)
bytes_consumed += len(data)
# Empty data, should disconnect
if not data:
_LOGGER.error('Socket EOF; %s', self._sock)
raise self.RxEndOfFile()
# Pass the data to the protocol
try:
self._protocol.data_received(data)
except Exception as error:
_LOGGER.exception(
'protocol.data_received() failed: error=%r; %s', error,
self._sock)
raise
def _produce(self):
"""Utility method for use by subclasses to emit data from tx_buffers.
This method sends chunks from `tx_buffers` until all chunks are
exhausted or sending is interrupted by an exception. Maintains integrity
of `self.tx_buffers`.
:raises: whatever the corresponding `sock.send()` raises except the
socket error with errno.EINTR
"""
while self._tx_buffers:
num_bytes_sent = self._sigint_safe_send(self._sock,
self._tx_buffers[0])
chunk = self._tx_buffers.popleft()
if num_bytes_sent < len(chunk):
_LOGGER.debug('Partial send, requeing remaining data; %s of %s',
num_bytes_sent, len(chunk))
self._tx_buffers.appendleft(chunk[num_bytes_sent:])
self._tx_buffered_byte_count -= num_bytes_sent
assert self._tx_buffered_byte_count >= 0, (
'_AsyncTransportBase._produce() tx buffer size underflow',
self._tx_buffered_byte_count, self._state)
@staticmethod
@_retry_on_sigint
def _sigint_safe_recv(sock, max_bytes):
"""Receive data from socket, retrying on SIGINT.
:param sock: stream or SSL socket
:param max_bytes: maximum number of bytes to receive
:returns: received data or empty bytes uppon end of file
:rtype: bytes
:raises: whatever the corresponding `sock.recv()` raises except socket
error with errno.EINTR
"""
return sock.recv(max_bytes)
@staticmethod
@_retry_on_sigint
def _sigint_safe_send(sock, data):
"""Send data to socket, retrying on SIGINT.
:param sock: stream or SSL socket
:param data: data bytes to send
:returns: number of bytes actually sent
:rtype: int
:raises: whatever the corresponding `sock.send()` raises except socket
error with errno.EINTR
"""
return sock.send(data)
@_log_exceptions
def _deactivate(self):
"""Unregister the transport from I/O events
"""
if self._state == self._STATE_ACTIVE:
_LOGGER.info('Deactivating transport: state=%s; %s', self._state,
self._sock)
self._nbio.remove_reader(self._sock.fileno())
self._nbio.remove_writer(self._sock.fileno())
self._tx_buffers.clear()
@_log_exceptions
def _close_and_finalize(self):
"""Close the transport's socket and unlink the transport it from
references to other assets (protocol, etc.)
"""
if self._state != self._STATE_COMPLETED:
_LOGGER.info('Closing transport socket and unlinking: state=%s; %s',
self._state, self._sock)
try:
self._sock.shutdown(socket.SHUT_RDWR)
except pika.compat.SOCKET_ERROR:
pass
self._sock.close()
self._sock = None
self._protocol = None
self._nbio = None
self._state = self._STATE_COMPLETED
@_log_exceptions
def _initiate_abort(self, error):
"""Initiate asynchronous abort of the transport that concludes with a
call to the protocol's `connection_lost()` method. No flushing of
output buffers will take place.
:param BaseException | None error: None if being canceled by user,
including via falsie return value from protocol.eof_received;
otherwise the exception corresponding to the the failed connection.
"""
_LOGGER.info(
'_AsyncTransportBase._initate_abort(): Initiating abrupt '
'asynchronous transport shutdown: state=%s; error=%r; %s',
self._state, error, self._sock)
assert self._state != self._STATE_COMPLETED, (
'_AsyncTransportBase._initate_abort() expected '
'non-_STATE_COMPLETED', self._state)
if self._state == self._STATE_COMPLETED:
return
self._deactivate()
# Update state
if error is None:
# Being aborted by user
if self._state == self._STATE_ABORTED_BY_USER:
# Abort by user already pending
_LOGGER.debug('_AsyncTransportBase._initiate_abort(): '
'ignoring - user-abort already pending.')
return
# Notification priority is given to user-initiated abort over
# failed connection
self._state = self._STATE_ABORTED_BY_USER
else:
# Connection failed
if self._state != self._STATE_ACTIVE:
assert self._state == self._STATE_ABORTED_BY_USER, (
'_AsyncTransportBase._initate_abort() expected '
'_STATE_ABORTED_BY_USER', self._state)
return
self._state = self._STATE_FAILED
# Schedule callback from I/O loop to avoid potential reentry into user
# code
self._nbio.add_callback_threadsafe(
functools.partial(self._connection_lost_notify_async, error))
@_log_exceptions
def _connection_lost_notify_async(self, error):
"""Handle aborting of transport either due to socket error or user-
initiated `abort()` call. Must be called from an I/O loop callback owned
by us in order to avoid reentry into user code from user's API call into
the transport.
:param BaseException | None error: None if being canceled by user;
otherwise the exception corresponding to the the failed connection.
"""
_LOGGER.debug('Concluding transport shutdown: state=%s; error=%r',
self._state, error)
if self._state == self._STATE_COMPLETED:
return
if error is not None and self._state != self._STATE_FAILED:
# Priority is given to user-initiated abort notification
assert self._state == self._STATE_ABORTED_BY_USER, (
'_AsyncTransportBase._connection_lost_notify_async() '
'expected _STATE_ABORTED_BY_USER', self._state)
return
# Inform protocol
try:
self._protocol.connection_lost(error)
except Exception as exc: # pylint: disable=W0703
_LOGGER.exception('protocol.connection_lost(%r) failed: exc=%r; %s',
error, exc, self._sock)
# Re-raise, since we've exhausted our normal failure notification
# mechanism (i.e., connection_lost())
raise
finally:
self._close_and_finalize()
class _AsyncPlaintextTransport(_AsyncTransportBase):
"""Implementation of `nbio_interface.AbstractStreamTransport` for a
plaintext connection.
"""
def __init__(self, sock, protocol, nbio):
"""
:param socket.socket sock: non-blocking connected socket
:param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
corresponding protocol in this transport/protocol pairing; the
protocol already had its `connection_made()` method called.
:param AbstractIOServices | AbstractFileDescriptorServices nbio:
"""
super(_AsyncPlaintextTransport, self).__init__(sock, protocol, nbio)
# Request to be notified of incoming data; we'll watch for writability
# only when our write buffer is non-empty
self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
def write(self, data):
"""Buffer the given data until it can be sent asynchronously.
:param bytes data:
:raises ValueError: if called with empty data
"""
if self._state != self._STATE_ACTIVE:
_LOGGER.debug(
'Ignoring write() called during inactive state: '
'state=%s; %s', self._state, self._sock)
return
assert data, ('_AsyncPlaintextTransport.write(): empty data from user.',
data, self._state)
# pika/pika#1286
# NOTE: Modify code to write data to buffer before setting writer.
# Otherwise a race condition can occur where ioloop executes writer
# while buffer is still empty.
tx_buffer_was_empty = self.get_write_buffer_size() == 0
self._buffer_tx_data(data)
if tx_buffer_was_empty:
self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
_LOGGER.debug('Turned on writability watcher: %s', self._sock)
@_log_exceptions
def _on_socket_readable(self):
"""Ingest data from socket and dispatch it to protocol until exception
occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
limit is reached, transport becomes inactive, or failure.
"""
if self._state != self._STATE_ACTIVE:
_LOGGER.debug(
'Ignoring readability notification due to inactive '
'state: state=%s; %s', self._state, self._sock)
return
try:
self._consume()
except self.RxEndOfFile:
try:
keep_open = self._protocol.eof_received()
except Exception as error: # pylint: disable=W0703
_LOGGER.exception(
'protocol.eof_received() failed: error=%r; %s', error,
self._sock)
self._initiate_abort(error)
else:
if keep_open:
_LOGGER.info(
'protocol.eof_received() elected to keep open: %s',
self._sock)
self._nbio.remove_reader(self._sock.fileno())
else:
_LOGGER.info('protocol.eof_received() elected to close: %s',
self._sock)
self._initiate_abort(None)
except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
if (isinstance(error, pika.compat.SOCKET_ERROR) and
error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES):
_LOGGER.debug('Recv would block on %s', self._sock)
else:
_LOGGER.exception(
'_AsyncBaseTransport._consume() failed, aborting '
'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
error, self._sock, ''.join(
traceback.format_exception(*sys.exc_info())))
self._initiate_abort(error)
else:
if self._state != self._STATE_ACTIVE:
# Most likely our protocol's `data_received()` aborted the
# transport
_LOGGER.debug(
'Leaving Plaintext consumer due to inactive '
'state: state=%s; %s', self._state, self._sock)
@_log_exceptions
def _on_socket_writable(self):
"""Handle writable socket notification
"""
if self._state != self._STATE_ACTIVE:
_LOGGER.debug(
'Ignoring writability notification due to inactive '
'state: state=%s; %s', self._state, self._sock)
return
# We shouldn't be getting called with empty tx buffers
assert self._tx_buffers, (
'_AsyncPlaintextTransport._on_socket_writable() called, '
'but _tx_buffers is empty.', self._state)
try:
# Transmit buffered data to remote socket
self._produce()
except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
if (isinstance(error, pika.compat.SOCKET_ERROR) and
error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES):
_LOGGER.debug('Send would block on %s', self._sock)
else:
_LOGGER.exception(
'_AsyncBaseTransport._produce() failed, aborting '
'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
error, self._sock, ''.join(
traceback.format_exception(*sys.exc_info())))
self._initiate_abort(error)
else:
if not self._tx_buffers:
self._nbio.remove_writer(self._sock.fileno())
_LOGGER.debug('Turned off writability watcher: %s', self._sock)
class _AsyncSSLTransport(_AsyncTransportBase):
"""Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
connection.
"""
def __init__(self, sock, protocol, nbio):
"""
:param ssl.SSLSocket sock: non-blocking connected socket
:param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
corresponding protocol in this transport/protocol pairing; the
protocol already had its `connection_made()` method called.
:param AbstractIOServices | AbstractFileDescriptorServices nbio:
"""
super(_AsyncSSLTransport, self).__init__(sock, protocol, nbio)
self._ssl_readable_action = self._consume
self._ssl_writable_action = None
# Bootstrap consumer; we'll take care of producer once data is buffered
self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
# Try reading asap just in case read-ahead caused some
self._nbio.add_callback_threadsafe(self._on_socket_readable)
def write(self, data):
"""Buffer the given data until it can be sent asynchronously.
:param bytes data:
:raises ValueError: if called with empty data
"""
if self._state != self._STATE_ACTIVE:
_LOGGER.debug(
'Ignoring write() called during inactive state: '
'state=%s; %s', self._state, self._sock)
return
assert data, ('_AsyncSSLTransport.write(): empty data from user.',
data, self._state)
# pika/pika#1286
# NOTE: Modify code to write data to buffer before setting writer.
# Otherwise a race condition can occur where ioloop executes writer
# while buffer is still empty.
tx_buffer_was_empty = self.get_write_buffer_size() == 0
self._buffer_tx_data(data)
if tx_buffer_was_empty and self._ssl_writable_action is None:
self._ssl_writable_action = self._produce
self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
_LOGGER.debug('Turned on writability watcher: %s', self._sock)
@_log_exceptions
def _on_socket_readable(self):
"""Handle readable socket indication
"""
if self._state != self._STATE_ACTIVE:
_LOGGER.debug(
'Ignoring readability notification due to inactive '
'state: state=%s; %s', self._state, self._sock)
return
if self._ssl_readable_action:
try:
self._ssl_readable_action()
except Exception as error: # pylint: disable=W0703
self._initiate_abort(error)
else:
_LOGGER.debug(
'SSL readable action was suppressed: '
'ssl_writable_action=%r; %s', self._ssl_writable_action,
self._sock)
@_log_exceptions
def _on_socket_writable(self):
"""Handle writable socket notification
"""
if self._state != self._STATE_ACTIVE:
_LOGGER.debug(
'Ignoring writability notification due to inactive '
'state: state=%s; %s', self._state, self._sock)
return
if self._ssl_writable_action:
try:
self._ssl_writable_action()
except Exception as error: # pylint: disable=W0703
self._initiate_abort(error)
else:
_LOGGER.debug(
'SSL writable action was suppressed: '
'ssl_readable_action=%r; %s', self._ssl_readable_action,
self._sock)
@_log_exceptions
def _consume(self):
"""[override] Ingest data from socket and dispatch it to protocol until
exception occurs (typically ssl.SSLError with
SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
transport becomes inactive, or failure.
Update consumer/producer registration.
:raises Exception: error that signals that connection needs to be
aborted
"""
next_consume_on_readable = True
try:
super(_AsyncSSLTransport, self)._consume()
except ssl.SSLError as error:
if error.errno == ssl.SSL_ERROR_WANT_READ:
_LOGGER.debug('SSL ingester wants read: %s', self._sock)
elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
# Looks like SSL re-negotiation
_LOGGER.debug('SSL ingester wants write: %s', self._sock)
next_consume_on_readable = False
else:
_LOGGER.exception(
'_AsyncBaseTransport._consume() failed, aborting '
'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
error, self._sock, ''.join(
traceback.format_exception(*sys.exc_info())))
raise # let outer catch block abort the transport
else:
if self._state != self._STATE_ACTIVE:
# Most likely our protocol's `data_received()` aborted the
# transport
_LOGGER.debug(
'Leaving SSL consumer due to inactive '
'state: state=%s; %s', self._state, self._sock)
return
# Consumer exited without exception; there may still be more,
# possibly unprocessed, data records in SSL input buffers that
# can be read without waiting for socket to become readable.
# In case buffered input SSL data records still remain
self._nbio.add_callback_threadsafe(self._on_socket_readable)
# Update consumer registration
if next_consume_on_readable:
if not self._ssl_readable_action:
self._nbio.set_reader(self._sock.fileno(),
self._on_socket_readable)
self._ssl_readable_action = self._consume
# NOTE: can't use identity check, it fails for instance methods
if self._ssl_writable_action == self._consume: # pylint: disable=W0143
self._nbio.remove_writer(self._sock.fileno())
self._ssl_writable_action = None
else:
# WANT_WRITE
if not self._ssl_writable_action:
self._nbio.set_writer(self._sock.fileno(),
self._on_socket_writable)
self._ssl_writable_action = self._consume
if self._ssl_readable_action:
self._nbio.remove_reader(self._sock.fileno())
self._ssl_readable_action = None
# Update producer registration
if self._tx_buffers and not self._ssl_writable_action:
self._ssl_writable_action = self._produce
self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
@_log_exceptions
def _produce(self):
"""[override] Emit data from tx_buffers all chunks are exhausted or
sending is interrupted by an exception (typically ssl.SSLError with
SSL_ERROR_WANT_READ/WRITE).
Update consumer/producer registration.
:raises Exception: error that signals that connection needs to be
aborted
"""
next_produce_on_writable = None # None means no need to produce
try:
super(_AsyncSSLTransport, self)._produce()
except ssl.SSLError as error:
if error.errno == ssl.SSL_ERROR_WANT_READ:
# Looks like SSL re-negotiation
_LOGGER.debug('SSL emitter wants read: %s', self._sock)
next_produce_on_writable = False
elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
_LOGGER.debug('SSL emitter wants write: %s', self._sock)
next_produce_on_writable = True
else:
_LOGGER.exception(
'_AsyncBaseTransport._produce() failed, aborting '
'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
error, self._sock, ''.join(
traceback.format_exception(*sys.exc_info())))
raise # let outer catch block abort the transport
else:
# No exception, so everything must have been written to the socket
assert not self._tx_buffers, (
'_AsyncSSLTransport._produce(): no exception from parent '
'class, but data remains in _tx_buffers.', len(
self._tx_buffers))
# Update producer registration
if self._tx_buffers:
assert next_produce_on_writable is not None, (
'_AsyncSSLTransport._produce(): next_produce_on_writable is '
'still None', self._state)
if next_produce_on_writable:
if not self._ssl_writable_action:
self._nbio.set_writer(self._sock.fileno(),
self._on_socket_writable)
self._ssl_writable_action = self._produce
# NOTE: can't use identity check, it fails for instance methods
if self._ssl_readable_action == self._produce: # pylint: disable=W0143
self._nbio.remove_reader(self._sock.fileno())
self._ssl_readable_action = None
else:
# WANT_READ
if not self._ssl_readable_action:
self._nbio.set_reader(self._sock.fileno(),
self._on_socket_readable)
self._ssl_readable_action = self._produce
if self._ssl_writable_action:
self._nbio.remove_writer(self._sock.fileno())
self._ssl_writable_action = None
else:
# NOTE: can't use identity check, it fails for instance methods
if self._ssl_readable_action == self._produce: # pylint: disable=W0143
self._nbio.remove_reader(self._sock.fileno())
self._ssl_readable_action = None
assert self._ssl_writable_action != self._produce, ( # pylint: disable=W0143
'_AsyncSSLTransport._produce(): with empty tx_buffers, '
'writable_action cannot be _produce when readable is '
'_produce', self._state)
else:
# NOTE: can't use identity check, it fails for instance methods
assert self._ssl_writable_action == self._produce, ( # pylint: disable=W0143
'_AsyncSSLTransport._produce(): with empty tx_buffers, '
'expected writable_action as _produce when readable_action '
'is not _produce', 'writable_action:',
self._ssl_writable_action, 'readable_action:',
self._ssl_readable_action, 'state:', self._state)
self._ssl_writable_action = None
self._nbio.remove_writer(self._sock.fileno())
# Update consumer registration
if not self._ssl_readable_action:
self._ssl_readable_action = self._consume
self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
# In case input SSL data records have been buffered
self._nbio.add_callback_threadsafe(self._on_socket_readable)
elif self._sock.pending():
self._nbio.add_callback_threadsafe(self._on_socket_readable)