control-freak-ide/server/nodejs/nxapp/protocols/JSON-RPC-2.js
plastic-hub-dev-node-saturn 538369cff7 latest
2021-05-12 18:35:18 +02:00

331 lines
12 KiB
JavaScript

/** @module nxapp/protocols/JSON-RPC-2 */
define([
'dcl/dcl',
'xdojo/declare',
"xide/utils",
"nxapp/types/Types",
'nxapp/protocols/ProtocolBase',
"dojo/node!net",
"dojo/node!path",
"dojo/node!child_process",
"dojo/node!util",
"dojo/node!deferred",
"nxapp/utils/_console",
"dojo/node!mqtt",
"dojo/Deferred"
],function(dcl,declare,utils,types,ProtocolBase,net,path,child,nUtil,deferred,_console,mqtt,Deferred){
var debug = true;
var debugSubscriptions = false;
var debugPublish = true;
var debugData = true;
var console = _console;
/**
* MQTT protocol mqttClient
* @class module:nxapp/protocols/MQTT
* @extends module:nxapp/protocols/ProtocolBase
*/
var Module = dcl(ProtocolBase,{
declaredClass:"nxapp.protocols.JSON-RPC-2",
_socket:null,
protocolName:'mqtt',
instance:null,
subscriptions:null,
mqttClient:null,
/**
* Creates a MQTT-Client options
* @link https://github.com/mqttjs/MQTT.js
* @param host
* @param port
* @param deviceOptions
* @returns {Object}
*/
getOptions:function(host,port,deviceOptions){
var options = {
clean:false,
clientId:deviceOptions.clientId || (host+':'+port),
port:port
};
return utils.mixin(options,deviceOptions);
},
onConnected:function(){
debug && console.log('# ' +this.protocolName + ' - Protocol:: onConnected ' + this.options.host + ' : ' + this.options.port + ' @ ' + this.protocolName);
this.connection.connected = true;
this.delegate.onConnect2(this.connection);
},
onError:function(error,options) {
this.delegate.onError(this.connection,utils.mixin({
code:error
},options),this.options);
},
onClose:function(data) {
this.delegate.onClose(this.connection,data);
},
connect:function(){
var _options = this.options;
if(!_options || !_options.driver){
debug || this.isDebug() && console.error('no driver in options',_options);
return this;
}
var self = this;
var host = _options.host;
var port = _options.port;
var deviceOptions = utils.getJson(_options.options || {});
if(deviceOptions.debug===true){
debug = true;
}
var options = this.getOptions(host,port,deviceOptions);
this.host = host;
this.port = port;
this.protocol = this.protocolName;
debug || this.isDebug() && console.log('MQTT->connecting to ' + host +' with',options);
var mqttClient = mqtt.connect(host,options);
this.mqttClient = mqttClient;
try{
mqttClient.on('connect', function (err) {
self.onConnected();
});
}catch(e){
debug || this.isDebug() && console.error('MQTT->connect error :: code: ',e);
}
mqttClient.on('close', function (code) {
debug || self.isDebug() && console.log('MQTT->close :: code: ', code);
self.onClose();
mqttClient.end();
});
//@TODO: remove back - compat
this._socket = {};
this._socket.writable=true;
return this;
},
onData:function(evt,buffer){
debugData || this.isDebug() && console.log('MQTT->onData ' + evt.topic,utils.inspect(evt));
this.delegate.onData(this.connection,evt,buffer);
},
onCommandError:function(cmd,options){
debug || this.isDebug() && console.log('MQTT->CommandError ' + cmd + ' id ' + options.id + ' src ' + options.src);
try {
this.delegate.onData(this.connection, utils.mixin({
cmd: cmd,
event: types.EVENTS.ON_COMMAND_ERROR
},options));
}catch(e){
console.error('---',e);
}
},
onFinish:function(cmd,options,buffer){
debug || this.isDebug() && console.log('MQTT onFinish ' + cmd + ' id ' + options.id + ' src ' + options.src);
try {
this.delegate.onData(this.connection, utils.mixin({
cmd: cmd,
event: types.EVENTS.ON_COMMAND_FINISH
},options),buffer);
}catch(e){
console.error('onFinish-Error:',e);
}
},
addSubscription:function(_topic,args,data){
!this.subscriptions && (this.subscriptions = {});
if(!_topic){
console.error('invalid topic');
return;
}
var self = this;
var topic = '' + _topic;
if(!this.subscriptions[topic]){
var options = {
qos:parseInt(args.qos) || 0
}
this.mqttClient.subscribe(topic,options);
debugSubscriptions || this.isDebug() && console.log('add subscription : ' + _topic + ' for ',nUtil.inspect(data, {depth: null, colors: true}));
var handle = this.subscriptions[topic] = {
id:data.params.id,
src:data.params.src,
topic:topic
};
this.mqttClient.on('message', function (__topic, message) {
if(__topic===topic) {
var _data = utils.getJson(message.toString());
if(_.isArray(_data)){
self.onData(utils.mixin({
payload:_data,
topic:__topic
}, handle));
}else if(_.isObject(_data)) {
if(!_data.topic){
_data.topic=__topic;
}
self.onData(utils.mixin(_data, handle));
}else if(_.isString(_data) || _.isNumber(_data)){
self.onData(utils.mixin({
payload:_data,
topic:__topic
}, handle));
}else{
console.error('-unknown ' + __topic);
}
}
});
}
},
unSubscribeTopic:function(args,data,mqttClient){
args = utils.getJson(args);
var topic = '' + args.topic;
if(topic) {
this.unsubscribe(topic);
}
},
subscribeTopic:function(args,data,mqttClient){
args = utils.getJson(args);
data = utils.getJson(data);
var topic = '' + args.topic;
delete args['topic'];
this.addSubscription(topic,args,data);
},
publishTopic:function(args,data,mqttClient){
try {
args = utils.getJson(args);
data = utils.getJson(data);
var topic = '' + args.topic;
var qos = args.qos;
var retain = args.retain;
delete args['topic'];
delete args['qos'];
delete args['retain'];
var payload = args.payload || args;
if(_.isObject(payload)){
try{
payload = JSON.stringify(payload);
}catch(e){
console.error('error generating payload',e);
return;
}
}
debugPublish || this.isDebug() && console.log('MQTT->publish: ' +topic + ' _ ' + _.isString(payload) + ' l = ' + payload.length,payload);
this.mqttClient.publish(topic, payload,{
qos:qos,
retain:retain
});
}catch(e){
debug || this.isDebug() && console.log('---error ',e);
}
},
send:function(cmd,options) {
return;
},
unsubscribe:function(topic){
if(topic && this.mqttClient && this.subscriptions && this.subscriptions[topic]){
this.mqttClient.unsubscribe(topic);
delete this.subscriptions[topic];
}
},
close:function() {
if(this.mqttClient){
_.each(this.subscriptions,this.unsubscribe,this);
this.mqttClient.end();
this.mqttClient = null;
}
}
});
Module.options = function (query) {
try {
var dfd = new Deferred();
var ECIType = types.ECIType;
var NetworkGroup = 'Network';
function createOption(label, value) {
return {
label: label,
value: label || value
}
}
var cis = [
utils.createCI('clientId', ECIType.STRING,'', {
group: NetworkGroup,
title:'Client id',
description:"mqttjs_' + Math.random().toString(16).substr(2, 8)"
}),
utils.createCI('protocolVersion', ECIType.INTEGER,4, {
group: NetworkGroup,
title:'Protocol id',
description:""
}),
utils.createCI('protocolId', ECIType.STRING,'', {
group: NetworkGroup,
title:'Protocol id',
description:""
}),
utils.createCI('username', ECIType.STRING,'', {
group: NetworkGroup,
title:'User name',
description:"the username required by your broker, if any"
}),
utils.createCI('password', ECIType.STRING,'', {
group: NetworkGroup,
title:'Password',
password:true,
description:"the password required by your broker, if any"
}),
utils.createCI('keepalive', ECIType.INTEGER,10, {
group: NetworkGroup,
title:'Keep alive',
description:'10 seconds, set to 0 to disable'
}),
utils.createCI('reconnectPeriod', ECIType.INTEGER,1000, {
group: NetworkGroup,
title:'Reconnect period',
description:'1000 milliseconds, interval between two re-connections'
}),
utils.createCI('connectTimeout', ECIType.INTEGER,30 * 1000, {
group: NetworkGroup,
title:'Connect timeout',
description:'30 * 1000 milliseconds, time to wait before a CONNACK is received'
}),
utils.createCI('reschedulePings', ECIType.BOOL,true, {
group: NetworkGroup,
title:'Reschedule',
description:'reschedule ping messages after sending packets'
}),
utils.createCI('queueQoSZero', ECIType.BOOL,true, {
group: NetworkGroup,
title:'Queue QoS Zero',
description:' if connection is broken, queue outgoing QoS zero messages (default true)'
}),
utils.createCI('clean', ECIType.BOOL,true, {
group: NetworkGroup,
title:'Clean',
description:'set to false to receive QoS 1 and 2 messages while offline'
})
]
dfd.resolve(cis);
return dfd;
} catch (e) {
console.error('error', e);
}
return dfd;
}
return Module;
});