var countDownLatch = require('../../util/countDownLatch'); var utils = require('../../util/utils'); var ChannelRemote = require('../remote/frontend/channelRemote'); var logger = require('pomelo-logger').getLogger('pomelo', __filename); /** * constant */ var ST_INITED = 0; var ST_DESTROYED = 1; /** * Create and maintain channels for server local. * * ChannelService is created by channel component which is a default loaded * component of pomelo and channel service would be accessed by `app.get('channelService')`. * * @class * @constructor */ var ChannelService = function(app, opts) { opts = opts || {}; this.app = app; this.channels = {}; this.prefix = opts.prefix; this.store = opts.store; this.broadcastFilter = opts.broadcastFilter; this.channelRemote = new ChannelRemote(app); }; module.exports = ChannelService; ChannelService.prototype.start = function(cb) { restoreChannel(this, cb); }; /** * Create channel with name. * * @param {String} name channel's name * @memberOf ChannelService */ ChannelService.prototype.createChannel = function(name) { if(this.channels[name]) { return this.channels[name]; } var c = new Channel(name, this); addToStore(this, genKey(this), genKey(this, name)); this.channels[name] = c; return c; }; /** * Get channel by name. * * @param {String} name channel's name * @param {Boolean} create if true, create channel * @return {Channel} * @memberOf ChannelService */ ChannelService.prototype.getChannel = function(name, create) { var channel = this.channels[name]; if(!channel && !!create) { channel = this.channels[name] = new Channel(name, this); addToStore(this, genKey(this), genKey(this, name)); } return channel; }; /** * Destroy channel by name. * * @param {String} name channel name * @memberOf ChannelService */ ChannelService.prototype.destroyChannel = function(name) { delete this.channels[name]; removeFromStore(this, genKey(this), genKey(this, name)); removeAllFromStore(this, genKey(this, name)); }; /** * Push message by uids. * Group the uids by group. ignore any uid if sid not specified. * * @param {String} route message route * @param {Object} msg message that would be sent to client * @param {Array} uids the receiver info list, [{uid: userId, sid: frontendServerId}] * @param {Object} opts userstate-defined push options, optional * @param {Function} cb cb(err) * @memberOf ChannelService */ ChannelService.prototype.pushMessageByUids = function(route, msg, uids, opts, cb) { if(typeof route !== 'string') { cb = opts; opts = uids; uids = msg; msg = route; route = msg.route; } if(!cb && typeof opts === 'function') { cb = opts; opts = {}; } if(!uids || uids.length === 0) { utils.invokeCallback(cb, new Error('uids should not be empty')); return; } var groups = {}, record; for(var i=0, l=uids.length; i ST_INITED) { return false; } else { var res = add(uid, sid, this.groups); if(res) { this.records[uid] = {sid: sid, uid: uid}; this.userAmount =this.userAmount+1; } addToStore(this.__channelService__, genKey(this.__channelService__, this.name), genValue(sid, uid)); return res; } }; /** * Remove userstate from channel. * * @param {Number} uid userstate id * @param {String} sid frontend server id which userstate has connected to. * @return [Boolean] true if success or false if fail */ Channel.prototype.leave = function(uid, sid) { if(!uid || !sid) { return false; } delete this.records[uid]; this.userAmount =this.userAmount-1; if(this.userAmount<0) this.userAmount=0;//robust removeFromStore(this.__channelService__, genKey(this.__channelService__, this.name), genValue(sid, uid)); var res = deleteFrom(uid, sid, this.groups[sid]); if(this.groups[sid] && this.groups[sid].length === 0) { delete this.groups[sid]; } return res; }; /** * Get channel UserAmount in a channel. * * @return {number } channel member amount */ Channel.prototype.getUserAmount = function() { return this.userAmount; }; /** * Get channel members. * * Notice: Heavy operation. * * @return {Array} channel member uid list */ Channel.prototype.getMembers = function() { var res = [], groups = this.groups; var group, i, l; for(var sid in groups) { group = groups[sid]; for(i=0, l=group.length; i 0) { sendMessage(sid); } else { // empty group process.nextTick(rpcCB(sid)); } } }; var restoreChannel = function(self, cb) { if(!self.store) { utils.invokeCallback(cb); return; } else { loadAllFromStore(self, genKey(self), function(err, list) { if(!!err) { utils.invokeCallback(cb, err); return; } else { if(!list.length || !Array.isArray(list)) { utils.invokeCallback(cb); return; } var load = function(key) { return (function() { loadAllFromStore(self, key, function(err, items) { for(var j=0; j