/** @module nxapp/protocols/JSON-RPC-2 */ define([ 'dcl/dcl', 'xdojo/declare', "xide/utils", "nxapp/types/Types", 'nxapp/protocols/ProtocolBase', "dojo/node!net", "dojo/node!path", "dojo/node!child_process", "dojo/node!util", "dojo/node!deferred", "nxapp/utils/_console", "dojo/node!mqtt", "dojo/Deferred" ],function(dcl,declare,utils,types,ProtocolBase,net,path,child,nUtil,deferred,_console,mqtt,Deferred){ var debug = true; var debugSubscriptions = false; var debugPublish = true; var debugData = true; var console = _console; /** * MQTT protocol mqttClient * @class module:nxapp/protocols/MQTT * @extends module:nxapp/protocols/ProtocolBase */ var Module = dcl(ProtocolBase,{ declaredClass:"nxapp.protocols.JSON-RPC-2", _socket:null, protocolName:'mqtt', instance:null, subscriptions:null, mqttClient:null, /** * Creates a MQTT-Client options * @link https://github.com/mqttjs/MQTT.js * @param host * @param port * @param deviceOptions * @returns {Object} */ getOptions:function(host,port,deviceOptions){ var options = { clean:false, clientId:deviceOptions.clientId || (host+':'+port), port:port }; return utils.mixin(options,deviceOptions); }, onConnected:function(){ debug && console.log('# ' +this.protocolName + ' - Protocol:: onConnected ' + this.options.host + ' : ' + this.options.port + ' @ ' + this.protocolName); this.connection.connected = true; this.delegate.onConnect2(this.connection); }, onError:function(error,options) { this.delegate.onError(this.connection,utils.mixin({ code:error },options),this.options); }, onClose:function(data) { this.delegate.onClose(this.connection,data); }, connect:function(){ var _options = this.options; if(!_options || !_options.driver){ debug || this.isDebug() && console.error('no driver in options',_options); return this; } var self = this; var host = _options.host; var port = _options.port; var deviceOptions = utils.getJson(_options.options || {}); if(deviceOptions.debug===true){ debug = true; } var options = this.getOptions(host,port,deviceOptions); this.host = host; this.port = port; this.protocol = this.protocolName; debug || this.isDebug() && console.log('MQTT->connecting to ' + host +' with',options); var mqttClient = mqtt.connect(host,options); this.mqttClient = mqttClient; try{ mqttClient.on('connect', function (err) { self.onConnected(); }); }catch(e){ debug || this.isDebug() && console.error('MQTT->connect error :: code: ',e); } mqttClient.on('close', function (code) { debug || self.isDebug() && console.log('MQTT->close :: code: ', code); self.onClose(); mqttClient.end(); }); //@TODO: remove back - compat this._socket = {}; this._socket.writable=true; return this; }, onData:function(evt,buffer){ debugData || this.isDebug() && console.log('MQTT->onData ' + evt.topic,utils.inspect(evt)); this.delegate.onData(this.connection,evt,buffer); }, onCommandError:function(cmd,options){ debug || this.isDebug() && console.log('MQTT->CommandError ' + cmd + ' id ' + options.id + ' src ' + options.src); try { this.delegate.onData(this.connection, utils.mixin({ cmd: cmd, event: types.EVENTS.ON_COMMAND_ERROR },options)); }catch(e){ console.error('---',e); } }, onFinish:function(cmd,options,buffer){ debug || this.isDebug() && console.log('MQTT onFinish ' + cmd + ' id ' + options.id + ' src ' + options.src); try { this.delegate.onData(this.connection, utils.mixin({ cmd: cmd, event: types.EVENTS.ON_COMMAND_FINISH },options),buffer); }catch(e){ console.error('onFinish-Error:',e); } }, addSubscription:function(_topic,args,data){ !this.subscriptions && (this.subscriptions = {}); if(!_topic){ console.error('invalid topic'); return; } var self = this; var topic = '' + _topic; if(!this.subscriptions[topic]){ var options = { qos:parseInt(args.qos) || 0 } this.mqttClient.subscribe(topic,options); debugSubscriptions || this.isDebug() && console.log('add subscription : ' + _topic + ' for ',nUtil.inspect(data, {depth: null, colors: true})); var handle = this.subscriptions[topic] = { id:data.params.id, src:data.params.src, topic:topic }; this.mqttClient.on('message', function (__topic, message) { if(__topic===topic) { var _data = utils.getJson(message.toString()); if(_.isArray(_data)){ self.onData(utils.mixin({ payload:_data, topic:__topic }, handle)); }else if(_.isObject(_data)) { if(!_data.topic){ _data.topic=__topic; } self.onData(utils.mixin(_data, handle)); }else if(_.isString(_data) || _.isNumber(_data)){ self.onData(utils.mixin({ payload:_data, topic:__topic }, handle)); }else{ console.error('-unknown ' + __topic); } } }); } }, unSubscribeTopic:function(args,data,mqttClient){ args = utils.getJson(args); var topic = '' + args.topic; if(topic) { this.unsubscribe(topic); } }, subscribeTopic:function(args,data,mqttClient){ args = utils.getJson(args); data = utils.getJson(data); var topic = '' + args.topic; delete args['topic']; this.addSubscription(topic,args,data); }, publishTopic:function(args,data,mqttClient){ try { args = utils.getJson(args); data = utils.getJson(data); var topic = '' + args.topic; var qos = args.qos; var retain = args.retain; delete args['topic']; delete args['qos']; delete args['retain']; var payload = args.payload || args; if(_.isObject(payload)){ try{ payload = JSON.stringify(payload); }catch(e){ console.error('error generating payload',e); return; } } debugPublish || this.isDebug() && console.log('MQTT->publish: ' +topic + ' _ ' + _.isString(payload) + ' l = ' + payload.length,payload); this.mqttClient.publish(topic, payload,{ qos:qos, retain:retain }); }catch(e){ debug || this.isDebug() && console.log('---error ',e); } }, send:function(cmd,options) { return; }, unsubscribe:function(topic){ if(topic && this.mqttClient && this.subscriptions && this.subscriptions[topic]){ this.mqttClient.unsubscribe(topic); delete this.subscriptions[topic]; } }, close:function() { if(this.mqttClient){ _.each(this.subscriptions,this.unsubscribe,this); this.mqttClient.end(); this.mqttClient = null; } } }); Module.options = function (query) { try { var dfd = new Deferred(); var ECIType = types.ECIType; var NetworkGroup = 'Network'; function createOption(label, value) { return { label: label, value: label || value } } var cis = [ utils.createCI('clientId', ECIType.STRING,'', { group: NetworkGroup, title:'Client id', description:"mqttjs_' + Math.random().toString(16).substr(2, 8)" }), utils.createCI('protocolVersion', ECIType.INTEGER,4, { group: NetworkGroup, title:'Protocol id', description:"" }), utils.createCI('protocolId', ECIType.STRING,'', { group: NetworkGroup, title:'Protocol id', description:"" }), utils.createCI('username', ECIType.STRING,'', { group: NetworkGroup, title:'User name', description:"the username required by your broker, if any" }), utils.createCI('password', ECIType.STRING,'', { group: NetworkGroup, title:'Password', password:true, description:"the password required by your broker, if any" }), utils.createCI('keepalive', ECIType.INTEGER,10, { group: NetworkGroup, title:'Keep alive', description:'10 seconds, set to 0 to disable' }), utils.createCI('reconnectPeriod', ECIType.INTEGER,1000, { group: NetworkGroup, title:'Reconnect period', description:'1000 milliseconds, interval between two re-connections' }), utils.createCI('connectTimeout', ECIType.INTEGER,30 * 1000, { group: NetworkGroup, title:'Connect timeout', description:'30 * 1000 milliseconds, time to wait before a CONNACK is received' }), utils.createCI('reschedulePings', ECIType.BOOL,true, { group: NetworkGroup, title:'Reschedule', description:'reschedule ping messages after sending packets' }), utils.createCI('queueQoSZero', ECIType.BOOL,true, { group: NetworkGroup, title:'Queue QoS Zero', description:' if connection is broken, queue outgoing QoS zero messages (default true)' }), utils.createCI('clean', ECIType.BOOL,true, { group: NetworkGroup, title:'Clean', description:'set to false to receive QoS 1 and 2 messages while offline' }) ] dfd.resolve(cis); return dfd; } catch (e) { console.error('error', e); } return dfd; } return Module; });