Streaming¶
The Stream Object¶
The Stream
object allows a user to connect to a Kafka broker and read
in a variety of alerts, such as GCN circulars. It also allows one to
specify default settings used across all streams opened from the Stream
instance.
Let’s open up a stream and show the Stream
object in action:
from hop import Stream
stream = Stream(persist=True)
with stream.open("kafka://hostname:port/topic", "r") as s:
for message in s:
print(message)
The persist
option allows one to listen to messages forever
and keeps the connection open after an end of stream (EOS) is received.
This is to allow long-lived connections where one may set up a service
to process incoming GCNs, for example.
A common use case is to not specify any defaults ahead of time, so a shorthand is provided for using one:
from hop import stream
with stream.open("kafka://hostname:port/topic", "r") as s:
for message in s:
print(message)
A complete list of configurable options in Stream
are:
auth
: A bool orauth.Auth
instance to provide authenticationstart_at
: The message offset to start at, by passing in anio.StartPosition
persist
: Whether to keep a long-live connection to the client beyond EOS
In addition, stream.open
provides an option to retrieve Kafka message metadata as well
as the message itself, such as the Kafka topic, key, timestamp and offset. This may
be useful in the case of listening to multiple topics at once:
from hop import stream
with stream.open("kafka://hostname:port/topic1,topic2", "r", metadata=True) as s:
for message, metadata in s:
print(message, metadata.topic)