Telemetry bus at the edge – Part 3: Consuming and producing

In the previous post in this blog series, we explained and walked through a bunch of different implementation examples of an edge-native telemetry bus.

Take a step back: Telemetry bus at the edge – Part 1: An overview

Now, let’s move along! In this blog post, I’ll show a couple of different ways of consuming and producing on a volga topic.

Producing and consuming is easy!

It may sound like it’d be complicated to produce and consume messages on our built in pub/sub bus. Hopefully, this article will change your mind.

Python producers and consumers

We will use a mix of Python, supctl and the Control Tower UI in this article. In my example, I’ll use a site called gbg.

First we will create a topic called demo on that site, we’ll use replication factor 3 for this example.

$ supctl do --site gbg volga create-topic --replication-factor 3 demo string

Let’s look at what was created:

$ supctl show --site gbg volga topics demo

name: demo
tenant: the-company
labels: {}
format: string
number-of-chunks: 100
creation-time: 2023-06-16T12:29:04.307Z
requested-replication-factor: 3
current-replication-factor: 1
persistence: disk
assigned-hosts:
  - gbg-1
leader-host: gbg-1
worker-hosts: []
size: 0 B
entries: 0
seqno: 0
chunkno: 0
dropped-chunks: 0
producers: []
consumers: []

Since this site has only one host, note that current-replication-factor is 1, that’s ok. If we ever add more hosts in the future, this topic will be replicated to two more hosts.

Let’s create a small python program that produces on this topic.

First, install the latest version of the:

$ python -m pip install --upgrade avassa-client

Then create a python script:

#!/usr/bin/env python3

import asyncio
import avassa_client
import avassa_client.volga as volga
import sys

async def main():
    # In a real world app, use app_login
    session = avassa_client.login(
            host="<https://192.168.3.109:4646>",
            username="fred@ec.com",
            password="verysecret")

    # Used if the topic doesn't exist
    create_opts = volga.CreateOptions.create(fmt='string')
    topic = volga.Topic.local('demo')

    async with volga.Producer(session=session,
                              producer_name='demo-producer',
                              topic=topic,
                              on_no_exists=create_opts) as producer:
        message = sys.argv[1]
        print(f"producing '{message}'")
        await producer.produce(message)

if __name__ == "__main__":
    asyncio.run(main())
$ ./produce.py "hello world"
producing 'hello world'

In another shell, you can try consuming using supctl:

$ supctl do --site gbg volga topics demo consume --follow
{
  "time": "2023-06-19T10:51:50.578Z",
  "seqno": 2,
  "remain": 23,
  "producer-name": "demo-producer",
  "payload": "hello world",
  "mtime": 1687171910578,
  "host": "gbg-1"
}

In the UI, you can of course also see this topic, and select the site and topic.

UI view displaying the topic selection along with site and topic options.

Now let’s create a python consumer:

#!/usr/bin/env python3

import asyncio
import avassa_client
import avassa_client.volga as volga
import sys

async def main():
    # In a real world app, use app_login
    session = avassa_client.login(
            host="<https://192.168.3.109:4646>",
            username="fred@ec.com",
            password="verysecret")

    # Used if the topic doesn't exist
    create_opts = volga.CreateOptions.create(fmt='string')
    topic = volga.Topic.local('demo')

    # Just get new messages, make sure you ack, see below
    position = volga.Position.unread()
    async with volga.Consumer(session=session,
                              consumer_name='demo-producer',
                              mode='exclusive',
                              position=position,
                              topic=topic,
                              on_no_exists=create_opts) as consumer:
        # Kick it off with one message
        await consumer.more(1)
        while True:
            msg = await consumer.recv()
            print(msg)

if __name__ == "__main__":
    asyncio.run(main())

If we run this: we will get all old messages and then wait for new ones:

$ ./consumer.py

{'time': '2023-06-19T10:49:35.801Z', 'seqno': 1, 'remain': 0, 'producer-name': 'demo-producer', 'payload': 'hej', 'mtime': 1687171775801, 'host': 'gbg-1'}
{'time': '2023-06-19T10:51:50.578Z', 'seqno': 2, 'remain': 9, 'producer-name': 'demo-producer', 'payload': 'hello world', 'mtime': 1687171910578, 'host': 'gbg-1'}
{'time': '2023-06-19T11:27:34.541Z', 'seqno': 3, 'remain': 8, 'producer-name': 'demo-producer', 'payload': 'hello world', 'mtime': 1687174054541, 'host': 'gbg-1'}
{'time': '2023-06-19T11:30:42.194Z', 'seqno': 4, 'remain': 7, 'producer-name': 'demo-producer', 'payload': 'hello world again', 'mtime': 1687174242194, 'host': 'gbg-1'}
{'time': '2023-06-19T11:30:58.346Z', 'seqno': 5, 'remain': 6, 'producer-name': 'demo-producer', 'payload': 'hello world again', 'mtime': 1687174258346, 'host': 'gbg-1'}

If we produce again:

$ ./produce.py "Happy Monday"
producing 'Happy Monday'

The consumer will output:

{'time': '2023-06-19T11:35:18.551Z', 'seqno': 6, 'remain': 5, 'producer-name': 'demo-producer', 'payload': 'Happy Monday', 'mtime': 1687174518551, 'host': 'gbg-1'}

Conclusion

As seen in this blog, with just a handful of lines of Python code, you can both publish and subscribe to messages on the pub/sub bus.

Notes

This summarizes the third and final part of this article series where we provide a few examples of what an edge-native telemetry bus implementation can look like. If you’d like to revisit one of the previous posts, you find them here:

Telemetry bus at the edge – Part 1: An overview
Telemetry bus at the edge – Part 2: Examples