Develop plugins using VerneMQ Webhooks

The VerneMQ Webhooks plugin provides an easy and flexible way to build powerful plugins for VerneMQ using web hooks. With VerneMQ Webhooks you are free to select the implementation language to match your technical requirements or the language in which you feel comfortable and productive in. You can use any modern language such as Python, Go, C#/.Net and indeed any language in which you can build something that can handle HTTP requests.

The idea of VerneMQ Webhooks very simple: you can register an HTTP endpoint with a VerneMQ plugin hook and whenever the hook (such as auth_on_register) is called, the VerneMQ Webhooks plugin dispatches a HTTP post request to the registered endpoint. The HTTP post request contains a HTTP header like vernemq-hook: auth_on_register and a JSON encoded payload. The endpoint then responds with code 200 on success and with a JSON encoded payload informing the VerneMQ Webhooks plugin which action to take (if any).

Configuring webhooks

To enable webhooks make sure to set:

plugins.vmq_webhooks = on

And then each webhook can be configured like this:

vmq_webhooks.mywebhook1.hook = auth_on_register
vmq_webhooks.mywebhook1.endpoint = http://127.0.0.1/myendpoints

It is also possible to dynamically register webhooks at run-time:

$ vmq-admin webhooks register hook=auth_on_register endpoint="http://localhost"

See which endpoints are registered:

$ vmq-admin webhooks show

And finally deregistering an endpoint:

$ vmq-admin webhooks deregister hook=auth_on_register endpoint="http://localhost"

Connection pool configuration

Each registered hook uses by default a connection pool containing maximally 100 connections. This can be changed by setting vmq_webhooks.pool_max_connections to a different value. Similarly the vmq_webhooks.pool_timeout configuration (value is in milliseconds) can be set to control how long an unused connection should stay in the connection pool before being closed and removed. The default value is 60000 (60 seconds).

These options are available in VerneMQ 1.4.0.

Caching

VerneMQ webhooks support caching of the auth_on_register, auth_on_publish and auth_on_subscribe hooks.

This can be used to speed up authentication and authorization tremendously. All data passed to these hooks is used to look if the call is in the cache, except in the case of the auth_on_publish where the payload is omitted.

To enable caching for an endpoint simply return the cache-control: max-age=AgeInSeconds in the response headers to one of the mentioned hooks. If the call was successful (authentication granted), the request will be cached together with any modifiers, except for the payload modifier in the auth_on_publish hook.

Whenever a non-expired entry is looked up in the cache the endpoint will not be called and the modifiers of the cached entry will be returned, if any.

It is possible to inspect the cache using:

$ vmq-admin webhooks cache show

Webhook specs

All webhooks are called with method POST. All hooks need to be answered with the HTTP code 200 to be considered successful. Any hook called that does not return the 200 code will be logged as an error as will any hook with an unparseable payload.

All hooks are called with the header vernemq-hook which contains the name of the hook in question.

For detailed information about the hooks and when they are called, see the Plugin Development Guide and the relevant subsections.

auth_on_register

Header: vernemq-hook: auth_on_register

Webhook example payload:

{
    "peer_addr": "127.0.0.1",
    "peer_port": 8888,
    "username": "username",
    "password": "password",
    "mountpoint": "",
    "client_id": "clientid",
    "clean_session": false
}

A minimal response indicating the authentication was successful looks like:

{
    "result": "ok"
}

It is also possible to override various client specific settings by returning an array of modifiers:

{
    "result": "ok",
    "modifiers": {
        "max_message_size": 65535,
        "max_inflight_messages": 10000,
        "retry_interval": 20000
    }
}

Note, the retry_interval is in milli-seconds. It is possible to override many more settings, see the Session Lifecycle for more information.

Other possible return values:

"result": "next"
"result": { "error": "some error message" }

auth_on_subscribe

Header: vernemq-hook: auth_on_subscribe

Webhook example payload:

{
    "client_id": "clientid",
    "mountpoint": "",
    "username": "username",
    "topics":
        [{"topic": "a/b",
          "qos": 1},
         {"topic": "c/d",
          "qos": 2}]
}

A minimal response indicating the subscription authorization was successful looks like:

{
    "result": "ok"
}

Another example where where the topics to subscribe have been rewritten looks like:

{
    "result": "ok",
    "topics":
        [{"topic": "rewritten/topic",
          "qos": 0}]
}

Note, you can also pass a qos with value 128 which means it was either not possible or the client was not allowed to subscribe to that specific question.

Other possible result values:

"result": "next"
"result": { "error": "some error message" }

auth_on_publish

Header: vernemq-hook: auth_on_publish

Note, in the example below the payload is not base64 encoded which is not the default.

Webhook example payload:

{
    "username": "username",
    "client_id": "clientid",
    "mountpoint": "",
    "qos": 1,
    "topic": "a/b",
    "payload": "hello",
    "retain": false
}

A minimal response indicating the publish was authorized looks like:

{
    "result": "ok"
}

A more complex example where the publish topic, qos, payload and retain flag is rewritten looks like:

{
    "result": "ok",
    "modifiers": {
        "topic": "rewritten/topic",
        "qos": 2,
        "payload": "rewritten payload",
        "retain": true
    }
}

Other result values:

"result": "next"
"result": { "error": "some error message" }

on_register

Header: vernemq-hook: on_register

Webhook example payload:

{
    "peer_addr": "127.0.0.1",
    "peer_port": 8888,
    "username": "username",
    "mountpoint": "",
    "client_id": "clientid"
}

The response of this hook should be empty as it is ignored.

on_publish

Header: vernemq-hook: on_publish

Note, in the example below the payload is not base64 encoded which is not the default.

Webhook example payload:

{
    "username": "username",
    "client_id": "clientid",
    "mountpoint": "",
    "qos": 1,
    "topic": "a/b",
    "payload": "hello",
    "retain": false
}

The response of this hook should be empty as it is ignored.

on_subscribe

Header: vernemq-hook: on_subscribe

Webhook example payload:

{
    "client_id": "clientid",
    "mountpoint": "",
    "username": "username",
    "topics":
        [{"topic": "a/b",
          "qos": 1},
         {"topic": "c/d",
          "qos": 2}]
}

The response of this hook should be empty as it is ignored.

on_unsubscribe

Header: vernemq-hook: on_unsubscribe

Webhook example payload:

{
    "username": "username",
    "client_id": "clientid",
    "mountpoint": "",
    "topics":
        ["a/b", "c/d"]
}

Example response:

{
    "result": "ok",
    "topics":
        ["rewritten/topic"]
}

Other result values:

"result": "next"
"result": { "error": "some error message" }

on_deliver

Header: vernemq-hook: on_deliver

Note, in the example below the payload is not base64 encoded which is not the default.

Webhook example payload:

{
    "username": "username",
    "client_id": "clientid",
    "mountpoint": "",
    "topic": "a/b",
    "payload": "hello"
}

Example response:

{
  "result": "ok",
  "modifiers":
  {
        "topic": "rewritten/topic",
        "payload": "rewritten payload"
    }
}

Other result values:

"result": "next"

on_offline_message

Header: vernemq-hook: on_offline_message

Note, in the example below the payload is not base64 encoded which is not the default.

Webhook example payload:

{
    "client_id": "clientid",
    "mountpoint": "",
    "qos": "1",
    "topic": "sometopic",
    "payload": "payload",
    "retain": false
}

The response of this hook should be empty as it is ignored.

on_client_wakeup

Header: vernemq-hook: on_client_wakeup

Webhook example payload:

{
    "client_id": "clientid",
    "mountpoint": ""
}

The response of this hook should be empty as it is ignored.

on_client_offline

Header: vernemq-hook: on_client_offline

Webhook example payload:

{
    "client_id": "clientid",
    "mountpoint": ""
}

The response of this hook should be empty as it is ignored.

on_client_gone

Header: vernemq-hook: on_client_gone

Webhook example payload:

{
    "client_id": "clientid",
    "mountpoint": ""
}

The response of this hook should be empty as it is ignored.

Example Webhook in Python

Below is a very simple example of an endpoint implemented in Python. It uses the web and json modules and implements handlers for three different hooks: auth_on_register, auth_on_publish and auth_on_subscribe.

The auth_on_register hook only restricts access only to the user with username joe and password secret. The auth_on_subscribe and auth_on_publish hooks allow any subscription or publish to continue as is. These last two hooks are needed as the default policy is deny.

import web
import json

urls = ('/.*', 'hooks')
app = web.application(urls, globals())

class hooks:
    def POST(self):

        # fetch hook and request data
        hook = web.ctx.env.get('HTTP_VERNEMQ_HOOK')
        data = json.loads(web.data())

        # print the hook and request data to the console
        print
        print 'hook:', hook
        print '  data: ', data

        # dispatch to appropriate function based on the hook.
        if hook == 'auth_on_register':
            return handle_auth_on_register(data)
        elif hook == 'auth_on_publish':
            return handle_auth_on_publish(data)
        elif hook == 'auth_on_subscribe':
            return handle_auth_on_subscribe(data)
        else:
            web.ctx.status = 501
            return "not implemented"

def handle_auth_on_register(data):
    # only allow user 'joe' with password 'secret', reject all others.
    if "joe" == data['username']:
        if "secret" == data['password']:
            return json.dumps({'result': 'ok'})

    return json.dumps({'result': {'error': 'not allowed'}})

def handle_auth_on_publish(data):
    # accept all publish requests
    return json.dumps({'result': 'ok'})

def handle_auth_on_subscribe(data):
    # accept all subscribe requests
    return json.dumps({'result': 'ok'})

if __name__ == '__main__':
    app.run()