|
- "use strict";
- const EventEmitter = require("events").EventEmitter;
- const factoryValidator = require("./factoryValidator");
- const PoolOptions = require("./PoolOptions");
- const ResourceRequest = require("./ResourceRequest");
- const ResourceLoan = require("./ResourceLoan");
- const PooledResource = require("./PooledResource");
- const DefaultEvictor = require("./DefaultEvictor");
- const Deque = require("./Deque");
- const Deferred = require("./Deferred");
- const PriorityQueue = require("./PriorityQueue");
- const DequeIterator = require("./DequeIterator");
- const reflector = require("./utils").reflector;
- /**
- * TODO: move me
- */
- const FACTORY_CREATE_ERROR = "factoryCreateError";
- const FACTORY_DESTROY_ERROR = "factoryDestroyError";
- class Pool extends EventEmitter {
- /**
- * Generate an Object pool with a specified `factory` and `config`.
- *
- * @param {typeof DefaultEvictor} Evictor
- * @param {typeof Deque} Deque
- * @param {typeof PriorityQueue} PriorityQueue
- * @param {Object} factory
- * Factory to be used for generating and destroying the items.
- * @param {Function} factory.create
- * Should create the item to be acquired,
- * and call it's first callback argument with the generated item as it's argument.
- * @param {Function} factory.destroy
- * Should gently close any resources that the item is using.
- * Called before the items is destroyed.
- * @param {Function} factory.validate
- * Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false
- * If it should be removed from pool.
- * @param {Object} options
- */
- constructor(Evictor, Deque, PriorityQueue, factory, options) {
- super();
- factoryValidator(factory);
- this._config = new PoolOptions(options);
- // TODO: fix up this ugly glue-ing
- this._Promise = this._config.Promise;
- this._factory = factory;
- this._draining = false;
- this._started = false;
- /**
- * Holds waiting clients
- * @type {PriorityQueue}
- */
- this._waitingClientsQueue = new PriorityQueue(this._config.priorityRange);
- /**
- * Collection of promises for resource creation calls made by the pool to factory.create
- * @type {Set}
- */
- this._factoryCreateOperations = new Set();
- /**
- * Collection of promises for resource destruction calls made by the pool to factory.destroy
- * @type {Set}
- */
- this._factoryDestroyOperations = new Set();
- /**
- * A queue/stack of pooledResources awaiting acquisition
- * TODO: replace with LinkedList backed array
- * @type {Deque}
- */
- this._availableObjects = new Deque();
- /**
- * Collection of references for any resource that are undergoing validation before being acquired
- * @type {Set}
- */
- this._testOnBorrowResources = new Set();
- /**
- * Collection of references for any resource that are undergoing validation before being returned
- * @type {Set}
- */
- this._testOnReturnResources = new Set();
- /**
- * Collection of promises for any validations currently in process
- * @type {Set}
- */
- this._validationOperations = new Set();
- /**
- * All objects associated with this pool in any state (except destroyed)
- * @type {Set}
- */
- this._allObjects = new Set();
- /**
- * Loans keyed by the borrowed resource
- * @type {Map}
- */
- this._resourceLoans = new Map();
- /**
- * Infinitely looping iterator over available object
- * @type {DequeIterator}
- */
- this._evictionIterator = this._availableObjects.iterator();
- this._evictor = new Evictor();
- /**
- * handle for setTimeout for next eviction run
- * @type {(number|null)}
- */
- this._scheduledEviction = null;
- // create initial resources (if factory.min > 0)
- if (this._config.autostart === true) {
- this.start();
- }
- }
- _destroy(pooledResource) {
- // FIXME: do we need another state for "in destruction"?
- pooledResource.invalidate();
- this._allObjects.delete(pooledResource);
- // NOTE: this maybe very bad promise usage?
- const destroyPromise = this._factory.destroy(pooledResource.obj);
- const wrappedDestroyPromise = this._Promise.resolve(destroyPromise);
- this._trackOperation(
- wrappedDestroyPromise,
- this._factoryDestroyOperations
- ).catch(reason => {
- this.emit(FACTORY_DESTROY_ERROR, reason);
- });
- // TODO: maybe ensuring minimum pool size should live outside here
- this._ensureMinimum();
- }
- /**
- * Attempt to move an available resource into test and then onto a waiting client
- * @return {Boolean} could we move an available resource into test
- */
- _testOnBorrow() {
- if (this._availableObjects.length < 1) {
- return false;
- }
- const pooledResource = this._availableObjects.shift();
- // Mark the resource as in test
- pooledResource.test();
- this._testOnBorrowResources.add(pooledResource);
- const validationPromise = this._factory.validate(pooledResource.obj);
- const wrappedValidationPromise = this._Promise.resolve(validationPromise);
- this._trackOperation(
- wrappedValidationPromise,
- this._validationOperations
- ).then(isValid => {
- this._testOnBorrowResources.delete(pooledResource);
- if (isValid === false) {
- pooledResource.invalidate();
- this._destroy(pooledResource);
- this._dispense();
- return;
- }
- this._dispatchPooledResourceToNextWaitingClient(pooledResource);
- });
- return true;
- }
- /**
- * Attempt to move an available resource to a waiting client
- * @return {Boolean} [description]
- */
- _dispatchResource() {
- if (this._availableObjects.length < 1) {
- return false;
- }
- const pooledResource = this._availableObjects.shift();
- this._dispatchPooledResourceToNextWaitingClient(pooledResource);
- return false;
- }
- /**
- * Attempt to resolve an outstanding resource request using an available resource from
- * the pool, or creating new ones
- *
- * @private
- */
- _dispense() {
- /**
- * Local variables for ease of reading/writing
- * these don't (shouldn't) change across the execution of this fn
- */
- const numWaitingClients = this._waitingClientsQueue.length;
- // If there aren't any waiting requests then there is nothing to do
- // so lets short-circuit
- if (numWaitingClients < 1) {
- return;
- }
- const resourceShortfall =
- numWaitingClients - this._potentiallyAllocableResourceCount;
- const actualNumberOfResourcesToCreate = Math.min(
- this.spareResourceCapacity,
- resourceShortfall
- );
- for (let i = 0; actualNumberOfResourcesToCreate > i; i++) {
- this._createResource();
- }
- // If we are doing test-on-borrow see how many more resources need to be moved into test
- // to help satisfy waitingClients
- if (this._config.testOnBorrow === true) {
- // how many available resources do we need to shift into test
- const desiredNumberOfResourcesToMoveIntoTest =
- numWaitingClients - this._testOnBorrowResources.size;
- const actualNumberOfResourcesToMoveIntoTest = Math.min(
- this._availableObjects.length,
- desiredNumberOfResourcesToMoveIntoTest
- );
- for (let i = 0; actualNumberOfResourcesToMoveIntoTest > i; i++) {
- this._testOnBorrow();
- }
- }
- // if we aren't testing-on-borrow then lets try to allocate what we can
- if (this._config.testOnBorrow === false) {
- const actualNumberOfResourcesToDispatch = Math.min(
- this._availableObjects.length,
- numWaitingClients
- );
- for (let i = 0; actualNumberOfResourcesToDispatch > i; i++) {
- this._dispatchResource();
- }
- }
- }
- /**
- * Dispatches a pooledResource to the next waiting client (if any) else
- * puts the PooledResource back on the available list
- * @param {PooledResource} pooledResource [description]
- * @return {Boolean} [description]
- */
- _dispatchPooledResourceToNextWaitingClient(pooledResource) {
- const clientResourceRequest = this._waitingClientsQueue.dequeue();
- if (
- clientResourceRequest === undefined ||
- clientResourceRequest.state !== Deferred.PENDING
- ) {
- // While we were away either all the waiting clients timed out
- // or were somehow fulfilled. put our pooledResource back.
- this._addPooledResourceToAvailableObjects(pooledResource);
- // TODO: do need to trigger anything before we leave?
- return false;
- }
- const loan = new ResourceLoan(pooledResource, this._Promise);
- this._resourceLoans.set(pooledResource.obj, loan);
- pooledResource.allocate();
- clientResourceRequest.resolve(pooledResource.obj);
- return true;
- }
- /**
- * tracks on operation using given set
- * handles adding/removing from the set and resolve/rejects the value/reason
- * @param {Promise} operation
- * @param {Set} set Set holding operations
- * @return {Promise} Promise that resolves once operation has been removed from set
- */
- _trackOperation(operation, set) {
- set.add(operation);
- return operation.then(
- v => {
- set.delete(operation);
- return this._Promise.resolve(v);
- },
- e => {
- set.delete(operation);
- return this._Promise.reject(e);
- }
- );
- }
- /**
- * @private
- */
- _createResource() {
- // An attempt to create a resource
- const factoryPromise = this._factory.create();
- const wrappedFactoryPromise = this._Promise
- .resolve(factoryPromise)
- .then(resource => {
- const pooledResource = new PooledResource(resource);
- this._allObjects.add(pooledResource);
- this._addPooledResourceToAvailableObjects(pooledResource);
- });
- this._trackOperation(wrappedFactoryPromise, this._factoryCreateOperations)
- .then(() => {
- this._dispense();
- // Stop bluebird complaining about this side-effect only handler
- // - a promise was created in a handler but was not returned from it
- // https://goo.gl/rRqMUw
- return null;
- })
- .catch(reason => {
- this.emit(FACTORY_CREATE_ERROR, reason);
- this._dispense();
- });
- }
- /**
- * @private
- */
- _ensureMinimum() {
- if (this._draining === true) {
- return;
- }
- const minShortfall = this._config.min - this._count;
- for (let i = 0; i < minShortfall; i++) {
- this._createResource();
- }
- }
- _evict() {
- const testsToRun = Math.min(
- this._config.numTestsPerEvictionRun,
- this._availableObjects.length
- );
- const evictionConfig = {
- softIdleTimeoutMillis: this._config.softIdleTimeoutMillis,
- idleTimeoutMillis: this._config.idleTimeoutMillis,
- min: this._config.min
- };
- for (let testsHaveRun = 0; testsHaveRun < testsToRun; ) {
- const iterationResult = this._evictionIterator.next();
- // Safety check incase we could get stuck in infinite loop because we
- // somehow emptied the array after chekcing it's length
- if (iterationResult.done === true && this._availableObjects.length < 1) {
- this._evictionIterator.reset();
- return;
- }
- // if this happens it should just mean we reached the end of the
- // list and can reset the cursor.
- if (iterationResult.done === true && this._availableObjects.length > 0) {
- this._evictionIterator.reset();
- continue;
- }
- const resource = iterationResult.value;
- const shouldEvict = this._evictor.evict(
- evictionConfig,
- resource,
- this._availableObjects.length
- );
- testsHaveRun++;
- if (shouldEvict === true) {
- // take it out of the _availableObjects list
- this._evictionIterator.remove();
- this._destroy(resource);
- }
- }
- }
- _scheduleEvictorRun() {
- // Start eviction if set
- if (this._config.evictionRunIntervalMillis > 0) {
- // @ts-ignore
- this._scheduledEviction = setTimeout(() => {
- this._evict();
- this._scheduleEvictorRun();
- }, this._config.evictionRunIntervalMillis);
- }
- }
- _descheduleEvictorRun() {
- if (this._scheduledEviction) {
- clearTimeout(this._scheduledEviction);
- }
- this._scheduledEviction = null;
- }
- start() {
- if (this._draining === true) {
- return;
- }
- if (this._started === true) {
- return;
- }
- this._started = true;
- this._scheduleEvictorRun();
- this._ensureMinimum();
- }
- /**
- * Request a new resource. The callback will be called,
- * when a new resource is available, passing the resource to the callback.
- * TODO: should we add a seperate "acquireWithPriority" function
- *
- * @param {Number} [priority=0]
- * Optional. Integer between 0 and (priorityRange - 1). Specifies the priority
- * of the caller if there are no available resources. Lower numbers mean higher
- * priority.
- *
- * @returns {Promise}
- */
- acquire(priority) {
- if (this._started === false && this._config.autostart === false) {
- this.start();
- }
- if (this._draining) {
- return this._Promise.reject(
- new Error("pool is draining and cannot accept work")
- );
- }
- // TODO: should we defer this check till after this event loop incase "the situation" changes in the meantime
- if (
- this.spareResourceCapacity < 1 &&
- this._availableObjects.length < 1 &&
- this._config.maxWaitingClients !== undefined &&
- this._waitingClientsQueue.length >= this._config.maxWaitingClients
- ) {
- return this._Promise.reject(
- new Error("max waitingClients count exceeded")
- );
- }
- const resourceRequest = new ResourceRequest(
- this._config.acquireTimeoutMillis,
- this._Promise
- );
- this._waitingClientsQueue.enqueue(resourceRequest, priority);
- this._dispense();
- return resourceRequest.promise;
- }
- /**
- * [use method, aquires a resource, passes the resource to a user supplied function and releases it]
- * @param {Function} fn [a function that accepts a resource and returns a promise that resolves/rejects once it has finished using the resource]
- * @return {Promise} [resolves once the resource is released to the pool]
- */
- use(fn) {
- return this.acquire().then(resource => {
- return fn(resource).then(
- result => {
- this.release(resource);
- return result;
- },
- err => {
- this.release(resource);
- throw err;
- }
- );
- });
- }
- /**
- * Check if resource is currently on loan from the pool
- *
- * @param {Function} resource
- * Resource for checking.
- *
- * @returns {Boolean}
- * True if resource belongs to this pool and false otherwise
- */
- isBorrowedResource(resource) {
- return this._resourceLoans.has(resource);
- }
- /**
- * Return the resource to the pool when it is no longer required.
- *
- * @param {Object} resource
- * The acquired object to be put back to the pool.
- */
- release(resource) {
- // check for an outstanding loan
- const loan = this._resourceLoans.get(resource);
- if (loan === undefined) {
- return this._Promise.reject(
- new Error("Resource not currently part of this pool")
- );
- }
- this._resourceLoans.delete(resource);
- loan.resolve();
- const pooledResource = loan.pooledResource;
- pooledResource.deallocate();
- this._addPooledResourceToAvailableObjects(pooledResource);
- this._dispense();
- return this._Promise.resolve();
- }
- /**
- * Request the resource to be destroyed. The factory's destroy handler
- * will also be called.
- *
- * This should be called within an acquire() block as an alternative to release().
- *
- * @param {Object} resource
- * The acquired resource to be destoyed.
- */
- destroy(resource) {
- // check for an outstanding loan
- const loan = this._resourceLoans.get(resource);
- if (loan === undefined) {
- return this._Promise.reject(
- new Error("Resource not currently part of this pool")
- );
- }
- this._resourceLoans.delete(resource);
- loan.resolve();
- const pooledResource = loan.pooledResource;
- pooledResource.deallocate();
- this._destroy(pooledResource);
- this._dispense();
- return this._Promise.resolve();
- }
- _addPooledResourceToAvailableObjects(pooledResource) {
- pooledResource.idle();
- if (this._config.fifo === true) {
- this._availableObjects.push(pooledResource);
- } else {
- this._availableObjects.unshift(pooledResource);
- }
- }
- /**
- * Disallow any new acquire calls and let the request backlog dissapate.
- * The Pool will no longer attempt to maintain a "min" number of resources
- * and will only make new resources on demand.
- * Resolves once all resource requests are fulfilled and all resources are returned to pool and available...
- * Should probably be called "drain work"
- * @returns {Promise}
- */
- drain() {
- this._draining = true;
- return this.__allResourceRequestsSettled()
- .then(() => {
- return this.__allResourcesReturned();
- })
- .then(() => {
- this._descheduleEvictorRun();
- });
- }
- __allResourceRequestsSettled() {
- if (this._waitingClientsQueue.length > 0) {
- // wait for last waiting client to be settled
- // FIXME: what if they can "resolve" out of order....?
- return reflector(this._waitingClientsQueue.tail.promise);
- }
- return this._Promise.resolve();
- }
- // FIXME: this is a horrific mess
- __allResourcesReturned() {
- const ps = Array.from(this._resourceLoans.values())
- .map(loan => loan.promise)
- .map(reflector);
- return this._Promise.all(ps);
- }
- /**
- * Forcibly destroys all available resources regardless of timeout. Intended to be
- * invoked as part of a drain. Does not prevent the creation of new
- * resources as a result of subsequent calls to acquire.
- *
- * Note that if factory.min > 0 and the pool isn't "draining", the pool will destroy all idle resources
- * in the pool, but replace them with newly created resources up to the
- * specified factory.min value. If this is not desired, set factory.min
- * to zero before calling clear()
- *
- */
- clear() {
- const reflectedCreatePromises = Array.from(
- this._factoryCreateOperations
- ).map(reflector);
- // wait for outstanding factory.create to complete
- return this._Promise.all(reflectedCreatePromises).then(() => {
- // Destroy existing resources
- // @ts-ignore
- for (const resource of this._availableObjects) {
- this._destroy(resource);
- }
- const reflectedDestroyPromises = Array.from(
- this._factoryDestroyOperations
- ).map(reflector);
- return reflector(this._Promise.all(reflectedDestroyPromises));
- });
- }
- /**
- * How many resources are available to allocated
- * (includes resources that have not been tested and may faul validation)
- * NOTE: internal for now as the name is awful and might not be useful to anyone
- * @return {Number} number of resources the pool has to allocate
- */
- get _potentiallyAllocableResourceCount() {
- return (
- this._availableObjects.length +
- this._testOnBorrowResources.size +
- this._testOnReturnResources.size +
- this._factoryCreateOperations.size
- );
- }
- /**
- * The combined count of the currently created objects and those in the
- * process of being created
- * Does NOT include resources in the process of being destroyed
- * sort of legacy...
- * @return {Number}
- */
- get _count() {
- return this._allObjects.size + this._factoryCreateOperations.size;
- }
- /**
- * How many more resources does the pool have room for
- * @return {Number} number of resources the pool could create before hitting any limits
- */
- get spareResourceCapacity() {
- return (
- this._config.max -
- (this._allObjects.size + this._factoryCreateOperations.size)
- );
- }
- /**
- * see _count above
- * @return {Number} [description]
- */
- get size() {
- return this._count;
- }
- /**
- * number of available resources
- * @return {Number} [description]
- */
- get available() {
- return this._availableObjects.length;
- }
- /**
- * number of resources that are currently acquired
- * @return {Number} [description]
- */
- get borrowed() {
- return this._resourceLoans.size;
- }
- /**
- * number of waiting acquire calls
- * @return {Number} [description]
- */
- get pending() {
- return this._waitingClientsQueue.length;
- }
- /**
- * maximum size of the pool
- * @return {Number} [description]
- */
- get max() {
- return this._config.max;
- }
- /**
- * minimum size of the pool
- * @return {Number} [description]
- */
- get min() {
- return this._config.min;
- }
- }
- module.exports = Pool;
|