multiprocess.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. const debug = require('debug')('log4js:multiprocess');
  2. const net = require('net');
  3. const LoggingEvent = require('../LoggingEvent');
  4. const END_MSG = '__LOG4JS__';
  5. /**
  6. * Creates a server, listening on config.loggerPort, config.loggerHost.
  7. * Output goes to config.actualAppender (config.appender is used to
  8. * set up that appender).
  9. */
  10. function logServer(config, actualAppender, levels) {
  11. /**
  12. * Takes a utf-8 string, returns an object with
  13. * the correct log properties.
  14. */
  15. function deserializeLoggingEvent(clientSocket, msg) {
  16. debug('(master) deserialising log event');
  17. const loggingEvent = LoggingEvent.deserialise(msg);
  18. loggingEvent.remoteAddress = clientSocket.remoteAddress;
  19. loggingEvent.remotePort = clientSocket.remotePort;
  20. return loggingEvent;
  21. }
  22. /* eslint prefer-arrow-callback:0 */
  23. const server = net.createServer(function connectionHandler(clientSocket) {
  24. debug('(master) connection received');
  25. clientSocket.setEncoding('utf8');
  26. let logMessage = '';
  27. function logTheMessage(msg) {
  28. if (logMessage.length > 0) {
  29. debug('(master) deserialising log event and sending to actual appender');
  30. actualAppender(deserializeLoggingEvent(clientSocket, msg));
  31. }
  32. }
  33. function chunkReceived(chunk) {
  34. debug('(master) chunk of data received');
  35. let event;
  36. logMessage += chunk || '';
  37. if (logMessage.indexOf(END_MSG) > -1) {
  38. event = logMessage.substring(0, logMessage.indexOf(END_MSG));
  39. logTheMessage(event);
  40. logMessage = logMessage.substring(event.length + END_MSG.length) || '';
  41. // check for more, maybe it was a big chunk
  42. chunkReceived();
  43. }
  44. }
  45. function handleError(error) {
  46. const loggingEvent = {
  47. startTime: new Date(),
  48. categoryName: 'log4js',
  49. level: levels.ERROR,
  50. data: ['A worker log process hung up unexpectedly', error],
  51. remoteAddress: clientSocket.remoteAddress,
  52. remotePort: clientSocket.remotePort
  53. };
  54. actualAppender(loggingEvent);
  55. }
  56. clientSocket.on('data', chunkReceived);
  57. clientSocket.on('end', chunkReceived);
  58. clientSocket.on('error', handleError);
  59. });
  60. server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', function (e) {
  61. debug('(master) master server listening, error was ', e);
  62. // allow the process to exit, if this is the only socket active
  63. server.unref();
  64. });
  65. function app(event) {
  66. debug('(master) log event sent directly to actual appender (local event)');
  67. return actualAppender(event);
  68. }
  69. app.shutdown = function (cb) {
  70. debug('(master) master shutdown called, closing server');
  71. server.close(cb);
  72. };
  73. return app;
  74. }
  75. function workerAppender(config) {
  76. let canWrite = false;
  77. const buffer = [];
  78. let socket;
  79. let shutdownAttempts = 3;
  80. function write(loggingEvent) {
  81. debug('(worker) Writing log event to socket');
  82. socket.write(loggingEvent.serialise(), 'utf8');
  83. socket.write(END_MSG, 'utf8');
  84. }
  85. function emptyBuffer() {
  86. let evt;
  87. debug('(worker) emptying worker buffer');
  88. /* eslint no-cond-assign:0 */
  89. while ((evt = buffer.shift())) {
  90. write(evt);
  91. }
  92. }
  93. function createSocket() {
  94. debug(
  95. `(worker) worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}`
  96. );
  97. socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
  98. socket.on('connect', () => {
  99. debug('(worker) worker socket connected');
  100. emptyBuffer();
  101. canWrite = true;
  102. });
  103. socket.on('timeout', socket.end.bind(socket));
  104. // don't bother listening for 'error', 'close' gets called after that anyway
  105. socket.on('close', createSocket);
  106. }
  107. createSocket();
  108. function log(loggingEvent) {
  109. if (canWrite) {
  110. write(loggingEvent);
  111. } else {
  112. debug('(worker) worker buffering log event because it cannot write at the moment');
  113. buffer.push(loggingEvent);
  114. }
  115. }
  116. log.shutdown = function (cb) {
  117. debug('(worker) worker shutdown called');
  118. if (buffer.length && shutdownAttempts) {
  119. debug('(worker) worker buffer has items, waiting 100ms to empty');
  120. shutdownAttempts -= 1;
  121. setTimeout(() => {
  122. log.shutdown(cb);
  123. }, 100);
  124. } else {
  125. socket.removeAllListeners('close');
  126. socket.end(cb);
  127. }
  128. };
  129. return log;
  130. }
  131. function createAppender(config, appender, levels) {
  132. if (config.mode === 'master') {
  133. debug('Creating master appender');
  134. return logServer(config, appender, levels);
  135. }
  136. debug('Creating worker appender');
  137. return workerAppender(config);
  138. }
  139. function configure(config, layouts, findAppender, levels) {
  140. let appender;
  141. debug(`configure with mode = ${config.mode}`);
  142. if (config.mode === 'master') {
  143. if (!config.appender) {
  144. debug(`no appender found in config ${config}`);
  145. throw new Error('multiprocess master must have an "appender" defined');
  146. }
  147. debug(`actual appender is ${config.appender}`);
  148. appender = findAppender(config.appender);
  149. if (!appender) {
  150. debug(`actual appender "${config.appender}" not found`);
  151. throw new Error(`multiprocess master appender "${config.appender}" not defined`);
  152. }
  153. }
  154. return createAppender(config, appender, levels);
  155. }
  156. module.exports.configure = configure;