/** * @class * @private */ function PriorityQueue (size) { if (!(this instanceof PriorityQueue)) { return new PriorityQueue() } this._size = size this._slots = null this._total = null // initialize arrays to hold queue elements size = Math.max(+size | 0, 1) this._slots = [] for (var i = 0; i < size; i += 1) { this._slots.push([]) } } PriorityQueue.prototype.size = function size () { if (this._total === null) { this._total = 0 for (var i = 0; i < this._size; i += 1) { this._total += this._slots[i].length } } return this._total } PriorityQueue.prototype.enqueue = function enqueue (obj, priority) { var priorityOrig // Convert to integer with a default value of 0. priority = priority && +priority | 0 || 0 // Clear cache for total. this._total = null if (priority) { priorityOrig = priority if (priority < 0 || priority >= this._size) { priority = (this._size - 1) // put obj at the end of the line console.error('invalid priority: ' + priorityOrig + ' must be between 0 and ' + priority) } } this._slots[priority].push(obj) } PriorityQueue.prototype.dequeue = function dequeue (callback) { var obj = null // Clear cache for total. this._total = null for (var i = 0, sl = this._slots.length; i < sl; i += 1) { if (this._slots[i].length) { obj = this._slots[i].shift() break } } return obj } function doWhileAsync (conditionFn, iterateFn, callbackFn) { var next = function () { if (conditionFn()) { iterateFn(next) } else { callbackFn() } } next() } /** * Generate an Object pool with a specified `factory`. * * @class * @param {Object} factory * Factory to be used for generating and destorying the items. * @param {String} factory.name * Name of the factory. Serves only logging purposes. * @param {Function} factory.create * Should create the item to be acquired, * and call it's first callback argument with the generated item as it's argument. * @param {Function} factory.destroy * Should gently close any resources that the item is using. * Called before the items is destroyed. * @param {Function} factory.validate * Should return true if connection is still valid and false * If it should be removed from pool. Called before item is * acquired from pool. * @param {Function} factory.validateAsync * Asynchronous validate function. Receives a callback function * as its second argument, that should be called with a single * boolean argument being true if the item is still valid and false * if it should be removed from pool. Called before item is * acquired from pool. Only one of validate/validateAsync may be specified * @param {Number} factory.max * Maximum number of items that can exist at the same time. Default: 1. * Any further acquire requests will be pushed to the waiting list. * @param {Number} factory.min * Minimum number of items in pool (including in-use). Default: 0. * When the pool is created, or a resource destroyed, this minimum will * be checked. If the pool resource count is below the minimum, a new * resource will be created and added to the pool. * @param {Number} factory.idleTimeoutMillis * Delay in milliseconds after the idle items in the pool will be destroyed. * And idle item is that is not acquired yet. Waiting items doesn't count here. * @param {Number} factory.reapIntervalMillis * Cleanup is scheduled in every `factory.reapIntervalMillis` milliseconds. * @param {Boolean|Function} factory.log * Whether the pool should log activity. If function is specified, * that will be used instead. The function expects the arguments msg, loglevel * @param {Number} factory.priorityRange * The range from 1 to be treated as a valid priority * @param {RefreshIdle} factory.refreshIdle * Should idle resources be destroyed and recreated every idleTimeoutMillis? Default: true. * @param {Bool} [factory.returnToHead=false] * Returns released object to head of available objects list */ function Pool (factory) { if (!(this instanceof Pool)) { return new Pool(factory) } if (factory.validate && factory.validateAsync) { throw new Error('Only one of validate or validateAsync may be specified') } // defaults factory.idleTimeoutMillis = factory.idleTimeoutMillis || 30000 factory.returnToHead = factory.returnToHead || false factory.refreshIdle = ('refreshIdle' in factory) ? factory.refreshIdle : true factory.reapInterval = factory.reapIntervalMillis || 1000 factory.priorityRange = factory.priorityRange || 1 factory.validate = factory.validate || function () { return true } factory.max = parseInt(factory.max, 10) factory.min = parseInt(factory.min, 10) factory.max = Math.max(isNaN(factory.max) ? 1 : factory.max, 1) factory.min = Math.min(isNaN(factory.min) ? 0 : factory.min, factory.max - 1) this._factory = factory this._inUseObjects = [] this._draining = false this._waitingClients = new PriorityQueue(factory.priorityRange) this._availableObjects = [] this._count = 0 this._removeIdleTimer = null this._removeIdleScheduled = false // create initial resources (if factory.min > 0) this._ensureMinimum() } /** * logs to console or user defined log function * @private * @param {string} str * @param {string} level */ Pool.prototype._log = function log (str, level) { if (typeof this._factory.log === 'function') { this._factory.log(str, level) } else if (this._factory.log) { console.log(level.toUpperCase() + ' pool ' + this._factory.name + ' - ' + str) } } /** * Request the client to be destroyed. The factory's destroy handler * will also be called. * * This should be called within an acquire() block as an alternative to release(). * * @param {Object} obj * The acquired item to be destoyed. */ Pool.prototype.destroy = function destroy (obj) { this._count -= 1 if (this._count < 0) this._count = 0 this._availableObjects = this._availableObjects.filter(function (objWithTimeout) { return (objWithTimeout.obj !== obj) }) this._inUseObjects = this._inUseObjects.filter(function (objInUse) { return (objInUse !== obj) }) this._factory.destroy(obj) this._ensureMinimum() } /** * Checks and removes the available (idle) clients that have timed out. * @private */ Pool.prototype._removeIdle = function removeIdle () { var toRemove = [] var now = new Date().getTime() var i var al var tr var timeout this._removeIdleScheduled = false // Go through the available (idle) items, // check if they have timed out for (i = 0, al = this._availableObjects.length; i < al && (this._factory.refreshIdle && (this._count - this._factory.min > toRemove.length)); i += 1) { timeout = this._availableObjects[i].timeout if (now >= timeout) { // Client timed out, so destroy it. this._log('removeIdle() destroying obj - now:' + now + ' timeout:' + timeout, 'verbose') toRemove.push(this._availableObjects[i].obj) } } for (i = 0, tr = toRemove.length; i < tr; i += 1) { this.destroy(toRemove[i]) } // Replace the available items with the ones to keep. al = this._availableObjects.length if (al > 0) { this._log('this._availableObjects.length=' + al, 'verbose') this._scheduleRemoveIdle() } else { this._log('removeIdle() all objects removed', 'verbose') } } /** * Schedule removal of idle items in the pool. * * More schedules cannot run concurrently. */ Pool.prototype._scheduleRemoveIdle = function scheduleRemoveIdle () { var self = this if (!this._removeIdleScheduled) { this._removeIdleScheduled = true this._removeIdleTimer = setTimeout(function () { self._removeIdle() }, this._factory.reapInterval) } } /** * Try to get a new client to work, and clean up pool unused (idle) items. * * - If there are available clients waiting, shift the first one out (LIFO), * and call its callback. * - If there are no waiting clients, try to create one if it won't exceed * the maximum number of clients. * - If creating a new client would exceed the maximum, add the client to * the wait list. * @private */ Pool.prototype._dispense = function dispense () { var self = this var objWithTimeout = null var err = null var clientCb = null var waitingCount = this._waitingClients.size() this._log('dispense() clients=' + waitingCount + ' available=' + this._availableObjects.length, 'info') if (waitingCount > 0) { if (this._factory.validateAsync) { doWhileAsync(function () { return self._availableObjects.length > 0 }, function (next) { self._log('dispense() - reusing obj', 'verbose') objWithTimeout = self._availableObjects[0] self._factory.validateAsync(objWithTimeout.obj, function (valid) { if (!valid) { self.destroy(objWithTimeout.obj) next() } else { self._availableObjects.shift() self._inUseObjects.push(objWithTimeout.obj) clientCb = self._waitingClients.dequeue() clientCb(err, objWithTimeout.obj) } }) }, function () { if (self._count < self._factory.max) { self._createResource() } }) return } while (this._availableObjects.length > 0) { this._log('dispense() - reusing obj', 'verbose') objWithTimeout = this._availableObjects[0] if (!this._factory.validate(objWithTimeout.obj)) { this.destroy(objWithTimeout.obj) continue } this._availableObjects.shift() this._inUseObjects.push(objWithTimeout.obj) clientCb = this._waitingClients.dequeue() return clientCb(err, objWithTimeout.obj) } if (this._count < this._factory.max) { this._createResource() } } } /** * @private */ Pool.prototype._createResource = function _createResource () { this._count += 1 this._log('createResource() - creating obj - count=' + this._count + ' min=' + this._factory.min + ' max=' + this._factory.max, 'verbose') var self = this this._factory.create(function () { var err, obj var clientCb = self._waitingClients.dequeue() if (arguments.length > 1) { err = arguments[0] obj = arguments[1] } else { err = (arguments[0] instanceof Error) ? arguments[0] : null obj = (arguments[0] instanceof Error) ? null : arguments[0] } if (err) { self._count -= 1 if (self._count < 0) self._count = 0 if (clientCb) { clientCb(err, obj) } process.nextTick(function () { self._dispense() }) } else { self._inUseObjects.push(obj) if (clientCb) { clientCb(err, obj) } else { self.release(obj) } } }) } /** * @private */ Pool.prototype._ensureMinimum = function _ensureMinimum () { var i, diff if (!this._draining && (this._count < this._factory.min)) { diff = this._factory.min - this._count for (i = 0; i < diff; i++) { this._createResource() } } } /** * Request a new client. The callback will be called, * when a new client will be availabe, passing the client to it. * * @param {Function} callback * Callback function to be called after the acquire is successful. * The function will receive the acquired item as the first parameter. * * @param {Number} priority * Optional. Integer between 0 and (priorityRange - 1). Specifies the priority * of the caller if there are no available resources. Lower numbers mean higher * priority. * * @returns {boolean} `true` if the pool is not fully utilized, `false` otherwise. */ Pool.prototype.acquire = function acquire (callback, priority) { if (this._draining) { throw new Error('pool is draining and cannot accept work') } if (process.domain) { callback = process.domain.bind(callback) } this._waitingClients.enqueue(callback, priority) this._dispense() return (this._count < this._factory.max) } /** * @deprecated */ Pool.prototype.borrow = function borrow (callback, priority) { this._log('borrow() is deprecated. use acquire() instead', 'warn') this.acquire(callback, priority) } /** * Return the client to the pool, in case it is no longer required. * * @param {Object} obj * The acquired object to be put back to the pool. */ Pool.prototype.release = function release (obj) { // check to see if this object has already been released (i.e., is back in the pool of this._availableObjects) if (this._availableObjects.some(function (objWithTimeout) { return (objWithTimeout.obj === obj) })) { this._log('release called twice for the same resource: ' + (new Error().stack), 'error') return } // check to see if this object exists in the `in use` list and remove it var index = this._inUseObjects.indexOf(obj) if (index < 0) { this._log('attempt to release an invalid resource: ' + (new Error().stack), 'error') return } // this._log("return to pool") this._inUseObjects.splice(index, 1) var objWithTimeout = { obj: obj, timeout: (new Date().getTime() + this._factory.idleTimeoutMillis) } if (this._factory.returnToHead) { this._availableObjects.splice(0, 0, objWithTimeout) } else { this._availableObjects.push(objWithTimeout) } this._log('timeout: ' + objWithTimeout.timeout, 'verbose') this._dispense() this._scheduleRemoveIdle() } /** * @deprecated */ Pool.prototype.returnToPool = function returnToPool (obj) { this._log('returnToPool() is deprecated. use release() instead', 'warn') this.release(obj) } /** * Disallow any new requests and let the request backlog dissapate. * * @param {Function} callback * Optional. Callback invoked when all work is done and all clients have been * released. */ Pool.prototype.drain = function drain (callback) { this._log('draining', 'info') // disable the ability to put more work on the queue. this._draining = true var self = this var check = function () { if (self._waitingClients.size() > 0) { // wait until all client requests have been satisfied. setTimeout(check, 100) } else if (self._availableObjects.length !== self._count) { // wait until all objects have been released. setTimeout(check, 100) } else if (callback) { callback() } } check() } /** * Forcibly destroys all clients regardless of timeout. Intended to be * invoked as part of a drain. Does not prevent the creation of new * clients as a result of subsequent calls to acquire. * * Note that if factory.min > 0, the pool will destroy all idle resources * in the pool, but replace them with newly created resources up to the * specified factory.min value. If this is not desired, set factory.min * to zero before calling destroyAllNow() * * @param {Function} callback * Optional. Callback invoked after all existing clients are destroyed. */ Pool.prototype.destroyAllNow = function destroyAllNow (callback) { this._log('force destroying all objects', 'info') var willDie = this._availableObjects this._availableObjects = [] var obj = willDie.shift() while (obj !== null && obj !== undefined) { this.destroy(obj.obj) obj = willDie.shift() } this._removeIdleScheduled = false clearTimeout(this._removeIdleTimer) if (callback) { callback() } } /** * Decorates a function to use a acquired client from the object pool when called. * * @param {Function} decorated * The decorated function, accepting a client as the first argument and * (optionally) a callback as the final argument. * * @param {Number} priority * Optional. Integer between 0 and (priorityRange - 1). Specifies the priority * of the caller if there are no available resources. Lower numbers mean higher * priority. */ Pool.prototype.pooled = function pooled (decorated, priority) { var self = this return function () { var callerArgs = arguments var callerCallback = callerArgs[callerArgs.length - 1] var callerHasCallback = typeof callerCallback === 'function' self.acquire(function (err, client) { if (err) { if (callerHasCallback) { callerCallback(err) } return } var args = [client].concat(Array.prototype.slice.call(callerArgs, 0, callerHasCallback ? -1 : undefined)) args.push(function () { self.release(client) if (callerHasCallback) { callerCallback.apply(null, arguments) } }) decorated.apply(null, args) }, priority) } } Pool.prototype.getPoolSize = function getPoolSize () { return this._count } Pool.prototype.getName = function getName () { return this._factory.name } Pool.prototype.availableObjectsCount = function availableObjectsCount () { return this._availableObjects.length } Pool.prototype.inUseObjectsCount = function inUseObjectsCount () { return this._inUseObjects.length } Pool.prototype.waitingClientsCount = function waitingClientsCount () { return this._waitingClients.size() } Pool.prototype.getMaxPoolSize = function getMaxPoolSize () { return this._factory.max } Pool.prototype.getMinPoolSize = function getMinPoolSize () { return this._factory.min } exports.Pool = Pool