Telemetry bus at the edge – Part 3: Consuming and producing
In the previous post in this blog series, we explained and walked through a bunch of different implementation examples of an edge-native telemetry bus.
Take a step back: Telemetry bus at the edge – Part 1: An overview
Now, let’s move along! In this blog post, Iāll show a couple of different ways of consuming and producing on a volga topic.
Producing and consuming is easy!
It may sound like itād be complicated to produce and consume messages on our built in pub/sub bus. Hopefully, this article will change your mind.
Python producers and consumers
We will use a mix of Python, supctl and the Control Tower UI in this article. In my example, Iāll use a site called gbg
.
First we will create a topic called demo
on that site, weāll use replication factor 3 for this example.
$ supctl do --site gbg volga create-topic --replication-factor 3 demo string
Letās look at what was created:
$ supctl show --site gbg volga topics demo
name: demo
tenant: the-company
labels: {}
format: string
number-of-chunks: 100
creation-time: 2023-06-16T12:29:04.307Z
requested-replication-factor: 3
current-replication-factor: 1
persistence: disk
assigned-hosts:
- gbg-1
leader-host: gbg-1
worker-hosts: []
size: 0 B
entries: 0
seqno: 0
chunkno: 0
dropped-chunks: 0
producers: []
consumers: []
Since this site has only one host, note that current-replication-factor
is 1, thatās ok. If we ever add more hosts in the future, this topic will be replicated to two more hosts.
Letās create a small python program that produces on this topic.
First, install the latest version of the:
$ python -m pip install --upgrade avassa-client
Then create a python script:
#!/usr/bin/env python3
import asyncio
import avassa_client
import avassa_client.volga as volga
import sys
async def main():
# In a real world app, use app_login
session = avassa_client.login(
host="<https://192.168.3.109:4646>",
username="fred@ec.com",
password="verysecret")
# Used if the topic doesn't exist
create_opts = volga.CreateOptions.create(fmt='string')
topic = volga.Topic.local('demo')
async with volga.Producer(session=session,
producer_name='demo-producer',
topic=topic,
on_no_exists=create_opts) as producer:
message = sys.argv[1]
print(f"producing '{message}'")
await producer.produce(message)
if __name__ == "__main__":
asyncio.run(main())
$ ./produce.py "hello world"
producing 'hello world'
In another shell, you can try consuming using supctl:
$ supctl do --site gbg volga topics demo consume --follow
{
"time": "2023-06-19T10:51:50.578Z",
"seqno": 2,
"remain": 23,
"producer-name": "demo-producer",
"payload": "hello world",
"mtime": 1687171910578,
"host": "gbg-1"
}
In the UI, you can of course also see this topic, and select the site and topic.

Now letās create a python consumer:
#!/usr/bin/env python3
import asyncio
import avassa_client
import avassa_client.volga as volga
import sys
async def main():
# In a real world app, use app_login
session = avassa_client.login(
host="<https://192.168.3.109:4646>",
username="fred@ec.com",
password="verysecret")
# Used if the topic doesn't exist
create_opts = volga.CreateOptions.create(fmt='string')
topic = volga.Topic.local('demo')
# Just get new messages, make sure you ack, see below
position = volga.Position.unread()
async with volga.Consumer(session=session,
consumer_name='demo-producer',
mode='exclusive',
position=position,
topic=topic,
on_no_exists=create_opts) as consumer:
# Kick it off with one message
await consumer.more(1)
while True:
msg = await consumer.recv()
print(msg)
if __name__ == "__main__":
asyncio.run(main())
If we run this: we will get all old messages and then wait for new ones:
$ ./consumer.py
{'time': '2023-06-19T10:49:35.801Z', 'seqno': 1, 'remain': 0, 'producer-name': 'demo-producer', 'payload': 'hej', 'mtime': 1687171775801, 'host': 'gbg-1'}
{'time': '2023-06-19T10:51:50.578Z', 'seqno': 2, 'remain': 9, 'producer-name': 'demo-producer', 'payload': 'hello world', 'mtime': 1687171910578, 'host': 'gbg-1'}
{'time': '2023-06-19T11:27:34.541Z', 'seqno': 3, 'remain': 8, 'producer-name': 'demo-producer', 'payload': 'hello world', 'mtime': 1687174054541, 'host': 'gbg-1'}
{'time': '2023-06-19T11:30:42.194Z', 'seqno': 4, 'remain': 7, 'producer-name': 'demo-producer', 'payload': 'hello world again', 'mtime': 1687174242194, 'host': 'gbg-1'}
{'time': '2023-06-19T11:30:58.346Z', 'seqno': 5, 'remain': 6, 'producer-name': 'demo-producer', 'payload': 'hello world again', 'mtime': 1687174258346, 'host': 'gbg-1'}
If we produce again:
$ ./produce.py "Happy Monday"
producing 'Happy Monday'
The consumer will output:
{'time': '2023-06-19T11:35:18.551Z', 'seqno': 6, 'remain': 5, 'producer-name': 'demo-producer', 'payload': 'Happy Monday', 'mtime': 1687174518551, 'host': 'gbg-1'}
Conclusion
As seen in this blog, with just a handful of lines of Python code, you can both publish and subscribe to messages on the pub/sub bus.
Notes
This summarizes the third and final part of this article series where we provide a few examples of what an edge-native telemetry bus implementation can look like. If you’d like to revisit one of the previous posts, you find them here:
Telemetry bus at the edge – Part 1: An overview
Telemetry bus at the edge – Part 2: Examples