tcpsocket.js 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. var Stream = require('stream');
  2. var util = require('util');
  3. var protocol = require('pomelo-protocol');
  4. var Package = protocol.Package;
  5. var logger = require('pomelo-logger').getLogger('pomelo', __filename);
  6. /**
  7. * Work states
  8. */
  9. var ST_HEAD = 1; // wait for head
  10. var ST_BODY = 2; // wait for body
  11. var ST_CLOSED = 3; // closed
  12. /**
  13. * Tcp socket wrapper with package compositing.
  14. * Collect the package from socket and emit a completed package with 'data' event.
  15. * Uniform with ws.WebSocket interfaces.
  16. *
  17. * @param {Object} socket origin socket from node.js net module
  18. * @param {Object} opts options parameter.
  19. * opts.headSize size of package head
  20. * opts.headHandler(headBuffer) handler for package head. caculate and return body size from head data.
  21. */
  22. var Socket = function(socket, opts) {
  23. if(!(this instanceof Socket)) {
  24. return new Socket(socket, opts);
  25. }
  26. if(!socket || !opts) {
  27. throw new Error('invalid socket or opts');
  28. }
  29. if(!opts.headSize || typeof opts.headHandler !== 'function') {
  30. throw new Error('invalid opts.headSize or opts.headHandler');
  31. }
  32. // stream style interfaces.
  33. // TODO: need to port to stream2 after node 0.9
  34. Stream.call(this);
  35. this.readable = true;
  36. this.writeable = true;
  37. this._socket = socket;
  38. this.headSize = opts.headSize;
  39. this.closeMethod = opts.closeMethod;
  40. this.headBuffer = new Buffer(opts.headSize);
  41. this.headHandler = opts.headHandler;
  42. this.headOffset = 0;
  43. this.packageOffset = 0;
  44. this.packageSize = 0;
  45. this.packageBuffer = null;
  46. // bind event form the origin socket
  47. this._socket.on('data', ondata.bind(null, this));
  48. this._socket.on('end', onend.bind(null, this));
  49. this._socket.on('error', this.emit.bind(this, 'error'));
  50. this._socket.on('close', this.emit.bind(this, 'close'));
  51. this.state = ST_HEAD;
  52. };
  53. util.inherits(Socket, Stream);
  54. module.exports = Socket;
  55. Socket.prototype.send = function(msg, encode, cb) {
  56. this._socket.write(msg, encode, cb);
  57. };
  58. Socket.prototype.close = function() {
  59. if(!!this.closeMethod && this.closeMethod === 'end') {
  60. this._socket.end();
  61. } else {
  62. try {
  63. this._socket.destroy();
  64. } catch (e) {
  65. logger.error('socket close with destroy error: %j', e.stack);
  66. }
  67. }
  68. };
  69. var ondata = function(socket, chunk) {
  70. if(socket.state === ST_CLOSED) {
  71. throw new Error('socket has closed');
  72. }
  73. if(typeof chunk !== 'string' && !Buffer.isBuffer(chunk)) {
  74. throw new Error('invalid data');
  75. }
  76. if(typeof chunk === 'string') {
  77. chunk = new Buffer(chunk, 'utf8');
  78. }
  79. var offset = 0, end = chunk.length;
  80. if(socket.state === ST_HEAD && socket.packageBuffer === null && !checkTypeData(chunk[0])) {
  81. logger.error('close the connection with invalid head message, the remote ip is %s && port is %s && message is %j', socket._socket.remoteAddress, socket._socket.remotePort, chunk);
  82. socket.close();
  83. }
  84. while(offset < end) {
  85. if(socket.state === ST_HEAD) {
  86. offset = readHead(socket, chunk, offset);
  87. }
  88. if(socket.state === ST_BODY) {
  89. offset = readBody(socket, chunk, offset);
  90. }
  91. }
  92. return true;
  93. };
  94. var onend = function(socket, chunk) {
  95. if(chunk) {
  96. socket._socket.write(chunk);
  97. }
  98. socket.state = ST_CLOSED;
  99. reset(socket);
  100. socket.emit('end');
  101. };
  102. /**
  103. * Read head segment from data to socket.headBuffer.
  104. *
  105. * @param {Object} socket Socket instance
  106. * @param {Object} data Buffer instance
  107. * @param {Number} offset offset read star from data
  108. * @return {Number} new offset of data after read
  109. */
  110. var readHead = function(socket, data, offset) {
  111. var hlen = socket.headSize - socket.headOffset;
  112. var dlen = data.length - offset;
  113. var len = Math.min(hlen, dlen);
  114. var dend = offset + len;
  115. data.copy(socket.headBuffer, socket.headOffset, offset, dend);
  116. socket.headOffset += len;
  117. if(socket.headOffset === socket.headSize) {
  118. // if head segment finished
  119. var size = socket.headHandler(socket.headBuffer);
  120. if(size < 0) {
  121. throw new Error('invalid body size: ' + size);
  122. }
  123. socket.packageSize = size + socket.headSize;
  124. socket.packageBuffer = new Buffer(socket.packageSize);
  125. socket.headBuffer.copy(socket.packageBuffer, 0, 0, socket.headSize);
  126. socket.packageOffset = socket.headSize;
  127. socket.state = ST_BODY;
  128. }
  129. return dend;
  130. };
  131. /**
  132. * Read body segment from data buffer to socket.packageBuffer;
  133. *
  134. * @param {Object} socket Socket instance
  135. * @param {Object} data Buffer instance
  136. * @param {Number} offset offset read star from data
  137. * @return {Number} new offset of data after read
  138. */
  139. var readBody = function(socket, data, offset) {
  140. var blen = socket.packageSize - socket.packageOffset;
  141. var dlen = data.length - offset;
  142. var len = Math.min(blen, dlen);
  143. var dend = offset + len;
  144. data.copy(socket.packageBuffer, socket.packageOffset, offset, dend);
  145. socket.packageOffset += len;
  146. if(socket.packageOffset === socket.packageSize) {
  147. // if all the package finished
  148. var buffer = socket.packageBuffer;
  149. socket.emit('message', buffer);
  150. reset(socket);
  151. }
  152. return dend;
  153. };
  154. var reset = function(socket) {
  155. socket.headOffset = 0;
  156. socket.packageOffset = 0;
  157. socket.packageSize = 0;
  158. socket.packageBuffer = null;
  159. socket.state = ST_HEAD;
  160. };
  161. var checkTypeData = function(data) {
  162. return data === Package.TYPE_HANDSHAKE || data === Package.TYPE_HANDSHAKE_ACK || data === Package.TYPE_HEARTBEAT || data === Package.TYPE_DATA || data === Package.TYPE_KICK;
  163. };