Streaming

The Stream Object

The Stream object allows a user to connect to a Kafka broker and read in a variety of alerts, such as GCN circulars. It also allows one to specify default settings used across all streams opened from the Stream instance.

Let’s open up a stream and show the Stream object in action:

from hop import Stream

stream = Stream(until_eos=True)
with stream.open("kafka://hostname:port/topic", "r") as s:
    for message in s:
         print(message.content)

The until_eos option allows one to listen to messages until the no more messages are available (EOS or end of stream). By default the connection is kept open indefinitely. This is to allow long-lived connections where one may set up a service to process incoming GCNs, for example.

A common use case is to not specify any defaults ahead of time, so a shorthand is provided for using one:

from hop import stream

with stream.open("kafka://hostname:port/topic", "r") as s:
    for message in s:
         print(message.content)

A complete list of configurable options in Stream are:

  • auth: A bool or auth.Auth instance to provide authentication

  • start_at: The message offset to start at, by passing in an io.StartPosition

  • until_eos: Whether to stop processing messages after an EOS is received

One doesn’t have to use the context manager protocol (with block) to open up a stream as long as the stream is explicitly closed afterwards:

from hop import stream

s = stream.open("kafka://hostname:port/topic", "r")
for message in s:
     print(message.content)
s.close()

So far, all examples have shown the iterator interface for reading messages from an open stream. But one can instead call s.read() directly or in the case of more specialized workflows, may make use of extra keyword arguments to configure an open stream. For example, the metadata option allows one to retrieve Kafka message metadata as well as the message itself, such as the Kafka topic, key, timestamp and offset. This may be useful in the case of listening to multiple topics at once:

from hop import stream

with stream.open("kafka://hostname:port/topic1,topic2", "r") as s:
    for message, metadata in s.read(metadata=True):
         print(message.content, metadata.topic)

Anatomy of a Kafka URL

Both the CLI and python API take a URL that describes how to connect to various Kafka topics, and takes the form:

kafka://[username@]broker/topic[,topic2[,...]]

The broker takes the form hostname[:port] and gives the URL to connect to a Kafka broker. Optionally, a username can be provided, which is used to select among available credentials to use when communicating with the broker. Finally, one can publish to a topic or subscribe to one or more topics to consume messages from.

Committing Messages Manually

By default, messages that are read in by the stream are marked as read immediately after returning them from an open stream instance for a given group ID. This is suitable for most cases, but some workflows have more strict fault tolerance requirements and don’t want to lose messages in the case of a failure while processing the current message. We can instead commit messages after we are done processing them so that in the case of a failure, a process that is restarted can get the same message back and finish processing it before moving on to the next. This requires returning broker-specific metadata as well as assigning yourself to a specific group ID. A workflow to do this is shown below:

from hop import stream

with stream.open("kafka://hostname:port/topic1", "r", "mygroup") as s:
    for message, metadata in s.read(metadata=True, autocommit=False):
         print(message.content, metadata.topic)
         s.mark_done(metadata)

Attaching Metadata to Messages

Apache Kafka supports headers to associate metadata with messages, separate from the message body, and the hop python API supports this feature as well. Headers should generally be small and ideally optional information; most of a message’s content should be in its body.

Each header has a string key, and a binary or unicode value. A collection of headers may be provided either as a dictionary or as a list of (key, value) tuples. Duplicate header keys are permitted; the list representation is necessary to utilize this allowance.

It is important to note that Hopskotch reserves all header names starting with an underscore (_) for internal use; users should not set their own headers with such names.

Sending messages with headers and viewing the headers attached to received messages can be done as shown below:

from hop import stream

with stream.open("kafka://hostname:port/topic1", "w") as s:
    s.write({"my": "message"}, headers={"priority": "1", "sender": "test"})
    s.write({"my": "other message"}, headers=[("priority", "2"), ("sender", "test")])
from hop import stream

with stream.open("kafka://hostname:port/topic1", "r") as s:
    for message, metadata in s.read(metadata=True):
        print(message, metadata.headers)

Standard Headers

The Hop client produces and uses certain message headers automatically. It is designed so that each header is intended to be optional, in the sense that messages lacking these headers can still be processed, but if a header is missing, functionality based on it may not be available. Headers currently automatically produced and used are:

  • _id: The value of this header is a unique ID intended to allow referring to the specific message without requiring context like its position within a Kafka topic. Message IDs are currently generated as version 4 RFC 4122 UUIDs. If the message ID header is missing, other users may not be able to send messages which refer to the message, and systems which store messages may not be able to look it up directly.

  • _sender: The value of this header is the username associated with the credential used to send the message, if any.

  • _test: The presence of this header, with any value, should be interpreted to mean that the message is a test, whose content may be safely ignored, or should otherwise not necessarily be acted upon normally.

  • _format: The value of this header is a UTF-8 string which is used to identify which message model should be used to decode the message content. If the format header is missing, an attempt will be made to decode the message content as JSON for backwards compatibility with old client versions, and if it is not valid JSON the message content will be left raw (treated as a Blob).

Because these header values are attached to messages by the publishing client, subscribers and systems receiving messages should be careful about the degree to which they trust the header values. For example, an ill-behaved publisher might re-use a message ID, or set an incorrect sender username. In most cases, however, due to the authentication and authorization systems enforced by the Kafka broker, subscribers receiving a message can generally trust its header values to the same extent that they trust the data in the message body, based on the entities they know are authorized to publish to the topic on which the message appears.