HEX
Server: Apache
System: Linux webd004.cluster130.gra.hosting.ovh.net 5.15.206-ovh-vps-grsec-zfs-classid #1 SMP Fri May 15 02:41:25 UTC 2026 x86_64
User: frenchy (106757)
PHP: 7.4.33
Disabled: _dyuweyrj4,_dyuweyrj4r,dl
Upload Files
File: /home/frenchy/www/french-american.org/current/node_modules/pullstream/test/pullStreamTest.js
'use strict';

var nodeunit = require('nodeunit');
var fs = require("fs");
var path = require("path");
var streamBuffers = require("stream-buffers");
var async = require('async')
var PullStream = require('../');

module.exports = {
  "source sending 1-byte at a time": function (t) {
    t.expect(3);
    var ps = new PullStream({ lowWaterMark : 0 });
    ps.on('finish', function () {
      sourceStream.destroy();
    });

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 0,
      chunkSize: 1
    });

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");

    ps.pull('Hello'.length, function (err, data) {
      if (err) {
        return t.done(err);
      }
      t.equal('Hello', data.toString());

      var writableStream = new streamBuffers.WritableStreamBuffer({
        initialSize: 100
      });
      writableStream.on('close', function () {
        var str = writableStream.getContentsAsString('utf8');
        t.equal(' World', str);

        ps.pull(function (err, data) {
          if (err) {
            return t.done(err);
          }
          t.equal('!', data.toString());
          return t.done();
        });
      });

      ps.pipe(' World'.length, writableStream);
    });
  },

  "source sending twelve bytes at once": function (t) {
    t.expect(3);
    var ps = new PullStream({ lowWaterMark : 0 });
    ps.on('finish', function () {
      sourceStream.destroy();
    });

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 0,
      chunkSize: 1000
    });

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");

    ps.pull('Hello'.length, function (err, data) {
      if (err) {
        return t.done(err);
      }
      t.equal('Hello', data.toString());

      var writableStream = new streamBuffers.WritableStreamBuffer({
        initialSize: 100
      });
      writableStream.on('close', function () {
        var str = writableStream.getContentsAsString('utf8');
        t.equal(' World', str);

        ps.pull(function (err, data) {
          if (err) {
            return t.done(err);
          }
          t.equal('!', data.toString());
          return t.done();
        });
      });

      ps.pipe(' World'.length, writableStream);
    });
  },

  "source sending 512 bytes at once": function (t) {
    t.expect(512 / 4);
    var ps = new PullStream({ lowWaterMark : 0 });
    ps.on('finish', function() {
      sourceStream.destroy();
    });

    var values = [];
    for (var i = 0; i < 512; i+=4) {
      values.push(i + 1000);
    }
    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 0,
      chunkSize: 1000
    });

    sourceStream.pipe(ps);
    values.forEach(function(val) {
      sourceStream.put(val);
    });

    async.forEachSeries(values, function (val, callback) {
      ps.pull(4, function (err, data) {
        if (err) {
          return callback(err);
        }
        t.equal(val, data.toString());
        return callback(null);
      });
    }, function (err) {
      t.done(err);
    });
  },

  "two length pulls": function (t) {
    t.expect(2);
    var ps = new PullStream({ lowWaterMark : 0 });
    ps.on('finish', function () {
      sourceStream.destroy();
    });

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 0,
      chunkSize: 1000
    });

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");

    ps.pull('Hello'.length, function (err, data) {
      if (err) {
        return t.done(err);
      }
      t.equal('Hello', data.toString());

      ps.pull(' World!'.length, function (err, data) {
        if (err) {
          return t.done(err);
        }
        t.equal(' World!', data.toString());
        return t.done();
      });
    });
  },

  "pulling zero bytes returns empty data": function (t) {
    t.expect(1);
    var ps = new PullStream({ lowWaterMark : 0 });

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      chunkSize: 1000
    });

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");

    ps.pull(0, function (err, data) {
      if (err) {
        return t.done(err);
      }

      t.equal(0, data.length, "data is empty");
      sourceStream.destroy();
      return t.done();
    });
  },

  "read from file": function (t) {
    t.expect(2);
    var ps = new PullStream({ lowWaterMark : 0 });

    var sourceStream = fs.createReadStream(path.join(__dirname, 'testFile.txt'));

    sourceStream.pipe(ps);

    ps.pull('Hello'.length, function (err, data) {
      if (err) {
        return t.done(err);
      }
      t.equal('Hello', data.toString());

      ps.pull(' World!'.length, function (err, data) {
        if (err) {
          return t.done(err);
        }
        t.equal(' World!', data.toString());
        return t.done();
      });
    });
  },

  "read past end of stream": function (t) {
    t.expect(2);
    var ps = new PullStream({ lowWaterMark : 0 });
    ps.on('finish', function () {
      sourceStream.destroy();
    });

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 1,
      chunkSize: 1000
    });

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");

    ps.pull('Hello World!'.length, function (err, data) {
      if (err) {
        return t.done(err);
      }
      t.equal('Hello World!', data.toString());

      ps.pull(1, function (err, data) {
        if (err) {
          t.ok(err, 'should get an error');
        }
        t.done();
      });
    });
  },

  "pipe with no length": function (t) {
    t.expect(2);
    var ps = new PullStream({ lowWaterMark : 0 });
    ps.on('end', function () {
      t.ok(true, "pullstream should end");
    });

    var writableStream = new streamBuffers.WritableStreamBuffer({
      initialSize: 100
    });
    writableStream.on('close', function () {
      var str = writableStream.getContentsAsString('utf8');
      t.equal('Hello World!', str);
      t.done();
    });

    ps.pipe(writableStream);

    process.nextTick(function () {
      ps.write(new Buffer('Hello', 'utf8'));
      ps.write(new Buffer(' World', 'utf8'));
      process.nextTick(function () {
        ps.write(new Buffer('!', 'utf8'));
        ps.end();
      });
    });
  },

  "throw on calling write() after end": function (t) {
    t.expect(1);
    var ps = new PullStream({ lowWaterMark : 0 });
    ps.end();

    try {
      ps.write(new Buffer('hello', 'utf8'));
      t.fail("should throw error");
    } catch (ex) {
      t.ok(ex);
    }

    t.done();
  },

  "pipe more bytes than the pullstream buffer size": function (t) {
    t.expect(1);
    var ps = new PullStream();
    ps.on('end', function() {
      sourceStream.destroy();
    });

    var aVals = "", bVals = "";
    for (var i = 0; i < 20 * 1000; i++) {
      aVals += 'a';
    }
    for (var i = 0; i < 180 * 1000; i++) {
      bVals += 'b';
    }
    var combined = aVals + bVals;

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 0,
      chunkSize: 40 * 1024
    });

    sourceStream.pipe(ps);
    sourceStream.put(aVals);

    var writableStream = new streamBuffers.WritableStreamBuffer({
      initialSize: 200 * 1000
    });
    writableStream.on('close', function () {
      var str = writableStream.getContentsAsString('utf8');
      t.equal(combined, str);
      t.done();
    });

    ps.once('drain', function () {
      ps.pipe(200 * 1000, writableStream);
      process.nextTick(sourceStream.put.bind(null, bVals));
    });
  },

  "mix asynchronous pull with synchronous pullUpTo - exact number of bytes returned": function (t) {
    t.expect(2);
    var ps = new PullStream();

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 0,
      chunkSize: 1000
    });

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");

    ps.pull('Hello'.length, function (err, data) {
      if (err) {
        return t.done(err);
      }
      t.equal('Hello', data.toString());
      var data = ps.pullUpTo(" World!".length);
      t.equal(" World!", data.toString());
      sourceStream.destroy();
      t.done();
    });
  },

  "mix asynchronous pull with pullUpTo - fewer bytes returned than requested": function (t) {
    t.expect(2);
    var ps = new PullStream();

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 0,
      chunkSize: 1000
    });

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");


    ps.pull('Hello'.length, function (err, data) {
      if (err) {
        return t.done(err);
      }
      t.equal('Hello', data.toString());
      var data = ps.pullUpTo(1000);
      t.equal(" World!", data.toString());
      sourceStream.destroy();
      t.done();
    });
  },

  "retrieve all currently remaining bytes": function (t) {
    t.expect(2);
    var ps = new PullStream();

    var sourceStream = new streamBuffers.ReadableStreamBuffer({
      frequency: 0,
      chunkSize: 1000
    });

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");

    ps.pull('Hello'.length, function (err, data) {
      if (err) {
        return t.done(err);
      }
      t.equal('Hello', data.toString());
      var data = ps.pullUpTo();
      t.equal(" World!", data.toString());
      sourceStream.destroy();
      t.done();
    });
  },

// TODO: node PassThrough stream doesn't handle unshift the same way anymore.
//  "prepend": function (t) {
//    t.expect(1);
//    var ps = new PullStream();
//
//    var sourceStream = new streamBuffers.ReadableStreamBuffer();
//
//    sourceStream.pipe(ps);
//    sourceStream.put("World!");
//    ps.prepend("Hello ");
//
//    ps.pull('Hello World!'.length, function (err, data) {
//      if (err) {
//        return t.done(err);
//      }
//      t.equal('Hello World!', data.toString());
//      sourceStream.destroy();
//      t.done();
//    });
//  },

  "drain": function (t) {
    t.expect(1);
    var ps = new PullStream();

    var sourceStream = new streamBuffers.ReadableStreamBuffer();

    sourceStream.pipe(ps);
    sourceStream.put("Hello World!");

    ps.drain('Hello '.length, function (err) {
      if (err) {
        return t.done(err);
      }
      ps.pull('World!'.length, function (err, data) {
        t.equal('World!', data.toString());
        sourceStream.destroy();
        t.done();
      });
    });
  }
};