RollingFileWriteStream.js 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. const debug = require("debug")("streamroller:RollingFileWriteStream");
  2. const fs = require("fs-extra");
  3. const path = require("path");
  4. const newNow = require("./now");
  5. const format = require("date-format");
  6. const { Writable } = require("stream");
  7. const fileNameFormatter = require("./fileNameFormatter");
  8. const fileNameParser = require("./fileNameParser");
  9. const moveAndMaybeCompressFile = require("./moveAndMaybeCompressFile");
  10. /**
  11. * RollingFileWriteStream is mainly used when writing to a file rolling by date or size.
  12. * RollingFileWriteStream inherits from stream.Writable
  13. */
  14. class RollingFileWriteStream extends Writable {
  15. /**
  16. * Create a RollingFileWriteStream
  17. * @constructor
  18. * @param {string} filePath - The file path to write.
  19. * @param {object} options - The extra options
  20. * @param {number} options.numToKeep - The max numbers of files to keep.
  21. * @param {number} options.maxSize - The maxSize one file can reach. Unit is Byte.
  22. * This should be more than 1024. The default is Number.MAX_SAFE_INTEGER.
  23. * @param {string} options.mode - The mode of the files. The default is '0644'. Refer to stream.writable for more.
  24. * @param {string} options.flags - The default is 'a'. Refer to stream.flags for more.
  25. * @param {boolean} options.compress - Whether to compress backup files.
  26. * @param {boolean} options.keepFileExt - Whether to keep the file extension.
  27. * @param {string} options.pattern - The date string pattern in the file name.
  28. * @param {boolean} options.alwaysIncludePattern - Whether to add date to the name of the first file.
  29. */
  30. constructor(filePath, options) {
  31. debug(`constructor: creating RollingFileWriteStream. path=${filePath}`);
  32. super(options);
  33. this.options = this._parseOption(options);
  34. this.fileObject = path.parse(filePath);
  35. if (this.fileObject.dir === "") {
  36. this.fileObject = path.parse(path.join(process.cwd(), filePath));
  37. }
  38. this.fileFormatter = fileNameFormatter({
  39. file: this.fileObject,
  40. alwaysIncludeDate: this.options.alwaysIncludePattern,
  41. needsIndex: this.options.maxSize < Number.MAX_SAFE_INTEGER,
  42. compress: this.options.compress,
  43. keepFileExt: this.options.keepFileExt
  44. });
  45. this.fileNameParser = fileNameParser({
  46. file: this.fileObject,
  47. keepFileExt: this.options.keepFileExt,
  48. pattern: this.options.pattern
  49. });
  50. this.state = {
  51. currentSize: 0
  52. };
  53. if (this.options.pattern) {
  54. this.state.currentDate = format(this.options.pattern, newNow());
  55. }
  56. this.filename = this.fileFormatter({
  57. index: 0,
  58. date: this.state.currentDate
  59. });
  60. if (["a", "a+", "as", "as+"].includes(this.options.flags)) {
  61. this._setExistingSizeAndDate();
  62. }
  63. debug(
  64. `constructor: create new file ${this.filename}, state=${JSON.stringify(
  65. this.state
  66. )}`
  67. );
  68. this._renewWriteStream();
  69. }
  70. _setExistingSizeAndDate() {
  71. try {
  72. const stats = fs.statSync(this.filename);
  73. this.state.currentSize = stats.size;
  74. if (this.options.pattern) {
  75. this.state.currentDate = format(this.options.pattern, stats.mtime);
  76. }
  77. } catch (e) {
  78. //file does not exist, that's fine - move along
  79. return;
  80. }
  81. }
  82. _parseOption(rawOptions) {
  83. const defaultOptions = {
  84. maxSize: Number.MAX_SAFE_INTEGER,
  85. numToKeep: Number.MAX_SAFE_INTEGER,
  86. encoding: "utf8",
  87. mode: parseInt("0644", 8),
  88. flags: "a",
  89. compress: false,
  90. keepFileExt: false,
  91. alwaysIncludePattern: false
  92. };
  93. const options = Object.assign({}, defaultOptions, rawOptions);
  94. if (options.maxSize <= 0) {
  95. throw new Error(`options.maxSize (${options.maxSize}) should be > 0`);
  96. }
  97. if (options.numToKeep <= 0) {
  98. throw new Error(`options.numToKeep (${options.numToKeep}) should be > 0`);
  99. }
  100. debug(
  101. `_parseOption: creating stream with option=${JSON.stringify(options)}`
  102. );
  103. return options;
  104. }
  105. _final(callback) {
  106. this.currentFileStream.end("", this.options.encoding, callback);
  107. }
  108. _write(chunk, encoding, callback) {
  109. this._shouldRoll().then(() => {
  110. debug(
  111. `_write: writing chunk. ` +
  112. `file=${this.currentFileStream.path} ` +
  113. `state=${JSON.stringify(this.state)} ` +
  114. `chunk=${chunk}`
  115. );
  116. this.currentFileStream.write(chunk, encoding, e => {
  117. this.state.currentSize += chunk.length;
  118. callback(e);
  119. });
  120. });
  121. }
  122. async _shouldRoll() {
  123. if (this._dateChanged() || this._tooBig()) {
  124. debug(
  125. `_shouldRoll: rolling because dateChanged? ${this._dateChanged()} or tooBig? ${this._tooBig()}`
  126. );
  127. await this._roll();
  128. }
  129. }
  130. _dateChanged() {
  131. return (
  132. this.state.currentDate &&
  133. this.state.currentDate !== format(this.options.pattern, newNow())
  134. );
  135. }
  136. _tooBig() {
  137. return this.state.currentSize >= this.options.maxSize;
  138. }
  139. _roll() {
  140. debug(`_roll: closing the current stream`);
  141. return new Promise((resolve, reject) => {
  142. this.currentFileStream.end("", this.options.encoding, () => {
  143. this._moveOldFiles()
  144. .then(resolve)
  145. .catch(reject);
  146. });
  147. });
  148. }
  149. async _moveOldFiles() {
  150. const files = await this._getExistingFiles();
  151. const todaysFiles = this.state.currentDate
  152. ? files.filter(f => f.date === this.state.currentDate)
  153. : files;
  154. for (let i = todaysFiles.length; i >= 0; i--) {
  155. debug(`_moveOldFiles: i = ${i}`);
  156. const sourceFilePath = this.fileFormatter({
  157. date: this.state.currentDate,
  158. index: i
  159. });
  160. const targetFilePath = this.fileFormatter({
  161. date: this.state.currentDate,
  162. index: i + 1
  163. });
  164. await moveAndMaybeCompressFile(
  165. sourceFilePath,
  166. targetFilePath,
  167. this.options.compress && i === 0
  168. );
  169. }
  170. this.state.currentSize = 0;
  171. this.state.currentDate = this.state.currentDate
  172. ? format(this.options.pattern, newNow())
  173. : null;
  174. debug(
  175. `_moveOldFiles: finished rolling files. state=${JSON.stringify(
  176. this.state
  177. )}`
  178. );
  179. this._renewWriteStream();
  180. // wait for the file to be open before cleaning up old ones,
  181. // otherwise the daysToKeep calculations can be off
  182. await new Promise((resolve, reject) => {
  183. this.currentFileStream.write("", "utf8", () => {
  184. this._clean()
  185. .then(resolve)
  186. .catch(reject);
  187. });
  188. });
  189. }
  190. // Sorted from the oldest to the latest
  191. async _getExistingFiles() {
  192. const files = await fs.readdir(this.fileObject.dir).catch(() => []);
  193. debug(`_getExistingFiles: files=${files}`);
  194. const existingFileDetails = files
  195. .map(n => this.fileNameParser(n))
  196. .filter(n => n);
  197. const getKey = n =>
  198. (n.timestamp ? n.timestamp : newNow().getTime()) - n.index;
  199. existingFileDetails.sort((a, b) => getKey(a) - getKey(b));
  200. return existingFileDetails;
  201. }
  202. _renewWriteStream() {
  203. fs.ensureDirSync(this.fileObject.dir);
  204. const filePath = this.fileFormatter({
  205. date: this.state.currentDate,
  206. index: 0
  207. });
  208. const ops = {
  209. flags: this.options.flags,
  210. encoding: this.options.encoding,
  211. mode: this.options.mode
  212. };
  213. this.currentFileStream = fs.createWriteStream(filePath, ops);
  214. this.currentFileStream.on("error", e => {
  215. this.emit("error", e);
  216. });
  217. }
  218. async _clean() {
  219. const existingFileDetails = await this._getExistingFiles();
  220. debug(
  221. `_clean: numToKeep = ${this.options.numToKeep}, existingFiles = ${existingFileDetails.length}`
  222. );
  223. debug("_clean: existing files are: ", existingFileDetails);
  224. if (this._tooManyFiles(existingFileDetails.length)) {
  225. const fileNamesToRemove = existingFileDetails
  226. .slice(0, existingFileDetails.length - this.options.numToKeep - 1)
  227. .map(f => path.format({ dir: this.fileObject.dir, base: f.filename }));
  228. await deleteFiles(fileNamesToRemove);
  229. }
  230. }
  231. _tooManyFiles(numFiles) {
  232. return this.options.numToKeep > 0 && numFiles > this.options.numToKeep;
  233. }
  234. }
  235. const deleteFiles = fileNames => {
  236. debug(`deleteFiles: files to delete: ${fileNames}`);
  237. return Promise.all(fileNames.map(f => fs.unlink(f).catch((e) => {
  238. debug(`deleteFiles: error when unlinking ${f}, ignoring. Error was ${e}`);
  239. })));
  240. };
  241. module.exports = RollingFileWriteStream;