GenericWorker.js 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. "use strict";
  2. /**
  3. * A worker that does nothing but passing chunks to the next one. This is like
  4. * a nodejs stream but with some differences. On the good side :
  5. * - it works on IE 6-9 without any issue / polyfill
  6. * - it weights less than the full dependencies bundled with browserify
  7. * - it forwards errors (no need to declare an error handler EVERYWHERE)
  8. *
  9. * A chunk is an object with 2 attributes : `meta` and `data`. The former is an
  10. * object containing anything (`percent` for example), see each worker for more
  11. * details. The latter is the real data (String, Uint8Array, etc).
  12. *
  13. * @constructor
  14. * @param {String} name the name of the stream (mainly used for debugging purposes)
  15. */
  16. function GenericWorker(name) {
  17. // the name of the worker
  18. this.name = name || "default";
  19. // an object containing metadata about the workers chain
  20. this.streamInfo = {};
  21. // an error which happened when the worker was paused
  22. this.generatedError = null;
  23. // an object containing metadata to be merged by this worker into the general metadata
  24. this.extraStreamInfo = {};
  25. // true if the stream is paused (and should not do anything), false otherwise
  26. this.isPaused = true;
  27. // true if the stream is finished (and should not do anything), false otherwise
  28. this.isFinished = false;
  29. // true if the stream is locked to prevent further structure updates (pipe), false otherwise
  30. this.isLocked = false;
  31. // the event listeners
  32. this._listeners = {
  33. "data":[],
  34. "end":[],
  35. "error":[]
  36. };
  37. // the previous worker, if any
  38. this.previous = null;
  39. }
  40. GenericWorker.prototype = {
  41. /**
  42. * Push a chunk to the next workers.
  43. * @param {Object} chunk the chunk to push
  44. */
  45. push : function (chunk) {
  46. this.emit("data", chunk);
  47. },
  48. /**
  49. * End the stream.
  50. * @return {Boolean} true if this call ended the worker, false otherwise.
  51. */
  52. end : function () {
  53. if (this.isFinished) {
  54. return false;
  55. }
  56. this.flush();
  57. try {
  58. this.emit("end");
  59. this.cleanUp();
  60. this.isFinished = true;
  61. } catch (e) {
  62. this.emit("error", e);
  63. }
  64. return true;
  65. },
  66. /**
  67. * End the stream with an error.
  68. * @param {Error} e the error which caused the premature end.
  69. * @return {Boolean} true if this call ended the worker with an error, false otherwise.
  70. */
  71. error : function (e) {
  72. if (this.isFinished) {
  73. return false;
  74. }
  75. if(this.isPaused) {
  76. this.generatedError = e;
  77. } else {
  78. this.isFinished = true;
  79. this.emit("error", e);
  80. // in the workers chain exploded in the middle of the chain,
  81. // the error event will go downward but we also need to notify
  82. // workers upward that there has been an error.
  83. if(this.previous) {
  84. this.previous.error(e);
  85. }
  86. this.cleanUp();
  87. }
  88. return true;
  89. },
  90. /**
  91. * Add a callback on an event.
  92. * @param {String} name the name of the event (data, end, error)
  93. * @param {Function} listener the function to call when the event is triggered
  94. * @return {GenericWorker} the current object for chainability
  95. */
  96. on : function (name, listener) {
  97. this._listeners[name].push(listener);
  98. return this;
  99. },
  100. /**
  101. * Clean any references when a worker is ending.
  102. */
  103. cleanUp : function () {
  104. this.streamInfo = this.generatedError = this.extraStreamInfo = null;
  105. this._listeners = [];
  106. },
  107. /**
  108. * Trigger an event. This will call registered callback with the provided arg.
  109. * @param {String} name the name of the event (data, end, error)
  110. * @param {Object} arg the argument to call the callback with.
  111. */
  112. emit : function (name, arg) {
  113. if (this._listeners[name]) {
  114. for(var i = 0; i < this._listeners[name].length; i++) {
  115. this._listeners[name][i].call(this, arg);
  116. }
  117. }
  118. },
  119. /**
  120. * Chain a worker with an other.
  121. * @param {Worker} next the worker receiving events from the current one.
  122. * @return {worker} the next worker for chainability
  123. */
  124. pipe : function (next) {
  125. return next.registerPrevious(this);
  126. },
  127. /**
  128. * Same as `pipe` in the other direction.
  129. * Using an API with `pipe(next)` is very easy.
  130. * Implementing the API with the point of view of the next one registering
  131. * a source is easier, see the ZipFileWorker.
  132. * @param {Worker} previous the previous worker, sending events to this one
  133. * @return {Worker} the current worker for chainability
  134. */
  135. registerPrevious : function (previous) {
  136. if (this.isLocked) {
  137. throw new Error("The stream '" + this + "' has already been used.");
  138. }
  139. // sharing the streamInfo...
  140. this.streamInfo = previous.streamInfo;
  141. // ... and adding our own bits
  142. this.mergeStreamInfo();
  143. this.previous = previous;
  144. var self = this;
  145. previous.on("data", function (chunk) {
  146. self.processChunk(chunk);
  147. });
  148. previous.on("end", function () {
  149. self.end();
  150. });
  151. previous.on("error", function (e) {
  152. self.error(e);
  153. });
  154. return this;
  155. },
  156. /**
  157. * Pause the stream so it doesn't send events anymore.
  158. * @return {Boolean} true if this call paused the worker, false otherwise.
  159. */
  160. pause : function () {
  161. if(this.isPaused || this.isFinished) {
  162. return false;
  163. }
  164. this.isPaused = true;
  165. if(this.previous) {
  166. this.previous.pause();
  167. }
  168. return true;
  169. },
  170. /**
  171. * Resume a paused stream.
  172. * @return {Boolean} true if this call resumed the worker, false otherwise.
  173. */
  174. resume : function () {
  175. if(!this.isPaused || this.isFinished) {
  176. return false;
  177. }
  178. this.isPaused = false;
  179. // if true, the worker tried to resume but failed
  180. var withError = false;
  181. if(this.generatedError) {
  182. this.error(this.generatedError);
  183. withError = true;
  184. }
  185. if(this.previous) {
  186. this.previous.resume();
  187. }
  188. return !withError;
  189. },
  190. /**
  191. * Flush any remaining bytes as the stream is ending.
  192. */
  193. flush : function () {},
  194. /**
  195. * Process a chunk. This is usually the method overridden.
  196. * @param {Object} chunk the chunk to process.
  197. */
  198. processChunk : function(chunk) {
  199. this.push(chunk);
  200. },
  201. /**
  202. * Add a key/value to be added in the workers chain streamInfo once activated.
  203. * @param {String} key the key to use
  204. * @param {Object} value the associated value
  205. * @return {Worker} the current worker for chainability
  206. */
  207. withStreamInfo : function (key, value) {
  208. this.extraStreamInfo[key] = value;
  209. this.mergeStreamInfo();
  210. return this;
  211. },
  212. /**
  213. * Merge this worker's streamInfo into the chain's streamInfo.
  214. */
  215. mergeStreamInfo : function () {
  216. for(var key in this.extraStreamInfo) {
  217. if (!Object.prototype.hasOwnProperty.call(this.extraStreamInfo, key)) {
  218. continue;
  219. }
  220. this.streamInfo[key] = this.extraStreamInfo[key];
  221. }
  222. },
  223. /**
  224. * Lock the stream to prevent further updates on the workers chain.
  225. * After calling this method, all calls to pipe will fail.
  226. */
  227. lock: function () {
  228. if (this.isLocked) {
  229. throw new Error("The stream '" + this + "' has already been used.");
  230. }
  231. this.isLocked = true;
  232. if (this.previous) {
  233. this.previous.lock();
  234. }
  235. },
  236. /**
  237. *
  238. * Pretty print the workers chain.
  239. */
  240. toString : function () {
  241. var me = "Worker " + this.name;
  242. if (this.previous) {
  243. return this.previous + " -> " + me;
  244. } else {
  245. return me;
  246. }
  247. }
  248. };
  249. module.exports = GenericWorker;