/** @module nxapp/manager/ConnectionManager */ define([ "dcl/dcl", 'nxapp/manager/ManagerBase', 'nxapp/utils/_LogMixin', 'nxapp/protocols/Tcp', 'nxapp/protocols/Udp', 'nxapp/protocols/Driver', 'nxapp/protocols/SSH', 'nxapp/protocols/Serial', 'nxapp/protocols/MQTT', 'nxapp/protocols/ProtocolBase', 'nxapp/types/Types', 'xide/factory', 'xide/utils', "nxapp/utils/_console", "xide/data/_Base", "nxapp/model/Connection", 'xide/data/TreeMemory', 'xide/data/ObservableStore', 'dstore/Trackable', "dojo/_base/declare", "dojo/Deferred", "dojo/node!path", 'require' ], function (dcl, ManagerBase, _LogMixin, Tcp, Udp, Driver, SSH, Serial, MQTT, ProtocolBase, types, factory, utils, _console, _StoreBase, Connection, TreeMemory, ObservableStore, Trackable, declare, Deferred, path, require) { var debug = false, debugConnection = false, debugData = false, debugMethods = false, debugLogging = false; var console = _console; /** * Common base class for protocol connections * @class module:nxapp/manager/ConnectionManager * @extends module:nxapp/manager/ManagerBase */ return dcl([ManagerBase, _LogMixin], { declaredClass: "nxapp.manager.ConnectionManager", options: null, profile: null, lastUpTime: null, mqttManager: null, pool: [], poolNotFound: -1, store: null, destroy: function () { console.log('destroy cm') this.closeAll(); }, getDriverModule: function (options) { var dfd = new Deferred(); if (!options || !options.driver) { console.error('cant get driver module. options invalid or no driver specified'); dfd.resolve(null); return dfd; } try { var driversRoot = options[options.driverScope]; var modulePath = driversRoot + '/' + options.driver; modulePath = utils.replaceAll('/', path.sep, modulePath); try { require.undef(modulePath); require([modulePath], function (what) { dfd.resolve(what); }); } catch (e) { console.error('-----error loading driver module at drivers root ' + driversRoot + ' and module ' + options.driver + ' :: ' + e.message, e.stack); dfd.resolve(); } } catch (e) { console.error('Error connecting driver ' + e.message); dfd.resolve(); } return dfd; }, protocolMethod: function (protocol, method, options) { options = options || {}; var dfd = new Deferred(); var self = this; this.getDriverModule(options.device).then(function (driverModule) { var sub = self._protocolMethod(protocol, method, options, driverModule); if (sub && sub.then) { sub.then(function (res) { dfd.resolve(res); }); } else { dfd.resolve([]); } }, function (error) { console.error('cant get driver : ' + options.device); }); return dfd; }, _protocolMethod: function (protocol, method, options, driverModule) { options = options || {}; var args = options.args; var device = options.device; // a driver may be subclass from an existing protocol if (protocol === types.PROTOCOL.DRIVER && driverModule) { protocol = driverModule.is ? driverModule.is() || protocol : protocol; } var result = {}; switch (protocol) { case types.PROTOCOL.DRIVER: { if (driverModule && driverModule[method]) { result = driverModule[method](args); break; } } case types.PROTOCOL.UDP: { if (method in Udp) { result = Udp[method].apply(this, args); break; } } case types.PROTOCOL.MQTT: { if (method in MQTT) { result = MQTT[method].apply(this, args); break; } } case types.PROTOCOL.SSH: { if (method in SSH) { result = SSH[method].apply(this, args); break; } } case types.PROTOCOL.TCP: { if (method in Tcp) { result = Tcp[method].apply(this, args); break; } } case types.PROTOCOL.SERIAL: { if (method in Serial) { var a = Serial[method].apply(this, args); return a; } else { console.error('no such protcol method ' + method + ' in ' + protocol); } break; } default: { var dfd = new Deferred(); dfd.resolve([]); return dfd; } } return result; }, onWakeup: function () { this.lastUpTime = (new Date()).getTime(); for (var i = 0; i < this.pool.length; i++) { var con = this.pool[i]; { if (con.socket && con.socket.close) { con.socket.close(); } if (con.socket && con.socket._socket && con.socket._socket.close) { con.socket._socket.close(); } } } this.pool = []; }, init: function (profile) { this.profile = profile; this.initLogger(profile.debug); this.lastUpTime = (new Date()).getTime(); var StoreClass = declare("nxapp.data.Store", [TreeMemory, Trackable, ObservableStore], {}); this.store = new StoreClass({ idProperty: 'id', Model: Connection }); var thiz = this; setInterval(function () { var current = (new Date()).getTime(); if (current - thiz.lastUpTime > 3000) { thiz.onWakeup(); } thiz.lastUpTime = current; }, 1000); }, /** * * Connect to a device. Returns a connection from cache if already connected. * * @param host {string} The host * @param protocol {string} The protocol * @param port {string|int} The target port * @param mqtt {object} MQTT parameters * @param options {module:xide/types~DeviceInfo} * @param clientConnection {module:node/net.Socket} * @returns {module:nxapp/model/Connection|null} */ connect: function (host, protocol, port, mqtt, options, clientConnection) { // console.log('connect ' + host + ':' + port); if (!host) { console.error('ConnectionManager::connect : have no host'); return null; } if (!protocol) { console.error('ConnectionManager::connect : have no protocol'); return null; } if (!options) { console.error('ConnectionManager::connect : have no options'); return null; } var connectionId = Connection.generateId(options, this); var _Connection = this.store.getSync(connectionId); if (_Connection) { if (!_Connection.isServer() && !_Connection.isRunAsServer()) { this.onConnect2(_Connection, true, true); } debugConnection && console.log('already have connection'); return _Connection; } else { debugConnection && console.log('create new connection'); } /* if(port==='9999'){ console.log(' '+ host + ' =' + connectionId,options.devicePath); if(host ==='127.0.0.1'){ console.error('maeh',utils.stack(),options); } }*/ var client = null; var socket = null; var ctx = this.ctx; switch (protocol) { case types.PROTOCOL.TCP: { client = new Tcp({ owner: this, options: options, delegate: this, ctx: ctx }); break; } case types.PROTOCOL.UDP: { client = new Udp({ owner: this, options: options, delegate: this, ctx: ctx }); break; } case types.PROTOCOL.DRIVER: { debug && console.log('---create Custom driver connection'); client = new Driver({ owner: this, options: options, delegate: this, ctx: ctx }); break; } case types.PROTOCOL.SSH: { debug && console.log('---create SSH connection'); client = new SSH({ owner: this, options: options, delegate: this, ctx: ctx }); break; } case types.PROTOCOL.SERIAL: { debug && console.log('---create SSH connection'); client = new Serial({ owner: this, options: options, delegate: this, ctx: ctx }); break; } case types.PROTOCOL.MQTT: { debug && console.log('---create MQTT connection'); client = new MQTT({ owner: this, options: options, delegate: this, ctx: ctx }); break; } default: { debug && console.error('no such driver protocol ' + protocol); } } if (client) { var connection = this.addConnection2(client, options); client.init(); client.connection = connection; socket = client.connect(); connection.socket = socket; factory.publish(types.EVENTS.ON_CONNECTION_CREATED, { connection: connection }, this); if (this.mqttManager) { this.mqttManager.onConnectedToDevice(options, client, mqtt); } return connection; } else // Connection error { factory.publish(types.EVENTS.ON_CONNECTION_ERROR, { connection: { port: port, host: host, protocol: protocol } }, this); return false; } }, /** * * @param connection {module:nxapp/model/Connection} * @param cmd {string} * @param options {module:xide/types~DeviceInfo|null} * @returns {*} */ send: function (connection, cmd, options) { if (!connection) { console.error('send ' + cmd + ' have no connection'); return; } var client = connection.client; if (!client) { console.error('send ' + cmd + ' have no client'); return; } var socket = client._socket; if (!connection.client) { console.error('connection has no client'); } if (socket) { if (socket.writable) { } else { console.error('socket not writable, closing connection' + connection.lastError); this.close(connection, true); return false; } } else { console.error('have no socket'); return false; } if (connection && connection.options) { this.logEx(types.LOG_OUTPUT.SEND_COMMAND + ' : ' + cmd, 'info', 'server', types.LOG_OUTPUT.SEND_COMMAND, connection); } return client.send(cmd, options); }, /** * * @param connection {module:nxapp/model/Connection} * @param data * @returns {*} */ callMethod: function (connection, data) { data = utils.getJson(data); var method = data.method; var args = data.args; if (!connection) { console.error('call method : invalid connection'); return; } if (!connection.client) { console.error('call method : connection has no client object'); return; } if (connection.client[method]) { /* var hash = MD5(JSON.stringify(args), 1); if(connection[hash + '_cm']){ console.log('already done!'); return; } connection[hash + '_cm'] = true; */ console.log('call method : ' + method + ' with ', [args, data]); // srcCon return connection.client[method].apply(connection.client, [args, data, connection]); } else { console.error('no such method ' + method, connection.client.declaredClass); } }, /** * @param connection {module:nxapp/model/Connection} */ close: function (connection) { if (connection) { connection._destroyed = true; connection.client.destroy(); return; /* var client = connection.client; var socket = client._socket; connection.client = null; if(socket && socket._socket && socket._socket.close) { socket._socket.close(); }else{ console.log('socket doesnt support close'); }*/ } }, closeAll: function () { _.each(this.store.query({}), this.close, this); }, /////////////////////////////////////////////////////////////////////////// // // Client Connection callbacks // /////////////////////////////////////////////////////////////////////////// onTimeout: function (connection) {}, /** * Called by the protocol client. * @param connection {module:nxapp/model/Connection} * @param data */ onClose: function (connection, data) { if (connection && connection.options) { debugConnection && console.log("onClosed Connection closed : " + connection.toString(), data); this.logEx(types.LOG_OUTPUT.DEVICE_DISCONNECTED, 'info', 'server', types.LOG_OUTPUT.DEVICE_DISCONNECTED, connection); this.publish(types.EVENTS.ON_DEVICE_DISCONNECTED, { device: connection.options, connection: connection }) } connection._destroyed = true; this.removeConnection(connection, data); }, onDrain: function (connection) {}, /** * Called by the protocol client. * @param connection {module:nxapp/model/Connection} * @param exception {Exception} * @param options {module:xide/types~DeviceInfo|null} */ onError: function (connection, exception, options) { if (options) { console.log(" Error: " + options.host + ':' + options.port + '@' + options.protocol, exception); } if (connection) { connection.lastError = exception; } this.removeConnection(connection, options); }, onFinish: function (connection, cmd, id) { debug && console.log("Finish " + id); var poolRef = this.connectionExists(connection.host, connection.port, connection.protocol); if (poolRef != this.poolNotFound) { this.callFeedBack(poolRef, data); } else { console.error('ConnectionManager::onData : cant find pool - reference:' + poolRef + ':' + connection.host + ':' + connection.port + ':' + connection.protocol); } }, /** * This is called by a protocol client which implements onData. * @param connection {module:nxapp/model/Connection} * @param data {string} * @param buffer */ onData: function (connection, data, buffer) { debugData && console.log("Data: --", utils.inspect(data)); try { this.callFeedBack2(connection, data, buffer); } catch (e) { console.error('call feedback error : ' + e.message); console.log(e.stack); } if (connection && connection.options) { this.logEx(types.LOG_OUTPUT.RESPONSE + ":" + data, 'info', 'server', types.LOG_OUTPUT.RESPONSE, connection); } }, /** * * @param options {module:xide/types~DeviceInfo|null} * @param output * @returns {boolean} */ hasFlag: function (deviceInfo, output) { var LOGGING_FLAGS = types.LOGGING_FLAGS, OUTPUT = types.LOG_OUTPUT, flags = deviceInfo.loggingFlags; flags = _.isString(flags) ? utils.fromJson(flags) : flags || {}; var flag = flags[output]; if (flag == null) { return false; } if (!(flag & LOGGING_FLAGS.FILE)) { return false; } return true; }, /** * Callback when protocol client connection has been established * @param connection {module:nxapp/model/Connection} * @param broadcast {boolean} * @param isReplay {boolean} */ onConnect2: function (connection, broadcast, isReplay) { if (!connection) { debugConnection && console.error('onConnect2 : invalid connection'); } if (!connection.connected) { debugConnection && console.error('onConnect : connection = not connected,abort ' + connection.options.host + ' bc=' + broadcast + ' isReplay=' + isReplay); return; } var broadCastMessage = { device: connection.options, isReplay: isReplay }; var deviceServer = this.ctx.getDeviceServer(); var _connection = this.getConnection2(connection.options); var isServer = false; if (_connection && _connection.options.isServer) { isServer = true; } deviceServer.broadCastMessage(types.EVENTS.ON_DEVICE_CONNECTED, broadCastMessage); this.publish(types.EVENTS.ON_DEVICE_CONNECTED, { device: connection.options, isReplay: isReplay, connection: connection }); if (connection) { if (!connection.__registered) { this.registerFeedBack2(connection, function (_connection, _data, buffer) { if (_data) { _data.lastResponse = !_.isString(_data) ? JSON.stringify(_data, null, 2) : _data; } deviceServer.onDeviceMessage(_connection, _data, buffer); }); connection.__registered = true; } } if (connection && connection.options && !isReplay) { this.logEx(types.LOG_OUTPUT.ON_DEVICE_CONNECTED, 'info', 'server', types.LOG_OUTPUT.ON_DEVICE_CONNECTED, connection); } if (connection.client && connection.client.onConnect) { connection.client.onConnect(); } return connection; }, /** * * @param message {string} * @param level {string} * @param type {string} * @param channel {string} * @param connection {module:nxapp/model/Connection} */ logEx: function (message, level, type, channel, connection) { var deviceServer = this.ctx.getDeviceServer(); if (connection && connection.options) { var deviceInfo = connection.options; if (this.hasFlag(deviceInfo, channel)) { deviceServer.logEx(message + ' ' + connection.toString(), level || 'info', type || 'server', { device: deviceInfo }); } } }, /** * * @param host * @param port * @param protocol * @param _connection */ _onConnect: function (host, port, protocol, _connection) { console.log("Connected to " + _connection.declaredClass + ':' + host + ":" + port + " via " + protocol + ' : ', _connection.options); if (!_connection.options) { console.error('have no options: ', _connection); throw new Error("Asdf"); } var broadCastMessage = { device: { host: host, port: port, protocol: protocol, options: _connection ? _connection.options : null } }; this.ctx.getDeviceServer().broadCastMessage(types.EVENTS.ON_DEVICE_CONNECTED, broadCastMessage); var thiz = this, deviceServer = this.ctx.getDeviceServer(); if (_connection) { if (!_connection.__registered) { this.registerFeedBack2(_connection, function (connection, _data, buffer) { _data.lastResponse = '' + _data; deviceServer.onDeviceMessage(connection, _data); }); _connection.__registered = true; } else { console.error('already registered'); } } }, onHandle: function (connection, data) {}, getNumberOfConnections: function () { return this.pool.length; }, getDeviceConnection: function () { var result = []; if (this.getNumberOfConnections()) { for (var i = 0; i < this.pool.length; i++) {} } }, addConnection: function (socket, host, port, protocol, user, customFields, connection, options) { var _Connection = new Connection(connection, socket, options); this.store.putSync(_Connection); this.pool.push({ socket: socket, host: host, port: port, protocol: protocol, user: user, customFields: customFields, feedbackTo: [] }); return _Connection; }, /** * * @param client * @param options * @returns {module:nxapp/model/Connection} */ addConnection2: function (client, options) { var _Connection = new Connection(client, options); return this.store.putSync(_Connection); }, /** * @param connection {module:nxapp/model/Connection} */ removeConnection: function (connection, error, stopped) { var thiz = this; if (connection) { connection = this.getConnection2(connection.options); } if (connection) { connection = this.getConnection2(connection.options); debug && console.log('---remove connection ' + connection.id); if (this.ctx.getDeviceServer()) { setTimeout(function () { var broadCastMessage = { device: connection.options, error: connection.lastError, stopped: stopped }; thiz.ctx.getDeviceServer().broadCastMessage(types.EVENTS.ON_DEVICE_DISCONNECTED, broadCastMessage); }, 1000); } this.store.removeSync(connection.id); connection._destroyed = true; } else { debug && console.error('remove connection : invalid connection'); } }, clearPool: function () { this.pool = []; }, getConnection: function (poolID) { if (arguments.length == 3) { var _host = arguments[0], _protocol = arguments[1], _port = arguments[2]; for (var i = 0; i < this.pool.length; i++) { var entry = this.pool[i]; if ((entry.host == _host) && (entry.port == _port) && (entry.protocol == _protocol) ) { return entry; } } return null; } return this.pool[poolID]; }, /** * @param options {module:xide/types~DeviceInfo} * @returns {module:nxapp/model/Connection} */ getConnection2: function (options) { return this.store.getSync(Connection.generateId(options, this)); }, getConnectionById: function (id) { return this.store.getSync(id); }, /** * * @param con {module:nxapp/model/Connection} * @param callback {function} */ registerFeedBack2: function (con, callback) { if (con) { if (!con.feedbackTo) { con.feedbackTo = []; } if (con && con.feedbackTo.length > 0) { return; } debug && console.info('register feedback : ' + con.options.host); con.feedbackTo.push(callback); } else { debug && console.error('register feedback : have no connection object'); } }, _registerFeedBack: function (poolRef, callback) { var con = this.getConnection(poolRef); if (con == null) { console.error('cant find connection for pool ref:' + poolRef); return; } if (con && con.feedbackTo.length > 0) { //console.error('registerFeedBack : already, abort'); return; } con.feedbackTo.push(callback); }, callFeedBack: function (poolRef, data) { throw new Error(); var myself = this; var connection = this.getConnection(poolRef); if (!connection) { console.error('-- call feed back: have no connection'); return; } if (!connection.feedbackTo || connection.feedbackTo.length == 0) { console.error('-- call feed back: have no feedback targets', connection.feedbackTo); return; } _.each(connection.feedbackTo, function (callback) { callback(myself.getConnection(poolRef), data); }); }, /** * * @param connection {module:nxapp/model/Connection} * @param data {string} * @param buffer {Buffer} */ callFeedBack2: function (connection, data, buffer) { var myself = this; if (!connection) { console.error('-- call feed back: have no connection'); return; } if (!connection.feedbackTo || connection.feedbackTo.length == 0) { console.error('-- call feed back: have no feedback targets', connection.options.host); return; } for (var i = 0; i < connection.feedbackTo.length; i++) { connection.feedbackTo[i](connection, data, buffer); } }, connectionExists: function (host, port, protocol) { var found = this.poolNotFound; /*console.log("Check if connection exists "+host+" "+protocol+" "+port + ' result : ' + found);*/ _.each(this.pool, function (entry, i) { //console.log("Pool position: " + i +" => ["+entry.host+" "+entry.port+" "+entry.protocol+"]"); if (entry) { if ((entry.host === host) && (entry.port === port) && (entry.protocol === protocol) ) { found = i; } } }); return found; }, forEachPool: function (callback) { _.each(this.pool, function (entry, i) { callback(entry, i); }); } }); });