Untitled
unknown
python
4 years ago
12 kB
9
Indexable
from __future__ import division, absolute_import, print_function, unicode_literals
from .version import __version__
from struct import Struct as _Struct
import warnings
from . import wrapper
try:
buffer
except NameError:
buffer = memoryview # py3
nanoconfig_started = False
#Import contants into module with NN_ prefix stripped
for name, value in wrapper.nn_symbols():
if name.startswith('NN_'):
name = name[3:]
globals()[name] = value
if hasattr(wrapper, 'create_writable_buffer'):
create_writable_buffer = wrapper.create_writable_buffer
else:
def create_writable_buffer(size):
"""Returns a writable buffer"""
return bytearray(size)
def create_message_buffer(size, type):
"""Create a message buffer"""
rtn = wrapper.nn_allocmsg(size, type)
if rtn is None:
raise NanoMsgAPIError()
return rtn
class NanoMsgError(Exception):
"""Base Exception for all errors in the nanomsg python package
"""
pass
class NanoMsgAPIError(NanoMsgError):
"""Exception for all errors reported by the C API.
msg and errno are from nanomsg C library.
"""
__slots__ = ('msg', 'errno')
def __init__(self):
errno = wrapper.nn_errno()
msg = wrapper.nn_strerror(errno)
NanoMsgError.__init__(self, msg)
self.errno, self.msg = errno, msg
def _nn_check_positive_rtn(rtn):
if rtn < 0:
raise NanoMsgAPIError()
return rtn
class Device(object):
"""Create a nanomsg device to relay messages between sockets.
If only one socket is supplied the device loops messages on that socket.
"""
def __init__(self, socket1, socket2=None):
self._fd1 = socket1.fd
self._fd2 = -1 if socket2 is None else socket2.fd
def start(self):
"""Run the device in the current thread.
This will not return until the device stops due to error or
termination.
"""
_nn_check_positive_rtn(wrapper.nn_device(self._fd1, self._fd2))
def terminate_all():
"""Close all sockets and devices"""
global nanoconfig_started
if nanoconfig_started:
wrapper.nc_term()
nanoconfig_started = False
wrapper.nn_term()
class Socket(object):
"""Class wrapping nanomsg socket.
protocol should be a nanomsg protocol constant e.g. nanomsg.PAIR
This class supports being used as a context manager which should guarantee
it is closed.
e.g.:
import time
from nanomsg import PUB, Socket
with Socket(PUB) as pub_socket:
pub_socket.bind('tcp://127.0.0.1:49234')
for i in range(100):
pub_socket.send(b'hello all')
time.sleep(0.5)
#pub_socket is closed
Socket.bind and Socket.connect return subclass of Endpoint which allow
you to shutdown selected endpoints.
The constructor also allows you to wrap existing sockets by passing in the
socket fd instead of the protocol e.g.:
from nanomsg import AF_SP, PAIR, Socket
from nanomsg import wrapper as nn
socket_fd = nn.nn_socket(AF_SP, PAIR)
socket = Socket(socket_fd=socket_fd)
"""
_INT_PACKER = _Struct(str('i'))
class _Endpoint(object):
def __init__(self, socket, endpoint_id, address):
self._endpoint_id = endpoint_id
self._fdocket = socket
self._address = address
@property
def address(self):
return self._address
def shutdown(self):
self._fdocket._endpoints.remove(self)
_nn_check_positive_rtn(nn_shutdown(self._fdocket._s,
self._endpoint_id))
def __repr__(self):
return '<%s socket %r, id %r, addresss %r>' % (
self.__class__.__name__,
self._fdocket,
self._endpoint_id,
self._address
)
class BindEndpoint(_Endpoint):
pass
class ConnectEndpoint(_Endpoint):
pass
class NanoconfigEndpoint(_Endpoint):
def shutdown(self):
raise NotImplementedError(
"Shutdown of nanoconfig endpoint is not supported")
def __init__(self, protocol=None, socket_fd=None, domain=AF_SP):
if protocol is not None and socket_fd is not None:
raise NanoMsgError('Only one of protocol or socket_fd should be '
'passed to the Socket constructor')
if protocol is not None:
self._fd = _nn_check_positive_rtn(
wrapper.nn_socket(domain, protocol)
)
else:
self._fd = socket_fd
self._endpoints = []
def _get_send_fd(self):
return self.get_int_option(SOL_SOCKET, SNDFD)
def _get_recv_fd(self):
return self.get_int_option(SOL_SOCKET, RCVFD)
def _get_linger(self):
return self.get_int_option(SOL_SOCKET, LINGER)
def _set_linger(self, value):
return self.set_int_option(SOL_SOCKET, LINGER, value)
def _get_send_buffer_size(self):
return self.get_int_option(SOL_SOCKET, SNDBUF)
def _set_send_buffer_size(self, value):
return self.set_int_option(SOL_SOCKET, SNDBUF, value)
def _get_recv_buffer_size(self):
return self.get_int_option(SOL_SOCKET, RCVBUF)
def _set_recv_buffer_size(self, value):
return self.set_int_option(SOL_SOCKET, RCVBUF, value)
def _get_recv_max_size(self):
return self.get_int_option(SOL_SOCKET, RCVMAXSIZE)
def _set_recv_max_size(self, value):
return self.set_int_option(SOL_SOCKET, RCVMAXSIZE, value)
def _get_send_timeout(self):
return self.get_int_option(SOL_SOCKET, SNDTIMEO)
def _set_send_timeout(self, value):
return self.set_int_option(SOL_SOCKET, SNDTIMEO, value)
def _get_recv_timeout(self):
return self.get_int_option(SOL_SOCKET, RCVTIMEO)
def _set_recv_timeout(self, value):
return self.set_int_option(SOL_SOCKET, RCVTIMEO, value)
def _get_reconnect_interval(self):
return self.get_int_option(SOL_SOCKET, RECONNECT_IVL)
def _set_reconnect_interval(self, value):
return self.set_int_option(SOL_SOCKET, RECONNECT_IVL, value)
def _get_reconnect_interval_max(self):
return self.get_int_option(SOL_SOCKET, RECONNECT_IVL_MAX)
def _set_reconnect_interval_max(self, value):
return self.set_int_option(SOL_SOCKET, RECONNECT_IVL_MAX, value)
send_fd = property(_get_send_fd, doc='Send file descripter')
recv_fd = property(_get_recv_fd, doc='Receive file descripter')
linger = property(_get_linger, _set_linger, doc='Socket linger in '
'milliseconds (0.001 seconds)')
recv_buffer_size = property(_get_recv_buffer_size, _set_recv_buffer_size,
doc='Receive buffer size in bytes')
recv_max_size = property(_get_recv_max_size, _set_recv_max_size,
doc='Maximum message size that can be received in bytes')
send_buffer_size = property(_get_send_buffer_size, _set_send_timeout,
doc='Send buffer size in bytes')
send_timeout = property(_get_send_timeout, _set_send_timeout,
doc='Send timeout in milliseconds (0.001 seconds)')
recv_timeout = property(_get_recv_timeout, _set_recv_timeout,
doc='Receive timeout in milliseconds (0.001 '
'seconds)')
reconnect_interval = property(
_get_reconnect_interval,
_set_reconnect_interval,
doc='Base interval between connection failure and reconnect'
' attempt in milliseconds (0.001 seconds).'
)
reconnect_interval_max = property(
_get_reconnect_interval_max,
_set_reconnect_interval_max,
doc='Max reconnect interval - see C API documentation.'
)
@property
def fd(self):
"""Socket file descripter.
Not this is not an OS file descripter (see .send_fd, .recv_fd).
"""
return self._fd
@property
def endpoints(self):
"""Endpoints list
"""
return list(self._endpoints)
@property
def uses_nanoconfig(self):
return (self._endpoints and
isinstance(self._endpoints[0], Socket.NanoconfigEndpoint))
def bind(self, address):
"""Add a local endpoint to the socket"""
if self.uses_nanoconfig:
raise ValueError("Nanoconfig address must be sole endpoint")
endpoint_id = _nn_check_positive_rtn(
wrapper.nn_bind(self._fd, address)
)
ep = Socket.BindEndpoint(self, endpoint_id, address)
self._endpoints.append(ep)
return ep
def connect(self, address):
"""Add a remote endpoint to the socket"""
if self.uses_nanoconfig:
raise ValueError("Nanoconfig address must be sole endpoint")
endpoint_id = _nn_check_positive_rtn(
wrapper.nn_connect(self.fd, address)
)
ep = Socket.ConnectEndpoint(self, endpoint_id, address)
self._endpoints.append(ep)
return ep
def configure(self, address):
"""Configure socket's addresses with nanoconfig"""
global nanoconfig_started
if len(self._endpoints):
raise ValueError("Nanoconfig address must be sole endpoint")
endpoint_id = _nn_check_positive_rtn(
wrapper.nc_configure(self.fd, address)
)
if not nanoconfig_started:
nanoconfig_started = True
ep = Socket.NanoconfigEndpoint(self, endpoint_id, address)
self._endpoints.append(ep)
return ep
def close(self):
"""Close the socket"""
if self.is_open():
fd = self._fd
self._fd = -1
if self.uses_nanoconfig:
wrapper.nc_close(fd)
else:
_nn_check_positive_rtn(wrapper.nn_close(fd))
def is_open(self):
"""Returns true if the socket has a valid socket id.
If the underlying socket is closed by some other means than
Socket.close this method may return True when the socket is actually
closed.
"""
return self.fd >= 0
def recv(self, buf=None, flags=0):
"""Recieve a message."""
if buf is None:
rtn, out_buf = wrapper.nn_recv(self.fd, flags)
else:
rtn, out_buf = wrapper.nn_recv(self.fd, buf, flags)
_nn_check_positive_rtn(rtn)
return bytes(buffer(out_buf))[:rtn]
def set_string_option(self, level, option, value):
_nn_check_positive_rtn(wrapper.nn_setsockopt(self.fd, level, option,
value))
def set_int_option(self, level, option, value):
buf = create_writable_buffer(Socket._INT_PACKER.size)
Socket._INT_PACKER.pack_into(buf, 0, value)
_nn_check_positive_rtn(wrapper.nn_setsockopt(self.fd, level, option,
buf))
def get_int_option(self, level, option):
size = Socket._INT_PACKER.size
buf = create_writable_buffer(size)
rtn, length = wrapper.nn_getsockopt(self._fd, level, option, buf)
_nn_check_positive_rtn(rtn)
if length != size:
raise NanoMsgError(('Returned option size (%r) should be the same'
' as size of int (%r)') % (rtn, size))
return Socket._INT_PACKER.unpack_from(buffer(buf))[0]
def get_string_option(self, level, option, max_len=16*1024):
buf = create_writable_buffer(max_len)
rtn, length = wrapper.nn_getsockopt(self._fd, level, option, buf)
_nn_check_positive_rtn(rtn)
return bytes(buffer(buf))[:length]
def send(self, msg, flags=0):
"""Send a message"""
_nn_check_positive_rtn(wrapper.nn_send(self.fd, msg, flags))
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __repr__(self):
return '<%s fd %r, connected to %r, bound to %r>' % (
self.__class__.__name__,
self.fd,
[i.address for i in self.endpoints if type(i) is
Socket.BindEndpoint],
[i.address for i in self.endpoints if type(i) is
Socket.ConnectEndpoint],
)
def __del__(self):
try:
self.close()
except NanoMsgError:
passEditor is loading...