API Documentation¶
ZeroMQ integration into Twisted reactor.
Factory¶
All ØMQ connections should belong to some context, txZMQ wraps that into concept of factory that tracks all connections created and wraps context.
Factory could be used as an easy way to close all connections and clean up Twisted reactor.
-
class
txzmq.
ZmqFactory
[source]¶ I control individual ZeroMQ connections.
Factory creates and destroys ZeroMQ context.
Variables: - reactor – reference to Twisted reactor used by all the connections
- ioThreads (int) – number of IO threads ZeroMQ will be using for this context
- lingerPeriod (int) – number of milliseconds to block when closing socket (terminating context), when there are some messages pending to be sent
- connections (set) – set of instanciated
ZmqConnection
- context – ZeroMQ context
Base Connection¶
ZmqConnection
isn’t supposed to be used explicitly, it is base
for different socket types.
-
class
txzmq.
ZmqEndpointType
[source]¶ Endpoint could be “bound” or “connected”.
-
bind
= 'bind'¶ Bind, listen for connection.
-
connect
= 'connect'¶ Connect to another endpoint.
-
-
class
txzmq.
ZmqEndpoint
[source]¶ ZeroMQ endpoint used when connecting or listening for connections.
Consists of two members: type and address.
Variables: - type – Could be either
ZmqEndpointType.bind
orZmqEndpointType.connect
. - address (str) – ZeroMQ address of endpoint, could be IP address, filename, see ZeroMQ docs for more details.
- type – Could be either
-
class
txzmq.
ZmqConnection
(factory, endpoint=None, identity=None)[source]¶ Connection through ZeroMQ, wraps up ZeroMQ socket.
This class isn’t supposed to be used directly, instead use one of the descendants like
ZmqPushConnection
.ZmqConnection
implements glue between ZeroMQ and Twisted reactor: putting polling ZeroMQ file descriptor into reactor, processing events, reading data from socket.Variables: - socketType – socket type, from ZeroMQ
- allowLoopbackMulticast (bool) – is loopback multicast allowed?
- multicastRate (int) – maximum allowed multicast rate, kbps
- highWaterMark (int) – hard limit on the maximum number of outstanding messages 0MQ shall queue in memory for any single peer
- tcpKeepalive (int) – if set to 1, enable TCP keepalive, otherwise leave it as default
- tcpKeepaliveCount (int) – override TCP_KEEPCNT socket option (where supported by OS)
- tcpKeepaliveIdle (int) – override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS) socket option(where supported by OS).
- tcpKeepaliveInterval (int) – override TCP_KEEPINTVL socket option(where supported by OS)
- reconnectInterval (int) – set reconnection interval
- reconnectIntervalMax (int) – set maximum reconnection interval
- factory (
ZmqFactory
) – ZeroMQ Twisted factory reference - socket (zmq.Socket) – ZeroMQ Socket
- endpoints (list of
ZmqEndpoint
) – ZeroMQ addresses for connect/bind - fd (int) – file descriptor of zmq mailbox
- queue (deque) – output message queue
-
__init__
(self, factory, endpoint=None, identity=None)[source]¶ Constructor.
One endpoint is passed to the constructor, more could be added via call to
addEndpoints()
.Parameters: - factory (
ZmqFactory
) – ZeroMQ Twisted factory - endpoint (
ZmqEndpoint
) – ZeroMQ address for connect/bind - identity (str) – socket identity (ZeroMQ), don’t set unless you know how it works
- factory (
-
addEndpoints
(endpoints)[source]¶ Add more connection endpoints.
Connection may have many endpoints, mixing ZeroMQ protocols (TCP, IPC, ...) and types (connect or bind).
Parameters: endpoints (list of ZmqEndpoint
) – list of endpoints to add
-
fileno
()[source]¶ Implementation of IFileDescriptor.
Returns ZeroMQ polling file descriptor.
Returns: The platform-specified representation of a file descriptor number.
-
connectionLost
(reason)[source]¶ Called when the connection was lost.
Implementation of IFileDescriptor.
This is called when the connection on a selectable object has been lost. It will be called whether the connection was closed explicitly, an exception occurred in an event handler, or the other end of the connection closed it first.
-
doRead
()[source]¶ Some data is available for reading on ZeroMQ descriptor.
ZeroMQ is signalling that we should process some events, we’re starting to receive incoming messages.
Implementation of IReadDescriptor.
-
logPrefix
()[source]¶ Implementation of ILoggingContext.
Returns: Prefix used during log formatting to indicate context. Return type: str
-
send
(message)[source]¶ Send message via ZeroMQ socket.
Sending is performed directly to ZeroMQ without queueing. If HWM is reached on ZeroMQ side, sending operation is aborted with exception from ZeroMQ (EAGAIN).
After writing read is scheduled as ZeroMQ may not signal incoming messages after we touched socket with write request.
Parameters: message (str or list of str) – message data, could be either list of str (multipart message) or just str
-
messageReceived
(message)[source]¶ Called when complete message is received.
Not implemented in
ZmqConnection
, should be overridden to handle incoming messages.Parameters: message – message data
Publish-Subscribe¶
For information on publish-subscribe in ØMQ, please read either reference or guide (look for publish-subscribe).
Note
These classes use PUB and SUB sockets from ØMQ. Special framing is implemented to support sending tag: tag and message are separated by zero byte and sent over as single message. This is related to the way PUB-SUB works with PGM (UDP multicast): multipart messages are sent as multiple datagrams and they get mixed together if several publishers exist in the same broadcast domain.
-
class
txzmq.
ZmqPubConnection
(factory, endpoint=None, identity=None)[source]¶ Bases:
txzmq.connection.ZmqConnection
Publishing in broadcast manner.
-
class
txzmq.
ZmqSubConnection
(factory, endpoint=None, identity=None)[source]¶ Bases:
txzmq.connection.ZmqConnection
Subscribing to messages published by publishers.
Subclass this class and implement
gotMessage()
to handle incoming messages.-
subscribe
(tag)[source]¶ Subscribe to messages with specified tag (prefix).
Function may be called several times.
Parameters: tag (str) – message tag
-
unsubscribe
(tag)[source]¶ Unsubscribe from messages with specified tag (prefix).
Function may be called several times.
Parameters: tag (str) – message tag
-
messageReceived
(message)[source]¶ Overridden from
ZmqConnection
to process and unframe incoming messages.All parsed messages are passed to
gotMessage()
.Parameters: message – message data
-
Push-Pull¶
For information on push and pull sockets in ØMQ, please read either reference or guide (look for pull or push).
-
class
txzmq.
ZmqPushConnection
(factory, endpoint=None, identity=None)[source]¶ Bases:
txzmq.connection.ZmqConnection
Pushing messages to the socket.
Wrapper around ZeroMQ PUSH socket.
-
class
txzmq.
ZmqPullConnection
(factory, endpoint=None, identity=None)[source]¶ Bases:
txzmq.connection.ZmqConnection
Pull messages from a socket.
Wrapper around ZeroMQ PULL socket.
Subclass and override
onPull()
.
Request-Reply and Router-Dealer¶
For information on these socket types in ØMQ, please read either reference or guide (look for router/dealer and request/reply).
-
class
txzmq.
ZmqREQConnection
(*args, **kwargs)[source]¶ Bases:
txzmq.connection.ZmqConnection
A Request ZeroMQ connection.
This is implemented with an underlying DEALER socket, even though semantics are closer to REQ socket.
Socket mimics request-reply behavior by sending each message with unique uuid and recording Deferred associated with the message. When reply comes, it uses that Deferred to pass response back to the caller.
Variables: defaultRequestTimeout – default timeout for requests, disabled by default (seconds)
-
class
txzmq.
ZmqREPConnection
(*args, **kwargs)[source]¶ Bases:
txzmq.connection.ZmqConnection
A Reply ZeroMQ connection.
This is implemented with an underlying ROUTER socket, but the semantics are close to REP socket.
-
reply
(messageId, *messageParts)[source]¶ Send reply to request with specified
messageId
.Parameters: - messageId (str) – message uuid
- messageParts (list) – message data
-