Telemetry bus at the edge – Part 3: Consuming and producing

In the previous post in this blog series, we explained and walked through a bunch of different implementation examples of an edge-native telemetry bus.

Take a step back: Telemetry bus at the edge – Part 1: An overview

Now, let’s move along! In this blog post, I’ll show a couple of different ways of consuming and producing on a volga topic.

Producing and consuming is easy!

It may sound like it’d be complicated to produce and consume messages on our built in pub/sub bus. Hopefully, this article will change your mind.

Python producers and consumers

We will use a mix of Python, supctl and the Control Tower UI in this article. In my example, I’ll use a site called gbg.

First we will create a topic called demo on that site, we’ll use replication factor 3 for this example.

$ supctl do --site gbg volga create-topic --replication-factor 3 demo string

Let’s look at what was created:

$ supctl show --site gbg volga topics demo

name: demo
tenant: the-company
labels: {}
format: string
number-of-chunks: 100
creation-time: 2023-06-16T12:29:04.307Z
requested-replication-factor: 3
current-replication-factor: 1
persistence: disk
assigned-hosts:
  - gbg-1
leader-host: gbg-1
worker-hosts: []
size: 0 B
entries: 0
seqno: 0
chunkno: 0
dropped-chunks: 0
producers: []
consumers: []

Since this site has only one host, note that current-replication-factor is 1, that’s ok. If we ever add more hosts in the future, this topic will be replicated to two more hosts.

Let’s create a small python program that produces on this topic.

First, install the latest version of the:

$ python -m pip install --upgrade avassa-client

Then create a python script:

#!/usr/bin/env python3

import asyncio
import avassa_client
import avassa_client.volga as volga
import sys

async def main():
    # In a real world app, use app_login
    session = avassa_client.login(
            host="<https://192.168.3.109:4646>",
            username="fred@ec.com",
            password="verysecret")

    # Used if the topic doesn't exist
    create_opts = volga.CreateOptions.create(fmt='string')
    topic = volga.Topic.local('demo')

    async with volga.Producer(session=session,
                              producer_name='demo-producer',
                              topic=topic,
                              on_no_exists=create_opts) as producer:
        message = sys.argv[1]
        print(f"producing '{message}'")
        await producer.produce(message)

if __name__ == "__main__":
    asyncio.run(main())
$ ./produce.py "hello world"
producing 'hello world'

In another shell, you can try consuming using supctl:

$ supctl do --site gbg volga topics demo consume --follow
{
  "time": "2023-06-19T10:51:50.578Z",
  "seqno": 2,
  "remain": 23,
  "producer-name": "demo-producer",
  "payload": "hello world",
  "mtime": 1687171910578,
  "host": "gbg-1"
}

In the UI, you can of course also see this topic, and select the site and topic.

Now let’s create a python consumer:

#!/usr/bin/env python3

import asyncio
import avassa_client
import avassa_client.volga as volga
import sys

async def main():
    # In a real world app, use app_login
    session = avassa_client.login(
            host="<https://192.168.3.109:4646>",
            username="fred@ec.com",
            password="verysecret")

    # Used if the topic doesn't exist
    create_opts = volga.CreateOptions.create(fmt='string')
    topic = volga.Topic.local('demo')

    # Just get new messages, make sure you ack, see below
    position = volga.Position.unread()
    async with volga.Consumer(session=session,
                              consumer_name='demo-producer',
                              mode='exclusive',
                              position=position,
                              topic=topic,
                              on_no_exists=create_opts) as consumer:
        # Kick it off with one message
        await consumer.more(1)
        while True:
            msg = await consumer.recv()
            print(msg)

if __name__ == "__main__":
    asyncio.run(main())

If we run this: we will get all old messages and then wait for new ones:

$ ./consumer.py

{'time': '2023-06-19T10:49:35.801Z', 'seqno': 1, 'remain': 0, 'producer-name': 'demo-producer', 'payload': 'hej', 'mtime': 1687171775801, 'host': 'gbg-1'}
{'time': '2023-06-19T10:51:50.578Z', 'seqno': 2, 'remain': 9, 'producer-name': 'demo-producer', 'payload': 'hello world', 'mtime': 1687171910578, 'host': 'gbg-1'}
{'time': '2023-06-19T11:27:34.541Z', 'seqno': 3, 'remain': 8, 'producer-name': 'demo-producer', 'payload': 'hello world', 'mtime': 1687174054541, 'host': 'gbg-1'}
{'time': '2023-06-19T11:30:42.194Z', 'seqno': 4, 'remain': 7, 'producer-name': 'demo-producer', 'payload': 'hello world again', 'mtime': 1687174242194, 'host': 'gbg-1'}
{'time': '2023-06-19T11:30:58.346Z', 'seqno': 5, 'remain': 6, 'producer-name': 'demo-producer', 'payload': 'hello world again', 'mtime': 1687174258346, 'host': 'gbg-1'}

If we produce again:

$ ./produce.py "Happy Monday"
producing 'Happy Monday'

The consumer will output:

{'time': '2023-06-19T11:35:18.551Z', 'seqno': 6, 'remain': 5, 'producer-name': 'demo-producer', 'payload': 'Happy Monday', 'mtime': 1687174518551, 'host': 'gbg-1'}

Conclusion

As seen in this blog, with just a handful of lines of Python code, you can both publish and subscribe to messages on the pub/sub bus.

Notes

This summarizes the third and final part of this article series where we provide a few examples of what an edge-native telemetry bus implementation can look like. If you’d like to revisit one of the previous posts, you find them here:

Telemetry bus at the edge – Part 1: An overview
Telemetry bus at the edge – Part 2: Examples