mqttconnector.js 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. var util = require('util');
  2. var EventEmitter = require('events').EventEmitter;
  3. var mqtt = require('mqtt');
  4. var MQTTSocket = require('./mqttsocket');
  5. var Adaptor = require('./mqtt/mqttadaptor');
  6. var generate = require('./mqtt/generate');
  7. var logger = require('pomelo-logger').getLogger('pomelo', __filename);
  8. var curId = 1;
  9. /**
  10. * Connector that manager low level connection and protocol bewteen server and client.
  11. * Develper can provide their own connector to switch the low level prototol, such as tcp or probuf.
  12. */
  13. var Connector = function(port, host, opts) {
  14. if (!(this instanceof Connector)) {
  15. return new Connector(port, host, opts);
  16. }
  17. EventEmitter.call(this);
  18. this.port = port;
  19. this.host = host;
  20. opts = opts || {};
  21. this.adaptor = new Adaptor(opts);
  22. };
  23. util.inherits(Connector, EventEmitter);
  24. module.exports = Connector;
  25. /**
  26. * Start connector to listen the specified port
  27. */
  28. Connector.prototype.start = function(cb) {
  29. var self = this;
  30. this.mqttServer = mqtt.createServer();
  31. this.mqttServer.on('client', function(client) {
  32. client.on('error', function(err) {
  33. client.stream.destroy();
  34. });
  35. client.on('close', function() {
  36. client.stream.destroy();
  37. });
  38. client.on('disconnect', function(packet) {
  39. client.stream.destroy();
  40. });
  41. client.on('connect', function(packet) {
  42. client.connack({returnCode: 0});
  43. var mqttsocket = new MQTTSocket(curId++, client, self.adaptor);
  44. self.emit('connection', mqttsocket);
  45. });
  46. });
  47. this.mqttServer.listen(this.port);
  48. process.nextTick(cb);
  49. };
  50. Connector.prototype.stop = function() {
  51. this.mqttServer.close();
  52. process.exit(0);
  53. };
  54. var composeResponse = function(msgId, route, msgBody) {
  55. return {
  56. id: msgId,
  57. body: msgBody
  58. };
  59. };
  60. var composePush = function(route, msgBody) {
  61. var msg = generate.publish(msgBody);
  62. if(!msg) {
  63. logger.error('invalid mqtt publish message: %j', msgBody);
  64. }
  65. return msg;
  66. };
  67. Connector.prototype.encode = function(reqId, route, msgBody) {
  68. if (!!reqId) {
  69. return composeResponse(reqId, route, msgBody);
  70. } else {
  71. return composePush(route, msgBody);
  72. }
  73. };
  74. Connector.prototype.close = function() {
  75. this.mqttServer.close();
  76. };