Phasis
Compatibility

Streams

Phasis implements the full WHATWG Streams Standard. Every class, every reader mode, every helper. The same code that works in Node and modern browsers works in Phasis.

The implementation is what fetch uses internally — Response.body is a real ReadableStream, Request accepts streaming bodies, and the tee() you need to clone a Response works.

What's shipped

ClassStatus
ReadableStreamFull
ReadableStreamDefaultControllerFull
ReadableByteStreamControllerFull
ReadableStreamDefaultReaderFull
ReadableStreamBYOBReaderFull
ReadableStreamBYOBRequestFull
WritableStreamFull
WritableStreamDefaultControllerFull
WritableStreamDefaultWriterFull
TransformStreamFull
TransformStreamDefaultControllerFull
ByteLengthQueuingStrategyFull
CountQueuingStrategyFull

Plus the methods you reach for: pipeTo, pipeThrough, tee, getReader, async iteration, Symbol.asyncIterator, BYOB read with { min }.

Producing

// Pull-based — only generates values when read
const counter = new ReadableStream({
  start(controller) { this.n = 0; },
  pull(controller) {
    if (this.n >= 5) { controller.close(); return; }
    controller.enqueue(this.n++);
  },
});

for await (const v of counter) console.log(v);  // 0,1,2,3,4
// Push-based — start enqueues everything eagerly
const small = new ReadableStream({
  start(c) {
    [10, 20, 30].forEach(n => c.enqueue(n));
    c.close();
  },
});

Consuming

Three ways to read:

// 1. Reader
const reader = stream.getReader();
while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  console.log(value);
}
reader.releaseLock();

// 2. Async iteration (preferred)
for await (const chunk of stream) {
  console.log(chunk);
}

// 3. tee + parallel pipes
const [a, b] = stream.tee();
await Promise.all([a.pipeTo(sinkA), b.pipeTo(sinkB)]);

Transforming

const upper = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const result = new ReadableStream({
  start(c) { ['hi', 'there'].forEach(s => c.enqueue(s)); c.close(); }
}).pipeThrough(upper);

for await (const v of result) console.log(v);  // HI, THERE

Byte streams (BYOB)

const bytes = new ReadableStream({
  type: 'bytes',
  start(controller) {
    controller.enqueue(new Uint8Array([1, 2, 3, 4, 5, 6]));
    controller.close();
  },
});

const reader = bytes.getReader({ mode: 'byob' });
const buf = new Uint8Array(4);
const { value, done } = await reader.read(buf);
// value is a Uint8Array of length 4, sharing buffer with `buf`

{ min: N } is supported — read(view, { min: 2 }) resolves only when at least 2 elements are available.

Backpressure

desiredSize and ready work as the spec describes. Use a queuing strategy to give the producer a back-channel:

const stream = new ReadableStream({
  pull(controller) {
    if (controller.desiredSize <= 0) return;  // back off
    controller.enqueue(nextChunk());
  },
}, new CountQueuingStrategy({ highWaterMark: 4 }));

With fetch

// Response body is a ReadableStream
const res = await fetch('https://example.com/big.txt');
let total = 0;
for await (const chunk of res.body) {
  total += chunk.byteLength;
}

// Request bodies can stream too
const upload = new ReadableStream({
  start(c) { c.enqueue(new TextEncoder().encode('hello')); c.close(); }
});
await fetch('https://example.com/upload', { method: 'POST', body: upload });

Tee for caching

Common pattern: read the body twice.

const res = await fetch(url);
const [forCache, forUse] = res.body.tee();
await Promise.all([
  forCache.pipeTo(cacheWritable),
  parseJson(forUse),
]);

Response.clone() does this for you and returns a fresh Response.

Caveats

  • Synchronous-only producers. A pull() that performs blocking PHP I/O will block the script. Use the fetch transport hook for HTTP; for files, prefer reading into memory upfront. setTimeout / setInterval are available (see Event loop) but the engine uses virtual time by default — a setTimeout(cb, 1000) inside a pull() does not sleep PHP, it advances the loop clock. Real-time pacing needs the host to drive tickEventLoop() against a wall clock.

See also

On this page