123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- "use strict";
- /**
- * A worker that does nothing but passing chunks to the next one. This is like
- * a nodejs stream but with some differences. On the good side :
- * - it works on IE 6-9 without any issue / polyfill
- * - it weights less than the full dependencies bundled with browserify
- * - it forwards errors (no need to declare an error handler EVERYWHERE)
- *
- * A chunk is an object with 2 attributes : `meta` and `data`. The former is an
- * object containing anything (`percent` for example), see each worker for more
- * details. The latter is the real data (String, Uint8Array, etc).
- *
- * @constructor
- * @param {String} name the name of the stream (mainly used for debugging purposes)
- */
- function GenericWorker(name) {
- // the name of the worker
- this.name = name || "default";
- // an object containing metadata about the workers chain
- this.streamInfo = {};
- // an error which happened when the worker was paused
- this.generatedError = null;
- // an object containing metadata to be merged by this worker into the general metadata
- this.extraStreamInfo = {};
- // true if the stream is paused (and should not do anything), false otherwise
- this.isPaused = true;
- // true if the stream is finished (and should not do anything), false otherwise
- this.isFinished = false;
- // true if the stream is locked to prevent further structure updates (pipe), false otherwise
- this.isLocked = false;
- // the event listeners
- this._listeners = {
- "data":[],
- "end":[],
- "error":[]
- };
- // the previous worker, if any
- this.previous = null;
- }
- GenericWorker.prototype = {
- /**
- * Push a chunk to the next workers.
- * @param {Object} chunk the chunk to push
- */
- push : function (chunk) {
- this.emit("data", chunk);
- },
- /**
- * End the stream.
- * @return {Boolean} true if this call ended the worker, false otherwise.
- */
- end : function () {
- if (this.isFinished) {
- return false;
- }
- this.flush();
- try {
- this.emit("end");
- this.cleanUp();
- this.isFinished = true;
- } catch (e) {
- this.emit("error", e);
- }
- return true;
- },
- /**
- * End the stream with an error.
- * @param {Error} e the error which caused the premature end.
- * @return {Boolean} true if this call ended the worker with an error, false otherwise.
- */
- error : function (e) {
- if (this.isFinished) {
- return false;
- }
- if(this.isPaused) {
- this.generatedError = e;
- } else {
- this.isFinished = true;
- this.emit("error", e);
- // in the workers chain exploded in the middle of the chain,
- // the error event will go downward but we also need to notify
- // workers upward that there has been an error.
- if(this.previous) {
- this.previous.error(e);
- }
- this.cleanUp();
- }
- return true;
- },
- /**
- * Add a callback on an event.
- * @param {String} name the name of the event (data, end, error)
- * @param {Function} listener the function to call when the event is triggered
- * @return {GenericWorker} the current object for chainability
- */
- on : function (name, listener) {
- this._listeners[name].push(listener);
- return this;
- },
- /**
- * Clean any references when a worker is ending.
- */
- cleanUp : function () {
- this.streamInfo = this.generatedError = this.extraStreamInfo = null;
- this._listeners = [];
- },
- /**
- * Trigger an event. This will call registered callback with the provided arg.
- * @param {String} name the name of the event (data, end, error)
- * @param {Object} arg the argument to call the callback with.
- */
- emit : function (name, arg) {
- if (this._listeners[name]) {
- for(var i = 0; i < this._listeners[name].length; i++) {
- this._listeners[name][i].call(this, arg);
- }
- }
- },
- /**
- * Chain a worker with an other.
- * @param {Worker} next the worker receiving events from the current one.
- * @return {worker} the next worker for chainability
- */
- pipe : function (next) {
- return next.registerPrevious(this);
- },
- /**
- * Same as `pipe` in the other direction.
- * Using an API with `pipe(next)` is very easy.
- * Implementing the API with the point of view of the next one registering
- * a source is easier, see the ZipFileWorker.
- * @param {Worker} previous the previous worker, sending events to this one
- * @return {Worker} the current worker for chainability
- */
- registerPrevious : function (previous) {
- if (this.isLocked) {
- throw new Error("The stream '" + this + "' has already been used.");
- }
- // sharing the streamInfo...
- this.streamInfo = previous.streamInfo;
- // ... and adding our own bits
- this.mergeStreamInfo();
- this.previous = previous;
- var self = this;
- previous.on("data", function (chunk) {
- self.processChunk(chunk);
- });
- previous.on("end", function () {
- self.end();
- });
- previous.on("error", function (e) {
- self.error(e);
- });
- return this;
- },
- /**
- * Pause the stream so it doesn't send events anymore.
- * @return {Boolean} true if this call paused the worker, false otherwise.
- */
- pause : function () {
- if(this.isPaused || this.isFinished) {
- return false;
- }
- this.isPaused = true;
- if(this.previous) {
- this.previous.pause();
- }
- return true;
- },
- /**
- * Resume a paused stream.
- * @return {Boolean} true if this call resumed the worker, false otherwise.
- */
- resume : function () {
- if(!this.isPaused || this.isFinished) {
- return false;
- }
- this.isPaused = false;
- // if true, the worker tried to resume but failed
- var withError = false;
- if(this.generatedError) {
- this.error(this.generatedError);
- withError = true;
- }
- if(this.previous) {
- this.previous.resume();
- }
- return !withError;
- },
- /**
- * Flush any remaining bytes as the stream is ending.
- */
- flush : function () {},
- /**
- * Process a chunk. This is usually the method overridden.
- * @param {Object} chunk the chunk to process.
- */
- processChunk : function(chunk) {
- this.push(chunk);
- },
- /**
- * Add a key/value to be added in the workers chain streamInfo once activated.
- * @param {String} key the key to use
- * @param {Object} value the associated value
- * @return {Worker} the current worker for chainability
- */
- withStreamInfo : function (key, value) {
- this.extraStreamInfo[key] = value;
- this.mergeStreamInfo();
- return this;
- },
- /**
- * Merge this worker's streamInfo into the chain's streamInfo.
- */
- mergeStreamInfo : function () {
- for(var key in this.extraStreamInfo) {
- if (!Object.prototype.hasOwnProperty.call(this.extraStreamInfo, key)) {
- continue;
- }
- this.streamInfo[key] = this.extraStreamInfo[key];
- }
- },
- /**
- * Lock the stream to prevent further updates on the workers chain.
- * After calling this method, all calls to pipe will fail.
- */
- lock: function () {
- if (this.isLocked) {
- throw new Error("The stream '" + this + "' has already been used.");
- }
- this.isLocked = true;
- if (this.previous) {
- this.previous.lock();
- }
- },
- /**
- *
- * Pretty print the workers chain.
- */
- toString : function () {
- var me = "Worker " + this.name;
- if (this.previous) {
- return this.previous + " -> " + me;
- } else {
- return me;
- }
- }
- };
- module.exports = GenericWorker;
|