Source: upload/batch.js

const extend = require('extend');
const Queue = require('promise-queue');

const Base = require('../base');
const join = require('../deps/utils/join');
const flatten = require('../deps/utils/flatten');
const BatchBlob = require('./blob');

const DEFAULT_OPTS = {
  concurrency: 5,
};

/**
 * The **BatchUpload** class allows to upload {@link Blob} objets to a Nuxeo Platform instance
 * using the batch upload API.
 *
 * It creates and maintains a batch id from the Nuxeo Platform instance.
 *
 * **Cannot directly be instantiated**
 *
 * @example
 * var Nuxeo = require('nuxeo')
 * var nuxeo = new Nuxeo({
 *  baseURL: 'http://localhost:8080/nuxeo',
 *  auth: {
 *    method: 'basic',
 *    username: 'Administrator',
 *    password: 'Administrator'
 *  }
 * });
 * var batch = nuxeo.batchUpload();
 * var nuxeoBlob = new Nuxeo.Blob(...);
 * batch.upload(nuxeoBlob)
 *   .then(function(res) {
 *     // res.blob instanceof BatchBlob === true
 *   })
 *   .catch(function(error) {
 *     throw new Error(error);
 *   });
 */
class BatchUpload extends Base {
  /**
   * Creates a BatchUpload.
   * @param {object} opts - The configuration options.
   * @param {string} opts.nuxeo - The {@link Nuxeo} object linked to this BatchUpload object.
   * @param {Number} [opts.concurrency=5] - Number of concurrent uploads.
   */
  constructor(opts = {}) {
    const options = extend(true, {}, DEFAULT_OPTS, opts);
    super(options);
    this._url = join(options.url, 'upload/');
    this._nuxeo = options.nuxeo;
    this._uploadIndex = 0;
    Queue.configure(this._nuxeo.Promise);
    this._queue = new Queue(options.concurrency, Infinity);
    this._batchIdPromise = null;
    this._batchId = null;
    this._promises = [];
  }

  /**
   * Upload one or more blobs.
   * @param {...Blob} blobs - Blobs to be uploaded.
   * @returns {Promise} A Promise object resolved when all blobs are uploaded.
   *
   * @example
   * ...
   * nuxeoBatch.upload(blob1, blob2, blob3)
   *   .then(function(res) {
   *     // res.batch === nuxeoBatch
   *     // res.blobs[0] is the BatchBlob object related to blob1
   *     // res.blobs[1] is the BatchBlob object related to blob2
   *     // res.blobs[2] is the BatchBlob object related to blob3
   *   })
   *   .catch(function(error) {
   *     throw new Error(error);
   *   });
   */
  upload(...blobs) {
    const allBlobs = flatten(blobs);
    const promises = allBlobs.map((blob) => {
      const promise = this._queue.add(this._upload.bind(this, blob));
      this._promises.push(promise);
      return promise;
    });
    if (promises.length === 1) {
      return promises[0];
    }

    const { Promise } = this._nuxeo;
    return Promise.all(promises).then((batchBlobs) => {
      return {
        blobs: batchBlobs.map((batchBlob) => batchBlob.blob),
        batch: this,
      };
    });
  }

  _upload(blob) {
    if (!this._batchIdPromise) {
      this._batchIdPromise = this._fetchBatchId();
    }

    const uploadIndex = this._uploadIndex;
    this._uploadIndex += 1;
    return this._batchIdPromise.then(() => {
      const opts = {
        json: false,
        method: 'POST',
        url: join(this._url, this._batchId, uploadIndex),
        body: blob.content,
        headers: {
          'Cache-Control': 'no-cache',
          'X-File-Name': encodeURIComponent(blob.name),
          'X-File-Size': blob.size,
          'X-File-Type': blob.mimeType,
          'Content-Length': blob.size,
        },
      };
      const options = this._computeOptions(opts);
      return this._nuxeo.http(options);
    }).then((res) => {
      res.batchId = this._batchId;
      res.index = uploadIndex;
      return {
        blob: new BatchBlob(res),
        batch: this,
      };
    });
  }

  _fetchBatchId() {
    const opts = {
      method: 'POST',
      url: this._url,
    };

    const { Promise } = this._nuxeo;
    if (this._batchId) {
      return Promise.resolve(this);
    }
    const options = this._computeOptions(opts);
    return this._nuxeo.http(options).then((res) => {
      this._batchId = res.batchId;
      return this;
    });
  }

  /**
   * Wait for all the current uploads to be finished. Note that it won't wait for uploads added after done() being call.
   * If an uploaded is added, you should call again done().
   * The {@link BatchUpload#isFinished} method can be used to know if the batch is finished.
   * @returns {Promise} A Promise object resolved when all the current uploads are finished.
   *
   * @example
   * ...
   * nuxeoBatch.upload(blob1, blob2, blob3);
   * nuxeoBatch.done()
   *   .then(function(res) {
   *     // res.batch === nuxeoBatch
   *     // res.blobs[0] is the BatchBlob object related to blob1
   *     // res.blobs[1] is the BatchBlob object related to blob2
   *     // res.blobs[2] is the BatchBlob object related to blob3
   *   })
   *   .catch(function(error) {
   *     throw new Error(error);
   *   });
   */
  done() {
    const { Promise } = this._nuxeo;
    return Promise.all(this._promises).then((batchBlobs) => {
      return {
        blobs: batchBlobs.map((batchBlob) => batchBlob.blob),
        batch: this,
      };
    });
  }

  /**
   * Returns whether the BatchUpload is finished, ie. has uploads running, or not.
   * @returns {Boolean} true if the BatchUpload is finished, false otherwise.
   */
  isFinished() {
    return this._queue.getQueueLength() === 0 && this._queue.getPendingLength() === 0;
  }

  /**
   * Cancels a BatchUpload.
   * @returns {Promise} A Promise object resolved with the BatchUpload itself.
   */
  cancel(opts) {
    const { Promise } = this._nuxeo;
    if (!this._batchIdPromise) {
      return Promise.resolve(this);
    }

    const path = join('upload', this._batchId);
    return this._batchIdPromise.then(() => {
      const options = this._computeOptions(opts);
      return this._nuxeo.request(path)
        .delete(options);
    }).then(() => {
      this._batchIdPromise = null;
      this._batchId = null;
      return this;
    });
  }

  /**
   * Fetches a blob at a given index from the batch.
   * @returns {Promise} A Promise object resolved with the BatchUpload itself and the BatchBlob.
   */
  fetchBlob(index, opts = {}) {
    const { Promise } = this._nuxeo;
    if (!this._batchId) {
      return Promise.reject(new Error('No \'batchId\' set'));
    }

    let options = {
      method: 'GET',
      url: join(this._url, this._batchId, index),
    };
    options = extend(true, options, opts);
    options = this._computeOptions(options);
    return this._nuxeo.http(options).then((res) => {
      res.batchId = this._batchId;
      res.index = index;
      return {
        batch: this,
        blob: new BatchBlob(res),
      };
    });
  }

  /**
   * Removes a blob at a given index from the batch.
   * @returns {Promise} A Promise object resolved with the result of the DELETE request.
   */
  removeBlob(index, opts = {}) {
    const { Promise } = this._nuxeo;
    if (!this._batchId) {
      return Promise.reject(new Error('No \'batchId\' set'));
    }

    let options = {
      method: 'DELETE',
      url: join(this._url, this._batchId, index),
    };
    options = extend(true, options, opts);
    options = this._computeOptions(options);
    return this._nuxeo.http(options);
  }

  /**
   * Fetches the blobs from the batch.
   * @returns {Promise} A Promise object resolved with the BatchUpload itself and the BatchBlobs.
   */
  fetchBlobs(opts = {}) {
    const { Promise } = this._nuxeo;
    if (!this._batchId) {
      return Promise.reject(new Error('No \'batchId\' set'));
    }

    let options = {
      method: 'GET',
      url: join(this._url, this._batchId),
    };
    options = extend(true, options, opts);
    options = this._computeOptions(options);
    return this._nuxeo.http(options).then((blobs) => {
      const batchBlobs = blobs.map((blob, index) => {
        blob.batchId = this._batchId;
        blob.index = index;
        return new BatchBlob(blob);
      });
      return {
        batch: this,
        blobs: batchBlobs,
      };
    });
  }
}

module.exports = BatchUpload;