from dataclasses import dataclass
from functools import lru_cache
from enum import Enum
import json
import logging
import random
import string
from typing import Union
import warnings

import confluent_kafka
import pluggy

from adc import consumer, errors, kafka, producer

from .configure import get_config_path
from .auth import Auth
from .auth import load_auth
from .auth import select_matching_auth
from . import models
from . import plugins

logger = logging.getLogger("hop")

StartPosition = consumer.ConsumerStartPosition

[docs]class Stream(object): """Defines an event stream. Sets up defaults used within the client so that when a stream connection is opened, it will use defaults specified here. Args: auth: A `bool` or :class:`Auth <hop.auth.Auth>` instance. Defaults to loading from :meth:`auth.load_auth <hop.auth.load_auth>` if set to True. To disable authentication, set to False. start_at: The message offset to start at in read mode. Defaults to LATEST. persist: Whether to listen to new messages forever or stop when EOS is received in read mode. Defaults to False. """ def __init__(self, auth=True, start_at=StartPosition.LATEST, persist=False): self._auth = [auth] if isinstance(auth, Auth) else auth self.start_at = start_at self.persist = persist @property @lru_cache(maxsize=1) def auth(self): # configuration is disabled in adc-streaming by passing None, # so to provide a nicer interface, we allow boolean flags as well. # this also explicitly gets around a problem in setting # configuration to True by default in the convenience class `stream` # which is set to `Stream()`. instead, configuration is first loaded # during the first open stream and cached for future use. if isinstance(self._auth, bool): if self._auth: try: return load_auth() except FileNotFoundError: logger.error( "configuration set to True and configuration file " f"not found at {get_config_path('auth')} to authenticate" ) raise else: return None else: return self._auth
[docs] def open(self, url, mode="r", group_id=None): """Opens a connection to an event stream. Args: url: Sets the broker URL to connect to. mode: Read ('r') or write ('w') from the stream. group_id: The consumer group ID from which to read. Generated automatically if not specified. Returns: An open connection to the client, either a :class:`Producer` instance in write mode or a :class:`Consumer` instance in read mode. Raises: ValueError: If the mode is not set to read/write, if more than one topic is specified in write mode, or if more than one broker is specified """ username, broker_addresses, topics = kafka.parse_kafka_url(url) if len(broker_addresses) > 1: raise ValueError("Multiple broker addresses are not supported") logger.debug("connecting to addresses=%s username=%s topics=%s", broker_addresses, group_id, topics) if topics is None: raise ValueError("no topic(s) specified in kafka URL") if self.auth is not None: credential = select_matching_auth(self.auth, broker_addresses[0], username) else: credential = None if mode == "w": if len(topics) != 1: raise ValueError("must specify exactly one topic in write mode") if group_id is not None: warnings.warn("group ID has no effect when opening a stream in write mode") return Producer(broker_addresses, topics[0], auth=credential) elif mode == "r": if group_id is None: username = credential.username if credential is not None else None group_id = _generate_group_id(username, 10)"group ID not specified, generating a random group ID: {group_id}") return Consumer( group_id, broker_addresses, topics, start_at=self.start_at, auth=credential, read_forever=self.persist, ) else: raise ValueError("mode must be either 'w' or 'r'")
def _load_deserializer_plugins(): """Load all registered deserializer plugins. """ # set up plugin manager manager = pluggy.PluginManager("hop") manager.add_hookspecs(plugins) # load in base models manager.register(models) # load external models try: manager.load_setuptools_entrypoints("hop_plugin") except Exception: logger.warning( "Could not load external message plugins as one or more plugins " "generated errors upon import. to fix this issue, uninstall or fix " "any problematic plugins that are currently installed." ) import sys import traceback traceback.print_exc(file=sys.stderr) # add all registered plugins to registry registered = {} for model_plugins in manager.hook.get_models(): for name, model in model_plugins.items(): plugin_name = name.upper() if plugin_name in registered: logger.warning( f"Identified duplicate message plugin {plugin_name} registered under " "the same name. this may cause unexpected behavior when using this " "message format." ) registered[plugin_name] = model return registered class _DeserializerMixin: @classmethod def deserialize(cls, message): """Deserialize a stream message and instantiate a model. Args: message: A serialized message. Returns: A data container corresponding to the format in the serialized message. Raises: ValueError: If the message is incorrectly formatted or if the message format is not recognized. """ try: format = message["format"].upper() content = message["content"] except (KeyError, TypeError): raise ValueError("Message is incorrectly formatted") else: if format == return content elif format in cls.__members__: return cls[format].value(**content) else: logger.warning(f"Message format {format} not recognized, returning a Blob") return models.Blob(content=content, missing_schema=True) def load(self, input_): return self.value.load(input_) def load_file(self, input_file): return self.value.load_file(input_file) Deserializer = Enum( "Deserializer", _load_deserializer_plugins(), module=__name__, type=_DeserializerMixin ) def _generate_group_id(user, n): """Generate a random Kafka group ID. Args: user: Username associated with the credential being used n: Length of randomly generated string suffix. Returns: The generated group ID. """ alphanum = string.ascii_uppercase + string.digits rand_str = ''.join(random.SystemRandom().choice(alphanum) for _ in range(n)) if user is None: return rand_str return '-'.join((user, rand_str))
[docs]@dataclass(frozen=True) class Metadata: """Broker-specific metadata that accompanies a consumed message. """ topic: str partition: int offset: int timestamp: int key: Union[str, bytes] _raw: confluent_kafka.Message @classmethod def from_message(cls, msg: confluent_kafka.Message) -> 'Metadata': return cls( topic=msg.topic(), partition=msg.partition(), offset=msg.offset(), timestamp=msg.timestamp()[1], key=msg.key(), _raw=msg, )
[docs]class Consumer: """ An event stream opened for reading one or more topics. Instances of this class should be obtained from :meth:``. """ def __init__(self, group_id, broker_addresses, topics, **kwargs): """ Args: group_id: The Kafka consumer group to join for reading messages. broker_addresses: The list of bootstrap Kafka broker URLs. topics: The list of names of topics to which to subscribe. read_forever: If true, keep the stream open to wait for more messages after reading the last currently available message. start_at: The position in the topic stream at which to start reading, specified as a StartPosition object. auth: An adc.auth.SASLAuth object specifying client authentication to use. error_callback: A callback which will be called with any confluent_kafka.KafkaError objects produced representing internal Kafka errors. offset_commit_interval: A datetime.timedelta specifying how often to report progress to Kafka. :meta private: """ self._consumer = consumer.Consumer(consumer.ConsumerConfig( broker_urls=broker_addresses, group_id=group_id, **kwargs, )) for t in topics: self._consumer.subscribe(t)
[docs] def read(self, metadata=False, autocommit=True, **kwargs): """Read messages from a stream. Args: metadata: Whether to receive message metadata alongside messages. autocommit: Whether messages are automatically marked as handled via `mark_done` when the next message is yielded. Defaults to True. batch_size: The number of messages to request from Kafka at a time. Lower numbers can give lower latency, while higher numbers will be more efficient, but may add latency. batch_timeout: The period of time to wait to get a full batch of messages from Kafka. Similar to batch_size, lower numbers can reduce latency while higher numbers can be more efficient at the cost of greater latency. If specified, this argument should be a datetime.timedelta object. """ for message in, **kwargs): yield self._unpack(message, metadata=metadata)
@staticmethod def _unpack(message, metadata=False): """Deserialize and unpack messages. Args: message: The message to deserialize and unpack. metadata: Whether to receive message metadata alongside messages. """ payload = json.loads(message.value().decode("utf-8")) payload = Deserializer.deserialize(payload) if metadata: return (payload, Metadata.from_message(message)) else: return payload
[docs] def mark_done(self, metadata): """Mark a message as fully-processed. Args: metadata: A Metadata instance containing broker-specific metadata. """ self._consumer.mark_done(metadata._raw)
[docs] def close(self): """End all subscriptions and shut down. """ self._consumer.close()
def __iter__(self): yield from def __enter__(self): return self def __exit__(self, *exc): self.close()
[docs]class Producer: """ An event stream opened for writing to a topic. Instances of this class should be obtained from :meth:``. """ def __init__(self, broker_addresses, topic, **kwargs): """ Args: broker_addresses: The list of bootstrap Kafka broker URLs. topic: The name of the topic to which to write. auth: An adc.auth.SASLAuth object specifying client authentication to use. delivery_callback: A callback which will be called when each message is either delivered or permenantly fails to be delivered. error_callback: A callback which will be called with any confluent_kafka.KafkaError objects produced representing internal Kafka errors. produce_timeout: A datetime.timedelta object specifying the maximum time to wait for a message to be sent to Kafka. If zero, sending will never time out. :meta private: """ self._producer = producer.Producer(producer.ProducerConfig( broker_urls=broker_addresses, topic=topic, delivery_callback=errors.raise_delivery_errors, **kwargs, ))
[docs] def write(self, message): """Write messages to a stream. Args: message: The message to write. """ self._producer.write(self._pack(message))
@staticmethod def _pack(message): """Pack and serialize messages. Args: message: The message to pack and serialize. :meta private: """ try: payload = message.serialize() except AttributeError: payload = {"format": "blob", "content": message} try: return json.dumps(payload).encode("utf-8") except TypeError: raise TypeError("Unable to pack a message of type " + message.__class__.__name__ + " which cannot be serialized to JSON")
[docs] def close(self): """Wait for enqueued messages to be written and shut down. """ return self._producer.close()
def __enter__(self): return self def __exit__(self, *exc): return self._producer.__exit__(*exc)