Message Formats

The hop client provides a few in-memory representations for common message types for easy access to various message properties, as well as loading messages from their serialized forms or from disk. These message formats, or models, can be sent directly to an open Stream to provide seamless serialization of messages through Hopskotch.

Structured Messages

Currently, the structured messages available through the hop client are VOEvent and GCNCircular. To give an example of its usage:

from hop import Stream
from hop.auth import load_auth
from hop.models import VOEvent

xml_path = "/path/to/voevent.xml"
voevent = VOEvent.load_file(xml_path)

stream = Stream(auth=load_auth())
with stream.open("kafka://hostname:port/topic", "w") as s:
    s.write(voevent)

Unstructured Messages

Unstructured (or less structured messages) can be sent directly to an open Stream instance. Any python object that can be JSON serialized can be sent. Examples include a dictionary, a string, and a list. At an even more raw level, bytes objects can be sent without any further encoding or interpretation.

On the more structured end of the spectrum, the hop client understands some general data formats, and can automatically encode and decode them. These include JSON with the JSONBlob class and Apache Avro with the AvroBlob class. Using JSONBlob is equivalent to simply writing an unstructured but JSON-encodable python object. AvroBlob supports efficiently including bytes subobjects, as well as schemas. If no schema is supplied, it will create a schema to describe the object(s) it is given, but a deliberately designed schema may also be used.

from hop import Stream
from hop.auth import load_auth
from hop.models import JSONBlob, AvroBlob
import fastavro

stream = Stream(auth=load_auth())
with stream.open("kafka://hostname:port/topic", "w") as s:

    # Writing simple, unstructured messages
    s.write("a string message")
    s.write(["some", "data", "with", "numbers:", 5, 6, 7])
    s.write({"priority": 1, "payload": "data"})
    s.write(b'\x02Binary data\x1DMessage ends\x03')

    # Explicitly writing a partially-structured message as JSON
    s.write(JSONBlob({"priority": 1, "payload": "data"}))

    # Write an Avro message with an ad-hoc schema
    # Avro may contain arbitrarily many records,
    # so it always expects a list or records to be written
    s.write(AvroBlob([{"priority": 1, "payload": b'\x02Binary data\x03'}]))

    # Write an Avro message with a specific schema
    schema = fastavro.load_schema("my_schema.avsc")
    s.write(AvroBlob([{priority: 1, payload: b'\x02Binary data\x03'}],
                     schema=schema))

All unstructured messages are unpacked by the hop client back into message model objects containing python objects equivalent to what was sent when they are read from a stream. The decoded objects are available from each of the unstructured message model types as content property. Some model types also make additional information available, for example, the AvroBlob also has a schema property which contains the schema with which the message was sent.

Please note that the AvroBlob message model serializes using the Avro container format, not the Avro variant of the Confluent wire format.

Register External Message Models

Sometimes it may be useful to use custom structured messages that aren’t currently available in the stock client. For instance, sending specialized messages between services that are internal to a specific observatory. The hop client provides a mechanism in which to register custom message types that are discoverable within hop when publishing and subscribing for your own project. This requires creating an external python library and setting up an entry point so that hop that discover it upon importing the client.

There are three steps involved in creating and registering a custom message model:

  1. Define the message model.

  2. Register the message model.

  3. Set up an entry point within your package.

Define a message model

To do this, you need to define a dataclass that subclasses hop.models.MessageModel, choose an identifier (name) that will be used to refer to your model, and implement functionality to load your message mode via the load() class method. As an example, assuming the message is represented as JSON on disk:

from dataclasses import dataclass
import json

from hop.models import MessageModel

@dataclass
class Donut(MessageModel):

    category: str
    flavor: str
    has_filling: bool

    format_name: "donut"  # optional

    @classmethod
    def load(cls, input_):
        # input_ is a file object
        if hasattr(donut_input, "read"):
            donut = json.load(input_)
        # serialized input_
        else:
            donut = json.loads(input_)

        # unpack the JSON dictionary and return the model
        return cls(**donut)

If you do not explicitly define the format name for your model, as a string property named format_name, the class name, converted to all lower case, will be used.

By default, the base MessageModel class will provide serialization and deserialization of the fields defined in your model to and from JSON. If you want greater control over how these processes work, your model class can define its own serialize and deserialize methods. If you choose to implement these methods yourself, serialize must return a dictionary with two keys: “format” which maps to your model’s identifier string, and “content” which maps to the encoded form of the model instance’s data, as a bytes object. Using hop.models.format_name is the recommended way to determine the value for the “format” key, as it will automatically follow the standard convention. deserialize must be a class method which accepts encoded data (as bytes) and produces an instance of your model after decoding. It is also possible to customize the load_file convenience class method, which normally just attempts to open the specified path as a file for reading and passes the resulting file object to load; the most common reason to customize this method is for models which need to ensure that input files are opened in binary mode.

For more information on dataclasses, see the Python Docs.

Register a message model

Once you have defined your message model, registering the message model involves defining a function with the hop.plugins.register decorator with key-value pairs mapping a message model name and the model:

from hop import plugins
from hop.models import format_name

...

@plugins.register
def get_models():
    model_classes = [Donut]
    return {format_name(cls): cls for cls in model_classes}

Using hop.models.format_name to compose the keys is recommended because it means that you only need to define the format name once, as part of the class definition.

Set up entry points within your package

After registering your model, you’ll need to set up an entry point to your package named hop_plugin as that entry point is explicitly used to auto-discover new plugins. The module used for the entry point is wherever you registered your model.

Setting up entry points may be different depending on how your package is set up. Below we’ll give an example for setuptools and setup.py. In setup.py:

from setuptools import setup

...

setup(
    ...

    entrypoints = {"hop_plugin": ["donut-plugin = my.custom.module"]}
)

Some further resources on entry points: