msgbus

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2017 License: Apache-2.0 Imports: 9 Imported by: 7

README

msgbus

A simplified interface to MQTT.

More than a simple MQTT client implementation, it implements rooting a topic tree, logging and retained topic retrieval.

Uses https://github.com/maruel/paho.mqtt.golang, a fork of https://github.com/eclipse/paho.mqtt.golang that drops websocket support to reduce the dependency set.

GoDoc Go Report Card

Documentation

Overview

Package msgbus implements a generic PubSub message bus that follows MQTT guidelines.

The main difference with MQTT topic is the support for relative message on rebased bus.

  • "../" to backtrack closer to root
  • "//" to ignore the root

Spec

The MQTT specification lives at http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus interface {
	io.Closer
	fmt.Stringer

	// Publish publishes a message to a topic.
	//
	// If msg.Payload is empty, the topic is deleted if it was retained.
	//
	// It is not guaranteed that messages are propagated in order, unless
	// qos ExactlyOnce is used.
	Publish(msg Message, qos QOS, retained bool) error

	// Subscribe sends updates to this topic query through the returned channel.
	Subscribe(topicQuery string, qos QOS) (<-chan Message, error)

	// Unsubscribe removes a previous subscription.
	//
	// Trying to unsubscribe from an invalid topic or a topic not currently
	// subscribed is ignored.
	//
	// BUG: while Subscribe() can be called multiple times with a topic query, a
	// single Unsubscribe() call will unregister all subscriptions.
	Unsubscribe(topicQuery string)

	// Retained retrieves a copy of all matching messages for a retained topic
	// query.
	Retained(topicQuery string) ([]Message, error)
}

Bus is a publisher-subscriber bus.

The topics are expected to use the MQTT definition. "Mosquitto" has good documentation about this: https://mosquitto.org/man/mqtt-7.html

For more information about retained message behavior, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages

func Log

func Log(b Bus) Bus

Log returns a Bus that logs all operations done on it, via log standard package.

func New

func New() Bus

New returns a local thread safe memory backed Bus.

This Bus is thread safe. It is useful for unit tests or as a local broker.

func NewMQTT

func NewMQTT(server, clientID, user, password string, will Message) (Bus, error)

NewMQTT returns an initialized active MQTT connection.

The connection timeouts are fine tuned for a LAN.

This main purpose of this library is to hide the horror that paho.mqtt.golang is.

func RebasePub

func RebasePub(b Bus, root string) Bus

RebasePub rebases a Bus when publishing messages.

All Message published have their Topic prefixed with root.

Messages retrieved are unaffected.

It is possible to publish a message topic outside of root with:

  • "../" to backtrack closer to root
  • "//" to ignore the root

func RebaseSub

func RebaseSub(b Bus, root string) Bus

RebaseSub rebases a Bus when subscribing or getting topics.

All Message retrieved have their Topic prefix root stripped.

Messages published are unaffected.

It is possible to subscribe to a message topic outside of root with:

  • "../" to backtrack closer to root
  • "//" to ignore the root

type Message

type Message struct {
	// Topic is the MQTT topic. It may have a prefix stripped by RebaseSub() or
	// inserted by RebasePub().
	Topic string
	// Payload is the application specific data.
	//
	// Publishing a message with no Payload deleted a retained Topic, and has no
	// effect on non-retained topic.
	Payload []byte
}

Message represents a single message to a single topic.

type QOS

type QOS int8

QOS defines the quality of service to use when publishing and subscribing to messages.

const (
	// BestEffort means the broker/client will deliver the message once, with no
	// confirmation.
	//
	// This enables asynchronous operation.
	BestEffort QOS = 0
	// MinOnce means the broker/client will deliver the message at least once,
	// with confirmation required.
	//
	// Do not use if message duplication is problematic.
	MinOnce QOS = 1
	// ExactlyOnce means the broker/client will deliver the message exactly once
	// by using a four step handshake.
	//
	// This enforces synchronous operation.
	ExactlyOnce QOS = 2
)

func (QOS) String

func (i QOS) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL