control-freak-ide/server/nodejs/nxapp/manager/ConnectionManager.js
plastic-hub-dev-node-saturn 538369cff7 latest
2021-05-12 18:35:18 +02:00

865 lines
22 KiB
JavaScript

/** @module nxapp/manager/ConnectionManager */
define([
"dcl/dcl",
'nxapp/manager/ManagerBase',
'nxapp/utils/_LogMixin',
'nxapp/protocols/Tcp',
'nxapp/protocols/Udp',
'nxapp/protocols/Driver',
'nxapp/protocols/SSH',
'nxapp/protocols/Serial',
'nxapp/protocols/MQTT',
'nxapp/protocols/ProtocolBase',
'nxapp/types/Types',
'xide/factory',
'xide/utils',
"nxapp/utils/_console",
"xide/data/_Base",
"nxapp/model/Connection",
'xide/data/TreeMemory',
'xide/data/ObservableStore',
'dstore/Trackable',
"dojo/_base/declare",
"dojo/Deferred",
"dojo/node!path",
'require'
], function (dcl, ManagerBase, _LogMixin, Tcp, Udp, Driver, SSH, Serial, MQTT, ProtocolBase,
types, factory, utils, _console,
_StoreBase, Connection, TreeMemory,
ObservableStore, Trackable, declare,
Deferred, path, require) {
var debug = false,
debugConnection = false,
debugData = false,
debugMethods = false,
debugLogging = false;
var console = _console;
/**
* Common base class for protocol connections
* @class module:nxapp/manager/ConnectionManager
* @extends module:nxapp/manager/ManagerBase
*/
return dcl([ManagerBase, _LogMixin], {
declaredClass: "nxapp.manager.ConnectionManager",
options: null,
profile: null,
lastUpTime: null,
mqttManager: null,
pool: [],
poolNotFound: -1,
store: null,
destroy: function () {
console.log('destroy cm')
this.closeAll();
},
getDriverModule: function (options) {
var dfd = new Deferred();
if (!options || !options.driver) {
console.error('cant get driver module. options invalid or no driver specified');
dfd.resolve(null);
return dfd;
}
try {
var driversRoot = options[options.driverScope];
var modulePath = driversRoot + '/' + options.driver;
modulePath = utils.replaceAll('/', path.sep, modulePath);
try {
require.undef(modulePath);
require([modulePath], function (what) {
dfd.resolve(what);
});
} catch (e) {
console.error('-----error loading driver module at drivers root ' + driversRoot + ' and module ' + options.driver + ' :: ' + e.message, e.stack);
dfd.resolve();
}
} catch (e) {
console.error('Error connecting driver ' + e.message);
dfd.resolve();
}
return dfd;
},
protocolMethod: function (protocol, method, options) {
options = options || {};
var dfd = new Deferred();
var self = this;
this.getDriverModule(options.device).then(function (driverModule) {
var sub = self._protocolMethod(protocol, method, options, driverModule);
if (sub && sub.then) {
sub.then(function (res) {
dfd.resolve(res);
});
} else {
dfd.resolve([]);
}
}, function (error) {
console.error('cant get driver : ' + options.device);
});
return dfd;
},
_protocolMethod: function (protocol, method, options, driverModule) {
options = options || {};
var args = options.args;
var device = options.device;
// a driver may be subclass from an existing protocol
if (protocol === types.PROTOCOL.DRIVER && driverModule) {
protocol = driverModule.is ? driverModule.is() || protocol : protocol;
}
var result = {};
switch (protocol) {
case types.PROTOCOL.DRIVER:
{
if (driverModule && driverModule[method]) {
result = driverModule[method](args);
break;
}
}
case types.PROTOCOL.UDP:
{
if (method in Udp) {
result = Udp[method].apply(this, args);
break;
}
}
case types.PROTOCOL.MQTT:
{
if (method in MQTT) {
result = MQTT[method].apply(this, args);
break;
}
}
case types.PROTOCOL.SSH:
{
if (method in SSH) {
result = SSH[method].apply(this, args);
break;
}
}
case types.PROTOCOL.TCP:
{
if (method in Tcp) {
result = Tcp[method].apply(this, args);
break;
}
}
case types.PROTOCOL.SERIAL:
{
if (method in Serial) {
var a = Serial[method].apply(this, args);
return a;
} else {
console.error('no such protcol method ' + method + ' in ' + protocol);
}
break;
}
default:
{
var dfd = new Deferred();
dfd.resolve([]);
return dfd;
}
}
return result;
},
onWakeup: function () {
this.lastUpTime = (new Date()).getTime();
for (var i = 0; i < this.pool.length; i++) {
var con = this.pool[i]; {
if (con.socket && con.socket.close) {
con.socket.close();
}
if (con.socket && con.socket._socket && con.socket._socket.close) {
con.socket._socket.close();
}
}
}
this.pool = [];
},
init: function (profile) {
this.profile = profile;
this.initLogger(profile.debug);
this.lastUpTime = (new Date()).getTime();
var StoreClass = declare("nxapp.data.Store", [TreeMemory, Trackable, ObservableStore], {});
this.store = new StoreClass({
idProperty: 'id',
Model: Connection
});
var thiz = this;
setInterval(function () {
var current = (new Date()).getTime();
if (current - thiz.lastUpTime > 3000) {
thiz.onWakeup();
}
thiz.lastUpTime = current;
}, 1000);
},
/**
*
* Connect to a device. Returns a connection from cache if already connected.
*
* @param host {string} The host
* @param protocol {string} The protocol
* @param port {string|int} The target port
* @param mqtt {object} MQTT parameters
* @param options {module:xide/types~DeviceInfo}
* @param clientConnection {module:node/net.Socket}
* @returns {module:nxapp/model/Connection|null}
*/
connect: function (host, protocol, port, mqtt, options, clientConnection) {
// console.log('connect ' + host + ':' + port);
if (!host) {
console.error('ConnectionManager::connect : have no host');
return null;
}
if (!protocol) {
console.error('ConnectionManager::connect : have no protocol');
return null;
}
if (!options) {
console.error('ConnectionManager::connect : have no options');
return null;
}
var connectionId = Connection.generateId(options, this);
var _Connection = this.store.getSync(connectionId);
if (_Connection) {
if (!_Connection.isServer() && !_Connection.isRunAsServer()) {
this.onConnect2(_Connection, true, true);
}
debugConnection && console.log('already have connection');
return _Connection;
} else {
debugConnection && console.log('create new connection');
}
/*
if(port==='9999'){
console.log(' '+ host + ' =' + connectionId,options.devicePath);
if(host ==='127.0.0.1'){
console.error('maeh',utils.stack(),options);
}
}*/
var client = null;
var socket = null;
var ctx = this.ctx;
switch (protocol) {
case types.PROTOCOL.TCP:
{
client = new Tcp({
owner: this,
options: options,
delegate: this,
ctx: ctx
});
break;
}
case types.PROTOCOL.UDP:
{
client = new Udp({
owner: this,
options: options,
delegate: this,
ctx: ctx
});
break;
}
case types.PROTOCOL.DRIVER:
{
debug && console.log('---create Custom driver connection');
client = new Driver({
owner: this,
options: options,
delegate: this,
ctx: ctx
});
break;
}
case types.PROTOCOL.SSH:
{
debug && console.log('---create SSH connection');
client = new SSH({
owner: this,
options: options,
delegate: this,
ctx: ctx
});
break;
}
case types.PROTOCOL.SERIAL:
{
debug && console.log('---create SSH connection');
client = new Serial({
owner: this,
options: options,
delegate: this,
ctx: ctx
});
break;
}
case types.PROTOCOL.MQTT:
{
debug && console.log('---create MQTT connection');
client = new MQTT({
owner: this,
options: options,
delegate: this,
ctx: ctx
});
break;
}
default:
{
debug && console.error('no such driver protocol ' + protocol);
}
}
if (client) {
var connection = this.addConnection2(client, options);
client.init();
client.connection = connection;
socket = client.connect();
connection.socket = socket;
factory.publish(types.EVENTS.ON_CONNECTION_CREATED, {
connection: connection
}, this);
if (this.mqttManager) {
this.mqttManager.onConnectedToDevice(options, client, mqtt);
}
return connection;
} else // Connection error
{
factory.publish(types.EVENTS.ON_CONNECTION_ERROR, {
connection: {
port: port,
host: host,
protocol: protocol
}
}, this);
return false;
}
},
/**
*
* @param connection {module:nxapp/model/Connection}
* @param cmd {string}
* @param options {module:xide/types~DeviceInfo|null}
* @returns {*}
*/
send: function (connection, cmd, options) {
if (!connection) {
console.error('send ' + cmd + ' have no connection');
return;
}
var client = connection.client;
if (!client) {
console.error('send ' + cmd + ' have no client');
return;
}
var socket = client._socket;
if (!connection.client) {
console.error('connection has no client');
}
if (socket) {
if (socket.writable) {
} else {
console.error('socket not writable, closing connection' + connection.lastError);
this.close(connection, true);
return false;
}
} else {
console.error('have no socket');
return false;
}
if (connection && connection.options) {
this.logEx(types.LOG_OUTPUT.SEND_COMMAND + ' : ' + cmd, 'info', 'server', types.LOG_OUTPUT.SEND_COMMAND, connection);
}
return client.send(cmd, options);
},
/**
*
* @param connection {module:nxapp/model/Connection}
* @param data
* @returns {*}
*/
callMethod: function (connection, data) {
data = utils.getJson(data);
var method = data.method;
var args = data.args;
if (!connection) {
console.error('call method : invalid connection');
return;
}
if (!connection.client) {
console.error('call method : connection has no client object');
return;
}
if (connection.client[method]) {
/*
var hash = MD5(JSON.stringify(args), 1);
if(connection[hash + '_cm']){
console.log('already done!');
return;
}
connection[hash + '_cm'] = true;
*/
console.log('call method : ' + method + ' with ', [args, data]);
// srcCon
return connection.client[method].apply(connection.client, [args, data, connection]);
} else {
console.error('no such method ' + method, connection.client.declaredClass);
}
},
/**
* @param connection {module:nxapp/model/Connection}
*/
close: function (connection) {
if (connection) {
connection._destroyed = true;
connection.client.destroy();
return;
/*
var client = connection.client;
var socket = client._socket;
connection.client = null;
if(socket && socket._socket && socket._socket.close) {
socket._socket.close();
}else{
console.log('socket doesnt support close');
}*/
}
},
closeAll: function () {
_.each(this.store.query({}), this.close, this);
},
///////////////////////////////////////////////////////////////////////////
//
// Client Connection callbacks
//
///////////////////////////////////////////////////////////////////////////
onTimeout: function (connection) {},
/**
* Called by the protocol client.
* @param connection {module:nxapp/model/Connection}
* @param data
*/
onClose: function (connection, data) {
if (connection && connection.options) {
debugConnection && console.log("onClosed Connection closed : " + connection.toString(), data);
this.logEx(types.LOG_OUTPUT.DEVICE_DISCONNECTED, 'info', 'server', types.LOG_OUTPUT.DEVICE_DISCONNECTED, connection);
this.publish(types.EVENTS.ON_DEVICE_DISCONNECTED, {
device: connection.options,
connection: connection
})
}
connection._destroyed = true;
this.removeConnection(connection, data);
},
onDrain: function (connection) {},
/**
* Called by the protocol client.
* @param connection {module:nxapp/model/Connection}
* @param exception {Exception}
* @param options {module:xide/types~DeviceInfo|null}
*/
onError: function (connection, exception, options) {
if (options) {
console.log(" Error: " + options.host + ':' + options.port + '@' + options.protocol, exception);
}
if (connection) {
connection.lastError = exception;
}
this.removeConnection(connection, options);
},
onFinish: function (connection, cmd, id) {
debug && console.log("Finish " + id);
var poolRef = this.connectionExists(connection.host, connection.port, connection.protocol);
if (poolRef != this.poolNotFound) {
this.callFeedBack(poolRef, data);
} else {
console.error('ConnectionManager::onData : cant find pool - reference:' + poolRef + ':' + connection.host + ':' + connection.port + ':' + connection.protocol);
}
},
/**
* This is called by a protocol client which implements onData.
* @param connection {module:nxapp/model/Connection}
* @param data {string}
* @param buffer
*/
onData: function (connection, data, buffer) {
debugData && console.log("Data: --", utils.inspect(data));
try {
this.callFeedBack2(connection, data, buffer);
} catch (e) {
console.error('call feedback error : ' + e.message);
console.log(e.stack);
}
if (connection && connection.options) {
this.logEx(types.LOG_OUTPUT.RESPONSE + ":" + data, 'info', 'server', types.LOG_OUTPUT.RESPONSE, connection);
}
},
/**
*
* @param options {module:xide/types~DeviceInfo|null}
* @param output
* @returns {boolean}
*/
hasFlag: function (deviceInfo, output) {
var LOGGING_FLAGS = types.LOGGING_FLAGS,
OUTPUT = types.LOG_OUTPUT,
flags = deviceInfo.loggingFlags;
flags = _.isString(flags) ? utils.fromJson(flags) : flags || {};
var flag = flags[output];
if (flag == null) {
return false;
}
if (!(flag & LOGGING_FLAGS.FILE)) {
return false;
}
return true;
},
/**
* Callback when protocol client connection has been established
* @param connection {module:nxapp/model/Connection}
* @param broadcast {boolean}
* @param isReplay {boolean}
*/
onConnect2: function (connection, broadcast, isReplay) {
if (!connection) {
debugConnection && console.error('onConnect2 : invalid connection');
}
if (!connection.connected) {
debugConnection && console.error('onConnect : connection = not connected,abort ' + connection.options.host + ' bc=' + broadcast + ' isReplay=' + isReplay);
return;
}
var broadCastMessage = {
device: connection.options,
isReplay: isReplay
};
var deviceServer = this.ctx.getDeviceServer();
var _connection = this.getConnection2(connection.options);
var isServer = false;
if (_connection && _connection.options.isServer) {
isServer = true;
}
deviceServer.broadCastMessage(types.EVENTS.ON_DEVICE_CONNECTED, broadCastMessage);
this.publish(types.EVENTS.ON_DEVICE_CONNECTED, {
device: connection.options,
isReplay: isReplay,
connection: connection
});
if (connection) {
if (!connection.__registered) {
this.registerFeedBack2(connection, function (_connection, _data, buffer) {
if (_data) {
_data.lastResponse = !_.isString(_data) ? JSON.stringify(_data, null, 2) : _data;
}
deviceServer.onDeviceMessage(_connection, _data, buffer);
});
connection.__registered = true;
}
}
if (connection && connection.options && !isReplay) {
this.logEx(types.LOG_OUTPUT.ON_DEVICE_CONNECTED, 'info', 'server', types.LOG_OUTPUT.ON_DEVICE_CONNECTED, connection);
}
if (connection.client && connection.client.onConnect) {
connection.client.onConnect();
}
return connection;
},
/**
*
* @param message {string}
* @param level {string}
* @param type {string}
* @param channel {string}
* @param connection {module:nxapp/model/Connection}
*/
logEx: function (message, level, type, channel, connection) {
var deviceServer = this.ctx.getDeviceServer();
if (connection && connection.options) {
var deviceInfo = connection.options;
if (this.hasFlag(deviceInfo, channel)) {
deviceServer.logEx(message + ' ' + connection.toString(), level || 'info', type || 'server', {
device: deviceInfo
});
}
}
},
/**
*
* @param host
* @param port
* @param protocol
* @param _connection
*/
_onConnect: function (host, port, protocol, _connection) {
console.log("Connected to " + _connection.declaredClass + ':' + host + ":" + port + " via " + protocol + ' : ', _connection.options);
if (!_connection.options) {
console.error('have no options: ', _connection);
throw new Error("Asdf");
}
var broadCastMessage = {
device: {
host: host,
port: port,
protocol: protocol,
options: _connection ? _connection.options : null
}
};
this.ctx.getDeviceServer().broadCastMessage(types.EVENTS.ON_DEVICE_CONNECTED, broadCastMessage);
var thiz = this,
deviceServer = this.ctx.getDeviceServer();
if (_connection) {
if (!_connection.__registered) {
this.registerFeedBack2(_connection, function (connection, _data, buffer) {
_data.lastResponse = '' + _data;
deviceServer.onDeviceMessage(connection, _data);
});
_connection.__registered = true;
} else {
console.error('already registered');
}
}
},
onHandle: function (connection, data) {},
getNumberOfConnections: function () {
return this.pool.length;
},
getDeviceConnection: function () {
var result = [];
if (this.getNumberOfConnections()) {
for (var i = 0; i < this.pool.length; i++) {}
}
},
addConnection: function (socket, host, port, protocol, user, customFields, connection, options) {
var _Connection = new Connection(connection, socket, options);
this.store.putSync(_Connection);
this.pool.push({
socket: socket,
host: host,
port: port,
protocol: protocol,
user: user,
customFields: customFields,
feedbackTo: []
});
return _Connection;
},
/**
*
* @param client
* @param options
* @returns {module:nxapp/model/Connection}
*/
addConnection2: function (client, options) {
var _Connection = new Connection(client, options);
return this.store.putSync(_Connection);
},
/**
* @param connection {module:nxapp/model/Connection}
*/
removeConnection: function (connection, error, stopped) {
var thiz = this;
if (connection) {
connection = this.getConnection2(connection.options);
}
if (connection) {
connection = this.getConnection2(connection.options);
debug && console.log('---remove connection ' + connection.id);
if (this.ctx.getDeviceServer()) {
setTimeout(function () {
var broadCastMessage = {
device: connection.options,
error: connection.lastError,
stopped: stopped
};
thiz.ctx.getDeviceServer().broadCastMessage(types.EVENTS.ON_DEVICE_DISCONNECTED, broadCastMessage);
}, 1000);
}
this.store.removeSync(connection.id);
connection._destroyed = true;
} else {
debug && console.error('remove connection : invalid connection');
}
},
clearPool: function () {
this.pool = [];
},
getConnection: function (poolID) {
if (arguments.length == 3) {
var _host = arguments[0],
_protocol = arguments[1],
_port = arguments[2];
for (var i = 0; i < this.pool.length; i++) {
var entry = this.pool[i];
if ((entry.host == _host) &&
(entry.port == _port) &&
(entry.protocol == _protocol)
) {
return entry;
}
}
return null;
}
return this.pool[poolID];
},
/**
* @param options {module:xide/types~DeviceInfo}
* @returns {module:nxapp/model/Connection}
*/
getConnection2: function (options) {
return this.store.getSync(Connection.generateId(options, this));
},
getConnectionById: function (id) {
return this.store.getSync(id);
},
/**
*
* @param con {module:nxapp/model/Connection}
* @param callback {function}
*/
registerFeedBack2: function (con, callback) {
if (con) {
if (!con.feedbackTo) {
con.feedbackTo = [];
}
if (con && con.feedbackTo.length > 0) {
return;
}
debug && console.info('register feedback : ' + con.options.host);
con.feedbackTo.push(callback);
} else {
debug && console.error('register feedback : have no connection object');
}
},
_registerFeedBack: function (poolRef, callback) {
var con = this.getConnection(poolRef);
if (con == null) {
console.error('cant find connection for pool ref:' + poolRef);
return;
}
if (con && con.feedbackTo.length > 0) {
//console.error('registerFeedBack : already, abort');
return;
}
con.feedbackTo.push(callback);
},
callFeedBack: function (poolRef, data) {
throw new Error();
var myself = this;
var connection = this.getConnection(poolRef);
if (!connection) {
console.error('-- call feed back: have no connection');
return;
}
if (!connection.feedbackTo || connection.feedbackTo.length == 0) {
console.error('-- call feed back: have no feedback targets', connection.feedbackTo);
return;
}
_.each(connection.feedbackTo, function (callback) {
callback(myself.getConnection(poolRef), data);
});
},
/**
*
* @param connection {module:nxapp/model/Connection}
* @param data {string}
* @param buffer {Buffer}
*/
callFeedBack2: function (connection, data, buffer) {
var myself = this;
if (!connection) {
console.error('-- call feed back: have no connection');
return;
}
if (!connection.feedbackTo || connection.feedbackTo.length == 0) {
console.error('-- call feed back: have no feedback targets', connection.options.host);
return;
}
for (var i = 0; i < connection.feedbackTo.length; i++) {
connection.feedbackTo[i](connection, data, buffer);
}
},
connectionExists: function (host, port, protocol) {
var found = this.poolNotFound;
/*console.log("Check if connection exists "+host+" "+protocol+" "+port + ' result : ' + found);*/
_.each(this.pool, function (entry, i) {
//console.log("Pool position: " + i +" => ["+entry.host+" "+entry.port+" "+entry.protocol+"]");
if (entry) {
if ((entry.host === host) &&
(entry.port === port) &&
(entry.protocol === protocol)
) {
found = i;
}
}
});
return found;
},
forEachPool: function (callback) {
_.each(this.pool, function (entry, i) {
callback(entry, i);
});
}
});
});