index.ts 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import * as buffer from 'buffer';
  2. import { EventEmitter } from 'events';
  3. // import * as streampkg from 'stream-pkg';
  4. // import * as profiler from 'v8-profiler';
  5. // import * as fs from 'fs';
  6. const ST_LENGTH = 1; // state that we should read length
  7. const ST_DATA = 2; // state that we should read data
  8. const ST_ERROR = 3; // state that something wrong has happened
  9. export class Composer extends EventEmitter {
  10. private buf: Buffer;
  11. private headbuf = new Buffer(2);
  12. private headoffset = 0;
  13. private state = ST_LENGTH;
  14. private dataoffset = 0;
  15. private buffoffset = 0;
  16. private end = 0;
  17. private length = 0;
  18. private left = 0;
  19. private hugePackMode = 0;
  20. private hugePackModeFinished = 0;
  21. private hugeBuf = Buffer.alloc(0);
  22. constructor() {
  23. super();
  24. }
  25. public feed(data: Buffer) {
  26. if (!data) {
  27. return;
  28. }
  29. if (this.state === ST_ERROR) {
  30. throw new Error('compose in error state, reset it first');
  31. }
  32. this.dataoffset = 0;
  33. this.end = data.length;
  34. this.length = this.length || 0;
  35. // data.copy(this._buf, this.offset);
  36. // this._buf.write(data);
  37. // Buffer.concat([this._buf, data]);
  38. while (this.dataoffset < this.end) {
  39. if (this.state === ST_LENGTH) {
  40. if (this.headoffset && (this.end - this.dataoffset + this.headoffset) >= 2) {
  41. data.copy(this.headbuf, this.headoffset, this.dataoffset, this.dataoffset + 2 - this.headoffset);
  42. this.dataoffset += 2 - this.headoffset;
  43. this.headoffset = 0;
  44. this.left = this.headbuf.readUInt16LE(0);
  45. if (this.hugePackMode && this.left < 32701) {
  46. this.hugePackModeFinished = 1;
  47. this.hugePackMode = 0;
  48. }
  49. if (this.left > 32700) {
  50. this.left = 32700;
  51. this.hugePackMode = 1;
  52. this.hugePackModeFinished = 0;
  53. if (!this.hugeBuf) {
  54. this.hugeBuf = Buffer.alloc(0);
  55. }
  56. }
  57. this.state = ST_DATA;
  58. this.buf = new Buffer(this.left);
  59. } else {
  60. if (this.end - this.dataoffset >= 2) {
  61. this.left = data.readUInt16LE(this.dataoffset);
  62. if (this.hugePackMode && this.left < 32701) {
  63. this.hugePackModeFinished = 1;
  64. this.hugePackMode = 0;
  65. }
  66. if (this.left > 32700) {
  67. this.left = 32700;
  68. this.hugePackMode = 1;
  69. this.hugePackModeFinished = 0;
  70. if (!this.hugeBuf) {
  71. this.hugeBuf = Buffer.alloc(0);
  72. }
  73. }
  74. this.state = ST_DATA;
  75. this.dataoffset = this.dataoffset + 2;
  76. this.buf = new Buffer(this.left);
  77. } else {
  78. data.copy(this.headbuf, 0, this.dataoffset, this.end);
  79. this.headoffset = this.end - this.dataoffset;
  80. this.dataoffset += this.headoffset;
  81. }
  82. }
  83. }
  84. if (this.state === ST_DATA) {
  85. let length = Math.min(this.left, data.length - this.dataoffset);
  86. data.copy(this.buf, this.buf.length - this.left, this.dataoffset, this.dataoffset + length);
  87. this.left -= length;
  88. this.dataoffset += length;
  89. if ((this.hugePackMode || this.hugePackModeFinished) && this.left === 0) {
  90. if (this.hugePackModeFinished === 0) {
  91. this.hugeBuf = Buffer.concat([this.hugeBuf, this.buf]);
  92. this.left = 0;
  93. this.state = ST_LENGTH;
  94. delete this.buf;
  95. }
  96. if (this.hugePackModeFinished === 1) {
  97. this.hugeBuf = Buffer.concat([this.hugeBuf, this.buf]);
  98. this.emit('data', this.hugeBuf);
  99. this.hugePackModeFinished = 0;
  100. this.hugePackMode = 0;
  101. this.left = 0;
  102. this.state = ST_LENGTH;
  103. delete this.buf;
  104. delete this.hugeBuf;
  105. }
  106. } else {
  107. if (this.left === 0) {
  108. this.emit('data', this.buf);
  109. this.left = 0;
  110. this.state = ST_LENGTH;
  111. delete this.buf;
  112. }
  113. }
  114. // return;
  115. }
  116. if (this.state === ST_ERROR) {
  117. break;
  118. }
  119. }
  120. }
  121. public compose(data: string) {
  122. if (data.length === 0) {
  123. throw new Error('data should not be empty.');
  124. }
  125. let d = Buffer.from(data);
  126. let totalLength = d.length;
  127. let offset = 0;
  128. let buf = new Buffer(d.length + 2 * (Math.floor(totalLength / 32700) + 1));
  129. let loop = 0;
  130. while (totalLength !== 0) {
  131. let writelength = totalLength;
  132. let currentLength = totalLength;
  133. if (writelength > 32700) {
  134. writelength = 32701;
  135. currentLength = 32700;
  136. }
  137. buf.writeUInt16LE(writelength, offset + 2 * (loop));
  138. d.copy(buf, 2 * (loop + 1) + offset, offset, offset + currentLength);
  139. offset = offset + currentLength;
  140. totalLength = totalLength - currentLength;
  141. loop = loop + 1;
  142. }
  143. return buf;
  144. }
  145. }
  146. // profiler.startProfiling();
  147. // let comp = new Compose();
  148. // let buf = comp.compose(JSON.stringify({
  149. // 'id': 10,
  150. // 'msg': {
  151. // 'namespace': 'user', 'serverType': 'test', 'service': 'service', 'method': 'echo',
  152. // 'args': [{ 'index': 9, 'time': 1498919947021 }],
  153. // },
  154. // }));
  155. // console.time('1');
  156. // for (let i = 0; i < 1; i++) {
  157. // comp.compose('你好世界');
  158. // }
  159. // console.timeEnd('1');
  160. // comp.on('data', (d: Buffer) => {
  161. // let s = JSON.parse(d.toString());
  162. // console.log(s);
  163. // });
  164. // console.time('1');
  165. // for (let i = 0; i < 100; i++) {
  166. // buf = comp.compose(JSON.stringify({
  167. // 'id': 10,
  168. // 'msg': {
  169. // 'namespace': 'user', 'serverType': 'test', 'service': 'service', 'method': 'echo',
  170. // 'args': [{ 'index': 9, 'time': 1498919947021 }],
  171. // },
  172. // }));
  173. // comp.feed(buf);
  174. // comp.feed(buf.slice(0, 5));
  175. // comp.feed(buf.slice(5, buf.length));
  176. // }
  177. // console.timeEnd('1');
  178. // let sp = new streampkg();
  179. // buf = sp.compose('你好世界');
  180. // console.time('1');
  181. // for (let i = 0; i < 1000000; i++) {
  182. // sp.compose('你好世界');
  183. // }
  184. // console.timeEnd('1');
  185. // sp.on('data', (d) => {
  186. // // let s = d.toString();
  187. // //console.log(s);
  188. // });
  189. // console.time('1');
  190. // for (let i = 0; i < 1000000; i++) {
  191. // sp.feed(buf);
  192. // sp.feed(buf.slice(0, 5));
  193. // sp.feed(buf.slice(5, buf.length));
  194. // }
  195. // console.timeEnd('1');
  196. // profiler.stopProfiling().export((error, result) => {
  197. // fs.writeFileSync('client.cpuprofile', result);
  198. // console.log('clientside profile saved');
  199. // });