multi.js 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. 'use strict';
  2. var Queue = require('double-ended-queue');
  3. var utils = require('./utils');
  4. var Command = require('./command');
  5. function Multi (client, args) {
  6. this._client = client;
  7. this.queue = new Queue();
  8. var command, tmp_args;
  9. if (args) { // Either undefined or an array. Fail hard if it's not an array
  10. for (var i = 0; i < args.length; i++) {
  11. command = args[i][0];
  12. tmp_args = args[i].slice(1);
  13. if (Array.isArray(command)) {
  14. this[command[0]].apply(this, command.slice(1).concat(tmp_args));
  15. } else {
  16. this[command].apply(this, tmp_args);
  17. }
  18. }
  19. }
  20. }
  21. function pipeline_transaction_command (self, command_obj, index) {
  22. // Queueing is done first, then the commands are executed
  23. var tmp = command_obj.callback;
  24. command_obj.callback = function (err, reply) {
  25. // Ignore the multi command. This is applied by node_redis and the user does not benefit by it
  26. if (err && index !== -1) {
  27. if (tmp) {
  28. tmp(err);
  29. }
  30. err.position = index;
  31. self.errors.push(err);
  32. }
  33. // Keep track of who wants buffer responses:
  34. // By the time the callback is called the command_obj got the buffer_args attribute attached
  35. self.wants_buffers[index] = command_obj.buffer_args;
  36. command_obj.callback = tmp;
  37. };
  38. self._client.internal_send_command(command_obj);
  39. }
  40. Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) {
  41. if (this.queue.length < 2) {
  42. return this.exec_batch(callback);
  43. }
  44. return this.exec(callback);
  45. };
  46. function multi_callback (self, err, replies) {
  47. var i = 0, command_obj;
  48. if (err) {
  49. err.errors = self.errors;
  50. if (self.callback) {
  51. self.callback(err);
  52. // Exclude connection errors so that those errors won't be emitted twice
  53. } else if (err.code !== 'CONNECTION_BROKEN') {
  54. self._client.emit('error', err);
  55. }
  56. return;
  57. }
  58. if (replies) {
  59. while (command_obj = self.queue.shift()) {
  60. if (replies[i] instanceof Error) {
  61. var match = replies[i].message.match(utils.err_code);
  62. // LUA script could return user errors that don't behave like all other errors!
  63. if (match) {
  64. replies[i].code = match[1];
  65. }
  66. replies[i].command = command_obj.command.toUpperCase();
  67. if (typeof command_obj.callback === 'function') {
  68. command_obj.callback(replies[i]);
  69. }
  70. } else {
  71. // If we asked for strings, even in detect_buffers mode, then return strings:
  72. replies[i] = self._client.handle_reply(replies[i], command_obj.command, self.wants_buffers[i]);
  73. if (typeof command_obj.callback === 'function') {
  74. command_obj.callback(null, replies[i]);
  75. }
  76. }
  77. i++;
  78. }
  79. }
  80. if (self.callback) {
  81. self.callback(null, replies);
  82. }
  83. }
  84. Multi.prototype.exec_transaction = function exec_transaction (callback) {
  85. if (this.monitoring || this._client.monitoring) {
  86. var err = new RangeError(
  87. 'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
  88. );
  89. err.command = 'EXEC';
  90. err.code = 'EXECABORT';
  91. return utils.reply_in_order(this._client, callback, err);
  92. }
  93. var self = this;
  94. var len = self.queue.length;
  95. self.errors = [];
  96. self.callback = callback;
  97. self._client.cork();
  98. self.wants_buffers = new Array(len);
  99. pipeline_transaction_command(self, new Command('multi', []), -1);
  100. // Drain queue, callback will catch 'QUEUED' or error
  101. for (var index = 0; index < len; index++) {
  102. // The commands may not be shifted off, since they are needed in the result handler
  103. pipeline_transaction_command(self, self.queue.get(index), index);
  104. }
  105. self._client.internal_send_command(new Command('exec', [], function (err, replies) {
  106. multi_callback(self, err, replies);
  107. }));
  108. self._client.uncork();
  109. return !self._client.should_buffer;
  110. };
  111. function batch_callback (self, cb, i) {
  112. return function batch_callback (err, res) {
  113. if (err) {
  114. self.results[i] = err;
  115. // Add the position to the error
  116. self.results[i].position = i;
  117. } else {
  118. self.results[i] = res;
  119. }
  120. cb(err, res);
  121. };
  122. }
  123. Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) {
  124. var self = this;
  125. var len = self.queue.length;
  126. var index = 0;
  127. var command_obj;
  128. if (len === 0) {
  129. utils.reply_in_order(self._client, callback, null, []);
  130. return !self._client.should_buffer;
  131. }
  132. self._client.cork();
  133. if (!callback) {
  134. while (command_obj = self.queue.shift()) {
  135. self._client.internal_send_command(command_obj);
  136. }
  137. self._client.uncork();
  138. return !self._client.should_buffer;
  139. }
  140. var callback_without_own_cb = function (err, res) {
  141. if (err) {
  142. self.results.push(err);
  143. // Add the position to the error
  144. var i = self.results.length - 1;
  145. self.results[i].position = i;
  146. } else {
  147. self.results.push(res);
  148. }
  149. // Do not emit an error here. Otherwise each error would result in one emit.
  150. // The errors will be returned in the result anyway
  151. };
  152. var last_callback = function (cb) {
  153. return function (err, res) {
  154. cb(err, res);
  155. callback(null, self.results);
  156. };
  157. };
  158. self.results = [];
  159. while (command_obj = self.queue.shift()) {
  160. if (typeof command_obj.callback === 'function') {
  161. command_obj.callback = batch_callback(self, command_obj.callback, index);
  162. } else {
  163. command_obj.callback = callback_without_own_cb;
  164. }
  165. if (typeof callback === 'function' && index === len - 1) {
  166. command_obj.callback = last_callback(command_obj.callback);
  167. }
  168. this._client.internal_send_command(command_obj);
  169. index++;
  170. }
  171. self._client.uncork();
  172. return !self._client.should_buffer;
  173. };
  174. module.exports = Multi;