mqttadaptor.js 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. var Adaptor = function(opts) {
  2. opts = opts || {};
  3. this.subReqs = {};
  4. this.publishRoute = opts.publishRoute;
  5. this.subscribeRoute = opts.subscribeRoute;
  6. };
  7. module.exports = Adaptor;
  8. Adaptor.prototype.onPublish = function(client, packet) {
  9. var route = this.publishRoute;
  10. if(!route) {
  11. throw new Error('unspecified publish route.');
  12. }
  13. var payload = packet.payload;
  14. if(payload instanceof Buffer) {
  15. payload = payload.toString('utf8');
  16. }
  17. var req = {
  18. id: packet.messageId,
  19. route: route,
  20. body: packet
  21. };
  22. client.emit('message', req);
  23. if(packet.qos === 1) {
  24. client.socket.puback({messageId: packet.messageId});
  25. }
  26. };
  27. Adaptor.prototype.onSubscribe = function(client, packet) {
  28. var route = this.subscribeRoute;
  29. if(!route) {
  30. throw new Error('unspecified subscribe route.');
  31. }
  32. var req = {
  33. id: packet.messageId,
  34. route: route,
  35. body: {
  36. subscriptions: packet.subscriptions
  37. }
  38. };
  39. this.subReqs[packet.messageId] = packet;
  40. client.emit('message', req);
  41. };
  42. Adaptor.prototype.onPubAck = function(client, packet) {
  43. var req = {
  44. id: packet.messageId,
  45. route: 'connector.mqttHandler.pubAck',
  46. body: {
  47. mid: packet.messageId
  48. }
  49. };
  50. this.subReqs[packet.messageId] = packet;
  51. client.emit('message', req);
  52. };
  53. /**
  54. * Publish message or subscription ack.
  55. *
  56. * if packet.id exist and this.subReqs[packet.id] exist then packet is a suback.
  57. * Subscription is request/response mode.
  58. * packet.id is pass from client in packet.messageId and record in Pomelo context and attached to the subscribe response packet.
  59. * packet.body is the context that returned by subscribe next callback.
  60. *
  61. * if packet.id not exist then packet is a publish message.
  62. *
  63. * otherwise packet is a illegal packet.
  64. */
  65. Adaptor.prototype.publish = function(client, packet) {
  66. var mid = packet.id;
  67. var subreq = this.subReqs[mid];
  68. if(subreq) {
  69. // is suback
  70. client.socket.suback({messageId: mid, granted: packet.body});
  71. delete this.subReqs[mid];
  72. return;
  73. }
  74. client.socket.publish(packet.body);
  75. };