123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- /**
- * Component for proxy.
- * Generate proxies for rpc client.
- */
- var crc = require('crc');
- var utils = require('../util/utils');
- var events = require('../util/events');
- var Client = require('pomelo-rpc').client;
- var pathUtil = require('../util/pathUtil');
- var Constants = require('../util/constants');
- var logger = require('pomelo-logger').getLogger('pomelo', __filename);
- /**
- * Component factory function
- *
- * @param {Object} app current application context
- * @param {Object} opts construct parameters
- * opts.router: (optional) rpc message route function, route(routeParam, msg, cb),
- * opts.mailBoxFactory: (optional) mail box factory instance.
- * @return {Object} component instance
- */
- module.exports = function(app, opts) {
- opts = opts || {};
- // proxy default config
- // cacheMsg is deprecated, just for compatibility here.
- opts.bufferMsg = opts.bufferMsg || opts.cacheMsg || false;
- opts.interval = opts.interval || 30;
- opts.router = genRouteFun();
- opts.context = app;
- opts.routeContext = app;
- if (app.enabled('rpcDebugLog')) {
- opts.rpcDebugLog = true;
- opts.rpcLogger = require('pomelo-logger').getLogger('rpc-debug', __filename);
- }
- return new Component(app, opts);
- };
- /**
- * Proxy component class
- *
- * @param {Object} app current application context
- * @param {Object} opts construct parameters
- */
- var Component = function(app, opts) {
- this.app = app;
- this.opts = opts;
- this.client = genRpcClient(this.app, opts);
- this.app.event.on(events.ADD_SERVERS, this.addServers.bind(this));
- this.app.event.on(events.REMOVE_SERVERS, this.removeServers.bind(this));
- this.app.event.on(events.REPLACE_SERVERS, this.replaceServers.bind(this));
- };
- var pro = Component.prototype;
- pro.name = '__proxy__';
- /**
- * Proxy component lifecycle function
- *
- * @param {Function} cb
- * @return {Void}
- */
- pro.start = function(cb) {
- if(this.opts.enableRpcLog) {
- logger.warn('enableRpcLog is deprecated in 0.8.0, please use app.rpcFilter(pomelo.rpcFilters.rpcLog())');
- }
- var rpcBefores = this.app.get(Constants.KEYWORDS.RPC_BEFORE_FILTER);
- var rpcAfters = this.app.get(Constants.KEYWORDS.RPC_AFTER_FILTER);
- var rpcErrorHandler = this.app.get(Constants.RESERVED.RPC_ERROR_HANDLER);
- if(!!rpcBefores) {
- this.client.before(rpcBefores);
- }
- if(!!rpcAfters) {
- this.client.after(rpcAfters);
- }
- if(!!rpcErrorHandler) {
- this.client.setErrorHandler(rpcErrorHandler);
- }
- process.nextTick(cb);
- };
- /**
- * Component lifecycle callback
- *
- * @param {Function} cb
- * @return {Void}
- */
- pro.afterStart = function(cb) {
- var self = this;
- this.app.__defineGetter__('rpc', function() {
- return self.client.proxies.user;
- });
- this.app.__defineGetter__('sysrpc', function() {
- return self.client.proxies.sys;
- });
- this.app.set('rpcInvoke', this.client.rpcInvoke.bind(this.client), true);
- this.client.start(cb);
- };
- /**
- * Add remote server to the rpc client.
- *
- * @param {Array} servers server info list, {id, serverType, host, port}
- */
- pro.addServers = function(servers) {
- if (!servers || !servers.length) {
- return;
- }
- genProxies(this.client, this.app, servers);
- this.client.addServers(servers);
- };
- /**
- * Remove remote server from the rpc client.
- *
- * @param {Array} ids server id list
- */
- pro.removeServers = function(ids) {
- this.client.removeServers(ids);
- };
- /**
- * Replace remote servers from the rpc client.
- *
- * @param {Array} ids server id list
- */
- pro.replaceServers = function(servers) {
- if (!servers || !servers.length) {
- return;
- }
- // update proxies
- this.client.proxies = {};
- genProxies(this.client, this.app, servers);
- this.client.replaceServers(servers);
- };
- /**
- * Proxy for rpc client rpcInvoke.
- *
- * @param {String} serverId remote server id
- * @param {Object} msg rpc message: {serverType: serverType, service: serviceName, method: methodName, args: arguments}
- * @param {Function} cb callback function
- */
- pro.rpcInvoke = function(serverId, msg, cb) {
- this.client.rpcInvoke(serverId, msg, cb);
- };
- /**
- * Generate rpc client
- *
- * @param {Object} app current application context
- * @param {Object} opts contructor parameters for rpc client
- * @return {Object} rpc client
- */
- var genRpcClient = function(app, opts) {
- opts.context = app;
- opts.routeContext = app;
- if(!!opts.rpcClient) {
- return opts.rpcClient.create(opts);
- } else {
- return Client.create(opts);
- }
- };
- /**
- * Generate proxy for the server infos.
- *
- * @param {Object} client rpc client instance
- * @param {Object} app application context
- * @param {Array} sinfos server info list
- */
- var genProxies = function(client, app, sinfos) {
- var item;
- for (var i = 0, l = sinfos.length; i < l; i++) {
- item = sinfos[i];
- if (hasProxy(client, item)) {
- continue;
- }
- client.addProxies(getProxyRecords(app, item));
- }
- };
- /**
- * Check a server whether has generated proxy before
- *
- * @param {Object} client rpc client instance
- * @param {Object} sinfo server info
- * @return {Boolean} true or false
- */
- var hasProxy = function(client, sinfo) {
- var proxy = client.proxies;
- return !!proxy.sys && !! proxy.sys[sinfo.serverType];
- };
- /**
- * Get proxy path for rpc client.
- * Iterate all the remote service path and create remote path record.
- *
- * @param {Object} app current application context
- * @param {Object} sinfo server info, format: {id, serverType, host, port}
- * @return {Array} remote path record array
- */
- var getProxyRecords = function(app, sinfo) {
- var records = [],
- appBase = app.getBase(),
- record;
- // sys remote service path record
- if (app.isFrontend(sinfo)) {
- record = pathUtil.getSysRemotePath('frontend');
- } else {
- record = pathUtil.getSysRemotePath('backend');
- }
- if (record) {
- records.push(pathUtil.remotePathRecord('sys', sinfo.serverType, record));
- }
- // userstate remote service path record
- record = pathUtil.getUserRemotePath(appBase, sinfo.serverType);
- if (record) {
- records.push(pathUtil.remotePathRecord('user', sinfo.serverType, record));
- }
- return records;
- };
- var genRouteFun = function() {
- return function(session, msg, app, cb) {
- var routes = app.get('__routes__');
- if (!routes) {
- defaultRoute(session, msg, app, cb);
- return;
- }
- var type = msg.serverType,
- route = routes[type] || routes['default'];
- if (route) {
- route(session, msg, app, cb);
- } else {
- defaultRoute(session, msg, app, cb);
- }
- };
- };
- var defaultRoute = function(session, msg, app, cb) {
- var list = app.getServersByType(msg.serverType);
- if (!list || !list.length) {
- cb(new Error('can not find server info for type:' + msg.serverType));
- return;
- }
- var uid = session ? (session.uid || '') : '';
- var index = Math.abs(crc.crc32(uid.toString())) % list.length;
- utils.invokeCallback(cb, null, list[index].id);
- };
|