connector.js 9.9 KB


  1. var logger = require('pomelo-logger').getLogger('pomelo', __filename);
  2. var taskManager = require('../common/manager/taskManager');
  3. var pomelo = require('../pomelo');
  4. var rsa = require("node-bignumber");
  5. var events = require('../util/events');
  6. var utils = require('../util/utils');
  7. module.exports = function(app, opts) {
  8. return new Component(app, opts);
  9. };
  10. /**
  11. * Connector component. Receive client requests and attach session with socket.
  12. *
  13. * @param {Object} app current application context
  14. * @param {Object} opts attach parameters
  15. * opts.connector {Object} provides low level network and protocol details implementation between server and clients.
  16. */
  17. var Component = function(app, opts) {
  18. opts = opts || {};
  19. this.app = app;
  20. this.connector = getConnector(app, opts);
  21. this.encode = opts.encode;
  22. this.decode = opts.decode;
  23. this.useCrypto = opts.useCrypto;
  24. this.blacklistFun = opts.blacklistFun;
  25. this.keys = {};
  26. this.blacklist = [];
  27. if(opts.useDict) {
  28. app.load(pomelo.dictionary, app.get('dictionaryConfig'));
  29. }
  30. if(opts.useProtobuf) {
  31. app.load(pomelo.protobuf, app.get('protobufConfig'));
  32. }
  33. // component dependencies
  34. this.server = null;
  35. this.session = null;
  36. this.connection = null;
  37. };
  38. var pro = Component.prototype;
  39. pro.name = '__connector__';
  40. pro.start = function(cb) {
  41. this.server = this.app.components.__server__;
  42. this.session = this.app.components.__session__;
  43. this.connection = this.app.components.__connection__;
  44. // check component dependencies
  45. if(!this.server) {
  46. process.nextTick(function() {
  47. utils.invokeCallback(cb, new Error('fail to start connector component for no server component loaded'));
  48. });
  49. return;
  50. }
  51. if(!this.session) {
  52. process.nextTick(function() {
  53. utils.invokeCallback(cb, new Error('fail to start connector component for no session component loaded'));
  54. });
  55. return;
  56. }
  57. process.nextTick(cb);
  58. };
  59. pro.afterStart = function(cb) {
  60. this.connector.start(cb);
  61. this.connector.on('connection', hostFilter.bind(this, bindEvents));
  62. };
  63. pro.stop = function(force, cb) {
  64. if(this.connector) {
  65. this.connector.stop(force, cb);
  66. this.connector = null;
  67. return;
  68. }
  69. process.nextTick(cb);
  70. };
  71. pro.send = function(reqId, route, msg, recvs, opts, cb) {
  72. logger.debug('[%s] send message reqId: %s, route: %s, msg: %j, receivers: %j, opts: %j', this.app.serverId, reqId, route, msg, recvs, opts);
  73. var emsg = msg;
  74. if(this.encode) {
  75. // use costumized encode
  76. emsg = this.encode.call(this, reqId, route, msg);
  77. } else if(this.connector.encode) {
  78. // use connector default encode
  79. emsg = this.connector.encode(reqId, route, msg);
  80. }
  81. if(!emsg) {
  82. process.nextTick(function() {
  83. utils.invokeCallback(cb, new Error('fail to send message for encode result is empty.'));
  84. return;
  85. });
  86. }
  87. this.app.components.__pushScheduler__.schedule(reqId, route, emsg,
  88. recvs, opts, cb);
  89. };
  90. pro.setPubKey = function(id, key) {
  91. var pubKey = new rsa.Key();
  92. pubKey.n = new rsa.BigInteger(key.rsa_n, 16);
  93. pubKey.e = key.rsa_e;
  94. this.keys[id] = pubKey;
  95. };
  96. pro.getPubKey = function(id) {
  97. return this.keys[id];
  98. };
  99. var getConnector = function(app, opts) {
  100. var connector = opts.connector;
  101. if(!connector) {
  102. return getDefaultConnector(app, opts);
  103. }
  104. if(typeof connector !== 'function') {
  105. return connector;
  106. }
  107. var curServer = app.getCurServer();
  108. return connector(curServer.clientPort, curServer.host, opts);
  109. };
  110. var getDefaultConnector = function(app, opts) {
  111. var DefaultConnector = require('../connectors/sioconnector');
  112. var curServer = app.getCurServer();
  113. return new DefaultConnector(curServer.clientPort, curServer.host, opts);
  114. };
  115. var hostFilter = function(cb, socket) {
  116. var ip = socket.remoteAddress.ip;
  117. var check = function(list) {
  118. for(var address in list) {
  119. var exp = new RegExp(list[address]);
  120. if(exp.test(ip)) {
  121. socket.disconnect();
  122. return true;
  123. }
  124. }
  125. return false;
  126. };
  127. // dynamical check
  128. if(this.blacklist.length !== 0 && !!check(this.blacklist)) {
  129. return;
  130. }
  131. // static check
  132. if(!!this.blacklistFun && typeof this.blacklistFun === 'function') {
  133. var self = this;
  134. self.blacklistFun(function(err, list) {
  135. if(!!err) {
  136. logger.error('connector blacklist error: %j', err.stack);
  137. utils.invokeCallback(cb, self, socket);
  138. return;
  139. }
  140. if(!Array.isArray(list)) {
  141. logger.error('connector blacklist is not array: %j', list);
  142. utils.invokeCallback(cb, self, socket);
  143. return;
  144. }
  145. if(!!check(list)) {
  146. return;
  147. } else {
  148. utils.invokeCallback(cb, self, socket);
  149. return;
  150. }
  151. });
  152. } else {
  153. utils.invokeCallback(cb, this, socket);
  154. }
  155. };
  156. var bindEvents = function(self, socket) {
  157. if(self.connection) {
  158. self.connection.increaseConnectionCount();
  159. var statisticInfo = self.connection.getStatisticsInfo();
  160. var curServer = self.app.getCurServer();
  161. if(statisticInfo.totalConnCount > curServer['max-connections']) {
  162. logger.warn('the server %s has reached the max connections %s', curServer.id, curServer['max-connections']);
  163. socket.disconnect();
  164. return;
  165. }
  166. }
  167. //create session for connection
  168. var session = getSession(self, socket);
  169. var closed = false;
  170. socket.on('disconnect', function() {
  171. if(closed) {
  172. return;
  173. }
  174. closed = true;
  175. if(self.connection) {
  176. self.connection.decreaseConnectionCount(session.uid);
  177. }
  178. });
  179. socket.on('error', function() {
  180. if(closed) {
  181. return;
  182. }
  183. closed = true;
  184. if(self.connection) {
  185. self.connection.decreaseConnectionCount(session.uid);
  186. }
  187. });
  188. // new message
  189. socket.on('message', function(msg) {
  190. var dmsg = msg;
  191. if(self.decode) {
  192. dmsg = self.decode(msg);
  193. } else if(self.connector.decode) {
  194. dmsg = self.connector.decode(msg);
  195. }
  196. if(!dmsg) {
  197. // discard invalid message
  198. return;
  199. }
  200. // use rsa crypto
  201. if(self.useCrypto) {
  202. var verified = verifyMessage(self, session, dmsg);
  203. if(!verified) {
  204. logger.error('fail to verify the data received from client.');
  205. return;
  206. }
  207. }
  208. handleMessage(self, session, dmsg);
  209. }); //on message end
  210. };
  211. /**
  212. * get session for current connection
  213. */
  214. var getSession = function(self, socket) {
  215. var app = self.app, sid = socket.id;
  216. var session = self.session.get(sid);
  217. if(session) {
  218. return session;
  219. }
  220. session = self.session.create(sid, app.getServerId(), socket);
  221. logger.debug('[%s] getSession session is created with session id: %s', app.getServerId(), sid);
  222. // bind events for session
  223. socket.on('disconnect', session.closed.bind(session));
  224. socket.on('error', session.closed.bind(session));
  225. session.on('closed', onSessionClose.bind(null, app));
  226. session.on('bind', function(uid) {
  227. logger.debug('session on [%s] bind with uid: %s', self.app.serverId, uid);
  228. // update connection statistics if necessary
  229. if(self.connection) {
  230. self.connection.addLoginedUser(uid, {
  231. loginTime: Date.now(),
  232. uid: uid,
  233. address: socket.remoteAddress.ip + ':' + socket.remoteAddress.port
  234. });
  235. }
  236. self.app.event.emit(events.BIND_SESSION, session);
  237. });
  238. session.on('unbind', function(uid) {
  239. if(self.connection) {
  240. self.connection.removeLoginedUser(uid);
  241. }
  242. self.app.event.emit(events.UNBIND_SESSION, session);
  243. });
  244. return session;
  245. };
  246. var onSessionClose = function(app, session, reason) {
  247. taskManager.closeQueue(session.id, true);
  248. app.event.emit(events.CLOSE_SESSION, session);
  249. };
  250. var handleMessage = function(self, session, msg) {
  251. logger.debug('[%s] handleMessage session id: %s, msg: %j', self.app.serverId, session.id, msg);
  252. var type = checkServerType(msg.route);
  253. if(!type) {
  254. logger.error('invalid route string. route : %j', msg.route);
  255. return;
  256. }
  257. self.server.globalHandle(msg, session.toFrontendSession(), function(err, resp, opts) {
  258. if(resp && !msg.id) {
  259. logger.warn('try to response to a notify: %j', msg.route);
  260. return;
  261. }
  262. if (!msg.id && !resp) return;
  263. if (!resp) resp = {};
  264. if (!!err){
  265. resp.code = 500;
  266. }
  267. opts = {type: 'response', userOptions: opts || {}};
  268. // for compatiablity
  269. opts.isResponse = true;
  270. self.send(msg.id, msg.route, resp, [session.id], opts,
  271. function() {
  272. /**
  273. * todo ��ʱ�޸ĵ�rpc�����쳣��ʱ��Դ�����ߵ��ͻ���,
  274. * �д��Ժ��ҵ������ŵķ�����ʵ��������� by feitianbubu 2015��11��2�� 23:04:04
  275. */
  276. //if(!!err){
  277. // session.closed('session close by error');
  278. //}
  279. });
  280. });
  281. };
  282. /**
  283. * Get server type form request message.
  284. */
  285. var checkServerType = function (route) {
  286. if(!route) {
  287. return null;
  288. }
  289. var idx = route.indexOf('.');
  290. if(idx < 0) {
  291. return null;
  292. }
  293. return route.substring(0, idx);
  294. };
  295. var verifyMessage = function (self, session, msg) {
  296. var sig = msg.body.__crypto__;
  297. if(!sig) {
  298. logger.error('receive data from client has no signature [%s]', self.app.serverId);
  299. return false;
  300. }
  301. var pubKey;
  302. if(!session) {
  303. logger.error('could not find session.');
  304. return false;
  305. }
  306. if(!session.get('pubKey')) {
  307. pubKey = self.getPubKey(session.id);
  308. if(!!pubKey) {
  309. delete self.keys[session.id];
  310. session.set('pubKey', pubKey);
  311. }
  312. else {
  313. logger.error('could not get public key, session id is %s', session.id);
  314. return false;
  315. }
  316. }
  317. else {
  318. pubKey = session.get('pubKey');
  319. }
  320. if(!pubKey.n || !pubKey.e) {
  321. logger.error('could not verify message without public key [%s]', self.app.serverId);
  322. return false;
  323. }
  324. delete msg.body.__crypto__;
  325. var message = JSON.stringify(msg.body);
  326. if(utils.hasChineseChar(message))
  327. message = utils.unicodeToUtf8(JSON.stringify(msg.body));
  328. return pubKey.verifyString(message, sig);
  329. };