transaction.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. 'use strict';
  2. exports.__esModule = true;
  3. var _create = require('babel-runtime/core-js/object/create');
  4. var _create2 = _interopRequireDefault(_create);
  5. var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck');
  6. var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
  7. var _possibleConstructorReturn2 = require('babel-runtime/helpers/possibleConstructorReturn');
  8. var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2);
  9. var _inherits2 = require('babel-runtime/helpers/inherits');
  10. var _inherits3 = _interopRequireDefault(_inherits2);
  11. var _isUndefined2 = require('lodash/isUndefined');
  12. var _isUndefined3 = _interopRequireDefault(_isUndefined2);
  13. var _uniqueId2 = require('lodash/uniqueId');
  14. var _uniqueId3 = _interopRequireDefault(_uniqueId2);
  15. var _bluebird = require('bluebird');
  16. var _bluebird2 = _interopRequireDefault(_bluebird);
  17. var _events = require('events');
  18. var _debug = require('debug');
  19. var _debug2 = _interopRequireDefault(_debug);
  20. var _makeKnex = require('./util/make-knex');
  21. var _makeKnex2 = _interopRequireDefault(_makeKnex);
  22. function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
  23. // Transaction
  24. // -------
  25. var debug = (0, _debug2.default)('knex:tx');
  26. // Acts as a facade for a Promise, keeping the internal state
  27. // and managing any child transactions.
  28. var Transaction = function (_EventEmitter) {
  29. (0, _inherits3.default)(Transaction, _EventEmitter);
  30. function Transaction(client, container, config, outerTx) {
  31. (0, _classCallCheck3.default)(this, Transaction);
  32. var _this = (0, _possibleConstructorReturn3.default)(this, _EventEmitter.call(this));
  33. var txid = _this.txid = (0, _uniqueId3.default)('trx');
  34. _this.client = client;
  35. _this.outerTx = outerTx;
  36. _this.trxClient = undefined;
  37. _this._debug = client.config && client.config.debug;
  38. debug('%s: Starting %s transaction', txid, outerTx ? 'nested' : 'top level');
  39. _this._promise = _bluebird2.default.using(_this.acquireConnection(client, config, txid), function (connection) {
  40. var trxClient = _this.trxClient = makeTxClient(_this, client, connection);
  41. var init = client.transacting ? _this.savepoint(connection) : _this.begin(connection);
  42. init.then(function () {
  43. return makeTransactor(_this, connection, trxClient);
  44. }).then(function (transactor) {
  45. // If we've returned a "thenable" from the transaction container, assume
  46. // the rollback and commit are chained to this object's success / failure.
  47. // Directly thrown errors are treated as automatic rollbacks.
  48. var result = void 0;
  49. try {
  50. result = container(transactor);
  51. } catch (err) {
  52. result = _bluebird2.default.reject(err);
  53. }
  54. if (result && result.then && typeof result.then === 'function') {
  55. result.then(function (val) {
  56. return transactor.commit(val);
  57. }).catch(function (err) {
  58. return transactor.rollback(err);
  59. });
  60. }
  61. return null;
  62. }).catch(function (e) {
  63. return _this._rejecter(e);
  64. });
  65. return new _bluebird2.default(function (resolver, rejecter) {
  66. _this._resolver = resolver;
  67. _this._rejecter = rejecter;
  68. });
  69. });
  70. _this._completed = false;
  71. // If there's a wrapping transaction, we need to wait for any older sibling
  72. // transactions to settle (commit or rollback) before we can start, and we
  73. // need to register ourselves with the parent transaction so any younger
  74. // siblings can wait for us to complete before they can start.
  75. _this._previousSibling = _bluebird2.default.resolve(true);
  76. if (outerTx) {
  77. if (outerTx._lastChild) _this._previousSibling = outerTx._lastChild;
  78. outerTx._lastChild = _this._promise;
  79. }
  80. return _this;
  81. }
  82. Transaction.prototype.isCompleted = function isCompleted() {
  83. return this._completed || this.outerTx && this.outerTx.isCompleted() || false;
  84. };
  85. Transaction.prototype.begin = function begin(conn) {
  86. return this.query(conn, 'BEGIN;');
  87. };
  88. Transaction.prototype.savepoint = function savepoint(conn) {
  89. return this.query(conn, 'SAVEPOINT ' + this.txid + ';');
  90. };
  91. Transaction.prototype.commit = function commit(conn, value) {
  92. return this.query(conn, 'COMMIT;', 1, value);
  93. };
  94. Transaction.prototype.release = function release(conn, value) {
  95. return this.query(conn, 'RELEASE SAVEPOINT ' + this.txid + ';', 1, value);
  96. };
  97. Transaction.prototype.rollback = function rollback(conn, error) {
  98. var _this2 = this;
  99. return this.query(conn, 'ROLLBACK;', 2, error).timeout(5000).catch(_bluebird2.default.TimeoutError, function () {
  100. _this2._resolver();
  101. });
  102. };
  103. Transaction.prototype.rollbackTo = function rollbackTo(conn, error) {
  104. var _this3 = this;
  105. return this.query(conn, 'ROLLBACK TO SAVEPOINT ' + this.txid, 2, error).timeout(5000).catch(_bluebird2.default.TimeoutError, function () {
  106. _this3._resolver();
  107. });
  108. };
  109. Transaction.prototype.query = function query(conn, sql, status, value) {
  110. var _this4 = this;
  111. var q = this.trxClient.query(conn, sql).catch(function (err) {
  112. status = 2;
  113. value = err;
  114. _this4._completed = true;
  115. debug('%s error running transaction query', _this4.txid);
  116. }).tap(function () {
  117. if (status === 1) {
  118. _this4._resolver(value);
  119. }
  120. if (status === 2) {
  121. if ((0, _isUndefined3.default)(value)) {
  122. value = new Error('Transaction rejected with non-error: ' + value);
  123. }
  124. _this4._rejecter(value);
  125. }
  126. });
  127. if (status === 1 || status === 2) {
  128. this._completed = true;
  129. }
  130. return q;
  131. };
  132. Transaction.prototype.debug = function debug(enabled) {
  133. this._debug = arguments.length ? enabled : true;
  134. return this;
  135. };
  136. // Acquire a connection and create a disposer - either using the one passed
  137. // via config or getting one off the client. The disposer will be called once
  138. // the original promise is marked completed.
  139. Transaction.prototype.acquireConnection = function acquireConnection(client, config, txid) {
  140. var configConnection = config && config.connection;
  141. return _bluebird2.default.try(function () {
  142. return configConnection || client.acquireConnection();
  143. }).then(function (connection) {
  144. connection.__knexTxId = txid;
  145. return connection;
  146. }).disposer(function (connection) {
  147. if (!configConnection) {
  148. debug('%s: releasing connection', txid);
  149. client.releaseConnection(connection);
  150. } else {
  151. debug('%s: not releasing external connection', txid);
  152. }
  153. });
  154. };
  155. return Transaction;
  156. }(_events.EventEmitter);
  157. // The transactor is a full featured knex object, with a "commit", a "rollback"
  158. // and a "savepoint" function. The "savepoint" is just sugar for creating a new
  159. // transaction. If the rollback is run inside a savepoint, it rolls back to the
  160. // last savepoint - otherwise it rolls back the transaction.
  161. exports.default = Transaction;
  162. function makeTransactor(trx, connection, trxClient) {
  163. var transactor = (0, _makeKnex2.default)(trxClient);
  164. transactor.transaction = function (container, options) {
  165. return trxClient.transaction(container, options, trx);
  166. };
  167. transactor.savepoint = function (container, options) {
  168. return transactor.transaction(container, options);
  169. };
  170. if (trx.client.transacting) {
  171. transactor.commit = function (value) {
  172. return trx.release(connection, value);
  173. };
  174. transactor.rollback = function (error) {
  175. return trx.rollbackTo(connection, error);
  176. };
  177. } else {
  178. transactor.commit = function (value) {
  179. return trx.commit(connection, value);
  180. };
  181. transactor.rollback = function (error) {
  182. return trx.rollback(connection, error);
  183. };
  184. }
  185. return transactor;
  186. }
  187. // We need to make a client object which always acquires the same
  188. // connection and does not release back into the pool.
  189. function makeTxClient(trx, client, connection) {
  190. var trxClient = (0, _create2.default)(client.constructor.prototype);
  191. trxClient.config = client.config;
  192. trxClient.driver = client.driver;
  193. trxClient.connectionSettings = client.connectionSettings;
  194. trxClient.transacting = true;
  195. trxClient.valueForUndefined = client.valueForUndefined;
  196. trxClient.on('query', function (arg) {
  197. trx.emit('query', arg);
  198. client.emit('query', arg);
  199. });
  200. trxClient.on('query-error', function (err, obj) {
  201. trx.emit('query-error', err, obj);
  202. client.emit('query-error', err, obj);
  203. });
  204. trxClient.on('query-response', function (response, obj, builder) {
  205. trx.emit('query-response', response, obj, builder);
  206. client.emit('query-response', response, obj, builder);
  207. });
  208. var _query = trxClient.query;
  209. trxClient.query = function (conn, obj) {
  210. var completed = trx.isCompleted();
  211. return _bluebird2.default.try(function () {
  212. if (conn !== connection) throw new Error('Invalid connection for transaction query.');
  213. if (completed) completedError(trx, obj);
  214. return _query.call(trxClient, conn, obj);
  215. });
  216. };
  217. var _stream = trxClient.stream;
  218. trxClient.stream = function (conn, obj, stream, options) {
  219. var completed = trx.isCompleted();
  220. return _bluebird2.default.try(function () {
  221. if (conn !== connection) throw new Error('Invalid connection for transaction query.');
  222. if (completed) completedError(trx, obj);
  223. return _stream.call(trxClient, conn, obj, stream, options);
  224. });
  225. };
  226. trxClient.acquireConnection = function () {
  227. return _bluebird2.default.resolve(connection);
  228. };
  229. trxClient.releaseConnection = function () {
  230. return _bluebird2.default.resolve();
  231. };
  232. return trxClient;
  233. }
  234. function completedError(trx, obj) {
  235. var sql = typeof obj === 'string' ? obj : obj && obj.sql;
  236. debug('%s: Transaction completed: %s', trx.txid, sql);
  237. throw new Error('Transaction query already complete, run with DEBUG=knex:tx for more info');
  238. }
  239. var promiseInterface = ['then', 'bind', 'catch', 'finally', 'asCallback', 'spread', 'map', 'reduce', 'tap', 'thenReturn', 'return', 'yield', 'ensure', 'exec', 'reflect', 'get', 'mapSeries', 'delay'];
  240. // Creates methods which proxy promise interface methods to
  241. // internal transaction resolution promise
  242. promiseInterface.forEach(function (method) {
  243. Transaction.prototype[method] = function () {
  244. return this._promise[method].apply(this._promise, arguments);
  245. };
  246. });
  247. module.exports = exports['default'];