server.js 11 KB


  1. /**
  2. * Implementation of server component.
  3. * Init and start server instance.
  4. */
  5. var logger = require('pomelo-logger').getLogger('pomelo', __filename);
  6. var fs = require('fs');
  7. var path = require('path');
  8. var pathUtil = require('../util/pathUtil');
  9. var Loader = require('pomelo-loader');
  10. var utils = require('../util/utils');
  11. var schedule = require('pomelo-scheduler');
  12. var events = require('../util/events');
  13. var Constants = require('../util/constants');
  14. var FilterService = require('../common/service/filterService');
  15. var HandlerService = require('../common/service/handlerService');
  16. var ST_INITED = 0; // server inited
  17. var ST_STARTED = 1; // server started
  18. var ST_STOPED = 2; // server stoped
  19. /**
  20. * Server factory function.
  21. *
  22. * @param {Object} app current application context
  23. * @return {Object} erver instance
  24. */
  25. module.exports.create = function(app, opts) {
  26. return new Server(app, opts);
  27. };
  28. var Server = function (app, opts) {
  29. this.opts = opts || {};
  30. this.app = app;
  31. this.globalFilterService = null;
  32. this.filterService = null;
  33. this.handlerService = null;
  34. this.crons = [];
  35. this.jobs = {};
  36. this.state = ST_INITED;
  37. app.event.on(events.ADD_CRONS, this.addCrons.bind(this));
  38. app.event.on(events.REMOVE_CRONS, this.removeCrons.bind(this));
  39. };
  40. var pro = Server.prototype;
  41. /**
  42. * Server lifecycle callback
  43. */
  44. pro.start = function() {
  45. if(this.state > ST_INITED) {
  46. return;
  47. }
  48. this.globalFilterService = initFilter(true, this.app);
  49. this.filterService = initFilter(false, this.app);
  50. this.handlerService = initHandler(this.app, this.opts);
  51. this.cronHandlers = loadCronHandlers(this.app);
  52. loadCrons(this, this.app);
  53. this.state = ST_STARTED;
  54. };
  55. pro.afterStart = function() {
  56. scheduleCrons(this, this.crons);
  57. };
  58. /**
  59. * Stop server
  60. */
  61. pro.stop = function() {
  62. this.state = ST_STOPED;
  63. };
  64. /**
  65. * Global handler.
  66. *
  67. * @param {Object} msg request message
  68. * @param {Object} session session object
  69. * @param {Callback} callback function
  70. */
  71. pro.globalHandle = function(msg, session, cb) {
  72. if(this.state !== ST_STARTED) {
  73. utils.invokeCallback(cb, new Error('server not started'));
  74. return;
  75. }
  76. var routeRecord = parseRoute(msg.route);
  77. if(!routeRecord) {
  78. utils.invokeCallback(cb, new Error('meet unknown route message %j', msg.route));
  79. return;
  80. }
  81. var self = this;
  82. var dispatch = function(err, resp, opts) {
  83. if(err) {
  84. handleError(true, self, err, msg, session, resp, opts, function(err, resp, opts) {
  85. response(true, self, err, msg, session, resp, opts, cb);
  86. });
  87. return;
  88. }
  89. if(self.app.getServerType() !== routeRecord.serverType) {
  90. doForward(self.app, msg, session, routeRecord, function(err, resp, opts) {
  91. response(true, self, err, msg, session, resp, opts, cb);
  92. });
  93. } else {
  94. doHandle(self, msg, session, routeRecord, function(err, resp, opts) {
  95. response(true, self, err, msg, session, resp, opts, cb);
  96. });
  97. }
  98. };
  99. beforeFilter(true, self, msg, session, dispatch);
  100. };
  101. /**
  102. * Handle request
  103. */
  104. pro.handle = function(msg, session, cb) {
  105. if(this.state !== ST_STARTED) {
  106. cb(new Error('server not started'));
  107. return;
  108. }
  109. var routeRecord = parseRoute(msg.route);
  110. doHandle(this, msg, session, routeRecord, cb);
  111. };
  112. /**
  113. * Add crons at runtime.
  114. *
  115. * @param {Array} crons would be added in application
  116. */
  117. pro.addCrons = function(crons) {
  118. this.cronHandlers = loadCronHandlers(this.app);
  119. for(var i=0, l=crons.length; i<l; i++) {
  120. var cron = crons[i];
  121. checkAndAdd(cron, this.crons, this);
  122. }
  123. scheduleCrons(this, crons);
  124. };
  125. /**
  126. * Remove crons at runtime.
  127. *
  128. * @param {Array} crons would be removed in application
  129. */
  130. pro.removeCrons = function(crons) {
  131. for(var i=0, l=crons.length; i<l; i++) {
  132. var cron = crons[i];
  133. var id = parseInt(cron.id);
  134. if(!!this.jobs[id]) {
  135. schedule.cancelJob(this.jobs[id]);
  136. } else {
  137. logger.warn('cron is not in application: %j', cron);
  138. }
  139. }
  140. };
  141. var initFilter = function(isGlobal, app) {
  142. var service = new FilterService();
  143. var befores, afters;
  144. if(isGlobal) {
  145. befores = app.get(Constants.KEYWORDS.GLOBAL_BEFORE_FILTER);
  146. afters = app.get(Constants.KEYWORDS.GLOBAL_AFTER_FILTER);
  147. } else {
  148. befores = app.get(Constants.KEYWORDS.BEFORE_FILTER);
  149. afters = app.get(Constants.KEYWORDS.AFTER_FILTER);
  150. }
  151. var i, l;
  152. if(befores) {
  153. for(i=0, l=befores.length; i<l; i++) {
  154. service.before(befores[i]);
  155. }
  156. }
  157. if(afters) {
  158. for(i=0, l=afters.length; i<l; i++) {
  159. service.after(afters[i]);
  160. }
  161. }
  162. return service;
  163. };
  164. var initHandler = function(app, opts) {
  165. return new HandlerService(app, opts);
  166. };
  167. /**
  168. * Load cron handlers from current application
  169. */
  170. var loadCronHandlers = function(app) {
  171. var p = pathUtil.getCronPath(app.getBase(), app.getServerType());
  172. if(p) {
  173. return Loader.load(p, app);
  174. }
  175. };
  176. /**
  177. * Load crons from configure file
  178. */
  179. var loadCrons = function(server, app) {
  180. var env = app.get(Constants.RESERVED.ENV);
  181. var p = path.join(app.getBase(), Constants.FILEPATH.CRON);
  182. if(!fs.existsSync(p)) {
  183. p = path.join(app.getBase(), Constants.FILEPATH.CONFIG_DIR, env, path.basename(Constants.FILEPATH.CRON));
  184. if (!fs.existsSync(p)) {
  185. return;
  186. }
  187. }
  188. app.loadConfigBaseApp(Constants.RESERVED.CRONS, Constants.FILEPATH.CRON);
  189. var crons = app.get(Constants.RESERVED.CRONS);
  190. for(var serverType in crons) {
  191. if(app.serverType === serverType) {
  192. var list = crons[serverType];
  193. for(var i = 0; i<list.length; i++) {
  194. if(!list[i].serverId) {
  195. checkAndAdd(list[i], server.crons, server);
  196. } else {
  197. if(app.serverId === list[i].serverId) {
  198. checkAndAdd(list[i], server.crons, server);
  199. }
  200. }
  201. }
  202. }
  203. }
  204. };
  205. /**
  206. * Fire before filter chain if any
  207. */
  208. var beforeFilter = function(isGlobal, server, msg, session, cb) {
  209. var fm;
  210. if(isGlobal) {
  211. fm = server.globalFilterService;
  212. } else {
  213. fm = server.filterService;
  214. }
  215. if(fm) {
  216. fm.beforeFilter(msg, session, cb);
  217. } else {
  218. utils.invokeCallback(cb);
  219. }
  220. };
  221. /**
  222. * Fire after filter chain if have
  223. */
  224. var afterFilter = function(isGlobal, server, err, msg, session, resp, opts, cb) {
  225. var fm;
  226. if(isGlobal) {
  227. fm = server.globalFilterService;
  228. } else {
  229. fm = server.filterService;
  230. }
  231. if(fm) {
  232. if(isGlobal) {
  233. fm.afterFilter(err, msg, session, resp, function() {
  234. // do nothing
  235. });
  236. } else {
  237. fm.afterFilter(err, msg, session, resp, function(err) {
  238. cb(err, resp, opts);
  239. });
  240. }
  241. }
  242. };
  243. /**
  244. * pass err to the global error handler if specified
  245. */
  246. var handleError = function(isGlobal, server, err, msg, session, resp, opts, cb) {
  247. var handler;
  248. if(isGlobal) {
  249. handler = server.app.get(Constants.RESERVED.GLOBAL_ERROR_HANDLER);
  250. } else {
  251. handler = server.app.get(Constants.RESERVED.ERROR_HANDLER);
  252. }
  253. if(!handler) {
  254. logger.debug('no default error handler to resolve unknown exception. ' + err.stack);
  255. utils.invokeCallback(cb, err, resp, opts);
  256. } else {
  257. if(handler.length === 5) {
  258. handler(err, msg, resp, session, cb);
  259. } else {
  260. handler(err, msg, resp, session, opts, cb);
  261. }
  262. }
  263. };
  264. /**
  265. * Send response to client and fire after filter chain if any.
  266. */
  267. var response = function(isGlobal, server, err, msg, session, resp, opts, cb) {
  268. if(isGlobal) {
  269. cb(err, resp, opts);
  270. // after filter should not interfere response
  271. afterFilter(isGlobal, server, err, msg, session, resp, opts, cb);
  272. } else {
  273. afterFilter(isGlobal, server, err, msg, session, resp, opts, cb);
  274. }
  275. };
  276. /**
  277. * Parse route string.
  278. *
  279. * @param {String} route route string, such as: serverName.handlerName.methodName
  280. * @return {Object} parse result object or null for illeagle route string
  281. */
  282. var parseRoute = function(route) {
  283. if(!route) {
  284. return null;
  285. }
  286. var ts = route.split('.');
  287. if(ts.length !== 3) {
  288. return null;
  289. }
  290. return {
  291. route: route,
  292. serverType: ts[0],
  293. handler: ts[1],
  294. method: ts[2]
  295. };
  296. };
  297. var doForward = function(app, msg, session, routeRecord, cb) {
  298. var finished = false;
  299. //should route to other servers
  300. try {
  301. app.sysrpc[routeRecord.serverType].msgRemote.forwardMessage(
  302. session,
  303. msg,
  304. session.export(),
  305. function(err, resp, opts) {
  306. if(err) {
  307. logger.error('fail to process remote message:' + err.stack);
  308. }
  309. finished = true;
  310. utils.invokeCallback(cb, err, resp, opts);
  311. }
  312. );
  313. } catch(err) {
  314. if(!finished) {
  315. logger.error('fail to forward message:' + err.stack);
  316. utils.invokeCallback(cb, err);
  317. }
  318. }
  319. };
  320. var doHandle = function(server, msg, session, routeRecord, cb) {
  321. var originMsg = msg;
  322. msg = msg.body || {};
  323. msg.__route__ = originMsg.route;
  324. var self = server;
  325. var handle = function(err, resp, opts) {
  326. if(err) {
  327. // error from before filter
  328. handleError(false, self, err, msg, session, resp, opts, function(err, resp, opts) {
  329. response(false, self, err, msg, session, resp, opts, cb);
  330. });
  331. return;
  332. }
  333. self.handlerService.handle(routeRecord, msg, session, function(err, resp, opts) {
  334. if(err) {
  335. //error from handler
  336. handleError(false, self, err, msg, session, resp, opts, function(err, resp, opts) {
  337. response(false, self, err, msg, session, resp, opts, cb);
  338. });
  339. return;
  340. }
  341. response(false, self, err, msg, session, resp, opts, cb);
  342. });
  343. }; //end of handle
  344. beforeFilter(false, server, msg, session, handle);
  345. };
  346. /**
  347. * Schedule crons
  348. */
  349. var scheduleCrons = function(server, crons) {
  350. var handlers = server.cronHandlers;
  351. for(var i = 0; i<crons.length; i++) {
  352. var cronInfo = crons[i];
  353. var time = cronInfo.time;
  354. var action = cronInfo.action;
  355. var jobId = cronInfo.id;
  356. if(!time || !action || !jobId) {
  357. logger.error('cron miss necessary parameters: %j', cronInfo);
  358. continue;
  359. }
  360. if(action.indexOf('.') < 0) {
  361. logger.error('cron action is error format: %j', cronInfo);
  362. continue;
  363. }
  364. var cron = action.split('.')[0];
  365. var job = action.split('.')[1];
  366. var handler = handlers[cron];
  367. if(!handler) {
  368. logger.error('could not find cron: %j', cronInfo);
  369. continue;
  370. }
  371. if(typeof handler[job] !== 'function') {
  372. logger.error('could not find cron job: %j, %s', cronInfo, job);
  373. continue;
  374. }
  375. var id = schedule.scheduleJob(time, handler[job].bind(handler));
  376. server.jobs[jobId] = id;
  377. }
  378. };
  379. /**
  380. * If cron is not in crons then put it in the array.
  381. */
  382. var checkAndAdd = function(cron, crons, server) {
  383. if(!containCron(cron.id, crons)) {
  384. server.crons.push(cron);
  385. } else {
  386. logger.warn('cron is duplicated: %j', cron);
  387. }
  388. };
  389. /**
  390. * Check if cron is in crons.
  391. */
  392. var containCron = function(id, crons) {
  393. for(var i=0, l=crons.length; i<l; i++) {
  394. if(id === crons[i].id) {
  395. return true;
  396. }
  397. }
  398. return false;
  399. };