123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- /**
- * Implementation of server component.
- * Init and start server instance.
- */
- var logger = require('pomelo-logger').getLogger('pomelo', __filename);
- var fs = require('fs');
- var path = require('path');
- var pathUtil = require('../util/pathUtil');
- var Loader = require('pomelo-loader');
- var utils = require('../util/utils');
- var schedule = require('pomelo-scheduler');
- var events = require('../util/events');
- var Constants = require('../util/constants');
- var FilterService = require('../common/service/filterService');
- var HandlerService = require('../common/service/handlerService');
- var ST_INITED = 0; // server inited
- var ST_STARTED = 1; // server started
- var ST_STOPED = 2; // server stoped
- /**
- * Server factory function.
- *
- * @param {Object} app current application context
- * @return {Object} erver instance
- */
- module.exports.create = function(app, opts) {
- return new Server(app, opts);
- };
- var Server = function (app, opts) {
- this.opts = opts || {};
- this.app = app;
- this.globalFilterService = null;
- this.filterService = null;
- this.handlerService = null;
- this.crons = [];
- this.jobs = {};
- this.state = ST_INITED;
- app.event.on(events.ADD_CRONS, this.addCrons.bind(this));
- app.event.on(events.REMOVE_CRONS, this.removeCrons.bind(this));
- };
- var pro = Server.prototype;
- /**
- * Server lifecycle callback
- */
- pro.start = function() {
- if(this.state > ST_INITED) {
- return;
- }
- this.globalFilterService = initFilter(true, this.app);
- this.filterService = initFilter(false, this.app);
- this.handlerService = initHandler(this.app, this.opts);
- this.cronHandlers = loadCronHandlers(this.app);
- loadCrons(this, this.app);
- this.state = ST_STARTED;
- };
- pro.afterStart = function() {
- scheduleCrons(this, this.crons);
- };
- /**
- * Stop server
- */
- pro.stop = function() {
- this.state = ST_STOPED;
- };
- /**
- * Global handler.
- *
- * @param {Object} msg request message
- * @param {Object} session session object
- * @param {Callback} callback function
- */
- pro.globalHandle = function(msg, session, cb) {
- if(this.state !== ST_STARTED) {
- utils.invokeCallback(cb, new Error('server not started'));
- return;
- }
- var routeRecord = parseRoute(msg.route);
- if(!routeRecord) {
- utils.invokeCallback(cb, new Error('meet unknown route message %j', msg.route));
- return;
- }
- var self = this;
- var dispatch = function(err, resp, opts) {
- if(err) {
- handleError(true, self, err, msg, session, resp, opts, function(err, resp, opts) {
- response(true, self, err, msg, session, resp, opts, cb);
- });
- return;
- }
- if(self.app.getServerType() !== routeRecord.serverType) {
- doForward(self.app, msg, session, routeRecord, function(err, resp, opts) {
- response(true, self, err, msg, session, resp, opts, cb);
- });
- } else {
- doHandle(self, msg, session, routeRecord, function(err, resp, opts) {
- response(true, self, err, msg, session, resp, opts, cb);
- });
- }
- };
- beforeFilter(true, self, msg, session, dispatch);
- };
- /**
- * Handle request
- */
- pro.handle = function(msg, session, cb) {
- if(this.state !== ST_STARTED) {
- cb(new Error('server not started'));
- return;
- }
- var routeRecord = parseRoute(msg.route);
- doHandle(this, msg, session, routeRecord, cb);
- };
- /**
- * Add crons at runtime.
- *
- * @param {Array} crons would be added in application
- */
- pro.addCrons = function(crons) {
- this.cronHandlers = loadCronHandlers(this.app);
- for(var i=0, l=crons.length; i<l; i++) {
- var cron = crons[i];
- checkAndAdd(cron, this.crons, this);
- }
- scheduleCrons(this, crons);
- };
- /**
- * Remove crons at runtime.
- *
- * @param {Array} crons would be removed in application
- */
- pro.removeCrons = function(crons) {
- for(var i=0, l=crons.length; i<l; i++) {
- var cron = crons[i];
- var id = parseInt(cron.id);
- if(!!this.jobs[id]) {
- schedule.cancelJob(this.jobs[id]);
- } else {
- logger.warn('cron is not in application: %j', cron);
- }
- }
- };
- var initFilter = function(isGlobal, app) {
- var service = new FilterService();
- var befores, afters;
- if(isGlobal) {
- befores = app.get(Constants.KEYWORDS.GLOBAL_BEFORE_FILTER);
- afters = app.get(Constants.KEYWORDS.GLOBAL_AFTER_FILTER);
- } else {
- befores = app.get(Constants.KEYWORDS.BEFORE_FILTER);
- afters = app.get(Constants.KEYWORDS.AFTER_FILTER);
- }
- var i, l;
- if(befores) {
- for(i=0, l=befores.length; i<l; i++) {
- service.before(befores[i]);
- }
- }
- if(afters) {
- for(i=0, l=afters.length; i<l; i++) {
- service.after(afters[i]);
- }
- }
- return service;
- };
- var initHandler = function(app, opts) {
- return new HandlerService(app, opts);
- };
- /**
- * Load cron handlers from current application
- */
- var loadCronHandlers = function(app) {
- var p = pathUtil.getCronPath(app.getBase(), app.getServerType());
- if(p) {
- return Loader.load(p, app);
- }
- };
- /**
- * Load crons from configure file
- */
- var loadCrons = function(server, app) {
- var env = app.get(Constants.RESERVED.ENV);
- var p = path.join(app.getBase(), Constants.FILEPATH.CRON);
- if(!fs.existsSync(p)) {
- p = path.join(app.getBase(), Constants.FILEPATH.CONFIG_DIR, env, path.basename(Constants.FILEPATH.CRON));
- if (!fs.existsSync(p)) {
- return;
- }
- }
- app.loadConfigBaseApp(Constants.RESERVED.CRONS, Constants.FILEPATH.CRON);
- var crons = app.get(Constants.RESERVED.CRONS);
- for(var serverType in crons) {
- if(app.serverType === serverType) {
- var list = crons[serverType];
- for(var i = 0; i<list.length; i++) {
- if(!list[i].serverId) {
- checkAndAdd(list[i], server.crons, server);
- } else {
- if(app.serverId === list[i].serverId) {
- checkAndAdd(list[i], server.crons, server);
- }
- }
- }
- }
- }
- };
- /**
- * Fire before filter chain if any
- */
- var beforeFilter = function(isGlobal, server, msg, session, cb) {
- var fm;
- if(isGlobal) {
- fm = server.globalFilterService;
- } else {
- fm = server.filterService;
- }
- if(fm) {
- fm.beforeFilter(msg, session, cb);
- } else {
- utils.invokeCallback(cb);
- }
- };
- /**
- * Fire after filter chain if have
- */
- var afterFilter = function(isGlobal, server, err, msg, session, resp, opts, cb) {
- var fm;
- if(isGlobal) {
- fm = server.globalFilterService;
- } else {
- fm = server.filterService;
- }
- if(fm) {
- if(isGlobal) {
- fm.afterFilter(err, msg, session, resp, function() {
- // do nothing
- });
- } else {
- fm.afterFilter(err, msg, session, resp, function(err) {
- cb(err, resp, opts);
- });
- }
- }
- };
- /**
- * pass err to the global error handler if specified
- */
- var handleError = function(isGlobal, server, err, msg, session, resp, opts, cb) {
- var handler;
- if(isGlobal) {
- handler = server.app.get(Constants.RESERVED.GLOBAL_ERROR_HANDLER);
- } else {
- handler = server.app.get(Constants.RESERVED.ERROR_HANDLER);
- }
- if(!handler) {
- logger.debug('no default error handler to resolve unknown exception. ' + err.stack);
- utils.invokeCallback(cb, err, resp, opts);
- } else {
- if(handler.length === 5) {
- handler(err, msg, resp, session, cb);
- } else {
- handler(err, msg, resp, session, opts, cb);
- }
- }
- };
- /**
- * Send response to client and fire after filter chain if any.
- */
- var response = function(isGlobal, server, err, msg, session, resp, opts, cb) {
- if(isGlobal) {
- cb(err, resp, opts);
- // after filter should not interfere response
- afterFilter(isGlobal, server, err, msg, session, resp, opts, cb);
- } else {
- afterFilter(isGlobal, server, err, msg, session, resp, opts, cb);
- }
- };
- /**
- * Parse route string.
- *
- * @param {String} route route string, such as: serverName.handlerName.methodName
- * @return {Object} parse result object or null for illeagle route string
- */
- var parseRoute = function(route) {
- if(!route) {
- return null;
- }
- var ts = route.split('.');
- if(ts.length !== 3) {
- return null;
- }
- return {
- route: route,
- serverType: ts[0],
- handler: ts[1],
- method: ts[2]
- };
- };
- var doForward = function(app, msg, session, routeRecord, cb) {
- var finished = false;
- //should route to other servers
- try {
- app.sysrpc[routeRecord.serverType].msgRemote.forwardMessage(
- session,
- msg,
- session.export(),
- function(err, resp, opts) {
- if(err) {
- logger.error('fail to process remote message:' + err.stack);
- }
- finished = true;
- utils.invokeCallback(cb, err, resp, opts);
- }
- );
- } catch(err) {
- if(!finished) {
- logger.error('fail to forward message:' + err.stack);
- utils.invokeCallback(cb, err);
- }
- }
- };
- var doHandle = function(server, msg, session, routeRecord, cb) {
- var originMsg = msg;
- msg = msg.body || {};
- msg.__route__ = originMsg.route;
- var self = server;
- var handle = function(err, resp, opts) {
- if(err) {
- // error from before filter
- handleError(false, self, err, msg, session, resp, opts, function(err, resp, opts) {
- response(false, self, err, msg, session, resp, opts, cb);
- });
- return;
- }
- self.handlerService.handle(routeRecord, msg, session, function(err, resp, opts) {
- if(err) {
- //error from handler
- handleError(false, self, err, msg, session, resp, opts, function(err, resp, opts) {
- response(false, self, err, msg, session, resp, opts, cb);
- });
- return;
- }
- response(false, self, err, msg, session, resp, opts, cb);
- });
- }; //end of handle
- beforeFilter(false, server, msg, session, handle);
- };
- /**
- * Schedule crons
- */
- var scheduleCrons = function(server, crons) {
- var handlers = server.cronHandlers;
- for(var i = 0; i<crons.length; i++) {
- var cronInfo = crons[i];
- var time = cronInfo.time;
- var action = cronInfo.action;
- var jobId = cronInfo.id;
- if(!time || !action || !jobId) {
- logger.error('cron miss necessary parameters: %j', cronInfo);
- continue;
- }
- if(action.indexOf('.') < 0) {
- logger.error('cron action is error format: %j', cronInfo);
- continue;
- }
-
- var cron = action.split('.')[0];
- var job = action.split('.')[1];
- var handler = handlers[cron];
-
- if(!handler) {
- logger.error('could not find cron: %j', cronInfo);
- continue;
- }
-
- if(typeof handler[job] !== 'function') {
- logger.error('could not find cron job: %j, %s', cronInfo, job);
- continue;
- }
-
- var id = schedule.scheduleJob(time, handler[job].bind(handler));
- server.jobs[jobId] = id;
- }
- };
- /**
- * If cron is not in crons then put it in the array.
- */
- var checkAndAdd = function(cron, crons, server) {
- if(!containCron(cron.id, crons)) {
- server.crons.push(cron);
- } else {
- logger.warn('cron is duplicated: %j', cron);
- }
- };
- /**
- * Check if cron is in crons.
- */
- var containCron = function(id, crons) {
- for(var i=0, l=crons.length; i<l; i++) {
- if(id === crons[i].id) {
- return true;
- }
- }
- return false;
- };
|