|
- /**
- * Module dependencies
- */
- var EventEmitter = require('events').EventEmitter;
- var util = require('util');
- var Protocol = require('./protocol');
- var Protobuf = require('./protobuf');
- var Package = Protocol.Package;
- var Message = Protocol.Message;
- var wsClient = require('websocket').client;
- var JS_WS_CLIENT_TYPE = 'js-websocket';
- var JS_WS_CLIENT_VERSION = '0.0.1';
- var RES_OK = 200;
- var RES_OLD_CLIENT = 501;
- var ePomelo = function (opts) {
- EventEmitter.call(this);
- this._socket = null;
- this._id = 1;
- this._callbacks = {};
- this._routeMap = {};
- this.dict = {};
- this.abbrs = {};
- this.protoVersion = 0;
- this.serverProtos = {};
- this.clientProtos = {};
- this.handshakeBuffer = {};
- this.heartbeatInterval = 0;
- this.heartbeatTimeout = 0;
- this.nextHeartbeatTimeout = 0;
- this.gapThreshold = 100; // heartbeat gap threashold
- this.heartbeatId = null;
- this.heartbeatTimeoutId = null;
- this.initCallback = null;
- };
- util.inherits(ePomelo, EventEmitter);
- module.exports = ePomelo;
- var pomelo = ePomelo.prototype;
- pomelo.init = function(params, cb) {
- var self = this;
- self.handshakeBuffer = {
- 'sys': {
- type: JS_WS_CLIENT_TYPE,
- version: JS_WS_CLIENT_VERSION,
- protoVersion: 0,
- rsa: {}
- },
- 'user': {
- }
- };
- if(!!cb && typeof cb === 'function') {
- self.initCallback = cb;
- }
- var client = new wsClient();
- client.connect({protocol:"ws:", hostname: params.host, port:params.port});
- client.on('connect', function (socket) { //socket 其实就是WebScoketConnection对象
- var obj = Package.encode(Package.TYPE_HANDSHAKE, Protocol.strencode(JSON.stringify(self.handshakeBuffer)));
- socket.send(obj);
- socket.on('message', function(msg){
- processPackage(self, Package.decode(msg.binaryData));
- });
- socket.on('error', function(msg){
- console.log('socket error', msg);
- self.emit('error', msg);
- });
- socket.on('close', function(msg){
- console.log('socket close', msg);
- self.emit('close', msg);
- });
- self._socket = socket;
- });
- }
- pomelo.request = function(route, msg, cb) {
- var self = this;
- if(arguments.length === 2 && typeof msg === 'function') {
- cb = msg;
- msg = {};
- } else {
- msg = msg || {};
- }
- route = route || msg.route;
- if(!route) {
- return;
- }
- var reqId = self._id++;
- sendMessage(self, reqId, route, msg);
- self._callbacks[reqId] = cb;
- self._routeMap[reqId] = route;
- };
- pomelo.notify = function (route, msg) {
- this.request(route, msg);
- };
- pomelo.disconnect = function() {
- var self = this;
- self._socket.emit('end');
- self._socket.close();
- if(self.heartbeatId) {
- clearTimeout(self.heartbeatId);
- self.heartbeatId = null;
- }
- if(self.heartbeatTimeoutId) {
- clearTimeout(self.heartbeatTimeoutId);
- self.heartbeatTimeoutId = null;
- }
- };
- /*
- * handshake、heartbeat、onData、onKick四个函数对应于与服务通信中不同的事件处理
- */
- var handshake = function(pomelo, data){
- data = JSON.parse(Protocol.strdecode(data));
- if(data.code === RES_OLD_CLIENT) {
- pomelo.emit('error', 'client version not fullfill');
- return;
- }
- if(data.code !== RES_OK) {
- pomelo.emit('error', 'handshake fail');
- return;
- }
- if(data.sys && data.sys.heartbeat) {
- pomelo.heartbeatInterval = data.sys.heartbeat * 1000; // heartbeat interval
- pomelo.heartbeatTimeout = pomelo.heartbeatInterval * 2; // max heartbeat timeout
- } else {
- pomelo.heartbeatInterval = 0;
- pomelo.heartbeatTimeout = 0;
- }
- //Init compress dict 保存全局的dict信息
- pomelo.dict = data.sys.dict;
- if(pomelo.dict){
- pomelo.abbrs = {};
- for(var route in pomelo.dict){ //保存dict的映射信息
- pomelo.abbrs[pomelo.dict[route]] = route;
- }
- }
- //Init protobuf protos
- var protos = data.sys.protos;
- if(protos){
- pomelo.protoVersion = protos.version || 0;
- pomelo.serverProtos = protos.server || {};
- pomelo.clientProtos = protos.client || {};
- Protobuf.init({encoderProtos: protos.client, decoderProtos: protos.server});
- }
- var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK);
- pomelo._socket.send(obj);
- if(!!pomelo.initCallback && 'function' === typeof pomelo.initCallback) { //注意:只所以初使化的回调在这里返回,是为了保证C-S连接全部完成之后,client才可以请求
- pomelo.initCallback();
- }
- };
- var heartbeat = function(pomelo, data) {
- if(!pomelo.heartbeatInterval) {
- // no heartbeat
- return;
- }
- var obj = Package.encode(Package.TYPE_HEARTBEAT);
- if(pomelo.heartbeatTimeoutId) {
- clearTimeout(pomelo.heartbeatTimeoutId);
- pomelo.heartbeatTimeoutId = null;
- }
- if(pomelo.heartbeatId) {
- // already in a heartbeat interval
- return;
- }
- pomelo.heartbeatId = setTimeout(function() {
- pomelo.heartbeatId = null;
- pomelo._socket.send(obj);
- pomelo.nextHeartbeatTimeout = Date.now() + pomelo.heartbeatTimeout;
- pomelo.heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb.bind(null, pomelo), pomelo.heartbeatTimeout);
- }, pomelo.heartbeatInterval);
- };
- var heartbeatTimeoutCb = function(pomelo) {
- var gap = pomelo.nextHeartbeatTimeout - Date.now();
- if(gap > pomelo.gapThreshold) {
- pomelo.heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb.bind(null, pomelo), gap);
- } else {
- pomelo.emit('heartbeat timeout');
- pomelo.disconnect();
- }
- }
- var onData = function(pomelo, data) {
- var msg = data;
- msg = defaultDecode(pomelo, msg);
- processMessage(pomelo, msg);
- };
- var onKick = function(pomelo, data) {
- data = JSON.parse(Protocol.strdecode(data));
- pomelo.emit('onKick', data);
- };
- var handlers = {};
- handlers[Package.TYPE_HANDSHAKE] = handshake;
- handlers[Package.TYPE_HEARTBEAT] = heartbeat;
- handlers[Package.TYPE_DATA] = onData;
- handlers[Package.TYPE_KICK] = onKick;
- /*
- * processPackage、processMessage 两函数处理服务器返回的消息
- */
- var processPackage = function(pomelo, msgs) {
- if(Array.isArray(msgs)) {
- for(var i=0; i<msgs.length; i++) {
- var msg = msgs[i];
- handlers[msg.type](pomelo, msg.body);
- }
- } else {
- handlers[msgs.type](pomelo, msgs.body);
- }
- };
- var processMessage = function(pomelo, msg) {
- if(!msg.id) {
- // server push message
- //pomelo.emit(msg.route, msg.body);
- pomelo.emit('processMessage', msg);
- return;
- }
- //if have a id then find the callback function with the request
- var cb = pomelo._callbacks[msg.id];
- delete pomelo._callbacks[msg.id];
- if(typeof cb !== 'function') {
- return;
- }
- cb(msg.body);
- return;
- };
- /*
- * sendMessage发送给服务器
- */
- var sendMessage = function(pomelo, reqId, route, msg) {
- /* 暂时不处理Crypto
- if(useCrypto) {
- msg = JSON.stringify(msg);
- var sig = rsa.signString(msg, "sha256");
- msg = JSON.parse(msg);
- msg['__crypto__'] = sig;
- }*/
- msg = defaultEncode(pomelo, reqId, route, msg);
- var packet = Package.encode(Package.TYPE_DATA, msg);
- pomelo._socket.send(packet);
- };
- var defaultEncode = function(pomelo, reqId, route, msg) {
- var type = reqId ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY;
- //compress message by protobuf
- if(pomelo.clientProtos && pomelo.clientProtos[route]) {
- msg = Protobuf.encode(route, msg);
- } else {
- msg = Protocol.strencode(JSON.stringify(msg));
- }
- var compressRoute = 0;
- if(pomelo.dict && pomelo.dict[route]) {
- route = pomelo.dict[route];
- compressRoute = 1;
- }
- return Message.encode(reqId, type, compressRoute, route, msg);
- };
- var defaultDecode = function(pomelo, data) {
- //probuff decode
- var msg = Message.decode(data);
- if(msg.id > 0){
- msg.route = pomelo._routeMap[msg.id];
- delete pomelo._routeMap[msg.id];
- if(!msg.route){
- return;
- }
- }
- msg.body = deCompose(pomelo, msg);
- return msg;
- };
- var deCompose = function(pomelo, msg){
- var route = msg.route;
- //Decompose route from dict
- if(msg.compressRoute) {
- if(!pomelo.abbrs[route]){
- return {};
- }
- route = msg.route = pomelo.abbrs[route];
- }
- if(pomelo.serverProtos && pomelo.serverProtos[route]){
- return Protobuf.decode(route, msg.body);
- }else{
- return JSON.parse(Protocol.strdecode(msg.body));
- }
- return msg;
- };
|