Implement Basic PubSub Concept in ZeroMQ with Python and Elasticsearch

Ridwan Fajar
5 min readMar 29, 2018
Naringgul Road, Cianjur Regency, West Java, Indonesia

Overview

Basically its hard to understand about ZeroMQ. Its not a simple as a Redis to build pub/sub mechanism or might be using the Google Cloud PubSub. But it still interesting for me to explore what ZeroMQ provide to us to create pub/sub mechanism.

So after i read this book: https://learning-0mq-with-pyzmq.readthedocs.io, i found that ZeroMQ has some broker that stated as device . There are some device for that:

  • Queue
  • Forwarder
  • Streamer

So we could send any message from n-publishers into the device then will be consumed by n-subcribers. But without using that device on our system. We also can treat some server as datasource that send its information to n-subscribers.

But in this my experiment, i only made single publisher that consumed by several subscriber. Of course without device the single publisher just only sent the message without queue or routing system to the subcriber. So if there multiple subcribers it will perform duplicate operation. But it still be okay if you target to the different host instead to same host of the Elasticsearch.

So lets check the sample code!

How it will be implemented?

Publisher.py Code

This is the code for publisher.py:

Publisher code for ZeroMQ with PubSub mode without using driver

Basically, I import some library that required by that code. Then build the argument parser using argparse . And finally, I initialize the ZeroMQ. After add initializing part, i make the publisher listen to the port 5600 with PUB mode.

Then i will validate if the device_id is given as argparse. Then i send the message periodically. Yeah, it still dummy. But the message has an id that generated using uuid library in Python, and i generate temperature and humidity data using random library.

Finally i wrap the payload with json.dumps(). Then send the message to the subscriber.

In the end of line within the loop, i limit the loop execution into 1 second per loop using time library in python.

Subscriber.py Code

This is the code for subscriber.py:

Subscriber code for ZeroMQ with PubSub mode without using driver

Basically, I import some library that required by that code. Then build the argument parser using argparse . And finally, I initialize the ZeroMQ. After add initializing part, i make the publisher listen to the port 5600 with SUB mode.

After the zeromq initialization part, then i initialize the Elasticsearch to connect into localhost Elasticsearch.

Then i will validate if the device_id is given as argparse. Then i receive the message from publisher.py using sock.recv(). Then i load the payload with json.loads(). Then send the message to Elasticsearch with index myiot and with type weather .

Elasticsearch Index

And this is the Elasticsearch Index structure for this demo. You can use any http request tools to build the index. For example you might use Postman to build the index:

// create it on postmanurl: PUT http://localhost:9200/myiot/body:{ "mappings": {
"weather":{
"properties":{
"humidity":{
"type":"integer"
},
"temperature_in_celsius":{
"type":"integer"
},
"device_id": {
"type":"text"
},
"message_id": {
"type":"text"
},
"createdAt":{
"type" :"date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
} }

How it works?

The first priority, you have to install the Elasticsearch in your local machine. If you not installed the Elasticsearch yet. This quickstart won’t work. So make sure the Elasticsearch is already installed on your machine.

Now lets open your console then run the Elasticsearch via this command:

$ elasticsearch
Running Elasticsearch instance within local machine

How to run publisher.py? open the second tab of your terminal then look at this command. It help you to start the publisher produce the data For example lets pass device_id with device-123:

$ python publisher.py --device_id device-123
Running the publisher script

For Then open third tab and we will execute the consumer through this command. For example lets pass device_id with device-123:

$ python subscriber.py --device_id device-123
The subscriber receive the message from publisher

Both the publisher and subscriber will only send/receive the message that has prefix with device-123 .

Lets check it through this URL to see the data if the data is already there:

http://localhost:9200/myiot/weather/_search
Before and after when there a new message inserted to Elasticsearch

If we wan to use Kibana, we could run the command via this command:

$ kibana serve -e http://localhost:9200/
Running Kibana to see dataset in Elasticsearch

The access localhost:5601 via web browser and you will see the data is coming continously and we can see that on discover page in Kibana.

The weather data come continously to the Elasticsearch

Conclusion

Thats all from me. The basic concept of pubsub using ZeroMQ. We make the local machine to be publisher that produce data. Maybe i think the single board computer (SBC) could become a source of the message that publish some message and consumed by another SBC. In my assumption, we could use this approach if we have clients that has a lot of device and create the subscriber for each client.

So within this post i create the publisher, make it serve and listen to certain port, consumed by the subscriber and finally stored it into Elasticsearch.

Later, i will create an experiment to use one of devices that provided by ZeroMQ. Maybe i want to use the streamer and queue devices from ZeroMQ.

References

--

--