buffer.js 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. var utils = require('../util/utils');
  2. var DEFAULT_FLUSH_INTERVAL = 20;
  3. var Service = function(app, opts) {
  4. if (!(this instanceof Service)) {
  5. return new Service(app, opts);
  6. }
  7. opts = opts || {};
  8. this.app = app;
  9. this.flushInterval = opts.flushInterval || DEFAULT_FLUSH_INTERVAL;
  10. this.sessions = {}; // sid -> msg queue
  11. this.tid = null;
  12. };
  13. module.exports = Service;
  14. Service.prototype.start = function(cb) {
  15. this.tid = setInterval(flush.bind(null, this), this.flushInterval);
  16. process.nextTick(function() {
  17. utils.invokeCallback(cb);
  18. });
  19. };
  20. Service.prototype.stop = function(force, cb) {
  21. if(this.tid) {
  22. clearInterval(this.tid);
  23. this.tid = null;
  24. }
  25. process.nextTick(function() {
  26. utils.invokeCallback(cb);
  27. });
  28. };
  29. Service.prototype.schedule = function(reqId, route, msg, recvs, opts, cb) {
  30. opts = opts || {};
  31. if(opts.type === 'broadcast') {
  32. doBroadcast(this, msg, opts.userOptions);
  33. } else {
  34. doBatchPush(this, msg, recvs);
  35. }
  36. process.nextTick(function() {
  37. utils.invokeCallback(cb);
  38. });
  39. };
  40. var doBroadcast = function(self, msg, opts) {
  41. var channelService = self.app.get('channelService');
  42. var sessionService = self.app.get('sessionService');
  43. if(opts.binded) {
  44. sessionService.forEachBindedSession(function(session) {
  45. if(channelService.broadcastFilter &&
  46. !channelService.broadcastFilter(session, msg, opts.filterParam)) {
  47. return;
  48. }
  49. enqueue(self, session, msg);
  50. });
  51. } else {
  52. sessionService.forEachSession(function(session) {
  53. if(channelService.broadcastFilter &&
  54. !channelService.broadcastFilter(session, msg, opts.filterParam)) {
  55. return;
  56. }
  57. enqueue(self, session, msg);
  58. });
  59. }
  60. };
  61. var doBatchPush = function(self, msg, recvs) {
  62. var sessionService = self.app.get('sessionService');
  63. var session;
  64. for(var i=0, l=recvs.length; i<l; i++) {
  65. session = sessionService.get(recvs[i]);
  66. if(session) {
  67. enqueue(self, session, msg);
  68. }
  69. }
  70. };
  71. var enqueue = function(self, session, msg) {
  72. var queue = self.sessions[session.id];
  73. if(!queue) {
  74. queue = self.sessions[session.id] = [];
  75. session.once('closed', onClose.bind(null, self));
  76. }
  77. queue.push(msg);
  78. };
  79. var onClose = function(self, session) {
  80. delete self.sessions[session.id];
  81. };
  82. var flush = function(self) {
  83. var sessionService = self.app.get('sessionService');
  84. var queue, session;
  85. for(var sid in self.sessions) {
  86. session = sessionService.get(sid);
  87. if(!session) {
  88. continue;
  89. }
  90. queue = self.sessions[sid];
  91. if(!queue || queue.length === 0) {
  92. continue;
  93. }
  94. session.sendBatch(queue);
  95. self.sessions[sid] = [];
  96. }
  97. };