Web Streams en todas partes (y Fetch para Node.js) | Programar Plus

El defensor de los desarrolladores de Chrome, Jake Archibald, llamó a 2016 “el año de las transmisiones web”. Claramente, su predicción fue algo prematura. Streams Standard se anunció en 2014. Ha llevado un tiempo, pero ahora hay una API de transmisión consistente implementada en los navegadores modernos (todavía esperando en Firefox…) y en Node (y Deno).

¿Qué son las corrientes?

La transmisión consiste en dividir un recurso en partes más pequeñas llamadas fragmentos y procesar cada fragmento de uno en uno. En lugar de tener que esperar para completar la descarga de todos los datos, con las transmisiones puede procesar los datos de forma progresiva tan pronto como esté disponible el primer fragmento.

Hay tres tipos de secuencias: secuencias legibles, secuencias de escritura y secuencias de transformación. Flujos legibles son de donde provienen los fragmentos de datos. Las fuentes de datos subyacentes podrían ser un archivo o una conexión HTTP, por ejemplo. Los datos pueden entonces (opcionalmente) ser modificados por un transformar corriente. Los fragmentos de datos se pueden canalizar a un secuencia de escritura.

Transmisiones web en todas partes

Node siempre ha tenido su propio tipo de transmisiones. Generalmente se consideran difícil trabajar con. El estándar web del Grupo de Trabajo de Tecnología de Aplicación de Hipertexto Web (WHATWG) para transmisiones llegó más tarde y se considera en gran medida una mejora. Los documentos de Node los llaman “transmisiones web”, lo que suena un poco menos engorroso. Las transmisiones de nodo originales no se están desaprobando ni eliminando, pero ahora coexistirán con la API de transmisión estándar web. Esto facilita la escritura de código multiplataforma y significa que los desarrolladores solo necesitan aprender una forma de hacer las cosas.

Deno, otro intento de JavaScript del lado del servidor por parte del creador original de Node, siempre se ha alineado estrechamente con las API del navegador y tiene soporte completo para transmisiones web. Los trabajadores de Cloudflare (que son un poco como los trabajadores de servicios pero que se ejecutan en ubicaciones de borde de CDN) y Deno Deploy (una oferta sin servidor de Deno) también admiten transmisiones.

fetch() respuesta como una secuencia legible

Hay varias formas de crear una transmisión legible, pero llamando fetch() está destinado a ser el más común. El cuerpo de respuesta de fetch() es una secuencia legible.

fetch('data.txt')
.then(response => console.log(response.body));

Si observa el registro de la consola, puede ver que una secuencia legible tiene varios métodos útiles. Como dice la especificación, una secuencia legible se puede canalizar directamente a una secuencia de escritura, utilizando su pipeTo() método, o se puede canalizar a través de uno o más flujos de transformación primero, utilizando su pipeThrough() método.

A diferencia de los navegadores, Node core actualmente no implementa fetch. node-fetch, una dependencia popular que intenta coincidir con la API del estándar del navegador, devuelve una secuencia de nodo, no una secuencia de WHATWG. Undici, un cliente HTTP / 1.1 mejorado del equipo de Node.js, es una alternativa moderna al núcleo de Node.js http.request (sobre el que se construyen cosas como node-fetch y Axios). Undici ha implementado fetch – y response.body devuelve una transmisión web. 🎉

Undici podría terminar eventualmente en el núcleo de Node.js, y parece que se convertirá en la forma recomendada de manejar solicitudes HTTP en Node. Una vez tú npm install undici e import fetch, funciona igual que en el navegador. En el siguiente ejemplo, canalizamos la secuencia a través de una secuencia de transformación. Cada trozo de la corriente es un Uint8Array. El núcleo del nodo proporciona un TextDecoderStream para decodificar datos binarios.

import { fetch } from 'undici';
import { TextDecoderStream } from 'node:stream/web';

async function fetchStream() {
  const response = await fetch('https://example.com')
  const stream = response.body;
  const textStream = stream.pipeThrough(new TextDecoderStream());
}

response.body es sincrónico, por lo que no es necesario await eso. En el navegador, fetch y TextDecoderStream están disponibles en el objeto global, por lo que no incluiría ninguna declaración de importación. Aparte de eso, el código es exactamente el mismo para los navegadores web y de nodo. Deno también tiene soporte integrado para fetch y TextDecoderStream.

Iteración asincrónica

El bucle for-await-of es una versión asincrónica del bucle for-of. Se utiliza un bucle for-of regular para recorrer matrices y otros iterables. Se puede usar un bucle for-await-of para iterar sobre una matriz de promesas, por ejemplo.

const promiseArray = [Promise.resolve("thing 1"), Promise.resolve("thing 2")];
for await (const thing of promiseArray) { console.log(thing); }

Es importante para nosotros que esto también se pueda utilizar para iterar transmisiones.

async function fetchAndLogStream() {
  const response = await fetch('https://example.com')
  const stream = response.body;
  const textStream = stream.pipeThrough(new TextDecoderStream());

  for await (const chunk of textStream) {
    console.log(chunk);
  }
}

fetchAndLogStream();

La iteración asíncrona de flujos funciona en Node y Deno. Todos los navegadores modernos han enviado bucles de espera, pero todavía no funcionan en transmisiones.

Algunas otras formas de obtener una transmisión legible

La búsqueda será una de las formas más comunes de hacerse con una transmisión, pero hay otras formas. Blob y File ambos tienen un .stream() método que devuelve una secuencia legible. El siguiente código funciona en navegadores modernos, así como en Node y en Deno, aunque, en Node, necesitará import { Blob } from 'buffer'; antes de poder usarlo:

const blobStream = new Blob(['Lorem ipsum'], { type: 'text/plain' }).stream();

A continuación, se muestra un ejemplo basado en un navegador de interfaz de usuario: si tiene un <input type="file"> en su marcado, es fácil obtener el archivo seleccionado por el usuario como una secuencia.

const fileStream = document.querySelector('input').files[0].stream();

Envío en el nodo 17, el objeto FileHandle devuelto por fs / promises open() la función tiene un .readableWebStream() método.

import {
  open,
} from 'node:fs/promises';

const file = await open('./some/file/to/read');

for await (const chunk of file.readableWebStream())
  console.log(chunk);

await file.close();

Las transmisiones funcionan bien con las promesas

Si necesita hacer algo después de que se haya completado la transmisión, puede usar promesas.

someReadableStream
.pipeTo(someWritableStream)
.then(() => console.log("all data successfully written"))
.catch(error => console.error("something went wrong", error))

O, opcionalmente, puede esperar el resultado:

await someReadableStream.pipeTo(someWritableStream)

Creando su propio flujo de transformación

Ya vimos TextDecoderStream (también hay un TextEncoderStream). También puede crear su propio flujo de transformación desde cero. El TransformStream el constructor puede aceptar un objeto. Puede especificar tres métodos en el objeto: start, transform y flush. Todos son opcionales, pero transform es lo que realmente hace la transformación.

Como ejemplo, imaginemos que TextDecoderStream() no existe e implementa la misma funcionalidad (asegúrese de usar TextDecoderStream en producción, aunque el siguiente es un ejemplo demasiado simplificado):

const decoder = new TextDecoder();
const decodeStream = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(decoder.decode(chunk, {stream: true}));
  }
});

Cada fragmento recibido es modificado y luego reenviado por el controlador. En el ejemplo anterior, cada fragmento es un texto codificado que se decodifica y luego se reenvía. Echemos un vistazo rápido a los otros dos métodos:

const transformStream = new TransformStream({
  start(controller) {
    // Called immediately when the TransformStream is created
  },

  flush(controller) {
    // Called when chunks are no longer being forwarded to the transformer
  }
});

Una secuencia de transformación es una secuencia legible y una secuencia de escritura que trabajan juntas, generalmente para transformar algunos datos. Cada objeto hecho con new TransformStream() tiene una propiedad llamada readable, el cual es un ReadableStream, y una propiedad llamada writable, que es una secuencia de escritura. Vocación someReadableStream.pipeThrough() escribe los datos de someReadableStream a transformStream.writable, posiblemente transforma los datos, luego empuja los datos a transformStream.readable.

A algunas personas les resulta útil crear un flujo de transformación que en realidad no transforme los datos. Esto se conoce como un “flujo de transformación de identidad”, que se crea llamando new TransformStream() sin pasar ningún argumento de objeto, o omitiendo el método de transformación. Reenvía todos los fragmentos escritos en su lado de escritura a su lado legible, sin ningún cambio. Como ejemplo simple del concepto, “hola” se registra mediante el siguiente código:

const {readable, writable} = new TransformStream();
writable.getWriter().write('hello');
readable.getReader().read().then(({value, done}) => console.log(value))

Creando su propia secuencia legible

Es posible crear una transmisión personalizada y completarla con sus propios fragmentos. El new ReadableStream() constructor toma un objeto que puede contener un start función, una pull función, y una cancel función. Esta función se invoca inmediatamente cuando el ReadableStream es creado. Dentro de start función, uso controller.enqueue para agregar fragmentos a la secuencia.

A continuación, se muestra un ejemplo básico de “hola mundo”:

import { ReadableStream } from "node:stream/web";
const readable = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

const allChunks = [];
for await (const chunk of readable) {
  allChunks.push(chunk);
}
console.log(allChunks.join(" "));

Aquí hay un ejemplo más del mundo real tomado de la especificación de transmisiones que convierte un conector web en una transmisión legible:

function makeReadableWebSocketStream(url, protocols) {
  let websocket = new WebSocket(url, protocols);
  websocket.binaryType = "arraybuffer";

  return new ReadableStream({
    start(controller) {
      websocket.onmessage = event => controller.enqueue(event.data);
      websocket.onclose = () => controller.close();
      websocket.onerror = () => controller.error(new Error("The WebSocket errored"));
    }
  });
}

Interoperabilidad de flujos de nodo

En Node, la antigua forma específica de trabajar con secuencias de Node no se está eliminando. La antigua API de secuencias de nodos y la API de secuencias web coexistirán. Por lo tanto, a veces puede ser necesario convertir una secuencia de nodo en una secuencia web, y viceversa, usando .fromWeb() y .toWeb() métodos, que se están agregando en el Nodo 17.

import {Readable} from 'node:stream';
import {fetch} from 'undici';

const response = await fetch(url);
const readableNodeStream = Readable.fromWeb(response.body);

Conclusión

Módulos ES, EventTarget, AbortController, Analizador de URL, Web Crypto, Blob, TextEncoder/Decoder: cada vez más API de navegador terminan en Node.js. Los conocimientos y habilidades son transferibles. La búsqueda y los flujos son una parte importante de esa convergencia.

Domenic Denicola, coautor de la especificación de transmisiones, ha escrito que el objetivo de la API de transmisiones es proporcionar una abstracción eficiente y una primitiva unificadora para la E / S, como se han convertido las promesas para la asincronicidad. Para ser realmente útil en la interfaz, es necesario que más API sean compatibles con las transmisiones. Por el momento, un MediaStream, a pesar de su nombre, no es un flujo legible. Si está trabajando con video o audio (al menos por el momento), no se puede asignar una transmisión legible a srcObject. O digamos que desea obtener una imagen y pasarla a través de un flujo de transformación, luego insertarla en la página. En el momento de escribir este artículo, el código para usar una secuencia como src de un elemento de imagen es algo detallado:

const response = await fetch('cute-cat.png');
const bodyStream = response.body;
const newResponse = new Response(bodyStream);
const blob = await newResponse.blob();
const url = URL.createObjectURL(blob);
document.querySelector('img').src = url;    

Sin embargo, con el tiempo, más API tanto en el navegador como en Node (y Deno) harán uso de las transmisiones, por lo que vale la pena conocerlas. Ya existe una API de transmisión para trabajar con Web Sockets en Deno y Chrome, por ejemplo. Chrome ha implementado transmisiones de solicitudes de Fetch. Node y Chrome han implementado flujos transferibles para canalizar datos hacia y desde un trabajador para procesar los fragmentos en un hilo separado. La gente ya está utilizando transmisiones para hacer cosas interesantes para productos en el mundo real: los creadores de la aplicación web para compartir archivos Wormhole tienen un código de fuente abierta para cifrar una transmisión, por ejemplo.

Quizás 2022 sea el año de las transmisiones web …