runner.js 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. 'use strict';
  2. exports.__esModule = true;
  3. var _isArray2 = require('lodash/isArray');
  4. var _isArray3 = _interopRequireDefault(_isArray2);
  5. var _assign2 = require('lodash/assign');
  6. var _assign3 = _interopRequireDefault(_assign2);
  7. var _bluebird = require('bluebird');
  8. var _bluebird2 = _interopRequireDefault(_bluebird);
  9. var _helpers = require('./helpers');
  10. var helpers = _interopRequireWildcard(_helpers);
  11. function _interopRequireWildcard(obj) { if (obj && obj.__esModule) { return obj; } else { var newObj = {}; if (obj != null) { for (var key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) newObj[key] = obj[key]; } } newObj.default = obj; return newObj; } }
  12. function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
  13. var PassThrough = void 0;
  14. // The "Runner" constructor takes a "builder" (query, schema, or raw)
  15. // and runs through each of the query statements, calling any additional
  16. // "output" method provided alongside the query and bindings.
  17. function Runner(client, builder) {
  18. this.client = client;
  19. this.builder = builder;
  20. this.queries = [];
  21. // The "connection" object is set on the runner when
  22. // "run" is called.
  23. this.connection = void 0;
  24. }
  25. (0, _assign3.default)(Runner.prototype, {
  26. // "Run" the target, calling "toSQL" on the builder, returning
  27. // an object or array of queries to run, each of which are run on
  28. // a single connection.
  29. run: function run() {
  30. var runner = this;
  31. return _bluebird2.default.using(this.ensureConnection(), function (connection) {
  32. runner.connection = connection;
  33. runner.client.emit('start', runner.builder);
  34. runner.builder.emit('start', runner.builder);
  35. var sql = runner.builder.toSQL();
  36. if (runner.builder._debug) {
  37. helpers.debugLog(sql);
  38. }
  39. if ((0, _isArray3.default)(sql)) {
  40. return runner.queryArray(sql);
  41. }
  42. return runner.query(sql);
  43. })
  44. // If there are any "error" listeners, we fire an error event
  45. // and then re-throw the error to be eventually handled by
  46. // the promise chain. Useful if you're wrapping in a custom `Promise`.
  47. .catch(function (err) {
  48. if (runner.builder._events && runner.builder._events.error) {
  49. runner.builder.emit('error', err);
  50. }
  51. throw err;
  52. })
  53. // Fire a single "end" event on the builder when
  54. // all queries have successfully completed.
  55. .tap(function () {
  56. runner.builder.emit('end');
  57. });
  58. },
  59. // Stream the result set, by passing through to the dialect's streaming
  60. // capabilities. If the options are
  61. stream: function stream(options, handler) {
  62. // If we specify stream(handler).then(...
  63. if (arguments.length === 1) {
  64. if (typeof options === 'function') {
  65. handler = options;
  66. options = {};
  67. }
  68. }
  69. // Determines whether we emit an error or throw here.
  70. var hasHandler = typeof handler === 'function';
  71. // Lazy-load the "PassThrough" dependency.
  72. PassThrough = PassThrough || require('readable-stream').PassThrough;
  73. var runner = this;
  74. var stream = new PassThrough({ objectMode: true });
  75. var hasConnection = false;
  76. var promise = _bluebird2.default.using(this.ensureConnection(), function (connection) {
  77. hasConnection = true;
  78. runner.connection = connection;
  79. var sql = runner.builder.toSQL();
  80. var err = new Error('The stream may only be used with a single query statement.');
  81. if ((0, _isArray3.default)(sql)) {
  82. if (hasHandler) throw err;
  83. stream.emit('error', err);
  84. }
  85. return runner.client.stream(runner.connection, sql, stream, options);
  86. });
  87. // If a function is passed to handle the stream, send the stream
  88. // there and return the promise, otherwise just return the stream
  89. // and the promise will take care of itsself.
  90. if (hasHandler) {
  91. handler(stream);
  92. return promise;
  93. }
  94. // Emit errors on the stream if the error occurred before a connection
  95. // could be acquired.
  96. // If the connection was acquired, assume the error occured in the client
  97. // code and has already been emitted on the stream. Don't emit it twice.
  98. promise.catch(function (err) {
  99. if (!hasConnection) stream.emit('error', err);
  100. });
  101. return stream;
  102. },
  103. // Allow you to pipe the stream to a writable stream.
  104. pipe: function pipe(writable, options) {
  105. return this.stream(options).pipe(writable);
  106. },
  107. // "Runs" a query, returning a promise. All queries specified by the builder are guaranteed
  108. // to run in sequence, and on the same connection, especially helpful when schema building
  109. // and dealing with foreign key constraints, etc.
  110. query: _bluebird2.default.method(function (obj) {
  111. var _this = this;
  112. var _connection = this.connection,
  113. __knexUid = _connection.__knexUid,
  114. __knexTxId = _connection.__knexTxId;
  115. this.builder.emit('query', (0, _assign3.default)({ __knexUid: __knexUid, __knexTxId: __knexTxId }, obj));
  116. var runner = this;
  117. var queryPromise = this.client.query(this.connection, obj);
  118. if (obj.timeout) {
  119. queryPromise = queryPromise.timeout(obj.timeout);
  120. }
  121. return queryPromise.then(function (resp) {
  122. var processedResponse = _this.client.processResponse(resp, runner);
  123. var queryContext = _this.builder.queryContext();
  124. var postProcessedResponse = _this.client.postProcessResponse(processedResponse, queryContext);
  125. _this.builder.emit('query-response', postProcessedResponse, (0, _assign3.default)({ __knexUid: _this.connection.__knexUid }, obj), _this.builder);
  126. _this.client.emit('query-response', postProcessedResponse, (0, _assign3.default)({ __knexUid: _this.connection.__knexUid }, obj), _this.builder);
  127. return postProcessedResponse;
  128. }).catch(_bluebird2.default.TimeoutError, function (error) {
  129. var timeout = obj.timeout,
  130. sql = obj.sql,
  131. bindings = obj.bindings;
  132. var cancelQuery = void 0;
  133. if (obj.cancelOnTimeout) {
  134. cancelQuery = _this.client.cancelQuery(_this.connection);
  135. } else {
  136. // If we don't cancel the query, we need to mark the connection as disposed so that
  137. // it gets destroyed by the pool and is never used again. If we don't do this and
  138. // return the connection to the pool, it will be useless until the current operation
  139. // that timed out, finally finishes.
  140. _this.connection.__knex__disposed = error;
  141. cancelQuery = _bluebird2.default.resolve();
  142. }
  143. return cancelQuery.catch(function (cancelError) {
  144. // If the cancellation failed, we need to mark the connection as disposed so that
  145. // it gets destroyed by the pool and is never used again. If we don't do this and
  146. // return the connection to the pool, it will be useless until the current operation
  147. // that timed out, finally finishes.
  148. _this.connection.__knex__disposed = error;
  149. // cancellation failed
  150. throw (0, _assign3.default)(cancelError, {
  151. message: 'After query timeout of ' + timeout + 'ms exceeded, cancelling of query failed.',
  152. sql: sql, bindings: bindings, timeout: timeout
  153. });
  154. }).then(function () {
  155. // cancellation succeeded, rethrow timeout error
  156. throw (0, _assign3.default)(error, {
  157. message: 'Defined query timeout of ' + timeout + 'ms exceeded when running query.',
  158. sql: sql, bindings: bindings, timeout: timeout
  159. });
  160. });
  161. }).catch(function (error) {
  162. _this.builder.emit('query-error', error, (0, _assign3.default)({ __knexUid: _this.connection.__knexUid }, obj));
  163. throw error;
  164. });
  165. }),
  166. // In the case of the "schema builder" we call `queryArray`, which runs each
  167. // of the queries in sequence.
  168. queryArray: function queryArray(queries) {
  169. return queries.length === 1 ? this.query(queries[0]) : _bluebird2.default.bind(this).return(queries).reduce(function (memo, query) {
  170. return this.query(query).then(function (resp) {
  171. memo.push(resp);
  172. return memo;
  173. });
  174. }, []);
  175. },
  176. // Check whether there's a transaction flag, and that it has a connection.
  177. ensureConnection: function ensureConnection() {
  178. var _this2 = this;
  179. if (this.connection) {
  180. return _bluebird2.default.resolve(this.connection);
  181. }
  182. return this.client.acquireConnection().catch(_bluebird2.default.TimeoutError, function (error) {
  183. if (_this2.builder) {
  184. error.sql = _this2.builder.sql;
  185. error.bindings = _this2.builder.bindings;
  186. }
  187. throw error;
  188. }).disposer(function () {
  189. // need to return promise or null from handler to prevent warning from bluebird
  190. return _this2.client.releaseConnection(_this2.connection);
  191. });
  192. }
  193. });
  194. exports.default = Runner;
  195. module.exports = exports['default'];