pomelo-client.js 8.7 KB


  1. /**
  2. * Module dependencies
  3. */
  4. var EventEmitter = require('events').EventEmitter;
  5. var util = require('util');
  6. var Protocol = require('./protocol');
  7. var Protobuf = require('./protobuf');
  8. var Package = Protocol.Package;
  9. var Message = Protocol.Message;
  10. var wsClient = require('websocket').client;
  11. var JS_WS_CLIENT_TYPE = 'js-websocket';
  12. var JS_WS_CLIENT_VERSION = '0.0.1';
  13. var RES_OK = 200;
  14. var RES_OLD_CLIENT = 501;
  15. var ePomelo = function (opts) {
  16. EventEmitter.call(this);
  17. this._socket = null;
  18. this._id = 1;
  19. this._callbacks = {};
  20. this._routeMap = {};
  21. this.dict = {};
  22. this.abbrs = {};
  23. this.protoVersion = 0;
  24. this.serverProtos = {};
  25. this.clientProtos = {};
  26. this.handshakeBuffer = {};
  27. this.heartbeatInterval = 0;
  28. this.heartbeatTimeout = 0;
  29. this.nextHeartbeatTimeout = 0;
  30. this.gapThreshold = 100; // heartbeat gap threashold
  31. this.heartbeatId = null;
  32. this.heartbeatTimeoutId = null;
  33. this.initCallback = null;
  34. };
  35. util.inherits(ePomelo, EventEmitter);
  36. module.exports = ePomelo;
  37. var pomelo = ePomelo.prototype;
  38. pomelo.init = function(params, cb) {
  39. var self = this;
  40. self.handshakeBuffer = {
  41. 'sys': {
  42. type: JS_WS_CLIENT_TYPE,
  43. version: JS_WS_CLIENT_VERSION,
  44. protoVersion: 0,
  45. rsa: {}
  46. },
  47. 'user': {
  48. }
  49. };
  50. if(!!cb && typeof cb === 'function') {
  51. self.initCallback = cb;
  52. }
  53. var client = new wsClient();
  54. client.connect({protocol:"ws:", hostname: params.host, port:params.port});
  55. client.on('connect', function (socket) { //socket 其实就是WebScoketConnection对象
  56. var obj = Package.encode(Package.TYPE_HANDSHAKE, Protocol.strencode(JSON.stringify(self.handshakeBuffer)));
  57. socket.send(obj);
  58. socket.on('message', function(msg){
  59. processPackage(self, Package.decode(msg.binaryData));
  60. });
  61. socket.on('error', function(msg){
  62. console.log('socket error', msg);
  63. self.emit('error', msg);
  64. });
  65. socket.on('close', function(msg){
  66. console.log('socket close', msg);
  67. self.emit('close', msg);
  68. });
  69. self._socket = socket;
  70. });
  71. }
  72. pomelo.request = function(route, msg, cb) {
  73. var self = this;
  74. if(arguments.length === 2 && typeof msg === 'function') {
  75. cb = msg;
  76. msg = {};
  77. } else {
  78. msg = msg || {};
  79. }
  80. route = route || msg.route;
  81. if(!route) {
  82. return;
  83. }
  84. var reqId = self._id++;
  85. sendMessage(self, reqId, route, msg);
  86. self._callbacks[reqId] = cb;
  87. self._routeMap[reqId] = route;
  88. };
  89. pomelo.notify = function (route, msg) {
  90. this.request(route, msg);
  91. };
  92. pomelo.disconnect = function() {
  93. var self = this;
  94. self._socket.emit('end');
  95. self._socket.close();
  96. if(self.heartbeatId) {
  97. clearTimeout(self.heartbeatId);
  98. self.heartbeatId = null;
  99. }
  100. if(self.heartbeatTimeoutId) {
  101. clearTimeout(self.heartbeatTimeoutId);
  102. self.heartbeatTimeoutId = null;
  103. }
  104. };
  105. /*
  106. * handshake、heartbeat、onData、onKick四个函数对应于与服务通信中不同的事件处理
  107. */
  108. var handshake = function(pomelo, data){
  109. data = JSON.parse(Protocol.strdecode(data));
  110. if(data.code === RES_OLD_CLIENT) {
  111. pomelo.emit('error', 'client version not fullfill');
  112. return;
  113. }
  114. if(data.code !== RES_OK) {
  115. pomelo.emit('error', 'handshake fail');
  116. return;
  117. }
  118. if(data.sys && data.sys.heartbeat) {
  119. pomelo.heartbeatInterval = data.sys.heartbeat * 1000; // heartbeat interval
  120. pomelo.heartbeatTimeout = pomelo.heartbeatInterval * 2; // max heartbeat timeout
  121. } else {
  122. pomelo.heartbeatInterval = 0;
  123. pomelo.heartbeatTimeout = 0;
  124. }
  125. //Init compress dict 保存全局的dict信息
  126. pomelo.dict = data.sys.dict;
  127. if(pomelo.dict){
  128. pomelo.abbrs = {};
  129. for(var route in pomelo.dict){ //保存dict的映射信息
  130. pomelo.abbrs[pomelo.dict[route]] = route;
  131. }
  132. }
  133. //Init protobuf protos
  134. var protos = data.sys.protos;
  135. if(protos){
  136. pomelo.protoVersion = protos.version || 0;
  137. pomelo.serverProtos = protos.server || {};
  138. pomelo.clientProtos = protos.client || {};
  139. Protobuf.init({encoderProtos: protos.client, decoderProtos: protos.server});
  140. }
  141. var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK);
  142. pomelo._socket.send(obj);
  143. if(!!pomelo.initCallback && 'function' === typeof pomelo.initCallback) { //注意:只所以初使化的回调在这里返回,是为了保证C-S连接全部完成之后,client才可以请求
  144. pomelo.initCallback();
  145. }
  146. };
  147. var heartbeat = function(pomelo, data) {
  148. if(!pomelo.heartbeatInterval) {
  149. // no heartbeat
  150. return;
  151. }
  152. var obj = Package.encode(Package.TYPE_HEARTBEAT);
  153. if(pomelo.heartbeatTimeoutId) {
  154. clearTimeout(pomelo.heartbeatTimeoutId);
  155. pomelo.heartbeatTimeoutId = null;
  156. }
  157. if(pomelo.heartbeatId) {
  158. // already in a heartbeat interval
  159. return;
  160. }
  161. pomelo.heartbeatId = setTimeout(function() {
  162. pomelo.heartbeatId = null;
  163. pomelo._socket.send(obj);
  164. pomelo.nextHeartbeatTimeout = Date.now() + pomelo.heartbeatTimeout;
  165. pomelo.heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb.bind(null, pomelo), pomelo.heartbeatTimeout);
  166. }, pomelo.heartbeatInterval);
  167. };
  168. var heartbeatTimeoutCb = function(pomelo) {
  169. var gap = pomelo.nextHeartbeatTimeout - Date.now();
  170. if(gap > pomelo.gapThreshold) {
  171. pomelo.heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb.bind(null, pomelo), gap);
  172. } else {
  173. pomelo.emit('heartbeat timeout');
  174. pomelo.disconnect();
  175. }
  176. }
  177. var onData = function(pomelo, data) {
  178. var msg = data;
  179. msg = defaultDecode(pomelo, msg);
  180. processMessage(pomelo, msg);
  181. };
  182. var onKick = function(pomelo, data) {
  183. data = JSON.parse(Protocol.strdecode(data));
  184. pomelo.emit('onKick', data);
  185. };
  186. var handlers = {};
  187. handlers[Package.TYPE_HANDSHAKE] = handshake;
  188. handlers[Package.TYPE_HEARTBEAT] = heartbeat;
  189. handlers[Package.TYPE_DATA] = onData;
  190. handlers[Package.TYPE_KICK] = onKick;
  191. /*
  192. * processPackage、processMessage 两函数处理服务器返回的消息
  193. */
  194. var processPackage = function(pomelo, msgs) {
  195. if(Array.isArray(msgs)) {
  196. for(var i=0; i<msgs.length; i++) {
  197. var msg = msgs[i];
  198. handlers[msg.type](pomelo, msg.body);
  199. }
  200. } else {
  201. handlers[msgs.type](pomelo, msgs.body);
  202. }
  203. };
  204. var processMessage = function(pomelo, msg) {
  205. if(!msg.id) {
  206. // server push message
  207. //pomelo.emit(msg.route, msg.body);
  208. pomelo.emit('processMessage', msg);
  209. return;
  210. }
  211. //if have a id then find the callback function with the request
  212. var cb = pomelo._callbacks[msg.id];
  213. delete pomelo._callbacks[msg.id];
  214. if(typeof cb !== 'function') {
  215. return;
  216. }
  217. cb(msg.body);
  218. return;
  219. };
  220. /*
  221. * sendMessage发送给服务器
  222. */
  223. var sendMessage = function(pomelo, reqId, route, msg) {
  224. /* 暂时不处理Crypto
  225. if(useCrypto) {
  226. msg = JSON.stringify(msg);
  227. var sig = rsa.signString(msg, "sha256");
  228. msg = JSON.parse(msg);
  229. msg['__crypto__'] = sig;
  230. }*/
  231. msg = defaultEncode(pomelo, reqId, route, msg);
  232. var packet = Package.encode(Package.TYPE_DATA, msg);
  233. pomelo._socket.send(packet);
  234. };
  235. var defaultEncode = function(pomelo, reqId, route, msg) {
  236. var type = reqId ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY;
  237. //compress message by protobuf
  238. if(pomelo.clientProtos && pomelo.clientProtos[route]) {
  239. msg = Protobuf.encode(route, msg);
  240. } else {
  241. msg = Protocol.strencode(JSON.stringify(msg));
  242. }
  243. var compressRoute = 0;
  244. if(pomelo.dict && pomelo.dict[route]) {
  245. route = pomelo.dict[route];
  246. compressRoute = 1;
  247. }
  248. return Message.encode(reqId, type, compressRoute, route, msg);
  249. };
  250. var defaultDecode = function(pomelo, data) {
  251. //probuff decode
  252. var msg = Message.decode(data);
  253. if(msg.id > 0){
  254. msg.route = pomelo._routeMap[msg.id];
  255. delete pomelo._routeMap[msg.id];
  256. if(!msg.route){
  257. return;
  258. }
  259. }
  260. msg.body = deCompose(pomelo, msg);
  261. return msg;
  262. };
  263. var deCompose = function(pomelo, msg){
  264. var route = msg.route;
  265. //Decompose route from dict
  266. if(msg.compressRoute) {
  267. if(!pomelo.abbrs[route]){
  268. return {};
  269. }
  270. route = msg.route = pomelo.abbrs[route];
  271. }
  272. if(pomelo.serverProtos && pomelo.serverProtos[route]){
  273. return Protobuf.decode(route, msg.body);
  274. }else{
  275. return JSON.parse(Protocol.strdecode(msg.body));
  276. }
  277. return msg;
  278. };