12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- var util = require('util');
- var EventEmitter = require('events').EventEmitter;
- var mqtt = require('mqtt');
- var MQTTSocket = require('./mqttsocket');
- var Adaptor = require('./mqtt/mqttadaptor');
- var generate = require('./mqtt/generate');
- var logger = require('pomelo-logger').getLogger('pomelo', __filename);
- var curId = 1;
- /**
- * Connector that manager low level connection and protocol bewteen server and client.
- * Develper can provide their own connector to switch the low level prototol, such as tcp or probuf.
- */
- var Connector = function(port, host, opts) {
- if (!(this instanceof Connector)) {
- return new Connector(port, host, opts);
- }
- EventEmitter.call(this);
- this.port = port;
- this.host = host;
- opts = opts || {};
- this.adaptor = new Adaptor(opts);
- };
- util.inherits(Connector, EventEmitter);
- module.exports = Connector;
- /**
- * Start connector to listen the specified port
- */
- Connector.prototype.start = function(cb) {
- var self = this;
- this.mqttServer = mqtt.createServer();
- this.mqttServer.on('client', function(client) {
- client.on('error', function(err) {
- client.stream.destroy();
- });
- client.on('close', function() {
- client.stream.destroy();
- });
- client.on('disconnect', function(packet) {
- client.stream.destroy();
- });
- client.on('connect', function(packet) {
- client.connack({returnCode: 0});
- var mqttsocket = new MQTTSocket(curId++, client, self.adaptor);
- self.emit('connection', mqttsocket);
- });
- });
- this.mqttServer.listen(this.port);
- process.nextTick(cb);
- };
- Connector.prototype.stop = function() {
- this.mqttServer.close();
- process.exit(0);
- };
- var composeResponse = function(msgId, route, msgBody) {
- return {
- id: msgId,
- body: msgBody
- };
- };
- var composePush = function(route, msgBody) {
- var msg = generate.publish(msgBody);
- if(!msg) {
- logger.error('invalid mqtt publish message: %j', msgBody);
- }
- return msg;
- };
- Connector.prototype.encode = function(reqId, route, msgBody) {
- if (!!reqId) {
- return composeResponse(reqId, route, msgBody);
- } else {
- return composePush(route, msgBody);
- }
- };
- Connector.prototype.close = function() {
- this.mqttServer.close();
- };
|