hop documentation¶
hop-client
is a pub-sub client library for Multimessenger Astrophysics.
User’s Guide¶
Installation¶
You can install hop-client either via pip, conda, or from source.
To install with pip:
pip install -U hop-client
To install with conda, you must use the channel from the SCiMMA Anaconda organization:
conda install --channel scimma hop-client
To install from source:
tar -xzf hop-client-x.y.z.tar.gz
cd hop-client-x.y.z
python setup.py install
Quickstart¶
Using the CLI¶
By default, authentication is enabled, reading in credentials
from auth.toml
. The path to this configuration can be found by running
hop auth locate
. One can initialize this configuration with default
settings by running hop auth add
. To disable authentication in the CLI
client, one can use the --no-auth
option.
Publish messages¶
hop publish kafka://hostname:port/gcn -f CIRCULAR example.gcn3
Example messages are provided in tests/data
including:
A GCN circular (
example.gcn3
)A VOEvent (
example_voevent.xml
)
Consume messages¶
hop subscribe kafka://hostname:port/gcn -s EARLIEST
This will read messages from the gcn topic from the earliest offset and read messages until an end of stream (EOS) is received.
View Available Topics¶
hop list-topics kafka://hostname:port/
This will list all of the topics on the given server which you are currently authorized to read or write.
Using the Python API¶
Publish messages¶
Using the python API, we can publish various types of messages, including structured messages such as GCN Circulars and VOEvents:
from hop import stream
from hop.models import GCNCircular
# read in a GCN circular
with open("path/to/circular.gcn3", "r") as f:
circular = GCNCircular.load(f)
with stream.open("kafka://hostname:port/topic", "w") as s:
s.write(circular)
In addition, we can also publish unstructured messages as long as they are JSON serializable:
from hop import stream
with stream.open("kafka://hostname:port/topic", "w") as s:
s.write({"my": "message"})
By default, authentication is enabled for the Hop broker, reading in configuration
settings from config.toml
. In order to modify various authentication options, one
can configure a Stream
instance and pass in an Auth
instance with credentials:
from hop import Stream
from hop.auth import Auth
auth = Auth("my-username", "my-password")
stream = Stream(auth=auth)
with stream.open("kafka://hostname:port/topic", "w") as s:
s.write({"my": "message"})
To explicitly disable authentication, one can set auth
to False
.
Consume messages¶
One can consume messages through the python API as follows:
from hop import stream
with stream.open("kafka://hostname:port/topic", "r") as s:
for message in s:
print(message)
This will listen to the Hop broker, listening to new messages and printing them to
stdout as they arrive until there are no more messages in the stream.
By default, this will only process new messages since the connection was opened.
The start_at
option lets you control where in the stream you can start listening
from. For example, if you’d like to listen to all messages stored in a topic, you can do:
from hop import stream
from hop.io import StartPosition
stream = Stream(start_at=StartPosition.EARLIEST)
with stream.open("kafka://hostname:port/topic", "r") as s:
for message in s:
print(message)
Streaming¶
The Stream Object¶
The Stream
object allows a user to connect to a Kafka broker and read
in a variety of alerts, such as GCN circulars. It also allows one to
specify default settings used across all streams opened from the Stream
instance.
Let’s open up a stream and show the Stream
object in action:
from hop import Stream
stream = Stream(persist=True)
with stream.open("kafka://hostname:port/topic", "r") as s:
for message in s:
print(message)
The persist
option allows one to listen to messages forever
and keeps the connection open after an end of stream (EOS) is received.
This is to allow long-lived connections where one may set up a service
to process incoming GCNs, for example.
A common use case is to not specify any defaults ahead of time, so a shorthand is provided for using one:
from hop import stream
with stream.open("kafka://hostname:port/topic", "r") as s:
for message in s:
print(message)
A complete list of configurable options in Stream
are:
auth
: A bool orauth.Auth
instance to provide authenticationstart_at
: The message offset to start at, by passing in anio.StartPosition
persist
: Whether to keep a long-live connection to the client beyond EOS
One doesn’t have to use the context manager protocol (with
block)
to open up a stream as long as the stream is explicitly closed afterwards:
from hop import stream
s = stream.open("kafka://hostname:port/topic", "r")
for message in s:
print(message)
s.close()
So far, all examples have shown the iterator interface for reading messages from an open
stream. But one can instead call s.read()
directly or in the case of more specialized
workflows, may make use of extra keyword arguments to configure an open stream. For example,
the metadata
option allows one to retrieve Kafka message metadata as well
as the message itself, such as the Kafka topic, key, timestamp and offset. This may
be useful in the case of listening to multiple topics at once:
from hop import stream
with stream.open("kafka://hostname:port/topic1,topic2", "r") as s:
for message, metadata in s.read(metadata=True):
print(message, metadata.topic)
Anatomy of a Kafka URL¶
Both the CLI and python API take a URL that describes how to connect to various Kafka topics, and takes the form:
kafka://[username@]broker/topic[,topic2[,...]]
The broker takes the form hostname[:port]
and gives the URL to connect to a
Kafka broker. Optionally, a username
can be provided, which is used to select
among available credentials to use when communicating with the broker.
Finally, one can publish to a topic or subscribe to one or more topics to consume messages
from.
Committing Messages Manually¶
By default, messages that are read in by the stream are marked as read immediately after returning them from an open stream instance for a given group ID. This is suitable for most cases, but some workflows have more strict fault tolerance requirements and don’t want to lose messages in the case of a failure while processing the current message. We can instead commit messages after we are done processing them so that in the case of a failure, a process that is restarted can get the same message back and finish processing it before moving on to the next. This requires returning broker-specific metadata as well as assigning yourself to a specific group ID. A workflow to do this is shown below:
from hop import stream
with stream.open("kafka://hostname:port/topic1", "r", "mygroup") as s:
for message, metadata in s.read(metadata=True, autocommit=False):
print(message, metadata.topic)
s.mark_done(metadata)
Authentication¶
Configuration¶
Since connections to the Hopskotch server require authentication, there
are several utilities exposed to generate and provide credentials for
both the CLI and python API. hop auth
provides command line
options to generate a configuration file with proper credentials needed
to authenticate.
In order to generate a configuration file, one can run hop auth add
,
which prompts for a username and password to connect to Hopskotch
to publish or subscribe to messages. If you have the credentials csv file, you can
use it directly with hop auth add <CREDENTIALS_FILE>
.
The default location for the authentication data file can be found with hop auth locate
,
which points by default to ${XDG_CONFIG_HOME}/hop/auth.toml
or
${HOME}/.config/hop/auth.toml
if the XDG_CONFIG_HOME
variable is not set.
Using Credentials¶
Authentication is enabled by default and will read credentials from the
path resolved by hop auth locate
.
Multiple credentials may be stored together using this mechanism.
Additional credentials may be added using hop auth add
, while the currently available
credentials may be displayed with hop auth list
and unwanted credentials can be removed
with hop auth remove
. Credentials can be added either interactively or from CSV files.
For removal, credentials are specified by username, or <username>@<hostname>
in case of ambiguity.
When using the hop CLI to connect to connect to a kafka server, a credential will be selected according to the following rules:
A credential with a matching hostname will be selected, unless no stored credential has a matching hostname, in which case a credential with no specific hostname can be selected.
If a username is specified as part of the authority component of the URL (e.g.
kafka://username@example.com/topic
) only credentials with that username will be considered.If no username is specified and there is only one credential, which is not specifically associated with any hostname, it will be used for all hosts.
For the python API, one can modify various authentication options by passing
in an Auth
instance with credentials to a Stream
instance.
This provides a similar interface to authenticating as with the requests library.
from hop import Stream
from hop.auth import Auth
auth = Auth("my-username", "my-password")
stream = Stream(auth=auth)
with stream.open("kafka://hostname:port/topic", "w") as s:
s.write({"my": "message"})
A list of multiple Auth
instance may also be passed, in which case the best match for the
connection being opened will be selected as described above.
In order to disable authentication in the command line interface, you can
pass --no-auth
for various CLI commands. For the python API, you
can set auth
to False
.
Message Formats¶
The hop client provides a few in-memory representations for common
message types for easy access to various message properties, as well
as loading messages from their serialized forms or from disk. These
message formats, or models, can be sent directly to an open Stream
to provide seamless serialization of messages through Hopskotch.
Structured Messages¶
Currently, the structured messages available through the hop client
are VOEvent
and GCNCircular
.
To give an example of its usage:
from hop import Stream
from hop.auth import load_auth
from hop.models import VOEvent
xml_path = "/path/to/voevent.xml"
voevent = VOEvent.load_file(xml_path)
stream = Stream(auth=load_auth())
with stream.open("kafka://hostname:port/topic", "w") as s:
s.write(voevent)
Unstructured Messages¶
Unstructured messages can be sent directly to an open Stream
instance
and will be serialized appropriately. Any python object that can be JSON
serialized can be sent. Examples include a dictionary, a byte64 encoded
string, and a list.
Register External Message Models¶
Sometimes it may be useful to use custom structured messages that aren’t currently available in the stock client. For instance, sending specialized messages between services that are internal to a specific observatory. The hop client provides a mechanism in which to register custom message types that are discoverable within hop when publishing and subscribing for your own project. This requires creating an external python library and setting up an entry point so that hop that discover it upon importing the client.
There are three steps involved in creating and registering a custom message model:
Define the message model.
Register the message model.
Set up an entry point within your package.
Define a message model¶
To do this, you need to define a dataclass that subclasses hop.models.MessageModel
and implement functionality to load your message mode via
the load()
class method. As an example, assuming the message is represented as
JSON on disk:
from dataclasses import dataclass
import json
from hop.models import MessageModel
@dataclass
class Donut(MessageModel):
category: str
flavor: str
has_filling: bool
@classmethod
def load(cls, input_):
# input_ is a file object
if hasattr(donut_input, "read"):
donut = json.load(input_)
# serialized input_
else:
donut = json.loads(input_)
# unpack the JSON dictionary and return the model
return cls(**donut)
For more information on dataclasses, see the Python Docs.
Register a message model¶
Once you have defined your message model, registering the message model involves
defining a function with the hop.plugins.register
decorator with key-value
pairs mapping a message model name and the model:
from hop import plugins
...
@plugins.register
def get_models():
return {
"donut": Donut,
}
Set up entry points within your package¶
After registering your model, you’ll need to set up an entry point to your package
named hop_plugin
as that entry point is explicitly used to auto-discover
new plugins. The module used for the entry point is wherever you registered your
model.
Setting up entry points may be different depending on how your package is set up. Below we’ll give an example for setuptools and setup.py. In setup.py:
from setuptools import setup
...
setup(
...
entrypoints = {"hop_plugin": ["donut-plugin = my.custom.module"]}
)
Some further resources on entry points:
Commands¶
hop-client provides a command line interface for various tasks:
hop auth
: Authentication utilitieshop list-topics
: SHow accessible Kafka topicshop publish
: Publish messages such as GCN circulars and noticeshop subscribe
: Listen to messages such as GCN circulars and noticeshop version
: Show version dependencies ofhop-client
hop auth
¶
This command allows a user to configure credentials for authentication.
usage: hop auth [-h] <command> ...
Authentication configuration utilities.
optional arguments:
-h, --help show this help message and exit
commands:
<command>
locate display configuration path
list Display all stored credentials
add Load a credential, specified either via a CSV file or
interactively
remove Delete a stored credential
No valid credential data found
You can get a credential from https://my.hop.scimma.org
To load your credential, run `hop auth add`
hop list-topics
¶
This command allows a user to view the topics that are available for subscribing or publishing on a given Hopskotch server.
Note that other topics may exist which the current user does not have permission to access.
usage: hop list-topics [-h] [--no-auth] URL
List available topics.
positional arguments:
URL Sets the URL (kafka://host[:port]/topic) to publish messages to.
optional arguments:
-h, --help show this help message and exit
--no-auth If set, disable authentication.
No valid credential data found
You can get a credential from https://my.hop.scimma.org
To load your credential, run `hop auth add`
hop publish
¶
This command allows a user to publish various structured and unstructured messages, including:
An XML formatted GCN/VOEvent notice
Unstructured messages such as JSON-serializable data.
Structured messages such as GCN circulars and VOEvents are published as JSON-formatted text.
Unstructured messages may be piped to this command to be published. This mode of operation requires JSON input with individual messages separated by newlines, and the Blob format (-f BLOB) to be selected.
usage: hop publish [-h] [--no-auth] [-f {VOEVENT,CIRCULAR,BLOB}]
URL [MESSAGE [MESSAGE ...]]
Publish messages.
positional arguments:
URL Sets the URL (kafka://host[:port]/topic) to publish
messages to.
MESSAGE Messages to publish.
optional arguments:
-h, --help show this help message and exit
--no-auth If set, disable authentication.
-f {VOEVENT,CIRCULAR,BLOB}, --format {VOEVENT,CIRCULAR,BLOB}
Specify the message format. Defaults to BLOB for an
unstructured message.
No valid credential data found
You can get a credential from https://my.hop.scimma.org
To load your credential, run `hop auth add`
hop subscribe
¶
This command allows a user to subscribe to messages and print them to stdout.
usage: hop subscribe [-h] [--no-auth] [-s {EARLIEST,LATEST}] [-p]
[-g GROUP_ID] [-j]
URL
Subscribe to messages.
positional arguments:
URL Sets the URL (kafka://host[:port]/topic) to publish
messages to.
optional arguments:
-h, --help show this help message and exit
--no-auth If set, disable authentication.
-s {EARLIEST,LATEST}, --start-at {EARLIEST,LATEST}
Set the message offset offset to start at. Default:
LATEST.
-p, --persist If set, persist or listen to messages indefinitely.
Otherwise, will stop listening when EOS is received.
-g GROUP_ID, --group-id GROUP_ID
Consumer group ID. If unset, a random ID will be
generated.
-j, --json Request message output as raw json
No valid credential data found
You can get a credential from https://my.hop.scimma.org
To load your credential, run `hop auth add`
hop version
¶
This command prints all the versions of the dependencies
usage: hop version [-h]
List all the dependencies' versions.
optional arguments:
-h, --help show this help message and exit
No valid credential data found
You can get a credential from https://my.hop.scimma.org
To load your credential, run `hop auth add`
API Reference¶
hop-client API¶
hop.auth¶
- class hop.auth.Auth(user, password, host='', ssl=True, method=SASLMethod.SCRAM_SHA_512, **kwargs)[source]¶
Attach SASL-based authentication to a client.
Returns client-based auth options when called.
- Parameters
user (str) – Username to authenticate with.
password (str) – Password to authenticate with.
host (str, optional) – The name of the host for which this authentication is valid.
ssl (bool, optional) – Whether to enable SSL (enabled by default).
method (SASLMethod, optional) – The SASL method to authenticate, default = SASLMethod.SCRAM_SHA_512. See valid SASL methods in SASLMethod.
ssl_ca_location (str, optional) – If using SSL via a self-signed cert, a path/location to the certificate.
- hop.auth.add_credential(args)[source]¶
Load a new credential and store it to the persistent configuration.
- Parameters
args – Command line options/arguments object. args.cred_file is taken as the path to a CSV file to import, or if None the user is prompted to enter a credential directly. args.force controls whether an existing credential with an identical name will be overwritten.
- hop.auth.delete_credential(name: str)[source]¶
Delete a credential from the persistent configuration.
- Parameters
name – The username, or username and hostname separated by an ‘@’ character of the credential
delete. (to) –
- Raises
RuntimeError – If no credentials or more than one credential matches the specified name, making the operation impossible or ambiguous.
- hop.auth.load_auth(config_file=None)[source]¶
Configures an Auth instance given a configuration file.
- Parameters
config_file – Path to a configuration file, loading from the default location if not given.
- Returns
A list of configured Auth instances.
- Raises
RuntimeError – The config file exists, but has unsafe permissions and will not be read until they are corrected.
KeyError – An error occurred parsing the configuration file.
FileNotFoundError – The configuration file, either as specified explicitly or found automatically, does not exist
- hop.auth.prune_outdated_auth(config_file=None)[source]¶
Remove auth data from a general configuration file.
This can be needed when updating auth data which was read from the general config for backwards compatibility, but is then written out to the correct new location in a separate auth config, as is now proper. With no further action, this would leave a vestigial copy from before the update in the general config file, which would not be rewritten, so this function exists to perform the necessary rewrite.
- Parameters
config_file – Path to a configuration file, rewriting the default location if not given.
- Raises
RuntimeError – The config file is malformed.
- hop.auth.read_new_credential(csv_file=None)[source]¶
Import a credential from a CSV file or obtain it interactively from the user.
- Parameters
csv_file – Path to a file from which to read credential data in CSV format. If unspecified, the user will be prompted to enter data instead.
- Returns
A configured Auth object containing the new credential.
- Raises
FileNotFoundError – If csv_file is not None and refers to a nonexistent path.
KeyError – If csv_file is not None and the specified file does not contain either a username or password field.
RuntimeError – If csv_file is None and the interactively entered username or passwod is empty.
- hop.auth.select_matching_auth(creds, hostname, username=None)[source]¶
Selects the most appropriate credential to use when attempting to contact the given host.
- Parameters
creds – A list of configured Auth objects. These can be obtained from
load_auth()
.hostname – The name of the host for which to select suitable credentials.
username – str, optional The name of the credential to use.
- Returns
A single Auth object which should be used to authenticate.
- Raises
RuntimeError – Too many or too few credentials matched.
hop.cli¶
hop.configure¶
- hop.configure.get_config_path(type: str = 'general')[source]¶
Determines the default location for auth configuration.
- Parameters
type – The type of configuration data for which the path should be looked up. Recognized types are ‘general’ and ‘auth’.
- Returns
The path to the requested configuration file.
- Raises
ValueError – Unrecognized config type requested.
hop.io¶
- class hop.io.Consumer(group_id, broker_addresses, topics, **kwargs)[source]¶
An event stream opened for reading one or more topics. Instances of this class should be obtained from
Stream.open()
.- mark_done(metadata)[source]¶
Mark a message as fully-processed.
- Parameters
metadata – A Metadata instance containing broker-specific metadata.
- read(metadata=False, autocommit=True, **kwargs)[source]¶
Read messages from a stream.
- Parameters
metadata – Whether to receive message metadata alongside messages.
autocommit – Whether messages are automatically marked as handled via mark_done when the next message is yielded. Defaults to True.
batch_size – The number of messages to request from Kafka at a time. Lower numbers can give lower latency, while higher numbers will be more efficient, but may add latency.
batch_timeout – The period of time to wait to get a full batch of messages from Kafka. Similar to batch_size, lower numbers can reduce latency while higher numbers can be more efficient at the cost of greater latency. If specified, this argument should be a datetime.timedelta object.
- class hop.io.Deserializer(value)¶
An enumeration.
- class hop.io.Metadata(topic: str, partition: int, offset: int, timestamp: int, key: Union[str, bytes], _raw: cimpl.Message)[source]¶
Broker-specific metadata that accompanies a consumed message.
- class hop.io.Producer(broker_addresses, topic, **kwargs)[source]¶
An event stream opened for writing to a topic. Instances of this class should be obtained from
Stream.open()
.
- class hop.io.Stream(auth=True, start_at=ConsumerStartPosition.LATEST, persist=False)[source]¶
Defines an event stream.
Sets up defaults used within the client so that when a stream connection is opened, it will use defaults specified here.
- Parameters
auth – A bool or
Auth
instance. Defaults to loading fromauth.load_auth
if set to True. To disable authentication, set to False.start_at – The message offset to start at in read mode. Defaults to LATEST.
persist – Whether to listen to new messages forever or stop when EOS is received in read mode. Defaults to False.
- open(url, mode='r', group_id=None)[source]¶
Opens a connection to an event stream.
- Parameters
url – Sets the broker URL to connect to.
mode – Read (‘r’) or write (‘w’) from the stream.
group_id – The consumer group ID from which to read. Generated automatically if not specified.
- Returns
An open connection to the client, either a
Producer
instance in write mode or aConsumer
instance in read mode.- Raises
ValueError – If the mode is not set to read/write, if more than one topic is specified in write mode, or if more than one broker is specified
hop.publish¶
hop.subscribe¶
hop.models¶
- class hop.models.Blob(content: Union[str, int, float, bool, None, Dict[str, Any], List[Any]], missing_schema: bool = False)[source]¶
Defines an unformatted message blob.
- asdict()[source]¶
Represents the message as a dictionary.
- Returns
The dictionary representation of the message.
- class hop.models.GCNCircular(header: dict, body: str)[source]¶
Defines a GCN Circular structure.
The parsed GCN circular is formatted as a dictionary with the following schema:
{‘headers’: {‘title’: …, ‘number’: …, …}, ‘body’: …}
- class hop.models.MessageModel[source]¶
An abstract message model.
- abstract classmethod load(input_)[source]¶
Create a new message model from a file object or string. This base implementation has no functionality and should not be called.
- Parameters
input – A file object or string.
- Returns
The message model.
- class hop.models.VOEvent(ivorn: str, role: str = 'observation', version: str = '2.0', Who: dict = <factory>, What: dict = <factory>, WhereWhen: dict = <factory>, How: dict = <factory>, Why: dict = <factory>, Citations: dict = <factory>, Description: dict = <factory>, Reference: dict = <factory>)[source]¶
Defines a VOEvent 2.0 structure.
- Implements the schema defined by: