channelService.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. var countDownLatch = require('../../util/countDownLatch');
  2. var utils = require('../../util/utils');
  3. var ChannelRemote = require('../remote/frontend/channelRemote');
  4. var logger = require('pomelo-logger').getLogger('pomelo', __filename);
  5. /**
  6. * constant
  7. */
  8. var ST_INITED = 0;
  9. var ST_DESTROYED = 1;
  10. /**
  11. * Create and maintain channels for server local.
  12. *
  13. * ChannelService is created by channel component which is a default loaded
  14. * component of pomelo and channel service would be accessed by `app.get('channelService')`.
  15. *
  16. * @class
  17. * @constructor
  18. */
  19. var ChannelService = function(app, opts) {
  20. opts = opts || {};
  21. this.app = app;
  22. this.channels = {};
  23. this.prefix = opts.prefix;
  24. this.store = opts.store;
  25. this.broadcastFilter = opts.broadcastFilter;
  26. this.channelRemote = new ChannelRemote(app);
  27. };
  28. module.exports = ChannelService;
  29. ChannelService.prototype.start = function(cb) {
  30. restoreChannel(this, cb);
  31. };
  32. /**
  33. * Create channel with name.
  34. *
  35. * @param {String} name channel's name
  36. * @memberOf ChannelService
  37. */
  38. ChannelService.prototype.createChannel = function(name) {
  39. if(this.channels[name]) {
  40. return this.channels[name];
  41. }
  42. var c = new Channel(name, this);
  43. addToStore(this, genKey(this), genKey(this, name));
  44. this.channels[name] = c;
  45. return c;
  46. };
  47. /**
  48. * Get channel by name.
  49. *
  50. * @param {String} name channel's name
  51. * @param {Boolean} create if true, create channel
  52. * @return {Channel}
  53. * @memberOf ChannelService
  54. */
  55. ChannelService.prototype.getChannel = function(name, create) {
  56. var channel = this.channels[name];
  57. if(!channel && !!create) {
  58. channel = this.channels[name] = new Channel(name, this);
  59. addToStore(this, genKey(this), genKey(this, name));
  60. }
  61. return channel;
  62. };
  63. /**
  64. * Destroy channel by name.
  65. *
  66. * @param {String} name channel name
  67. * @memberOf ChannelService
  68. */
  69. ChannelService.prototype.destroyChannel = function(name) {
  70. delete this.channels[name];
  71. removeFromStore(this, genKey(this), genKey(this, name));
  72. removeAllFromStore(this, genKey(this, name));
  73. };
  74. /**
  75. * Push message by uids.
  76. * Group the uids by group. ignore any uid if sid not specified.
  77. *
  78. * @param {String} route message route
  79. * @param {Object} msg message that would be sent to client
  80. * @param {Array} uids the receiver info list, [{uid: userId, sid: frontendServerId}]
  81. * @param {Object} opts userstate-defined push options, optional
  82. * @param {Function} cb cb(err)
  83. * @memberOf ChannelService
  84. */
  85. ChannelService.prototype.pushMessageByUids = function(route, msg, uids, opts, cb) {
  86. if(typeof route !== 'string') {
  87. cb = opts;
  88. opts = uids;
  89. uids = msg;
  90. msg = route;
  91. route = msg.route;
  92. }
  93. if(!cb && typeof opts === 'function') {
  94. cb = opts;
  95. opts = {};
  96. }
  97. if(!uids || uids.length === 0) {
  98. utils.invokeCallback(cb, new Error('uids should not be empty'));
  99. return;
  100. }
  101. var groups = {}, record;
  102. for(var i=0, l=uids.length; i<l; i++) {
  103. record = uids[i];
  104. add(record.uid, record.sid, groups);
  105. }
  106. sendMessageByGroup(this, route, msg, groups, opts, cb);
  107. };
  108. /**
  109. * Broadcast message to all the connected clients.
  110. *
  111. * @param {String} stype frontend server type string
  112. * @param {String} route route string
  113. * @param {Object} msg message
  114. * @param {Object} opts userstate-defined broadcast options, optional
  115. * opts.binded: push to binded sessions or all the sessions
  116. * opts.filterParam: parameters for broadcast filter.
  117. * @param {Function} cb callback
  118. * @memberOf ChannelService
  119. */
  120. ChannelService.prototype.broadcast = function(stype, route, msg, opts, cb) {
  121. var app = this.app;
  122. var namespace = 'sys';
  123. var service = 'channelRemote';
  124. var method = 'broadcast';
  125. var servers = app.getServersByType(stype);
  126. if(!servers || servers.length === 0) {
  127. // server list is empty
  128. utils.invokeCallback(cb);
  129. return;
  130. }
  131. var count = servers.length;
  132. var successFlag = false;
  133. var latch = countDownLatch.createCountDownLatch(count, function() {
  134. if(!successFlag) {
  135. utils.invokeCallback(cb, new Error('broadcast fails'));
  136. return;
  137. }
  138. utils.invokeCallback(cb, null);
  139. });
  140. var genCB = function(serverId) {
  141. return function(err) {
  142. if(err) {
  143. logger.error('[broadcast] fail to push message to serverId: ' + serverId + ', err:' + err.stack);
  144. latch.done();
  145. return;
  146. }
  147. successFlag = true;
  148. latch.done();
  149. };
  150. };
  151. var self = this;
  152. var sendMessage = function(serverId) {
  153. return (function() {
  154. if(serverId === app.serverId) {
  155. self.channelRemote[method](route, msg, opts, genCB());
  156. } else {
  157. app.rpcInvoke(serverId, {namespace: namespace, service: service,
  158. method: method, args: [route, msg, opts]}, genCB(serverId));
  159. }
  160. }());
  161. };
  162. opts = {type: 'broadcast', userOptions: opts || {}};
  163. // for compatiblity
  164. opts.isBroadcast = true;
  165. if(opts.userOptions) {
  166. opts.binded = opts.userOptions.binded;
  167. opts.filterParam = opts.userOptions.filterParam;
  168. }
  169. for(var i=0, l=count; i<l; i++) {
  170. sendMessage(servers[i].id);
  171. }
  172. };
  173. /**
  174. * Channel maintains the receiver collection for a subject. You can
  175. * add users into a channel and then broadcast message to them by channel.
  176. *
  177. * @class channel
  178. * @constructor
  179. */
  180. var Channel = function(name, service) {
  181. this.name = name;
  182. this.groups = {}; // group map for uids. key: sid, value: [uid]
  183. this.records = {}; // member records. key: uid
  184. this.__channelService__ = service;
  185. this.state = ST_INITED;
  186. this.userAmount =0;
  187. };
  188. /**
  189. * Add userstate to channel.
  190. *
  191. * @param {Number} uid userstate id
  192. * @param {String} sid frontend server id which userstate has connected to
  193. */
  194. Channel.prototype.add = function(uid, sid) {
  195. if(this.state > ST_INITED) {
  196. return false;
  197. } else {
  198. var res = add(uid, sid, this.groups);
  199. if(res) {
  200. this.records[uid] = {sid: sid, uid: uid};
  201. this.userAmount =this.userAmount+1;
  202. }
  203. addToStore(this.__channelService__, genKey(this.__channelService__, this.name), genValue(sid, uid));
  204. return res;
  205. }
  206. };
  207. /**
  208. * Remove userstate from channel.
  209. *
  210. * @param {Number} uid userstate id
  211. * @param {String} sid frontend server id which userstate has connected to.
  212. * @return [Boolean] true if success or false if fail
  213. */
  214. Channel.prototype.leave = function(uid, sid) {
  215. if(!uid || !sid) {
  216. return false;
  217. }
  218. delete this.records[uid];
  219. this.userAmount =this.userAmount-1;
  220. if(this.userAmount<0) this.userAmount=0;//robust
  221. removeFromStore(this.__channelService__, genKey(this.__channelService__, this.name), genValue(sid, uid));
  222. var res = deleteFrom(uid, sid, this.groups[sid]);
  223. if(this.groups[sid] && this.groups[sid].length === 0) {
  224. delete this.groups[sid];
  225. }
  226. return res;
  227. };
  228. /**
  229. * Get channel UserAmount in a channel.
  230. *
  231. * @return {number } channel member amount
  232. */
  233. Channel.prototype.getUserAmount = function() {
  234. return this.userAmount;
  235. };
  236. /**
  237. * Get channel members.
  238. *
  239. * <b>Notice:</b> Heavy operation.
  240. *
  241. * @return {Array} channel member uid list
  242. */
  243. Channel.prototype.getMembers = function() {
  244. var res = [], groups = this.groups;
  245. var group, i, l;
  246. for(var sid in groups) {
  247. group = groups[sid];
  248. for(i=0, l=group.length; i<l; i++) {
  249. res.push(group[i]);
  250. }
  251. }
  252. return res;
  253. };
  254. /**
  255. * Get Member info.
  256. *
  257. * @param {String} uid userstate id
  258. * @return {Object} member info
  259. */
  260. Channel.prototype.getMember = function(uid) {
  261. return this.records[uid];
  262. };
  263. /**
  264. * Destroy channel.
  265. */
  266. Channel.prototype.destroy = function() {
  267. this.state = ST_DESTROYED;
  268. this.__channelService__.destroyChannel(this.name);
  269. };
  270. /**
  271. * Push message to all the members in the channel
  272. *
  273. * @param {String} route message route
  274. * @param {Object} msg message that would be sent to client
  275. * @param {Object} opts userstate-defined push options, optional
  276. * @param {Function} cb callback function
  277. */
  278. Channel.prototype.pushMessage = function(route, msg, opts, cb) {
  279. if(this.state !== ST_INITED) {
  280. utils.invokeCallback(new Error('channel is not running now'));
  281. return;
  282. }
  283. if(typeof route !== 'string') {
  284. cb = opts;
  285. opts = msg;
  286. msg = route;
  287. route = msg.route;
  288. }
  289. if(!cb && typeof opts === 'function') {
  290. cb = opts;
  291. opts = {};
  292. }
  293. sendMessageByGroup(this.__channelService__, route, msg, this.groups, opts, cb);
  294. };
  295. /**
  296. * add uid and sid into group. ignore any uid that uid not specified.
  297. *
  298. * @param uid userstate id
  299. * @param sid server id
  300. * @param groups {Object} grouped uids, , key: sid, value: [uid]
  301. */
  302. var add = function(uid, sid, groups) {
  303. if(!sid) {
  304. logger.warn('ignore uid %j for sid not specified.', uid);
  305. return false;
  306. }
  307. var group = groups[sid];
  308. if(!group) {
  309. group = [];
  310. groups[sid] = group;
  311. }
  312. group.push(uid);
  313. return true;
  314. };
  315. /**
  316. * delete element from array
  317. */
  318. var deleteFrom = function(uid, sid, group) {
  319. if(!group) {
  320. return true;
  321. }
  322. for(var i=0, l=group.length; i<l; i++) {
  323. if(group[i] === uid) {
  324. group.splice(i, 1);
  325. return true;
  326. }
  327. }
  328. return false;
  329. };
  330. /**
  331. * push message by group
  332. *
  333. * @param route {String} route route message
  334. * @param msg {Object} message that would be sent to client
  335. * @param groups {Object} grouped uids, , key: sid, value: [uid]
  336. * @param opts {Object} push options
  337. * @param cb {Function} cb(err)
  338. *
  339. * @api private
  340. */
  341. var sendMessageByGroup = function(channelService, route, msg, groups, opts, cb) {
  342. var app = channelService.app;
  343. var namespace = 'sys';
  344. var service = 'channelRemote';
  345. var method = 'pushMessage';
  346. var count = utils.size(groups);
  347. var successFlag = false;
  348. var failIds = [];
  349. logger.debug('[%s] channelService sendMessageByGroup route: %s, msg: %j, groups: %j, opts: %j', app.serverId, route, msg, groups, opts);
  350. if(count === 0) {
  351. // group is empty
  352. utils.invokeCallback(cb);
  353. return;
  354. }
  355. var latch = countDownLatch.createCountDownLatch(count, function() {
  356. if(!successFlag) {
  357. utils.invokeCallback(cb, new Error('all uids push message fail'));
  358. return;
  359. }
  360. utils.invokeCallback(cb, null, failIds);
  361. });
  362. var rpcCB = function(serverId) {
  363. return function(err, fails) {
  364. if(err) {
  365. logger.error('[pushMessage] fail to dispatch msg to serverId: ' + serverId + ', err:' + err.stack);
  366. latch.done();
  367. return;
  368. }
  369. if(fails) {
  370. failIds = failIds.concat(fails);
  371. }
  372. successFlag = true;
  373. latch.done();
  374. };
  375. };
  376. opts = {type: 'push', userOptions: opts || {}};
  377. // for compatiblity
  378. opts.isPush = true;
  379. var sendMessage = function(sid) {
  380. return (function() {
  381. if(sid === app.serverId) {
  382. channelService.channelRemote[method](route, msg, groups[sid], opts, rpcCB(sid));
  383. } else {
  384. app.rpcInvoke(sid, {namespace: namespace, service: service,
  385. method: method, args: [route, msg, groups[sid], opts]}, rpcCB(sid));
  386. }
  387. })();
  388. };
  389. var group;
  390. for(var sid in groups) {
  391. group = groups[sid];
  392. if(group && group.length > 0) {
  393. sendMessage(sid);
  394. } else {
  395. // empty group
  396. process.nextTick(rpcCB(sid));
  397. }
  398. }
  399. };
  400. var restoreChannel = function(self, cb) {
  401. if(!self.store) {
  402. utils.invokeCallback(cb);
  403. return;
  404. } else {
  405. loadAllFromStore(self, genKey(self), function(err, list) {
  406. if(!!err) {
  407. utils.invokeCallback(cb, err);
  408. return;
  409. } else {
  410. if(!list.length || !Array.isArray(list)) {
  411. utils.invokeCallback(cb);
  412. return;
  413. }
  414. var load = function(key) {
  415. return (function() {
  416. loadAllFromStore(self, key, function(err, items) {
  417. for(var j=0; j<items.length; j++) {
  418. var array = items[j].split(':');
  419. var sid = array[0];
  420. var uid = array[1];
  421. var channel = self.channels[name];
  422. var res = add(uid, sid, channel.groups);
  423. if(res) {
  424. channel.records[uid] = {sid: sid, uid: uid};
  425. }
  426. }
  427. });
  428. })();
  429. };
  430. for(var i=0; i<list.length; i++) {
  431. var name = list[i].slice(genKey(self).length + 1);
  432. self.channels[name] = new Channel(name, self);
  433. load(list[i]);
  434. }
  435. utils.invokeCallback(cb);
  436. }
  437. });
  438. }
  439. };
  440. var addToStore = function(self, key, value) {
  441. if(!!self.store) {
  442. self.store.add(key, value, function(err) {
  443. if(!!err) {
  444. logger.error('add key: %s value: %s to store, with err: %j', key, value, err.stack);
  445. }
  446. });
  447. }
  448. };
  449. var removeFromStore = function(self, key, value) {
  450. if(!!self.store) {
  451. self.store.remove(key, value, function(err) {
  452. if(!!err) {
  453. logger.error('remove key: %s value: %s from store, with err: %j', key, value, err.stack);
  454. }
  455. });
  456. }
  457. };
  458. var loadAllFromStore = function(self, key, cb) {
  459. if(!!self.store) {
  460. self.store.load(key, function(err, list) {
  461. if(!!err) {
  462. logger.error('load key: %s from store, with err: %j', key, err.stack);
  463. utils.invokeCallback(cb, err);
  464. } else {
  465. utils.invokeCallback(cb, null, list);
  466. }
  467. });
  468. }
  469. };
  470. var removeAllFromStore = function(self, key) {
  471. if(!!self.store) {
  472. self.store.removeAll(key, function(err) {
  473. if(!!err) {
  474. logger.error('remove key: %s all members from store, with err: %j', key, err.stack);
  475. }
  476. });
  477. }
  478. };
  479. var genKey = function(self, name) {
  480. if(!!name) {
  481. return self.prefix + ':' + self.app.serverId + ':' + name;
  482. } else {
  483. return self.prefix + ':' + self.app.serverId;
  484. }
  485. };
  486. var genValue = function(sid, uid) {
  487. return sid + ':' + uid;
  488. };