Consuming Messages from a Publisher Subscription

Download this manual as a PDF file

This section describes how to read the messages sent from SL1 Publisher through a subscription to a Kafka topic.

Use the following menu options to navigate the SL1 user interface:

  • To view a pop-out list of menu options, click the menu icon ().
  • To view a page containing all of the menu options, click the Advanced menu icon ().

Configuring Your System to Consume Messages from a Publisher Subscription

After you create a subscription in SL1 Publisher, you will want to verify the messages are being sent. The easiest way to ensure that messages are being sent is to check the logs for the subscription's pods and to verify that they report messages numbering greater than 0.

After you verify that messages are being sent, you can then read the messages from the destination Kafka topic. Two things are required to read the messages:

  • A Kafka Python library that allows you to connect and consume content from a Kafka topic

  • The sl-schema-registry library provided by ScienceLogic

The messages sent from SL1 Publisher to the Kafka topic are encoded by the sl-schema-registry library, so you need to install this library to unpack and decode the messages.

Installing the sl-schema-registry Library

The sl-schema-registry library is provided as a wheel (.whl file) available from the ScienceLogic Support Site.

To install the sl-schema-registry library on your third-party system:

  1. Log on to the ScienceLogic Support Site and download the sl_schema_registry_whl_and_docs.zip file, which contains the library, as well as the documentation.

  2. Copy the compressed file to the third-party system where you will unpack the binary data bundles sent by Publisher.

  3. Unzip the compressed file to access the wheel and documentation files.

  4. (Option 1) To deploy in a standard environment, enter the following at the command prompt:

pip3 install sl_schema_registry-1.5.13-py2.py3-none-any.whl

  1. (Option 2) To deploy in a virtual environment, if you have virtualenvwrapper installed, enter the following at the command prompt:

mkvirtualenv sl_schema_registry

(sl_schema_registry) pip3 install sl_schema_registry-1.5.13-py2.py3-none-any.whl

  1. The installation process will complete, and the license will be displayed.

Using a Kafka Python Library to Read Subscription Messages

You can use any Python library that is Python 3.6 or higher to read the messages sent to the Kafka topic. The example below uses the kafka-python library found here: https://pypi.org/project/kafka-python/

Example Script

The following is a sample script that you can use to read and print messages from a Kafka topic using the kafka-python library and the sl-schema-registry library. This script takes messages from the topic you specify on the Kafka server you specify, converts them to JSON, and prints the resulting JSON messages.

To run this example script, save it to a file (for example, kafka_consumer.py), and change the topic and kafka_servers variables to a valid topic and server in your environment. Run the script as you would run any Python 3 code on that appliance (for example, python3.6 kafka_consumer.py).

import datetime
import json

from json import JSONEncoder
from contextlib import closing

from kafka import KafkaConsumer
from sl_schema_registry import registry, objectgraph


class DateTimeEncoder(JSONEncoder):
    """
    Handles decoding dates and datetimes into a JSON compatible format.
    """
    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()
        return None

def get_messages(topic, kafka_servers):
    """
    Generator that yields JSON-formated messages from the a Kafka Topic.
    """
    with closing(KafkaConsumer(topic, bootstrap_servers=kafka_servers)) as consumer:
        for message in consumer:
            env = registry.loads(message.value)
            json_msg = json.dumps(env, cls=DateTimeEncoder)
            yield json_msg


# Establish the Schema Registry to output in Python Dictionaries and Lists, which are
# the most compatible with JSON.
objectgraph.set_auto_registration_hook(registry)


if __name__ == "__main__":
    topic = "destination-topic" # destination topic where the subscription is populating
    kafka_servers = ["kafka:9092"] # kafka server address where the destination topic can be consumed from

    for json_msg in get_messages(topic, kafka_servers):
        print(json_msg) # place operational logic here