API Documentation

ZeroMQ integration into Twisted reactor.


All ØMQ connections should belong to some context, txZMQ wraps that into concept of factory that tracks all connections created and wraps context.

Factory could be used as an easy way to close all connections and clean up Twisted reactor.

class txzmq.ZmqFactory[source]

I control individual ZeroMQ connections.

Factory creates and destroys ZeroMQ context.

  • reactor – reference to Twisted reactor used by all the connections
  • ioThreads (int) – number of IO threads ZeroMQ will be using for this context
  • lingerPeriod (int) – number of milliseconds to block when closing socket (terminating context), when there are some messages pending to be sent
  • connections (set) – set of instanciated ZmqConnection
  • context – ZeroMQ context


Create ZeroMQ context.


Shutdown factory.

This is shutting down all created connections and terminating ZeroMQ context. Also cleans up Twisted reactor.


Register factory to be automatically shut down on reactor shutdown.

It is recommended that this method is called on any created factory.

Base Connection

ZmqConnection isn’t supposed to be used explicitly, it is base for different socket types.

class txzmq.ZmqEndpointType[source]

Endpoint could be “bound” or “connected”.

bind = 'bind'

Bind, listen for connection.

connect = 'connect'

Connect to another endpoint.

class txzmq.ZmqEndpoint[source]

ZeroMQ endpoint used when connecting or listening for connections.

Consists of two members: type and address.

class txzmq.ZmqConnection(factory, endpoint=None, identity=None)[source]

Connection through ZeroMQ, wraps up ZeroMQ socket.

This class isn’t supposed to be used directly, instead use one of the descendants like ZmqPushConnection.

ZmqConnection implements glue between ZeroMQ and Twisted reactor: putting polling ZeroMQ file descriptor into reactor, processing events, reading data from socket.

  • socketType – socket type, from ZeroMQ
  • allowLoopbackMulticast (bool) – is loopback multicast allowed?
  • multicastRate (int) – maximum allowed multicast rate, kbps
  • highWaterMark (int) – hard limit on the maximum number of outstanding messages 0MQ shall queue in memory for any single peer
  • tcpKeepalive (int) – if set to 1, enable TCP keepalive, otherwise leave it as default
  • tcpKeepaliveCount (int) – override TCP_KEEPCNT socket option (where supported by OS)
  • tcpKeepaliveIdle (int) – override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS) socket option(where supported by OS).
  • tcpKeepaliveInterval (int) – override TCP_KEEPINTVL socket option(where supported by OS)
  • reconnectInterval (int) – set reconnection interval
  • reconnectIntervalMax (int) – set maximum reconnection interval
  • factory (ZmqFactory) – ZeroMQ Twisted factory reference
  • socket (zmq.Socket) – ZeroMQ Socket
  • endpoints (list of ZmqEndpoint) – ZeroMQ addresses for connect/bind
  • fd (int) – file descriptor of zmq mailbox
  • queue (deque) – output message queue
__init__(self, factory, endpoint=None, identity=None)[source]


One endpoint is passed to the constructor, more could be added via call to addEndpoints().

  • factory (ZmqFactory) – ZeroMQ Twisted factory
  • endpoint (ZmqEndpoint) – ZeroMQ address for connect/bind
  • identity (str) – socket identity (ZeroMQ), don’t set unless you know how it works

Add more connection endpoints.

Connection may have many endpoints, mixing ZeroMQ protocols (TCP, IPC, ...) and types (connect or bind).

Parameters:endpoints (list of ZmqEndpoint) – list of endpoints to add

Shutdown (close) connection and ZeroMQ socket.


Implementation of IFileDescriptor.

Returns ZeroMQ polling file descriptor.

Returns:The platform-specified representation of a file descriptor number.

Called when the connection was lost.

Implementation of IFileDescriptor.

This is called when the connection on a selectable object has been lost. It will be called whether the connection was closed explicitly, an exception occurred in an event handler, or the other end of the connection closed it first.


Some data is available for reading on ZeroMQ descriptor.

ZeroMQ is signalling that we should process some events, we’re starting to receive incoming messages.

Implementation of IReadDescriptor.


Implementation of ILoggingContext.

Returns:Prefix used during log formatting to indicate context.
Return type:str

Send message via ZeroMQ socket.

Sending is performed directly to ZeroMQ without queueing. If HWM is reached on ZeroMQ side, sending operation is aborted with exception from ZeroMQ (EAGAIN).

After writing read is scheduled as ZeroMQ may not signal incoming messages after we touched socket with write request.

Parameters:message (str or list of str) – message data, could be either list of str (multipart message) or just str

Called when complete message is received.

Not implemented in ZmqConnection, should be overridden to handle incoming messages.

Parameters:message – message data


For information on publish-subscribe in ØMQ, please read either reference or guide (look for publish-subscribe).


These classes use PUB and SUB sockets from ØMQ. Special framing is implemented to support sending tag: tag and message are separated by zero byte and sent over as single message. This is related to the way PUB-SUB works with PGM (UDP multicast): multipart messages are sent as multiple datagrams and they get mixed together if several publishers exist in the same broadcast domain.

class txzmq.ZmqPubConnection(factory, endpoint=None, identity=None)[source]

Bases: txzmq.connection.ZmqConnection

Publishing in broadcast manner.

publish(message, tag='')[source]

Publish message with specified tag.

  • message (str) – message data
  • tag (str) – message tag
class txzmq.ZmqSubConnection(factory, endpoint=None, identity=None)[source]

Bases: txzmq.connection.ZmqConnection

Subscribing to messages published by publishers.

Subclass this class and implement gotMessage() to handle incoming messages.


Subscribe to messages with specified tag (prefix).

Function may be called several times.

Parameters:tag (str) – message tag

Unsubscribe from messages with specified tag (prefix).

Function may be called several times.

Parameters:tag (str) – message tag

Overridden from ZmqConnection to process and unframe incoming messages.

All parsed messages are passed to gotMessage().

Parameters:message – message data
gotMessage(message, tag)[source]

Called on incoming message recevied by subscriber.

Should be overridden to handle incoming messages.

  • message – message data
  • tag – message tag


For information on push and pull sockets in ØMQ, please read either reference or guide (look for pull or push).

class txzmq.ZmqPushConnection(factory, endpoint=None, identity=None)[source]

Bases: txzmq.connection.ZmqConnection

Pushing messages to the socket.

Wrapper around ZeroMQ PUSH socket.


Push a message L{message}.

Parameters:message (str) – message data
class txzmq.ZmqPullConnection(factory, endpoint=None, identity=None)[source]

Bases: txzmq.connection.ZmqConnection

Pull messages from a socket.

Wrapper around ZeroMQ PULL socket.

Subclass and override onPull().


Called on incoming message from ZeroMQ.

Parameters:message – message data

Called on incoming message received by puller.

Parameters:message – message data

Request-Reply and Router-Dealer

For information on these socket types in ØMQ, please read either reference or guide (look for router/dealer and request/reply).

class txzmq.ZmqREQConnection(*args, **kwargs)[source]

Bases: txzmq.connection.ZmqConnection

A Request ZeroMQ connection.

This is implemented with an underlying DEALER socket, even though semantics are closer to REQ socket.

Socket mimics request-reply behavior by sending each message with unique uuid and recording Deferred associated with the message. When reply comes, it uses that Deferred to pass response back to the caller.

Variables:defaultRequestTimeout – default timeout for requests, disabled by default (seconds)
sendMsg(*messageParts, **kwargs)[source]

Send request and deliver response back when available.

  • messageParts (tuple) – message data
  • timeout (float) – as keyword argument, timeout on request

Deferred that will fire when response comes back


Called on incoming message from ZeroMQ.

Dispatches message to back to the requestor.

Parameters:message – message data
class txzmq.ZmqREPConnection(*args, **kwargs)[source]

Bases: txzmq.connection.ZmqConnection

A Reply ZeroMQ connection.

This is implemented with an underlying ROUTER socket, but the semantics are close to REP socket.

reply(messageId, *messageParts)[source]

Send reply to request with specified messageId.

  • messageId (str) – message uuid
  • messageParts (list) – message data

Called on incoming message from ZeroMQ.

Parameters:message – message data
gotMessage(messageId, *messageParts)[source]

Called on incoming request.

Override this method in subclass and reply using reply() using the same messageId.

  • messageId (str) – message uuid
  • messageParts – message data
class txzmq.ZmqRouterConnection(factory, endpoint=None, identity=None)[source]

Bases: txzmq.router_dealer.ZmqBase

Raw ZeroMQ ROUTER connection.

class txzmq.ZmqDealerConnection(factory, endpoint=None, identity=None)[source]

Bases: txzmq.router_dealer.ZmqBase

Raw ZeroMQ DEALER connection.