Untitled
unknown
python
3 years ago
12 kB
4
Indexable
from __future__ import division, absolute_import, print_function, unicode_literals from .version import __version__ from struct import Struct as _Struct import warnings from . import wrapper try: buffer except NameError: buffer = memoryview # py3 nanoconfig_started = False #Import contants into module with NN_ prefix stripped for name, value in wrapper.nn_symbols(): if name.startswith('NN_'): name = name[3:] globals()[name] = value if hasattr(wrapper, 'create_writable_buffer'): create_writable_buffer = wrapper.create_writable_buffer else: def create_writable_buffer(size): """Returns a writable buffer""" return bytearray(size) def create_message_buffer(size, type): """Create a message buffer""" rtn = wrapper.nn_allocmsg(size, type) if rtn is None: raise NanoMsgAPIError() return rtn class NanoMsgError(Exception): """Base Exception for all errors in the nanomsg python package """ pass class NanoMsgAPIError(NanoMsgError): """Exception for all errors reported by the C API. msg and errno are from nanomsg C library. """ __slots__ = ('msg', 'errno') def __init__(self): errno = wrapper.nn_errno() msg = wrapper.nn_strerror(errno) NanoMsgError.__init__(self, msg) self.errno, self.msg = errno, msg def _nn_check_positive_rtn(rtn): if rtn < 0: raise NanoMsgAPIError() return rtn class Device(object): """Create a nanomsg device to relay messages between sockets. If only one socket is supplied the device loops messages on that socket. """ def __init__(self, socket1, socket2=None): self._fd1 = socket1.fd self._fd2 = -1 if socket2 is None else socket2.fd def start(self): """Run the device in the current thread. This will not return until the device stops due to error or termination. """ _nn_check_positive_rtn(wrapper.nn_device(self._fd1, self._fd2)) def terminate_all(): """Close all sockets and devices""" global nanoconfig_started if nanoconfig_started: wrapper.nc_term() nanoconfig_started = False wrapper.nn_term() class Socket(object): """Class wrapping nanomsg socket. protocol should be a nanomsg protocol constant e.g. nanomsg.PAIR This class supports being used as a context manager which should guarantee it is closed. e.g.: import time from nanomsg import PUB, Socket with Socket(PUB) as pub_socket: pub_socket.bind('tcp://127.0.0.1:49234') for i in range(100): pub_socket.send(b'hello all') time.sleep(0.5) #pub_socket is closed Socket.bind and Socket.connect return subclass of Endpoint which allow you to shutdown selected endpoints. The constructor also allows you to wrap existing sockets by passing in the socket fd instead of the protocol e.g.: from nanomsg import AF_SP, PAIR, Socket from nanomsg import wrapper as nn socket_fd = nn.nn_socket(AF_SP, PAIR) socket = Socket(socket_fd=socket_fd) """ _INT_PACKER = _Struct(str('i')) class _Endpoint(object): def __init__(self, socket, endpoint_id, address): self._endpoint_id = endpoint_id self._fdocket = socket self._address = address @property def address(self): return self._address def shutdown(self): self._fdocket._endpoints.remove(self) _nn_check_positive_rtn(nn_shutdown(self._fdocket._s, self._endpoint_id)) def __repr__(self): return '<%s socket %r, id %r, addresss %r>' % ( self.__class__.__name__, self._fdocket, self._endpoint_id, self._address ) class BindEndpoint(_Endpoint): pass class ConnectEndpoint(_Endpoint): pass class NanoconfigEndpoint(_Endpoint): def shutdown(self): raise NotImplementedError( "Shutdown of nanoconfig endpoint is not supported") def __init__(self, protocol=None, socket_fd=None, domain=AF_SP): if protocol is not None and socket_fd is not None: raise NanoMsgError('Only one of protocol or socket_fd should be ' 'passed to the Socket constructor') if protocol is not None: self._fd = _nn_check_positive_rtn( wrapper.nn_socket(domain, protocol) ) else: self._fd = socket_fd self._endpoints = [] def _get_send_fd(self): return self.get_int_option(SOL_SOCKET, SNDFD) def _get_recv_fd(self): return self.get_int_option(SOL_SOCKET, RCVFD) def _get_linger(self): return self.get_int_option(SOL_SOCKET, LINGER) def _set_linger(self, value): return self.set_int_option(SOL_SOCKET, LINGER, value) def _get_send_buffer_size(self): return self.get_int_option(SOL_SOCKET, SNDBUF) def _set_send_buffer_size(self, value): return self.set_int_option(SOL_SOCKET, SNDBUF, value) def _get_recv_buffer_size(self): return self.get_int_option(SOL_SOCKET, RCVBUF) def _set_recv_buffer_size(self, value): return self.set_int_option(SOL_SOCKET, RCVBUF, value) def _get_recv_max_size(self): return self.get_int_option(SOL_SOCKET, RCVMAXSIZE) def _set_recv_max_size(self, value): return self.set_int_option(SOL_SOCKET, RCVMAXSIZE, value) def _get_send_timeout(self): return self.get_int_option(SOL_SOCKET, SNDTIMEO) def _set_send_timeout(self, value): return self.set_int_option(SOL_SOCKET, SNDTIMEO, value) def _get_recv_timeout(self): return self.get_int_option(SOL_SOCKET, RCVTIMEO) def _set_recv_timeout(self, value): return self.set_int_option(SOL_SOCKET, RCVTIMEO, value) def _get_reconnect_interval(self): return self.get_int_option(SOL_SOCKET, RECONNECT_IVL) def _set_reconnect_interval(self, value): return self.set_int_option(SOL_SOCKET, RECONNECT_IVL, value) def _get_reconnect_interval_max(self): return self.get_int_option(SOL_SOCKET, RECONNECT_IVL_MAX) def _set_reconnect_interval_max(self, value): return self.set_int_option(SOL_SOCKET, RECONNECT_IVL_MAX, value) send_fd = property(_get_send_fd, doc='Send file descripter') recv_fd = property(_get_recv_fd, doc='Receive file descripter') linger = property(_get_linger, _set_linger, doc='Socket linger in ' 'milliseconds (0.001 seconds)') recv_buffer_size = property(_get_recv_buffer_size, _set_recv_buffer_size, doc='Receive buffer size in bytes') recv_max_size = property(_get_recv_max_size, _set_recv_max_size, doc='Maximum message size that can be received in bytes') send_buffer_size = property(_get_send_buffer_size, _set_send_timeout, doc='Send buffer size in bytes') send_timeout = property(_get_send_timeout, _set_send_timeout, doc='Send timeout in milliseconds (0.001 seconds)') recv_timeout = property(_get_recv_timeout, _set_recv_timeout, doc='Receive timeout in milliseconds (0.001 ' 'seconds)') reconnect_interval = property( _get_reconnect_interval, _set_reconnect_interval, doc='Base interval between connection failure and reconnect' ' attempt in milliseconds (0.001 seconds).' ) reconnect_interval_max = property( _get_reconnect_interval_max, _set_reconnect_interval_max, doc='Max reconnect interval - see C API documentation.' ) @property def fd(self): """Socket file descripter. Not this is not an OS file descripter (see .send_fd, .recv_fd). """ return self._fd @property def endpoints(self): """Endpoints list """ return list(self._endpoints) @property def uses_nanoconfig(self): return (self._endpoints and isinstance(self._endpoints[0], Socket.NanoconfigEndpoint)) def bind(self, address): """Add a local endpoint to the socket""" if self.uses_nanoconfig: raise ValueError("Nanoconfig address must be sole endpoint") endpoint_id = _nn_check_positive_rtn( wrapper.nn_bind(self._fd, address) ) ep = Socket.BindEndpoint(self, endpoint_id, address) self._endpoints.append(ep) return ep def connect(self, address): """Add a remote endpoint to the socket""" if self.uses_nanoconfig: raise ValueError("Nanoconfig address must be sole endpoint") endpoint_id = _nn_check_positive_rtn( wrapper.nn_connect(self.fd, address) ) ep = Socket.ConnectEndpoint(self, endpoint_id, address) self._endpoints.append(ep) return ep def configure(self, address): """Configure socket's addresses with nanoconfig""" global nanoconfig_started if len(self._endpoints): raise ValueError("Nanoconfig address must be sole endpoint") endpoint_id = _nn_check_positive_rtn( wrapper.nc_configure(self.fd, address) ) if not nanoconfig_started: nanoconfig_started = True ep = Socket.NanoconfigEndpoint(self, endpoint_id, address) self._endpoints.append(ep) return ep def close(self): """Close the socket""" if self.is_open(): fd = self._fd self._fd = -1 if self.uses_nanoconfig: wrapper.nc_close(fd) else: _nn_check_positive_rtn(wrapper.nn_close(fd)) def is_open(self): """Returns true if the socket has a valid socket id. If the underlying socket is closed by some other means than Socket.close this method may return True when the socket is actually closed. """ return self.fd >= 0 def recv(self, buf=None, flags=0): """Recieve a message.""" if buf is None: rtn, out_buf = wrapper.nn_recv(self.fd, flags) else: rtn, out_buf = wrapper.nn_recv(self.fd, buf, flags) _nn_check_positive_rtn(rtn) return bytes(buffer(out_buf))[:rtn] def set_string_option(self, level, option, value): _nn_check_positive_rtn(wrapper.nn_setsockopt(self.fd, level, option, value)) def set_int_option(self, level, option, value): buf = create_writable_buffer(Socket._INT_PACKER.size) Socket._INT_PACKER.pack_into(buf, 0, value) _nn_check_positive_rtn(wrapper.nn_setsockopt(self.fd, level, option, buf)) def get_int_option(self, level, option): size = Socket._INT_PACKER.size buf = create_writable_buffer(size) rtn, length = wrapper.nn_getsockopt(self._fd, level, option, buf) _nn_check_positive_rtn(rtn) if length != size: raise NanoMsgError(('Returned option size (%r) should be the same' ' as size of int (%r)') % (rtn, size)) return Socket._INT_PACKER.unpack_from(buffer(buf))[0] def get_string_option(self, level, option, max_len=16*1024): buf = create_writable_buffer(max_len) rtn, length = wrapper.nn_getsockopt(self._fd, level, option, buf) _nn_check_positive_rtn(rtn) return bytes(buffer(buf))[:length] def send(self, msg, flags=0): """Send a message""" _nn_check_positive_rtn(wrapper.nn_send(self.fd, msg, flags)) def __enter__(self): return self def __exit__(self, *args): self.close() def __repr__(self): return '<%s fd %r, connected to %r, bound to %r>' % ( self.__class__.__name__, self.fd, [i.address for i in self.endpoints if type(i) is Socket.BindEndpoint], [i.address for i in self.endpoints if type(i) is Socket.ConnectEndpoint], ) def __del__(self): try: self.close() except NanoMsgError: pass
Editor is loading...