package mqtt
|
|
|
|
import (
|
|
"fmt"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/eclipse/paho.mqtt.golang"
|
|
|
|
"cygnux.net/kepler/config"
|
|
)
|
|
|
|
type Broker interface {
|
|
mqtt.Client
|
|
}
|
|
|
|
type broker struct {
|
|
mqtt.Client
|
|
}
|
|
|
|
func SetupBroker(cfg config.Config) (Broker, error) {
|
|
var opts = mqtt.NewClientOptions()
|
|
if svr := cfg.GetString("server"); svr != "" {
|
|
if _, e := url.Parse(svr); nil != e {
|
|
return nil, e
|
|
} else {
|
|
opts.AddBroker(svr)
|
|
}
|
|
} else if svrs := cfg.GetStringSlice("servers"); len(svrs) > 0 {
|
|
for _, svr := range svrs {
|
|
if _, e := url.Parse(svr); nil != e {
|
|
return nil, e
|
|
} else {
|
|
opts.AddBroker(svr)
|
|
}
|
|
}
|
|
} else {
|
|
return nil, fmt.Errorf(" server or servers should be provided")
|
|
}
|
|
opts.SetClientID(cfg.GetString("clientId"))
|
|
if s := cfg.GetString("username"); s != "" {
|
|
opts.SetUsername(s)
|
|
}
|
|
if s := cfg.GetString("password"); s != "" {
|
|
opts.SetPassword(s)
|
|
}
|
|
if d := cfg.GetDuration("keepAlive"); d > time.Second {
|
|
opts.SetKeepAlive(d)
|
|
}
|
|
client := mqtt.NewClient(opts)
|
|
return broker{Client: client}, nil
|
|
}
|