Pool.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. 'use strict';
  2. const Resource = require('./Resource').Resource;
  3. const PendingOperation = require('./PendingOperation').PendingOperation;
  4. const now = require('./utils').now;
  5. const duration = require('./utils').duration;
  6. const checkOptionalTime = require('./utils').checkOptionalTime;
  7. const delay = require('./utils').delay;
  8. const reflect = require('./utils').reflect;
  9. const tryPromise = require('./utils').tryPromise;
  10. class Pool {
  11. constructor(opt) {
  12. opt = opt || {};
  13. if (!opt.create) {
  14. throw new Error('Tarn: opt.create function most be provided');
  15. }
  16. if (!opt.destroy) {
  17. throw new Error('Tarn: opt.destroy function most be provided');
  18. }
  19. if (typeof opt.min !== 'number' || opt.min < 0 || opt.min !== Math.round(opt.min)) {
  20. throw new Error('Tarn: opt.min must be an integer >= 0');
  21. }
  22. if (typeof opt.max !== 'number' || opt.max <= 0 || opt.max !== Math.round(opt.max)) {
  23. throw new Error('Tarn: opt.max must be an integer > 0');
  24. }
  25. if (opt.min > opt.max) {
  26. throw new Error('Tarn: opt.max is smaller than opt.min');
  27. }
  28. if (!checkOptionalTime(opt.acquireTimeoutMillis)) {
  29. throw new Error(
  30. 'Tarn: invalid opt.acquireTimeoutMillis ' + JSON.stringify(opt.acquireTimeoutMillis)
  31. );
  32. }
  33. if (!checkOptionalTime(opt.createTimeoutMillis)) {
  34. throw new Error(
  35. 'Tarn: invalid opt.createTimeoutMillis ' + JSON.stringify(opt.createTimeoutMillis)
  36. );
  37. }
  38. if (!checkOptionalTime(opt.idleTimeoutMillis)) {
  39. throw new Error(
  40. 'Tarn: invalid opt.idleTimeoutMillis ' + JSON.stringify(opt.idleTimeoutMillis)
  41. );
  42. }
  43. if (!checkOptionalTime(opt.reapIntervalMillis)) {
  44. throw new Error(
  45. 'Tarn: invalid opt.reapIntervalMillis ' + JSON.stringify(opt.reapIntervalMillis)
  46. );
  47. }
  48. if (!checkOptionalTime(opt.createRetryIntervalMillis)) {
  49. throw new Error(
  50. 'Tarn: invalid opt.createRetryIntervalMillis ' +
  51. JSON.stringify(opt.createRetryIntervalMillis)
  52. );
  53. }
  54. this.creator = opt.create;
  55. this.destroyer = opt.destroy;
  56. this.validate = typeof opt.validate === 'function' ? opt.validate : () => true;
  57. this.log = opt.log || (() => {});
  58. this.acquireTimeoutMillis = opt.acquireTimeoutMillis || 30000;
  59. this.createTimeoutMillis = opt.createTimeoutMillis || 30000;
  60. this.idleTimeoutMillis = opt.idleTimeoutMillis || 30000;
  61. this.reapIntervalMillis = opt.reapIntervalMillis || 1000;
  62. this.createRetryIntervalMillis = opt.createRetryIntervalMillis || 200;
  63. this.propagateCreateError = !!opt.propagateCreateError;
  64. this.min = opt.min;
  65. this.max = opt.max;
  66. this.used = [];
  67. this.free = [];
  68. this.pendingCreates = [];
  69. this.pendingAcquires = [];
  70. this.destroyed = false;
  71. this.interval = null;
  72. }
  73. numUsed() {
  74. return this.used.length;
  75. }
  76. numFree() {
  77. return this.free.length;
  78. }
  79. numPendingAcquires() {
  80. return this.pendingAcquires.length;
  81. }
  82. numPendingCreates() {
  83. return this.pendingCreates.length;
  84. }
  85. acquire() {
  86. const pendingAcquire = new PendingOperation(this.acquireTimeoutMillis);
  87. this.pendingAcquires.push(pendingAcquire);
  88. // If the acquire fails for whatever reason
  89. // remove it from the pending queue.
  90. pendingAcquire.promise = pendingAcquire.promise.catch(err => {
  91. remove(this.pendingAcquires, pendingAcquire);
  92. return Promise.reject(err);
  93. });
  94. this._tryAcquireOrCreate();
  95. return pendingAcquire;
  96. }
  97. release(resource) {
  98. for (let i = 0, l = this.used.length; i < l; ++i) {
  99. const used = this.used[i];
  100. if (used.resource === resource) {
  101. this.used.splice(i, 1);
  102. this.free.push(used.resolve());
  103. this._tryAcquireOrCreate();
  104. return true;
  105. }
  106. }
  107. return false;
  108. }
  109. isEmpty() {
  110. return (
  111. [this.numFree(), this.numUsed(), this.numPendingAcquires(), this.numPendingCreates()].reduce(
  112. (total, value) => total + value
  113. ) === 0
  114. );
  115. }
  116. check() {
  117. const timestamp = now();
  118. const newFree = [];
  119. const minKeep = this.min - this.used.length;
  120. const maxDestroy = this.free.length - minKeep;
  121. let numDestroyed = 0;
  122. this.free.forEach(free => {
  123. if (
  124. duration(timestamp, free.timestamp) > this.idleTimeoutMillis &&
  125. numDestroyed < maxDestroy
  126. ) {
  127. numDestroyed++;
  128. this._destroy(free.resource);
  129. } else {
  130. newFree.push(free);
  131. }
  132. });
  133. this.free = newFree;
  134. //Pool is completely empty, stop reaping.
  135. //Next .acquire will start reaping interval again.
  136. if (this.isEmpty()) {
  137. this._stopReaping();
  138. }
  139. }
  140. destroy() {
  141. this._stopReaping();
  142. this.destroyed = true;
  143. // First wait for all the pending creates get ready.
  144. return reflect(
  145. Promise.all(this.pendingCreates.map(create => reflect(create.promise)))
  146. .then(() => {
  147. // Wait for all the used resources to be freed.
  148. return Promise.all(this.used.map(used => reflect(used.promise)));
  149. })
  150. .then(() => {
  151. // Abort all pending acquires.
  152. return Promise.all(
  153. this.pendingAcquires.map(acquire => {
  154. acquire.abort();
  155. return reflect(acquire.promise);
  156. })
  157. );
  158. })
  159. .then(() => {
  160. // Now we can destroy all the freed resources.
  161. this.free.forEach(free => this._destroy(free.resource));
  162. this.free = [];
  163. this.pendingAcquires = [];
  164. })
  165. );
  166. }
  167. _tryAcquireOrCreate() {
  168. if (this.destroyed) {
  169. return;
  170. }
  171. if (this._hasFreeResources()) {
  172. this._doAcquire();
  173. } else if (this._shouldCreateMoreResources()) {
  174. this._doCreate();
  175. }
  176. }
  177. _hasFreeResources() {
  178. return this.free.length > 0;
  179. }
  180. _doAcquire() {
  181. let didDestroyResources = false;
  182. while (this._canAcquire()) {
  183. const pendingAcquire = this.pendingAcquires[0];
  184. const free = this.free[this.free.length - 1];
  185. if (!this._validateResource(free.resource)) {
  186. this.free.pop();
  187. this._destroy(free.resource);
  188. didDestroyResources = true;
  189. continue;
  190. }
  191. this.pendingAcquires.shift();
  192. this.free.pop();
  193. this.used.push(free.resolve());
  194. //At least one active resource, start reaping
  195. this._startReaping();
  196. pendingAcquire.resolve(free.resource);
  197. }
  198. // If we destroyed invalid resources, we may need to create new ones.
  199. if (didDestroyResources) {
  200. this._tryAcquireOrCreate();
  201. }
  202. }
  203. _canAcquire() {
  204. return this.free.length > 0 && this.pendingAcquires.length > 0;
  205. }
  206. _validateResource(resource) {
  207. try {
  208. return !!this.validate(resource);
  209. } catch (err) {
  210. // There's nothing we can do here but log the error. This would otherwise
  211. // leak out as an unhandled exception.
  212. this.log('Tarn: resource validator threw an exception ' + err.stack, 'warn');
  213. return false;
  214. }
  215. }
  216. _shouldCreateMoreResources() {
  217. return (
  218. this.used.length + this.pendingCreates.length < this.max &&
  219. this.pendingCreates.length < this.pendingAcquires.length
  220. );
  221. }
  222. _doCreate() {
  223. const pendingAcquiresBeforeCreate = this.pendingAcquires.slice();
  224. const pendingCreate = this._create();
  225. pendingCreate.promise
  226. .then(() => {
  227. // Not returned on purpose.
  228. this._tryAcquireOrCreate();
  229. })
  230. .catch(err => {
  231. if (this.propagateCreateError && this.pendingAcquires.length !== 0) {
  232. // If propagateCreateError is true, we don't retry the create
  233. // but reject the first pending acquire immediately. Intentionally
  234. // use `this.pendingAcquires` instead of `pendingAcquiresBeforeCreate`
  235. // in case some acquires in pendingAcquiresBeforeCreate have already
  236. // been resolved.
  237. this.pendingAcquires[0].reject(err);
  238. }
  239. // Save the create error to all pending acquires so that we can use it
  240. // as the error to reject the acquire if it times out.
  241. pendingAcquiresBeforeCreate.forEach(pendingAcquire => {
  242. pendingAcquire.possibleTimeoutCause = err;
  243. });
  244. // Not returned on purpose.
  245. delay(this.createRetryIntervalMillis).then(() => this._tryAcquireOrCreate());
  246. });
  247. }
  248. _create() {
  249. const pendingCreate = new PendingOperation(this.createTimeoutMillis);
  250. this.pendingCreates.push(pendingCreate);
  251. callbackOrPromise(this.creator)
  252. .then(resource => {
  253. remove(this.pendingCreates, pendingCreate);
  254. this.free.push(new Resource(resource));
  255. pendingCreate.resolve(resource);
  256. })
  257. .catch(err => {
  258. remove(this.pendingCreates, pendingCreate);
  259. pendingCreate.reject(err);
  260. });
  261. return pendingCreate;
  262. }
  263. _destroy(resource) {
  264. try {
  265. this.destroyer(resource);
  266. } catch (err) {
  267. // There's nothing we can do here but log the error. This would otherwise
  268. // leak out as an unhandled exception.
  269. this.log('Tarn: resource destroyer threw an exception ' + err.stack, 'warn');
  270. }
  271. }
  272. _startReaping() {
  273. if (!this.interval) {
  274. this.interval = setInterval(() => this.check(), this.reapIntervalMillis);
  275. }
  276. }
  277. _stopReaping() {
  278. clearInterval(this.interval);
  279. this.interval = null;
  280. }
  281. }
  282. function remove(arr, item) {
  283. var idx = arr.indexOf(item);
  284. if (idx === -1) {
  285. return false;
  286. } else {
  287. arr.splice(idx, 1);
  288. return true;
  289. }
  290. }
  291. function callbackOrPromise(func) {
  292. return new Promise((resolve, reject) => {
  293. const callback = (err, resource) => {
  294. if (err) {
  295. reject(err);
  296. } else {
  297. resolve(resource);
  298. }
  299. };
  300. tryPromise(() => func(callback))
  301. .then(res => {
  302. // If the result is falsy, we assume that the callback will
  303. // be called instead of interpreting the falsy value as a
  304. // result value.
  305. if (res) {
  306. resolve(res);
  307. }
  308. })
  309. .catch(err => {
  310. reject(err);
  311. });
  312. });
  313. }
  314. module.exports = {
  315. Pool
  316. };