337 lines
12 KiB
JavaScript
337 lines
12 KiB
JavaScript
/** @module nxapp/protocols/MQTT */
|
|
define([
|
|
'dcl/dcl',
|
|
"xide/utils",
|
|
"nxapp/types/Types",
|
|
'nxapp/protocols/ProtocolBase',
|
|
"dojo/node!util",
|
|
"nxapp/utils/_console",
|
|
"dojo/node!mqtt",
|
|
"dojo/Deferred"
|
|
],function(dcl,utils,types,ProtocolBase,nUtil,_console,mqtt,Deferred){
|
|
|
|
var debug = true;
|
|
var debugSubscriptions = false;
|
|
var debugPublish = true;
|
|
var debugData = true;
|
|
var console = _console;
|
|
|
|
/**
|
|
* MQTT protocol mqttClient
|
|
* @class module:nxapp/protocols/MQTT
|
|
* @extends module:nxapp/protocols/ProtocolBase
|
|
*/
|
|
var Module = dcl(ProtocolBase,{
|
|
declaredClass:"nxapp.protocols.MQTT",
|
|
_socket:null,
|
|
protocolName:'mqtt',
|
|
instance:null,
|
|
subscriptions:null,
|
|
mqttClient:null,
|
|
/**
|
|
* Creates a MQTT-Client options
|
|
* @link https://github.com/mqttjs/MQTT.js
|
|
* @param host
|
|
* @param port
|
|
* @param deviceOptions
|
|
* @returns {Object}
|
|
*/
|
|
getOptions:function(host,port,deviceOptions){
|
|
var options = {
|
|
clean:false,
|
|
clientId:deviceOptions.clientId || (host+':'+port),
|
|
port:port,
|
|
protocol:""
|
|
};
|
|
return utils.mixin(options,deviceOptions);
|
|
},
|
|
onConnected:function(){
|
|
debug && console.log('# ' +this.protocolName + ' - Protocol:: onConnected ' + this.options.host + ' : ' + this.options.port + ' @ ' + this.protocolName);
|
|
this.connection.connected = true;
|
|
this.delegate.onConnect2(this.connection);
|
|
},
|
|
onError:function(error,options) {
|
|
this.delegate.onError(this.connection,utils.mixin({
|
|
code:error
|
|
},options),this.options);
|
|
},
|
|
onClose:function(data) {
|
|
this.delegate.onClose(this.connection,data);
|
|
},
|
|
connect:function(){
|
|
var _options = this.options;
|
|
if(!_options || !_options.driver){
|
|
debug || this.isDebug() && console.error('no driver in options',_options);
|
|
return this;
|
|
}
|
|
|
|
var self = this;
|
|
var host = _options.host;
|
|
var port = _options.port;
|
|
var deviceOptions = utils.getJson(_options.options || {});
|
|
if(deviceOptions.debug===true){
|
|
debug = true;
|
|
}
|
|
var options = this.getOptions(host,port,deviceOptions);
|
|
|
|
this.host = host;
|
|
this.port = port;
|
|
this.protocol = this.protocolName;
|
|
debug || this.isDebug() && console.log('MQTT->connecting to ' + host +' with',options);
|
|
var mqttClient = null;
|
|
try{
|
|
mqttClient = mqtt.connect(host,options);
|
|
mqttClient.on('error',function(e){
|
|
console.error('MQTT->error connecting to ' + host +' with',e);
|
|
});
|
|
}catch(e){
|
|
debug || this.isDebug() && console.error('MQTT->connect error :: code: ',e);
|
|
return;
|
|
}
|
|
this.mqttClient = mqttClient;
|
|
try{
|
|
mqttClient.on('connect', function (err) {
|
|
self.onConnected();
|
|
});
|
|
}catch(e){
|
|
debug || this.isDebug() && console.error('MQTT->connect error :: code: ',e);
|
|
}
|
|
|
|
mqttClient.on('close', function (code) {
|
|
debug || self.isDebug() && console.log('MQTT->close :: code: ', code);
|
|
self.onClose();
|
|
mqttClient.end();
|
|
});
|
|
|
|
//@TODO: remove back - compat
|
|
this._socket = {};
|
|
this._socket.writable=true;
|
|
return this;
|
|
},
|
|
onData:function(evt,buffer){
|
|
debugData || this.isDebug() && console.log('MQTT->onData ' + evt.topic,utils.inspect(evt));
|
|
this.delegate.onData(this.connection,evt,buffer);
|
|
},
|
|
onCommandError:function(cmd,options){
|
|
debug || this.isDebug() && console.log('MQTT->CommandError ' + cmd + ' id ' + options.id + ' src ' + options.src);
|
|
try {
|
|
this.delegate.onData(this.connection, utils.mixin({
|
|
cmd: cmd,
|
|
event: types.EVENTS.ON_COMMAND_ERROR
|
|
},options));
|
|
}catch(e){
|
|
console.error('---',e);
|
|
}
|
|
},
|
|
onFinish:function(cmd,options,buffer){
|
|
debug || this.isDebug() && console.log('MQTT onFinish ' + cmd + ' id ' + options.id + ' src ' + options.src);
|
|
try {
|
|
this.delegate.onData(this.connection, utils.mixin({
|
|
cmd: cmd,
|
|
event: types.EVENTS.ON_COMMAND_FINISH
|
|
},options),buffer);
|
|
}catch(e){
|
|
console.error('onFinish-Error:',e);
|
|
}
|
|
},
|
|
addSubscription:function(_topic,args,data){
|
|
|
|
!this.subscriptions && (this.subscriptions = {});
|
|
|
|
if(!_topic){
|
|
console.error('invalid topic');
|
|
return;
|
|
}
|
|
var self = this;
|
|
var topic = '' + _topic;
|
|
|
|
if(!this.subscriptions[topic]){
|
|
var options = {
|
|
qos:parseInt(args.qos) || 0
|
|
}
|
|
|
|
this.mqttClient.subscribe(topic,options);
|
|
|
|
debugSubscriptions || this.isDebug() && console.log('add subscription : ' + _topic + ' for ',nUtil.inspect(data, {depth: null, colors: true}));
|
|
|
|
var handle = this.subscriptions[topic] = {
|
|
id:data.params.id,
|
|
src:data.params.src,
|
|
topic:topic
|
|
};
|
|
|
|
this.mqttClient.on('message', function (__topic, message) {
|
|
if(__topic===topic) {
|
|
var _data = utils.getJson(message.toString());
|
|
if(_.isArray(_data)){
|
|
self.onData(utils.mixin({
|
|
payload:_data,
|
|
topic:__topic
|
|
}, handle));
|
|
|
|
}else if(_.isObject(_data)) {
|
|
if(!_data.topic){
|
|
_data.topic=__topic;
|
|
}
|
|
self.onData(utils.mixin(_data, handle));
|
|
}else if(_.isString(_data) || _.isNumber(_data)){
|
|
self.onData(utils.mixin({
|
|
payload:_data,
|
|
topic:__topic
|
|
}, handle));
|
|
}else{
|
|
console.error('-unknown ' + __topic);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
},
|
|
unSubscribeTopic:function(args,data,mqttClient){
|
|
args = utils.getJson(args);
|
|
var topic = '' + args.topic;
|
|
if(topic) {
|
|
this.unsubscribe(topic);
|
|
}
|
|
},
|
|
subscribeTopic:function(args,data,mqttClient){
|
|
|
|
args = utils.getJson(args);
|
|
data = utils.getJson(data);
|
|
var topic = '' + args.topic;
|
|
delete args['topic'];
|
|
|
|
this.addSubscription(topic,args,data);
|
|
},
|
|
publishTopic:function(args,data,mqttClient){
|
|
|
|
try {
|
|
args = utils.getJson(args);
|
|
data = utils.getJson(data);
|
|
|
|
var topic = '' + args.topic;
|
|
var qos = args.qos;
|
|
var retain = args.retain;
|
|
|
|
delete args['topic'];
|
|
delete args['qos'];
|
|
delete args['retain'];
|
|
|
|
var payload = args.payload || args;
|
|
if(_.isObject(payload)){
|
|
try{
|
|
payload = JSON.stringify(payload);
|
|
}catch(e){
|
|
console.error('error generating payload',e);
|
|
return;
|
|
}
|
|
}
|
|
|
|
debugPublish || this.isDebug() && console.log('MQTT->publish: ' +topic + ' _ ' + _.isString(payload) + ' l = ' + payload.length,payload);
|
|
|
|
this.mqttClient.publish(topic, payload,{
|
|
qos:qos,
|
|
retain:retain
|
|
});
|
|
}catch(e){
|
|
debug || this.isDebug() && console.log('---error ',e);
|
|
}
|
|
},
|
|
send:function(cmd,options) {
|
|
return;
|
|
},
|
|
unsubscribe:function(topic){
|
|
if(topic && this.mqttClient && this.subscriptions && this.subscriptions[topic]){
|
|
this.mqttClient.unsubscribe(topic);
|
|
delete this.subscriptions[topic];
|
|
}
|
|
},
|
|
close:function() {
|
|
if(this.mqttClient){
|
|
_.each(this.subscriptions,this.unsubscribe,this);
|
|
this.mqttClient.end();
|
|
this.mqttClient = null;
|
|
}
|
|
}
|
|
});
|
|
|
|
Module.options = function (query) {
|
|
try {
|
|
var dfd = new Deferred();
|
|
var ECIType = types.ECIType;
|
|
var NetworkGroup = 'Network';
|
|
function createOption(label, value) {
|
|
return {
|
|
label: label,
|
|
value: label || value
|
|
}
|
|
}
|
|
|
|
var cis = [
|
|
utils.createCI('clientId', ECIType.STRING,'', {
|
|
group: NetworkGroup,
|
|
title:'Client id',
|
|
description:"mqttjs_' + Math.random().toString(16).substr(2, 8)"
|
|
}),
|
|
utils.createCI('protocolVersion', ECIType.INTEGER,4, {
|
|
group: NetworkGroup,
|
|
title:'Protocol id',
|
|
description:""
|
|
}),
|
|
utils.createCI('protocolId', ECIType.STRING,'', {
|
|
group: NetworkGroup,
|
|
title:'Protocol id',
|
|
description:""
|
|
}),
|
|
utils.createCI('username', ECIType.STRING,'', {
|
|
group: NetworkGroup,
|
|
title:'User name',
|
|
description:"the username required by your broker, if any"
|
|
}),
|
|
utils.createCI('password', ECIType.STRING,'', {
|
|
group: NetworkGroup,
|
|
title:'Password',
|
|
password:true,
|
|
description:"the password required by your broker, if any"
|
|
}),
|
|
utils.createCI('keepalive', ECIType.INTEGER,10, {
|
|
group: NetworkGroup,
|
|
title:'Keep alive',
|
|
description:'10 seconds, set to 0 to disable'
|
|
}),
|
|
utils.createCI('reconnectPeriod', ECIType.INTEGER,1000, {
|
|
group: NetworkGroup,
|
|
title:'Reconnect period',
|
|
description:'1000 milliseconds, interval between two re-connections'
|
|
}),
|
|
utils.createCI('connectTimeout', ECIType.INTEGER,30 * 1000, {
|
|
group: NetworkGroup,
|
|
title:'Connect timeout',
|
|
description:'30 * 1000 milliseconds, time to wait before a CONNACK is received'
|
|
}),
|
|
utils.createCI('reschedulePings', ECIType.BOOL,true, {
|
|
group: NetworkGroup,
|
|
title:'Reschedule',
|
|
description:'reschedule ping messages after sending packets'
|
|
}),
|
|
utils.createCI('queueQoSZero', ECIType.BOOL,true, {
|
|
group: NetworkGroup,
|
|
title:'Queue QoS Zero',
|
|
description:' if connection is broken, queue outgoing QoS zero messages (default true)'
|
|
}),
|
|
utils.createCI('clean', ECIType.BOOL,true, {
|
|
group: NetworkGroup,
|
|
title:'Clean',
|
|
description:'set to false to receive QoS 1 and 2 messages while offline'
|
|
})
|
|
]
|
|
dfd.resolve(cis);
|
|
return dfd;
|
|
} catch (e) {
|
|
console.error('error', e);
|
|
}
|
|
return dfd;
|
|
}
|
|
|
|
return Module;
|
|
});
|