hop.robust_publisher

class hop.robust_publisher.PublicationJournal(journal_path='publisher.journal')[source]

An object which tracks the state of messages which are being sent, persists that state to disk, and enables it to be restored if the program stops unexpectedly.

__init__(journal_path='publisher.journal')[source]

Prepare a journal, including loading any data previously persisted to disk.

Parameters

journal_path – The filesystem path from/to which the journal data should be read/written.

Raises
  • PermissionError – If existing journal file does not have suitable permissions.

  • RuntimeError – If existing journal data cannot be read.

class NullLock[source]

A trivial context manager-compatible class which can be used in place of a lock when no locking is needed.

static error_callback(kafka_error: KafkaError)[source]

A safe callback handler for reporting Kafka errors.

get_delivery_callback(seq_num, lock=<hop.robust_publisher.PublicationJournal.NullLock object>)[source]

Construct a callback handler specific to a particular message which will either mark it successfully sent or requeue it to send again.

The callback which is produced will take two arguments: A confluent_kafka.KafkaError describing any error in sending the message, and confluent_kafka.Message containing the message itself.

Parameters
  • seq_num – The sequence number of the message in question, previously returned by get_next_message_to_send().

  • lock – An optional reference to a lock object which the callback should hold when invoked, e.g. to protect concurrent access to the journal.

get_next_message_to_send()[source]

Fetch the next message which should be sent

Returns

The next message in the form of a tuple of (seqeunce number, message, message headers), or None if there are no messages currently needing to be sent.

has_messages_in_flight()[source]

Check whether there are messages for which a sending attempt has been started, but has not yet conclusively succeeded or failed

has_messages_to_send()[source]

Check whether there are messages queued for sending (which have either not been sent at all, or for which all sending attempts so far have failed, causing them to be requeued).

mark_message_sent(sequence_number)[source]

Mark a message as successfully sent, and removes it from further consideration.

Truncates and restarts the backing journal file if the number of messages in-flight and waiting to be sent falls to zero, and restarts the sequence number assignment sequence.

Raises

RuntimeError – If no message with the specifed sequence number is currently recorded as being in-flight.

queue_message(message: bytes, headers=None)[source]

Record to the journal a message which is to be sent.

Parameters
  • message – A message to send, encoded as a bytes-like object.

  • headers – Headers to be sent with the message, as a list of 2-tuples of bytes-like objects.

Returns

The sequence number assigned to the message. Sequence numbers are unique among all messages which are ‘live’ at the same time, but will otherwise be recycled.

Raises
  • RuntimeError – If appending the new message to the on-disk journal fails.

  • TypeError – If the message is not a suitable type (bytes)

requeue_message(sequence_number)[source]
Record a message send attempt as having failed by moving the message back from the

in-flight pool to the queue of messages needing to be sent.

Raises

RuntimeError – If no message with the specifed sequence number is currently recorded as being in-flight.

class hop.robust_publisher.RobustProducer(url, auth=True, journal_path='publisher.journal', poll_wait=0.0001, **kwargs)[source]
__init__(url, auth=True, journal_path='publisher.journal', poll_wait=0.0001, **kwargs)[source]

Construct a publisher which will retry sending messages if it does not receive confirmation that they have arrived, including if it is itself taken offline (i.e. crashes) for some reason.

This is intended to provide at least once delivery of messages: If a message is confirmed received by the broker, it will not be sent again, but if any disruption of the network or the publisher itself prevents it from receiving that confirmation, even if the message was actually received by the broker, the publisher will assume the worst and send the message again. Users of this class (and more generally consumers of data published with it) should be prepared to discard duplicate messages.

Parameters
  • url – The URL for the Kafka topci to which messages will be published.

  • auth – A bool or Auth instance. Defaults to loading from auth.load_auth if set to True. To disable authentication, set to False.

  • journal_path – The path on the filesystem where the messages being sent should be recorded until they are known to have been successfully received. This path should be located somewhere that will survive system restarts, and if messages contain sensitive data it should be noted that they will be written unencrypted to this path. The journal size is generally limited to the sum of sizes of messages queued for sending or in flight at the same time, plus some small (few tens of bytes per message) bookkeeping overhead. Note that this size can become large is a lengthy network disruption prev ents messages from being sent; enough disk spacec should be available to cover this possibility for the expected message rate and duration of disruptions which may need to be handled.

  • poll_wait – The time the publisher should spend checking for receipt of each message directly after sending it. Tuning this parameter controls a tradeoff between low latency discovery of successful message delivery and throughput. If the time between sending messages is large compared to the latency for a message to be sent and for a confirmation of receipt to return, it is useful to increase this value so that the publisher will wait to discover that each message has been sent (in the success case) instead of sleeping and waiting for another message to send. If this value is ‘too low’ (much smaller than both the time for a message to be sent and acknowledged and the time for the next message to be ready for sending), the publisher will waste CPU time entering and exiting the internal function used to receive event notifications. If this value is too large (larger than or similar in size to the time between messages needing to be sent) throughput will be lost as time will be spent waiting to see if the previous message has been acknowledged which could be better spent getting the next message sent out. When in doubt, it is probably best to err on the side of choosing a small value.

  • kwargs – Any additional arguments to be passed to hop.io.open.

Raises
  • OSError – If a journal file exists but cannot be read.

  • Runtime Error – If the contents of the journal file are corrupted.

run()[source]

This method is not part of the public interface of this class, and should not be called directly by users.

start()[source]

Start the background communication thread used by the publisher to send messages. This should be called prior to any calls to RobustProducer.write. This method should not be called more than once.

stop()[source]

Stop the background communication thread used by the publisher to send messages. This method will block until the thread completes, which includes sending all queued messages. RobustProducer.write should not be called after this method has been called. This method should not be called more than once.

write(message, headers=None)[source]

Queue a message to be sent. Message sending occurs asynchronously on a background thread, so this method returns immediately unless an error occurs queuing the message. RobustProducer.start must be called prior to calling this method.

Parameters
  • message – A message to send.

  • headers – Headers to be sent with the message, as a list of 2-tuples of strings.

Raises
  • RuntimeError – If appending the new message to the on-disk journal fails.

  • TypeError – If the message is not a suitable type.