/** * Module dependencies */ var EventEmitter = require('events').EventEmitter; var util = require('util'); var Protocol = require('./protocol'); var Protobuf = require('./protobuf'); var Package = Protocol.Package; var Message = Protocol.Message; var wsClient = require('websocket').client; var JS_WS_CLIENT_TYPE = 'js-websocket'; var JS_WS_CLIENT_VERSION = '0.0.1'; var RES_OK = 200; var RES_OLD_CLIENT = 501; var ePomelo = function (opts) { EventEmitter.call(this); this._socket = null; this._id = 1; this._callbacks = {}; this._routeMap = {}; this.dict = {}; this.abbrs = {}; this.protoVersion = 0; this.serverProtos = {}; this.clientProtos = {}; this.handshakeBuffer = {}; this.heartbeatInterval = 0; this.heartbeatTimeout = 0; this.nextHeartbeatTimeout = 0; this.gapThreshold = 100; // heartbeat gap threashold this.heartbeatId = null; this.heartbeatTimeoutId = null; this.initCallback = null; }; util.inherits(ePomelo, EventEmitter); module.exports = ePomelo; var pomelo = ePomelo.prototype; pomelo.init = function(params, cb) { var self = this; self.handshakeBuffer = { 'sys': { type: JS_WS_CLIENT_TYPE, version: JS_WS_CLIENT_VERSION, protoVersion: 0, rsa: {} }, 'user': { } }; if(!!cb && typeof cb === 'function') { self.initCallback = cb; } var client = new wsClient(); client.connect({protocol:"ws:", hostname: params.host, port:params.port}); client.on('connect', function (socket) { //socket 其实就是WebScoketConnection对象 var obj = Package.encode(Package.TYPE_HANDSHAKE, Protocol.strencode(JSON.stringify(self.handshakeBuffer))); socket.send(obj); socket.on('message', function(msg){ processPackage(self, Package.decode(msg.binaryData)); }); socket.on('error', function(msg){ console.log('socket error', msg); self.emit('error', msg); }); socket.on('close', function(msg){ console.log('socket close', msg); self.emit('close', msg); }); self._socket = socket; }); } pomelo.request = function(route, msg, cb) { var self = this; if(arguments.length === 2 && typeof msg === 'function') { cb = msg; msg = {}; } else { msg = msg || {}; } route = route || msg.route; if(!route) { return; } var reqId = self._id++; sendMessage(self, reqId, route, msg); self._callbacks[reqId] = cb; self._routeMap[reqId] = route; }; pomelo.notify = function (route, msg) { this.request(route, msg); }; pomelo.disconnect = function() { var self = this; self._socket.emit('end'); self._socket.close(); if(self.heartbeatId) { clearTimeout(self.heartbeatId); self.heartbeatId = null; } if(self.heartbeatTimeoutId) { clearTimeout(self.heartbeatTimeoutId); self.heartbeatTimeoutId = null; } }; /* * handshake、heartbeat、onData、onKick四个函数对应于与服务通信中不同的事件处理 */ var handshake = function(pomelo, data){ data = JSON.parse(Protocol.strdecode(data)); if(data.code === RES_OLD_CLIENT) { pomelo.emit('error', 'client version not fullfill'); return; } if(data.code !== RES_OK) { pomelo.emit('error', 'handshake fail'); return; } if(data.sys && data.sys.heartbeat) { pomelo.heartbeatInterval = data.sys.heartbeat * 1000; // heartbeat interval pomelo.heartbeatTimeout = pomelo.heartbeatInterval * 2; // max heartbeat timeout } else { pomelo.heartbeatInterval = 0; pomelo.heartbeatTimeout = 0; } //Init compress dict 保存全局的dict信息 pomelo.dict = data.sys.dict; if(pomelo.dict){ pomelo.abbrs = {}; for(var route in pomelo.dict){ //保存dict的映射信息 pomelo.abbrs[pomelo.dict[route]] = route; } } //Init protobuf protos var protos = data.sys.protos; if(protos){ pomelo.protoVersion = protos.version || 0; pomelo.serverProtos = protos.server || {}; pomelo.clientProtos = protos.client || {}; Protobuf.init({encoderProtos: protos.client, decoderProtos: protos.server}); } var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK); pomelo._socket.send(obj); if(!!pomelo.initCallback && 'function' === typeof pomelo.initCallback) { //注意:只所以初使化的回调在这里返回,是为了保证C-S连接全部完成之后,client才可以请求 pomelo.initCallback(); } }; var heartbeat = function(pomelo, data) { if(!pomelo.heartbeatInterval) { // no heartbeat return; } var obj = Package.encode(Package.TYPE_HEARTBEAT); if(pomelo.heartbeatTimeoutId) { clearTimeout(pomelo.heartbeatTimeoutId); pomelo.heartbeatTimeoutId = null; } if(pomelo.heartbeatId) { // already in a heartbeat interval return; } pomelo.heartbeatId = setTimeout(function() { pomelo.heartbeatId = null; pomelo._socket.send(obj); pomelo.nextHeartbeatTimeout = Date.now() + pomelo.heartbeatTimeout; pomelo.heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb.bind(null, pomelo), pomelo.heartbeatTimeout); }, pomelo.heartbeatInterval); }; var heartbeatTimeoutCb = function(pomelo) { var gap = pomelo.nextHeartbeatTimeout - Date.now(); if(gap > pomelo.gapThreshold) { pomelo.heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb.bind(null, pomelo), gap); } else { pomelo.emit('heartbeat timeout'); pomelo.disconnect(); } } var onData = function(pomelo, data) { var msg = data; msg = defaultDecode(pomelo, msg); processMessage(pomelo, msg); }; var onKick = function(pomelo, data) { data = JSON.parse(Protocol.strdecode(data)); pomelo.emit('onKick', data); }; var handlers = {}; handlers[Package.TYPE_HANDSHAKE] = handshake; handlers[Package.TYPE_HEARTBEAT] = heartbeat; handlers[Package.TYPE_DATA] = onData; handlers[Package.TYPE_KICK] = onKick; /* * processPackage、processMessage 两函数处理服务器返回的消息 */ var processPackage = function(pomelo, msgs) { if(Array.isArray(msgs)) { for(var i=0; i 0){ msg.route = pomelo._routeMap[msg.id]; delete pomelo._routeMap[msg.id]; if(!msg.route){ return; } } msg.body = deCompose(pomelo, msg); return msg; }; var deCompose = function(pomelo, msg){ var route = msg.route; //Decompose route from dict if(msg.compressRoute) { if(!pomelo.abbrs[route]){ return {}; } route = msg.route = pomelo.abbrs[route]; } if(pomelo.serverProtos && pomelo.serverProtos[route]){ return Protobuf.decode(route, msg.body); }else{ return JSON.parse(Protocol.strdecode(msg.body)); } return msg; };