define([ "dcl/dcl", 'nxapp/manager/ManagerBase', 'nxapp/utils/_LogMixin', 'nxapp/types/Types', "dojo/node!mosca-memory", "dojo/node!mqtt", "dojo/node!node-notifier", "dojo/node!colors", "xide/utils", "nxapp/utils/_console", "dojo/node!tcp-port-used" ], function (dcl, ManagerBase, _LogMixin, types, mosca, mqtt, nodeNotifier, colors, utils, _console, tcpPortUsed) { var debug = false; var console = _console; return dcl([ManagerBase, _LogMixin], { declaredClass: "nxapp.manager.MQTTManager", options: null, profile: null, lastUpTime: null, mqttClients: null, destroy: function () { if (this.server && this.server.close) { this.server.close(); } }, publishTopic: function (topic, data) { var message = { topic: topic, payload: JSON.stringify(data), qos: 1 }; this.server.publish(message, function () { debug && console.log('did send message on mqtt server '); }); }, createMQTTClient2: function (deviceHost, devicePort, mqttOptions) { var hash = deviceHost + ':' + devicePort; var clientId = hash; var client = mqtt.connect('mqtt://localhost', { clean: false, clientId: clientId }); var ctx = this.ctx; client.on('message', function (topic, message) { // message is Buffer debug && console.log('MQTTManager :: LocalMQTT - Client for ' + hash + ' : received topic' + topic + "\n"); //client.end(); ctx.getDeviceServer().broadCastMessage(types.EVENTS.ON_MQTT_MESSAGE, { host: deviceHost, port: devicePort, topic: topic, message: message.toString(), mqtt: mqttOptions }); }); return client; }, createMQTTClient: function (deviceHost, devicePort, mqttOptions) { var hash = deviceHost + ':' + devicePort; debug && console.log('create mqtt client ' + deviceHost + ' : ' + devicePort);; if (this.mqttClients[hash]) { return this.mqttClients[hash]; } if (!this.mqttClients[hash]) { var clientId = hash; var client = mqtt.connect('mqtt://localhost', { clean: false, clientId: clientId }); var ctx = this.ctx; debug && console.log('create mqtt - client for ' + hash); client.on('connect', function () { debug && console.log('on connected local mqtt client for ' + hash); }); this.mqttClients[hash] = client; return client; } }, onConnectedToDevice: function (info, client, mqtt) { }, onWakeup: function () { this.lastUpTime = (new Date()).getTime(); for (var i = 0; i < this.pool.length; i++) { var con = this.pool[i]; { if (con.socket && con.socket.close) { con.socket.close(); } if (con.socket && con.socket._socket && con.socket._socket.close) { con.socket._socket.close(); } } } this.pool = []; }, init: function (profile) { this.profile = profile; this.lastUpTime = (new Date()).getTime(); var thiz = this; this.mqttClients = {}; var ascoltatore = { //using ascoltatore type: 'mongo', url: 'mongodb://localhost:27017/mqtt', pubsubCollection: 'xcf', mongo: {} }; utils.mixin(ascoltatore, this.profile.ascoltatore); var moscaSettings = { port: 1883, //backend: ascoltatore, //factory: mosca.persistence.Memory persistence: { factory: mosca.persistence.Memory //url: 'mongodb://localhost:27017/mqtt' } }; if (this.profile.mosca.port) { this.profile.mosca.port = parseInt(this.profile.mosca.port); } utils.mixin(moscaSettings, this.profile.mosca); function setup() { if (thiz.__started) { return; } thiz.__started = true; try { nodeNotifier.notify({ title: 'Control-Freak', message: 'All servers up and running, you can start now Control-Freak IDE', //icon: path.join(__dirname, 'coulson.jpg'), // Absolute path (doesn't work on balloons) sound: true, // Only Notification Center or Windows Toasters wait: true, // Wait with callback, until user action is taken against notification 'open': 'file:///' }, function (err, response) { // Response is response from notification }); } catch (e) { } console.log(colors.green('\n\r\t Device server up and running\n\r \t *********** You can start Control Freak now **************\n\r')); thiz.ctx.onReady(); } if (this.profile.mqtt === 'false') { console.warn('MQTT is disabled, proceed without'); this.server = {}; setup(); return; } try { function boot() { var server = new mosca.Server(moscaSettings); server.on('error', function (err) { console.error('Unable to connect to Mongo:' + err.message); setup(); }); server.on('ready', setup); server.on('clientConnected', function () { debug && console.log('MQTTManager :: mqtt client connected'); }); server.on('clientDisconnected', function () { debug && console.log('MQTTManager :: mqtt client disconnected'); }); thiz.server = server; } tcpPortUsed.check(profile.socket_server.port, profile.socket_server.host).then(function (inUse) { if (inUse) { } else { boot(); } }, function () { boot(); }); } catch (e) { console.error('error starting mosca ', e); } }, /////////////////////////////////////////////////////////////////////////// // // Client Connection callbacks // /////////////////////////////////////////////////////////////////////////// onTimeout: function (connection) { debug && console.log("Timeout!"); }, onClose: function (connection) { debug && console.log("Connection closed", connection); this.removeConnection(connection); }, onDrain: function (connection) { debug && console.log("Drain"); }, onError: function (connection, exception) { debug && console.log(" Error: "); debug && console.dir(exception); }, onData: function (connection, data) { throw new Error(); }, onConnect: function (host, port, protocol, deviceScope) { console.log("Connected to " + host + ":" + port + " via " + protocol, "protocol_messages"); var broadCastMessage = { device: { host: host, port: port, protocol: protocol, deviceScope: deviceScope } }; this.ctx.getDeviceServer().broadCastMessage(types.EVENTS.ON_DEVICE_CONNECTED, broadCastMessage); }, onHandle: function (connection, data) { if (this.showDebugMsg("protocol_messages")) { debug && console.log("Handle: "); debug && console.dir(data); } } }); });