Ductworks Message Ducts

This page documents a few examples around the ductworks.message_duct module as well as the API docs for the objects contained in this module that are public facing.

Examples

Creating a Local Anonymous Duct Pair

This creates a duct pair that is a drop in interface-compatible replacement for multiprocessing.Connections created via the multiprocessing.Pipe routine.

from ductworks.message_duct import create_psuedo_anonymous_duct_pair

# Create a new duct pair, much as you might a connection pair
parent_duct, child_duct = create_psuedo_anonymous_duct_pair()

# Send the message across the wire
parent_duct.send("hello!")

# Wait for a message to come across
assert child_duct.poll(1)
received = child_duct.recv()

# Check to ensure we've received the right data and send it back
assert received == "hello!"
child_duct.send(received)

# Wait for parent to get data back
assert parent_duct.poll(1)
received = parent_duct.recv()

# Ensure that the bounced data is correct
assert received == "hello!"

Creating an explicit TCP Duct Pair

For the situations that Ductworks usually targets, you’ll want to instantiate the Ducts individually. For example, if we wanted to open a Duct up over TCP (for potentially remote systems), we can do as follows:

from __future__ import print_function
from ductworks.message_duct import MessageDuctParent, MessageDuctChild

# Create an anonymous TCP parent duct
# This is by default bound to the first available open port on the
# "localhost" interface; if you want to talk to a remote system, you should
# either specify the desired interface address (good) or "0.0.0.0"
# (potentially less good)
parent_duct = MessageDuctParent.psuedo_anonymous_tcp_parent_duct()

# Bind parent duct to begin listening for incoming connections
parent_duct.bind()

# Get the interface address and port from the parent_duct
# Note that this may be different from the specified bind address
# and that the proper address to connect to should usually be extracted
# this way.
parent_interface_address, parent_port = parent.listener_address

# At some point later, the child is given the parent addresses
# and begins to conenct
child_duct = MessageDuctChild.psuedo_anonymous_tcp_child_duct(
    parent_interface_address, parent_port
)
child_duct.connect()

# While the child is connecting, the parent will continue to listen until
# The child is connected...
while not parent.listen(1):
    # Check at 1 second intervals forever for a child to connect.
    pass

# The parent and child may now exchange data
parent.send("test")

assert child.poll(1)
assert child.recv()

# After communication is finished, the child and parent can be explicitly closed.
child.close()

# At this point, any attempt to send data in the parent will raise RemoteDuctClosed
# which inherits from EOFError and Ductworks exception, making it again a drop-in
# replacement for multiprocessing.Pipe

try:
    parent.recv()
except EOFError:
    print("parent.recv() triggered an EOFError! Other side must be closed!")
else:
    # Shouldn't ever be here.
    assert False

Message Duct Objects

Parent Message Duct

class ductworks.message_duct.MessageDuctParent(socket_duct, serialize=<function serialize>, deserialize=<function deserialize>, lock=None)[source]

The MessageDuctParent is an abstraction over the SocketDuctParent and provides an interface compatible with Python’s multiprocessing.Connection (created by multiprocessing.Pipe). The Message Duct, much like multiprocessing Connections, can send native data structures over the wire and does so without the developer worrying about how to make the underlying stream based connections work correctly with arbitrary-size message based semantics. This means for the end user, a single send() is all that is needed to send data over the connection, and a single recv() on the other side will produce the correct data structure without any additional hassle.

Unlike multiprocessing.Connection, MessageDuctParent supports some additional features; first, while a reasonable default choice is made for serialization and deserialization (UTF-8 encoded JSON), the user may specify the serialization and deserialization routines; this can include higher performance or more Python-centric serialization libraries such as marshal, pickle, or dill, or general high performance serialization libraries such as msgpack. It would not be outside of the realm of possibility to bolt-in other serialization libraries such as Cap’n Proto or messagebuffers though these would not be a direct fit because of the differing paradigms.

Additional the message ducts allow an arbitrary lock structure to be used when communicating, which the sole requirement being that it must obey the same semantics as Python’s built in lock structures. This can be as simple as using the anonymous POSIX thread locking (threading.Lock) to eliminate race conditions when the duct is shared between competing threads, to using multiprocessing.Lock, a wrapper around flock(2), or a named POSIX locks library to synchronize access between local processes. It is even conceivable to bolt-in distributed lock systems, like etcd.

Finally, the Message Duct retains the full power of the underlying Socket Duct to pick and choose the connection constructor and destructor, which means that both TCP and stream-oriented Unix Domain Sockets are provided and supported, and other networking libraries such as nanomsg or ZeroMQ could also be used with minimal extra work if needed. This enables this library to allow simple pipe like semantics even when both ends are children of the same parent process, or even located on the same physical system.

This side of the message duct must listen and wait for a MessageDuctChild to connect in order to begin communication.

bind()[source]

Bind the underlying socket duct. :return: None :rtype: NoneType

bind_address

Access the address specified by the user to bind to. This may not be the actual address the parent is/was listening on.

Returns:The bound address
Return type:(str, int) | str
close()[source]

Close the underlying socket duct. :return: None

fileno()[source]

Get the file descriptor for the connection socket in the socket duct. This is useful for integrating into other event loops.

Returns:The connection file descriptor.
Return type:int
listen()[source]

Listen on the underyling socket duct. :return: None :rtype: NoneType

listener_address

Access the actual address the parent socket is/was listening on.

Returns:The bound address
Return type:(str, int) | str
poll(timeout=60)[source]

Poll the underlying socket duct to check for new messages. :param timeout: The amount of time to wait for a new message, if none is present. Default: 60 seconds. :type timeout: int :return: True if a message is waiting, False otherwise. :rtype: bool

classmethod psuedo_anonymous_parent_duct(bind_address=None, serialize=<function serialize>, deserialize=<function deserialize>, lock=None, timeout=30)[source]

Create a new psuedo-anonymous parent message duct with Unix Domain sockets.

Parameters:
  • bind_address (basestring | None) – The (filesystem) address to listen on, if any. If None a new random address will be chosen. Default: None.
  • serialize – The serialization function for sending messages. Default: Encoded JSON.
  • deserialize – The deserialization function for sending messages. Default: Encoded JSON.
  • lock – A lock object to lock send/recv calls.
  • timeout (int | float) – The number of seconds to block a send/recv call waiting for completion.
Returns:

A new MessageDuctParent.

Return type:

ductworks.message_duct.MessageDuctParent

classmethod psuedo_anonymous_tcp_parent_duct(bind_address='localhost', bind_port=0, serialize=<function serialize>, deserialize=<function deserialize>, lock=None, timeout=30)[source]

Create a new psuedo-anonymous parent message duct with TCP sockets.

Parameters:
  • bind_address (basestring) – The interface address to listen on, if any. Default: ‘localhost’.
  • bind_port (int) – The port to bind the interface to. If 0, pick the a random open port. Default: 0.
  • serialize – The serialization function for sending messages. Default: Encoded JSON.
  • deserialize – The deserialization function for sending messages. Default: Encoded JSON.
  • lock – A lock object to lock send/recv calls.
  • timeout (int | float) – The number of seconds to block a send/recv call waiting for completion.
Returns:

A new MessageDuctParent.

Return type:

ductworks.message_duct.MessageDuctParent

recv()[source]

Receive a payload from the other end, if connected and data is present.

Returns:A deserialized Python object from the other end of the duct.
send(payload)[source]

Send a payload to the other end, if connected.

Parameters:payload – A serializable Python object to send to the other duct.
Returns:None
Return type:NoneType

Child Message Duct

class ductworks.message_duct.MessageDuctChild(socket_duct, serialize=<function serialize>, deserialize=<function deserialize>, lock=None)[source]

The MessageDuctChild is an abstraction over the SocketDuctChild and provides an interface compatible with Python’s multiprocessing.Connection (created by multiprocessing.Pipe).

This side must connect to a listening MessageDuctParent in order to begin communication.

close()[source]

Close the underlying socket duct. :return: None

connect()[source]

Call connect() on the underlying socket duct. :return: None

fileno()[source]

Get the file descriptor for the connection socket in the socket duct. This is useful for integrating into other event loops.

Returns:The connection file descriptor.
Return type:int
poll(timeout=60)[source]

Poll the underlying socket duct to check for new messages. :param timeout: The amount of time to wait for a new message, if none is present. Default: 60 seconds. :type timeout: int :return: True if a message is waiting, False otherwise. :rtype: bool

classmethod psuedo_anonymous_child_duct(connect_address, serialize=<function serialize>, deserialize=<function deserialize>, lock=None, timeout=30)[source]

Create a new psuedo-anonymous child message duct with Unix Domain sockets. The connect_address parameter should be sourced from the parent duct by getting its listener_address property.

Parameters:
  • connect_address (basestring) – The filesystem address to connect to. Get this from parent_duct.listener_address.
  • serialize – The serialization function for sending messages. Default: Encoded JSON.
  • deserialize – The deserialization function for sending messages. Default: Encoded JSON.
  • lock – A lock object to lock send/recv calls.
  • timeout (int | float) – The number of seconds to block a send/recv call waiting for completion.
Returns:

A new MessageParentDuct.

Return type:

ductworks.message_duct.MessageDuctChild

classmethod psuedo_anonymous_tcp_child_duct(connect_address, connect_port, serialize=<function serialize>, deserialize=<function deserialize>, lock=None, timeout=30)[source]

Create a new psuedo-anonymous child message duct with Unix Domain sockets. The connect_address and connect_port parameters should be sourced from the parent duct by getting its listener_address property.

Parameters:
  • connect_address (basestring) – The interface address to connect to. Get this from parent_duct.listener_address[0].
  • connect_port (int) – The TCP port to connect to. Get this from parent_duct.listener_address[1].
  • serialize – The serialization function for sending messages. Default: Encoded JSON.
  • deserialize – The deserialization function for sending messages. Default: Encoded JSON.
  • lock – A lock object to lock send/recv calls.
  • timeout (int | float) – The number of seconds to block a send/recv call waiting for completion.
Returns:

A new MessageParentDuct.

Return type:

ductworks.message_duct.MessageDuctChild

recv()[source]

Receive a payload from the other end, if connected and data is present.

Returns:A deserialized Python object from the other end of the duct.
send(payload)[source]

Send a payload to the other end, if connected.

Parameters:payload – A serializable Python object to send to the other duct.
Returns:None

Supporting Functions and Datastructures

ductworks.message_duct.create_psuedo_anonymous_duct_pair(serialize=<function serialize>, deserialize=<function deserialize>, parent_lock=None, child_lock=None)[source]

Create an already connected pair of anonymous ducts. This is very similar to how multiprocess.Pipe(True) functions.

Parameters:
  • serialize – The serializer function for the pair. Defaults to encoded JSON.
  • deserialize – The deserializer funtion for the pair. Defaults to encoded JSON.
  • parent_lock – An optional lock object to give to the “parent” duct.
  • child_lock – An optional lock object to give to the “child” duct.
Returns:

A parent/child pair of ducts.

Return type:

(ductworks.message_duct.MessageDuctParent, ductworks.message_duct.MesssageDuctChild)

exception ductworks.message_duct.MessageProtocolException[source]
exception ductworks.message_duct.RemoteDuctClosed[source]
ductworks.message_duct.serializer_with_encoder_constructor(serialization_func, encoder_type='utf-8', encoder_error_mode='strict')[source]

Wrap a serialization function with string encoding. This is important for JSON, as it serializes objects into strings (potentially unicode), NOT bytestreams. An extra encoding step is needed to get to a bytestream.

Parameters:
  • serialization_func – The base serialization function.
  • encoder_type – The encoder type. Default: ‘utf-8’
  • encoder_error_mode – The encode error mode. Default: ‘strict’.
Returns:

The serializer function wrapped with specified encoder.

Return type:

T -> bytes | bytearray | str

ductworks.message_duct.deserializer_with_decoder_constructor(deserialization_func, decoder_type='utf-8', decoder_error_mode='replace')[source]

Wrap a deserialization function with string encoding. This is important for JSON, as it expects to operate on strings (potentially unicode), NOT bytetsteams. A decoding steps is needed in between.

Parameters:
  • deserialization_func – The base deserialization function.
  • decoder_type – The decoder type. Default: ‘utf-8’
  • decoder_error_mode – The decode error mode. Default: ‘replace’.
Returns:

The deserializer function wrapped with specified decoder.

Return type:

bytes | bytearray | str -> T

ductworks.message_duct.default_serializer = <function serialize>
ductworks.message_duct.default_deserializer = <function deserialize>
ductworks.message_duct.MAGIC_BYTE = 'T'

str(object=’‘) -> string

Return a nice string representation of the object. If the argument is a string, the return value is the same object.