mqttsocket.js 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. var util = require('util');
  2. var EventEmitter = require('events').EventEmitter;
  3. var ST_INITED = 1;
  4. var ST_CLOSED = 2;
  5. /**
  6. * Socket class that wraps socket and websocket to provide unified interface for up level.
  7. */
  8. var Socket = function(id, socket, adaptor) {
  9. EventEmitter.call(this);
  10. this.id = id;
  11. this.socket = socket;
  12. this.remoteAddress = {
  13. ip: socket.stream.remoteAddress,
  14. port: socket.stream.remotePort
  15. };
  16. this.adaptor = adaptor;
  17. var self = this;
  18. socket.on('close', this.emit.bind(this, 'disconnect'));
  19. socket.on('error', this.emit.bind(this, 'disconnect'));
  20. socket.on('disconnect', this.emit.bind(this, 'disconnect'));
  21. socket.on('pingreq', function(packet) {
  22. socket.pingresp();
  23. });
  24. socket.on('subscribe', this.adaptor.onSubscribe.bind(this.adaptor, this));
  25. socket.on('publish', this.adaptor.onPublish.bind(this.adaptor, this));
  26. this.state = ST_INITED;
  27. // TODO: any other events?
  28. };
  29. util.inherits(Socket, EventEmitter);
  30. module.exports = Socket;
  31. Socket.prototype.send = function(msg) {
  32. if(this.state !== ST_INITED) {
  33. return;
  34. }
  35. if(msg instanceof Buffer) {
  36. // if encoded, send directly
  37. this.socket.stream.write(msg);
  38. } else {
  39. this.adaptor.publish(this, msg);
  40. }
  41. };
  42. Socket.prototype.sendBatch = function(msgs) {
  43. for(var i = 0, l = msgs.length; i<l; i++) {
  44. this.send(msgs[i]);
  45. }
  46. };
  47. Socket.prototype.disconnect = function() {
  48. if(this.state === ST_CLOSED) {
  49. return;
  50. }
  51. this.state = ST_CLOSED;
  52. this.socket.stream.destroy();
  53. };