control-freak-ide/server/nodejs/nxapp/manager/MQTTManager.js
plastic-hub-dev-node-saturn 538369cff7 latest
2021-05-12 18:35:18 +02:00

244 lines
6.4 KiB
JavaScript

define([
"dcl/dcl",
'nxapp/manager/ManagerBase',
'nxapp/utils/_LogMixin',
'nxapp/types/Types',
"dojo/node!mosca-memory",
"dojo/node!mqtt",
"dojo/node!node-notifier",
"dojo/node!colors",
"xide/utils",
"nxapp/utils/_console",
"dojo/node!tcp-port-used"
], function (dcl, ManagerBase, _LogMixin, types, mosca, mqtt, nodeNotifier, colors, utils, _console, tcpPortUsed) {
var debug = false;
var console = _console;
return dcl([ManagerBase, _LogMixin], {
declaredClass: "nxapp.manager.MQTTManager",
options: null,
profile: null,
lastUpTime: null,
mqttClients: null,
destroy: function () {
if (this.server && this.server.close) {
this.server.close();
}
},
publishTopic: function (topic, data) {
var message = {
topic: topic,
payload: JSON.stringify(data),
qos: 1
};
this.server.publish(message, function () {
debug && console.log('did send message on mqtt server ');
});
},
createMQTTClient2: function (deviceHost, devicePort, mqttOptions) {
var hash = deviceHost + ':' + devicePort;
var clientId = hash;
var client = mqtt.connect('mqtt://localhost', {
clean: false,
clientId: clientId
});
var ctx = this.ctx;
client.on('message', function (topic, message) {
// message is Buffer
debug && console.log('MQTTManager :: LocalMQTT - Client for ' + hash + ' : received topic' + topic + "\n");
//client.end();
ctx.getDeviceServer().broadCastMessage(types.EVENTS.ON_MQTT_MESSAGE, {
host: deviceHost,
port: devicePort,
topic: topic,
message: message.toString(),
mqtt: mqttOptions
});
});
return client;
},
createMQTTClient: function (deviceHost, devicePort, mqttOptions) {
var hash = deviceHost + ':' + devicePort;
debug && console.log('create mqtt client ' + deviceHost + ' : ' + devicePort);;
if (this.mqttClients[hash]) {
return this.mqttClients[hash];
}
if (!this.mqttClients[hash]) {
var clientId = hash;
var client = mqtt.connect('mqtt://localhost', {
clean: false,
clientId: clientId
});
var ctx = this.ctx;
debug && console.log('create mqtt - client for ' + hash);
client.on('connect', function () {
debug && console.log('on connected local mqtt client for ' + hash);
});
this.mqttClients[hash] = client;
return client;
}
},
onConnectedToDevice: function (info, client, mqtt) {
},
onWakeup: function () {
this.lastUpTime = (new Date()).getTime();
for (var i = 0; i < this.pool.length; i++) {
var con = this.pool[i];
{
if (con.socket && con.socket.close) {
con.socket.close();
}
if (con.socket && con.socket._socket && con.socket._socket.close) {
con.socket._socket.close();
}
}
}
this.pool = [];
},
init: function (profile) {
this.profile = profile;
this.lastUpTime = (new Date()).getTime();
var thiz = this;
this.mqttClients = {};
var ascoltatore = {
//using ascoltatore
type: 'mongo',
url: 'mongodb://localhost:27017/mqtt',
pubsubCollection: 'xcf',
mongo: {}
};
utils.mixin(ascoltatore, this.profile.ascoltatore);
var moscaSettings = {
port: 1883,
//backend: ascoltatore,
//factory: mosca.persistence.Memory
persistence: {
factory: mosca.persistence.Memory
//url: 'mongodb://localhost:27017/mqtt'
}
};
if (this.profile.mosca.port) {
this.profile.mosca.port = parseInt(this.profile.mosca.port);
}
utils.mixin(moscaSettings, this.profile.mosca);
function setup() {
if (thiz.__started) {
return;
}
thiz.__started = true;
try {
nodeNotifier.notify({
title: 'Control-Freak',
message: 'All servers up and running, you can start now Control-Freak IDE',
//icon: path.join(__dirname, 'coulson.jpg'), // Absolute path (doesn't work on balloons)
sound: true, // Only Notification Center or Windows Toasters
wait: true, // Wait with callback, until user action is taken against notification
'open': 'file:///'
}, function (err, response) {
// Response is response from notification
});
} catch (e) {
}
console.log(colors.green('\n\r\t Device server up and running\n\r \t *********** You can start Control Freak now **************\n\r'));
thiz.ctx.onReady();
}
if (this.profile.mqtt === 'false') {
console.warn('MQTT is disabled, proceed without');
this.server = {};
setup();
return;
}
try {
function boot() {
var server = new mosca.Server(moscaSettings);
server.on('error', function (err) {
console.error('Unable to connect to Mongo:' + err.message);
setup();
});
server.on('ready', setup);
server.on('clientConnected', function () {
debug && console.log('MQTTManager :: mqtt client connected');
});
server.on('clientDisconnected', function () {
debug && console.log('MQTTManager :: mqtt client disconnected');
});
thiz.server = server;
}
tcpPortUsed.check(profile.socket_server.port, profile.socket_server.host).then(function (inUse) {
if (inUse) {
} else {
boot();
}
}, function () {
boot();
});
} catch (e) {
console.error('error starting mosca ', e);
}
},
///////////////////////////////////////////////////////////////////////////
//
// Client Connection callbacks
//
///////////////////////////////////////////////////////////////////////////
onTimeout: function (connection) {
debug && console.log("Timeout!");
},
onClose: function (connection) {
debug && console.log("Connection closed", connection);
this.removeConnection(connection);
},
onDrain: function (connection) {
debug && console.log("Drain");
},
onError: function (connection, exception) {
debug && console.log(" Error: ");
debug && console.dir(exception);
},
onData: function (connection, data) {
throw new Error();
},
onConnect: function (host, port, protocol, deviceScope) {
console.log("Connected to " + host + ":" + port + " via " + protocol, "protocol_messages");
var broadCastMessage = {
device: {
host: host,
port: port,
protocol: protocol,
deviceScope: deviceScope
}
};
this.ctx.getDeviceServer().broadCastMessage(types.EVENTS.ON_DEVICE_CONNECTED, broadCastMessage);
},
onHandle: function (connection, data) {
if (this.showDebugMsg("protocol_messages")) {
debug && console.log("Handle: ");
debug && console.dir(data);
}
}
});
});