Pool.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. "use strict";
  2. const EventEmitter = require("events").EventEmitter;
  3. const factoryValidator = require("./factoryValidator");
  4. const PoolOptions = require("./PoolOptions");
  5. const ResourceRequest = require("./ResourceRequest");
  6. const ResourceLoan = require("./ResourceLoan");
  7. const PooledResource = require("./PooledResource");
  8. const DefaultEvictor = require("./DefaultEvictor");
  9. const Deque = require("./Deque");
  10. const Deferred = require("./Deferred");
  11. const PriorityQueue = require("./PriorityQueue");
  12. const DequeIterator = require("./DequeIterator");
  13. const reflector = require("./utils").reflector;
  14. /**
  15. * TODO: move me
  16. */
  17. const FACTORY_CREATE_ERROR = "factoryCreateError";
  18. const FACTORY_DESTROY_ERROR = "factoryDestroyError";
  19. class Pool extends EventEmitter {
  20. /**
  21. * Generate an Object pool with a specified `factory` and `config`.
  22. *
  23. * @param {typeof DefaultEvictor} Evictor
  24. * @param {typeof Deque} Deque
  25. * @param {typeof PriorityQueue} PriorityQueue
  26. * @param {Object} factory
  27. * Factory to be used for generating and destroying the items.
  28. * @param {Function} factory.create
  29. * Should create the item to be acquired,
  30. * and call it's first callback argument with the generated item as it's argument.
  31. * @param {Function} factory.destroy
  32. * Should gently close any resources that the item is using.
  33. * Called before the items is destroyed.
  34. * @param {Function} factory.validate
  35. * Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false
  36. * If it should be removed from pool.
  37. * @param {Object} options
  38. */
  39. constructor(Evictor, Deque, PriorityQueue, factory, options) {
  40. super();
  41. factoryValidator(factory);
  42. this._config = new PoolOptions(options);
  43. // TODO: fix up this ugly glue-ing
  44. this._Promise = this._config.Promise;
  45. this._factory = factory;
  46. this._draining = false;
  47. this._started = false;
  48. /**
  49. * Holds waiting clients
  50. * @type {PriorityQueue}
  51. */
  52. this._waitingClientsQueue = new PriorityQueue(this._config.priorityRange);
  53. /**
  54. * Collection of promises for resource creation calls made by the pool to factory.create
  55. * @type {Set}
  56. */
  57. this._factoryCreateOperations = new Set();
  58. /**
  59. * Collection of promises for resource destruction calls made by the pool to factory.destroy
  60. * @type {Set}
  61. */
  62. this._factoryDestroyOperations = new Set();
  63. /**
  64. * A queue/stack of pooledResources awaiting acquisition
  65. * TODO: replace with LinkedList backed array
  66. * @type {Deque}
  67. */
  68. this._availableObjects = new Deque();
  69. /**
  70. * Collection of references for any resource that are undergoing validation before being acquired
  71. * @type {Set}
  72. */
  73. this._testOnBorrowResources = new Set();
  74. /**
  75. * Collection of references for any resource that are undergoing validation before being returned
  76. * @type {Set}
  77. */
  78. this._testOnReturnResources = new Set();
  79. /**
  80. * Collection of promises for any validations currently in process
  81. * @type {Set}
  82. */
  83. this._validationOperations = new Set();
  84. /**
  85. * All objects associated with this pool in any state (except destroyed)
  86. * @type {Set}
  87. */
  88. this._allObjects = new Set();
  89. /**
  90. * Loans keyed by the borrowed resource
  91. * @type {Map}
  92. */
  93. this._resourceLoans = new Map();
  94. /**
  95. * Infinitely looping iterator over available object
  96. * @type {DequeIterator}
  97. */
  98. this._evictionIterator = this._availableObjects.iterator();
  99. this._evictor = new Evictor();
  100. /**
  101. * handle for setTimeout for next eviction run
  102. * @type {(number|null)}
  103. */
  104. this._scheduledEviction = null;
  105. // create initial resources (if factory.min > 0)
  106. if (this._config.autostart === true) {
  107. this.start();
  108. }
  109. }
  110. _destroy(pooledResource) {
  111. // FIXME: do we need another state for "in destruction"?
  112. pooledResource.invalidate();
  113. this._allObjects.delete(pooledResource);
  114. // NOTE: this maybe very bad promise usage?
  115. const destroyPromise = this._factory.destroy(pooledResource.obj);
  116. const wrappedDestroyPromise = this._Promise.resolve(destroyPromise);
  117. this._trackOperation(
  118. wrappedDestroyPromise,
  119. this._factoryDestroyOperations
  120. ).catch(reason => {
  121. this.emit(FACTORY_DESTROY_ERROR, reason);
  122. });
  123. // TODO: maybe ensuring minimum pool size should live outside here
  124. this._ensureMinimum();
  125. }
  126. /**
  127. * Attempt to move an available resource into test and then onto a waiting client
  128. * @return {Boolean} could we move an available resource into test
  129. */
  130. _testOnBorrow() {
  131. if (this._availableObjects.length < 1) {
  132. return false;
  133. }
  134. const pooledResource = this._availableObjects.shift();
  135. // Mark the resource as in test
  136. pooledResource.test();
  137. this._testOnBorrowResources.add(pooledResource);
  138. const validationPromise = this._factory.validate(pooledResource.obj);
  139. const wrappedValidationPromise = this._Promise.resolve(validationPromise);
  140. this._trackOperation(
  141. wrappedValidationPromise,
  142. this._validationOperations
  143. ).then(isValid => {
  144. this._testOnBorrowResources.delete(pooledResource);
  145. if (isValid === false) {
  146. pooledResource.invalidate();
  147. this._destroy(pooledResource);
  148. this._dispense();
  149. return;
  150. }
  151. this._dispatchPooledResourceToNextWaitingClient(pooledResource);
  152. });
  153. return true;
  154. }
  155. /**
  156. * Attempt to move an available resource to a waiting client
  157. * @return {Boolean} [description]
  158. */
  159. _dispatchResource() {
  160. if (this._availableObjects.length < 1) {
  161. return false;
  162. }
  163. const pooledResource = this._availableObjects.shift();
  164. this._dispatchPooledResourceToNextWaitingClient(pooledResource);
  165. return false;
  166. }
  167. /**
  168. * Attempt to resolve an outstanding resource request using an available resource from
  169. * the pool, or creating new ones
  170. *
  171. * @private
  172. */
  173. _dispense() {
  174. /**
  175. * Local variables for ease of reading/writing
  176. * these don't (shouldn't) change across the execution of this fn
  177. */
  178. const numWaitingClients = this._waitingClientsQueue.length;
  179. // If there aren't any waiting requests then there is nothing to do
  180. // so lets short-circuit
  181. if (numWaitingClients < 1) {
  182. return;
  183. }
  184. const resourceShortfall =
  185. numWaitingClients - this._potentiallyAllocableResourceCount;
  186. const actualNumberOfResourcesToCreate = Math.min(
  187. this.spareResourceCapacity,
  188. resourceShortfall
  189. );
  190. for (let i = 0; actualNumberOfResourcesToCreate > i; i++) {
  191. this._createResource();
  192. }
  193. // If we are doing test-on-borrow see how many more resources need to be moved into test
  194. // to help satisfy waitingClients
  195. if (this._config.testOnBorrow === true) {
  196. // how many available resources do we need to shift into test
  197. const desiredNumberOfResourcesToMoveIntoTest =
  198. numWaitingClients - this._testOnBorrowResources.size;
  199. const actualNumberOfResourcesToMoveIntoTest = Math.min(
  200. this._availableObjects.length,
  201. desiredNumberOfResourcesToMoveIntoTest
  202. );
  203. for (let i = 0; actualNumberOfResourcesToMoveIntoTest > i; i++) {
  204. this._testOnBorrow();
  205. }
  206. }
  207. // if we aren't testing-on-borrow then lets try to allocate what we can
  208. if (this._config.testOnBorrow === false) {
  209. const actualNumberOfResourcesToDispatch = Math.min(
  210. this._availableObjects.length,
  211. numWaitingClients
  212. );
  213. for (let i = 0; actualNumberOfResourcesToDispatch > i; i++) {
  214. this._dispatchResource();
  215. }
  216. }
  217. }
  218. /**
  219. * Dispatches a pooledResource to the next waiting client (if any) else
  220. * puts the PooledResource back on the available list
  221. * @param {PooledResource} pooledResource [description]
  222. * @return {Boolean} [description]
  223. */
  224. _dispatchPooledResourceToNextWaitingClient(pooledResource) {
  225. const clientResourceRequest = this._waitingClientsQueue.dequeue();
  226. if (
  227. clientResourceRequest === undefined ||
  228. clientResourceRequest.state !== Deferred.PENDING
  229. ) {
  230. // While we were away either all the waiting clients timed out
  231. // or were somehow fulfilled. put our pooledResource back.
  232. this._addPooledResourceToAvailableObjects(pooledResource);
  233. // TODO: do need to trigger anything before we leave?
  234. return false;
  235. }
  236. const loan = new ResourceLoan(pooledResource, this._Promise);
  237. this._resourceLoans.set(pooledResource.obj, loan);
  238. pooledResource.allocate();
  239. clientResourceRequest.resolve(pooledResource.obj);
  240. return true;
  241. }
  242. /**
  243. * tracks on operation using given set
  244. * handles adding/removing from the set and resolve/rejects the value/reason
  245. * @param {Promise} operation
  246. * @param {Set} set Set holding operations
  247. * @return {Promise} Promise that resolves once operation has been removed from set
  248. */
  249. _trackOperation(operation, set) {
  250. set.add(operation);
  251. return operation.then(
  252. v => {
  253. set.delete(operation);
  254. return this._Promise.resolve(v);
  255. },
  256. e => {
  257. set.delete(operation);
  258. return this._Promise.reject(e);
  259. }
  260. );
  261. }
  262. /**
  263. * @private
  264. */
  265. _createResource() {
  266. // An attempt to create a resource
  267. const factoryPromise = this._factory.create();
  268. const wrappedFactoryPromise = this._Promise
  269. .resolve(factoryPromise)
  270. .then(resource => {
  271. const pooledResource = new PooledResource(resource);
  272. this._allObjects.add(pooledResource);
  273. this._addPooledResourceToAvailableObjects(pooledResource);
  274. });
  275. this._trackOperation(wrappedFactoryPromise, this._factoryCreateOperations)
  276. .then(() => {
  277. this._dispense();
  278. // Stop bluebird complaining about this side-effect only handler
  279. // - a promise was created in a handler but was not returned from it
  280. // https://goo.gl/rRqMUw
  281. return null;
  282. })
  283. .catch(reason => {
  284. this.emit(FACTORY_CREATE_ERROR, reason);
  285. this._dispense();
  286. });
  287. }
  288. /**
  289. * @private
  290. */
  291. _ensureMinimum() {
  292. if (this._draining === true) {
  293. return;
  294. }
  295. const minShortfall = this._config.min - this._count;
  296. for (let i = 0; i < minShortfall; i++) {
  297. this._createResource();
  298. }
  299. }
  300. _evict() {
  301. const testsToRun = Math.min(
  302. this._config.numTestsPerEvictionRun,
  303. this._availableObjects.length
  304. );
  305. const evictionConfig = {
  306. softIdleTimeoutMillis: this._config.softIdleTimeoutMillis,
  307. idleTimeoutMillis: this._config.idleTimeoutMillis,
  308. min: this._config.min
  309. };
  310. for (let testsHaveRun = 0; testsHaveRun < testsToRun; ) {
  311. const iterationResult = this._evictionIterator.next();
  312. // Safety check incase we could get stuck in infinite loop because we
  313. // somehow emptied the array after chekcing it's length
  314. if (iterationResult.done === true && this._availableObjects.length < 1) {
  315. this._evictionIterator.reset();
  316. return;
  317. }
  318. // if this happens it should just mean we reached the end of the
  319. // list and can reset the cursor.
  320. if (iterationResult.done === true && this._availableObjects.length > 0) {
  321. this._evictionIterator.reset();
  322. continue;
  323. }
  324. const resource = iterationResult.value;
  325. const shouldEvict = this._evictor.evict(
  326. evictionConfig,
  327. resource,
  328. this._availableObjects.length
  329. );
  330. testsHaveRun++;
  331. if (shouldEvict === true) {
  332. // take it out of the _availableObjects list
  333. this._evictionIterator.remove();
  334. this._destroy(resource);
  335. }
  336. }
  337. }
  338. _scheduleEvictorRun() {
  339. // Start eviction if set
  340. if (this._config.evictionRunIntervalMillis > 0) {
  341. // @ts-ignore
  342. this._scheduledEviction = setTimeout(() => {
  343. this._evict();
  344. this._scheduleEvictorRun();
  345. }, this._config.evictionRunIntervalMillis);
  346. }
  347. }
  348. _descheduleEvictorRun() {
  349. if (this._scheduledEviction) {
  350. clearTimeout(this._scheduledEviction);
  351. }
  352. this._scheduledEviction = null;
  353. }
  354. start() {
  355. if (this._draining === true) {
  356. return;
  357. }
  358. if (this._started === true) {
  359. return;
  360. }
  361. this._started = true;
  362. this._scheduleEvictorRun();
  363. this._ensureMinimum();
  364. }
  365. /**
  366. * Request a new resource. The callback will be called,
  367. * when a new resource is available, passing the resource to the callback.
  368. * TODO: should we add a seperate "acquireWithPriority" function
  369. *
  370. * @param {Number} [priority=0]
  371. * Optional. Integer between 0 and (priorityRange - 1). Specifies the priority
  372. * of the caller if there are no available resources. Lower numbers mean higher
  373. * priority.
  374. *
  375. * @returns {Promise}
  376. */
  377. acquire(priority) {
  378. if (this._started === false && this._config.autostart === false) {
  379. this.start();
  380. }
  381. if (this._draining) {
  382. return this._Promise.reject(
  383. new Error("pool is draining and cannot accept work")
  384. );
  385. }
  386. // TODO: should we defer this check till after this event loop incase "the situation" changes in the meantime
  387. if (
  388. this.spareResourceCapacity < 1 &&
  389. this._availableObjects.length < 1 &&
  390. this._config.maxWaitingClients !== undefined &&
  391. this._waitingClientsQueue.length >= this._config.maxWaitingClients
  392. ) {
  393. return this._Promise.reject(
  394. new Error("max waitingClients count exceeded")
  395. );
  396. }
  397. const resourceRequest = new ResourceRequest(
  398. this._config.acquireTimeoutMillis,
  399. this._Promise
  400. );
  401. this._waitingClientsQueue.enqueue(resourceRequest, priority);
  402. this._dispense();
  403. return resourceRequest.promise;
  404. }
  405. /**
  406. * [use method, aquires a resource, passes the resource to a user supplied function and releases it]
  407. * @param {Function} fn [a function that accepts a resource and returns a promise that resolves/rejects once it has finished using the resource]
  408. * @return {Promise} [resolves once the resource is released to the pool]
  409. */
  410. use(fn) {
  411. return this.acquire().then(resource => {
  412. return fn(resource).then(
  413. result => {
  414. this.release(resource);
  415. return result;
  416. },
  417. err => {
  418. this.release(resource);
  419. throw err;
  420. }
  421. );
  422. });
  423. }
  424. /**
  425. * Check if resource is currently on loan from the pool
  426. *
  427. * @param {Function} resource
  428. * Resource for checking.
  429. *
  430. * @returns {Boolean}
  431. * True if resource belongs to this pool and false otherwise
  432. */
  433. isBorrowedResource(resource) {
  434. return this._resourceLoans.has(resource);
  435. }
  436. /**
  437. * Return the resource to the pool when it is no longer required.
  438. *
  439. * @param {Object} resource
  440. * The acquired object to be put back to the pool.
  441. */
  442. release(resource) {
  443. // check for an outstanding loan
  444. const loan = this._resourceLoans.get(resource);
  445. if (loan === undefined) {
  446. return this._Promise.reject(
  447. new Error("Resource not currently part of this pool")
  448. );
  449. }
  450. this._resourceLoans.delete(resource);
  451. loan.resolve();
  452. const pooledResource = loan.pooledResource;
  453. pooledResource.deallocate();
  454. this._addPooledResourceToAvailableObjects(pooledResource);
  455. this._dispense();
  456. return this._Promise.resolve();
  457. }
  458. /**
  459. * Request the resource to be destroyed. The factory's destroy handler
  460. * will also be called.
  461. *
  462. * This should be called within an acquire() block as an alternative to release().
  463. *
  464. * @param {Object} resource
  465. * The acquired resource to be destoyed.
  466. */
  467. destroy(resource) {
  468. // check for an outstanding loan
  469. const loan = this._resourceLoans.get(resource);
  470. if (loan === undefined) {
  471. return this._Promise.reject(
  472. new Error("Resource not currently part of this pool")
  473. );
  474. }
  475. this._resourceLoans.delete(resource);
  476. loan.resolve();
  477. const pooledResource = loan.pooledResource;
  478. pooledResource.deallocate();
  479. this._destroy(pooledResource);
  480. this._dispense();
  481. return this._Promise.resolve();
  482. }
  483. _addPooledResourceToAvailableObjects(pooledResource) {
  484. pooledResource.idle();
  485. if (this._config.fifo === true) {
  486. this._availableObjects.push(pooledResource);
  487. } else {
  488. this._availableObjects.unshift(pooledResource);
  489. }
  490. }
  491. /**
  492. * Disallow any new acquire calls and let the request backlog dissapate.
  493. * The Pool will no longer attempt to maintain a "min" number of resources
  494. * and will only make new resources on demand.
  495. * Resolves once all resource requests are fulfilled and all resources are returned to pool and available...
  496. * Should probably be called "drain work"
  497. * @returns {Promise}
  498. */
  499. drain() {
  500. this._draining = true;
  501. return this.__allResourceRequestsSettled()
  502. .then(() => {
  503. return this.__allResourcesReturned();
  504. })
  505. .then(() => {
  506. this._descheduleEvictorRun();
  507. });
  508. }
  509. __allResourceRequestsSettled() {
  510. if (this._waitingClientsQueue.length > 0) {
  511. // wait for last waiting client to be settled
  512. // FIXME: what if they can "resolve" out of order....?
  513. return reflector(this._waitingClientsQueue.tail.promise);
  514. }
  515. return this._Promise.resolve();
  516. }
  517. // FIXME: this is a horrific mess
  518. __allResourcesReturned() {
  519. const ps = Array.from(this._resourceLoans.values())
  520. .map(loan => loan.promise)
  521. .map(reflector);
  522. return this._Promise.all(ps);
  523. }
  524. /**
  525. * Forcibly destroys all available resources regardless of timeout. Intended to be
  526. * invoked as part of a drain. Does not prevent the creation of new
  527. * resources as a result of subsequent calls to acquire.
  528. *
  529. * Note that if factory.min > 0 and the pool isn't "draining", the pool will destroy all idle resources
  530. * in the pool, but replace them with newly created resources up to the
  531. * specified factory.min value. If this is not desired, set factory.min
  532. * to zero before calling clear()
  533. *
  534. */
  535. clear() {
  536. const reflectedCreatePromises = Array.from(
  537. this._factoryCreateOperations
  538. ).map(reflector);
  539. // wait for outstanding factory.create to complete
  540. return this._Promise.all(reflectedCreatePromises).then(() => {
  541. // Destroy existing resources
  542. // @ts-ignore
  543. for (const resource of this._availableObjects) {
  544. this._destroy(resource);
  545. }
  546. const reflectedDestroyPromises = Array.from(
  547. this._factoryDestroyOperations
  548. ).map(reflector);
  549. return reflector(this._Promise.all(reflectedDestroyPromises));
  550. });
  551. }
  552. /**
  553. * How many resources are available to allocated
  554. * (includes resources that have not been tested and may faul validation)
  555. * NOTE: internal for now as the name is awful and might not be useful to anyone
  556. * @return {Number} number of resources the pool has to allocate
  557. */
  558. get _potentiallyAllocableResourceCount() {
  559. return (
  560. this._availableObjects.length +
  561. this._testOnBorrowResources.size +
  562. this._testOnReturnResources.size +
  563. this._factoryCreateOperations.size
  564. );
  565. }
  566. /**
  567. * The combined count of the currently created objects and those in the
  568. * process of being created
  569. * Does NOT include resources in the process of being destroyed
  570. * sort of legacy...
  571. * @return {Number}
  572. */
  573. get _count() {
  574. return this._allObjects.size + this._factoryCreateOperations.size;
  575. }
  576. /**
  577. * How many more resources does the pool have room for
  578. * @return {Number} number of resources the pool could create before hitting any limits
  579. */
  580. get spareResourceCapacity() {
  581. return (
  582. this._config.max -
  583. (this._allObjects.size + this._factoryCreateOperations.size)
  584. );
  585. }
  586. /**
  587. * see _count above
  588. * @return {Number} [description]
  589. */
  590. get size() {
  591. return this._count;
  592. }
  593. /**
  594. * number of available resources
  595. * @return {Number} [description]
  596. */
  597. get available() {
  598. return this._availableObjects.length;
  599. }
  600. /**
  601. * number of resources that are currently acquired
  602. * @return {Number} [description]
  603. */
  604. get borrowed() {
  605. return this._resourceLoans.size;
  606. }
  607. /**
  608. * number of waiting acquire calls
  609. * @return {Number} [description]
  610. */
  611. get pending() {
  612. return this._waitingClientsQueue.length;
  613. }
  614. /**
  615. * maximum size of the pool
  616. * @return {Number} [description]
  617. */
  618. get max() {
  619. return this._config.max;
  620. }
  621. /**
  622. * minimum size of the pool
  623. * @return {Number} [description]
  624. */
  625. get min() {
  626. return this._config.min;
  627. }
  628. }
  629. module.exports = Pool;