Finalizamos el capítulo de streams en Node.js. En esta entrada del blog, en primer lugar hacemos un breve resumen de todo lo aprendido hasta ahora, repasando el concepto que representan las diferentes clases de Stream en Node.js, Readable, Writable, Duplex y Transform, en segundo lugar veremos ejemplos prácticos de clases incluidas en Node.js que heredan de la clase Stream, crearemos un cliente y un servidor TCP y transferiremos un archivo desde el cliente al servidor, para ello, en el cliente leeremos el archivo, lo comprimiremos y lo cifraremos y a continuación lo enviaremos al servidor, en donde lo descifraremos, descomprimiremos y finalmente lo guardaremos en disco. Para los ejemplos prácticos usaremos las APIs que proporcionan los módulos net
, fs
, crypto
y zlib
de Node.js.
Puedes consultar las entradas anteriores:
Contenido.
- Resumen Streams
- Tipos.
- Eventos
- Modos de lectura.
- Consumir datos de los Stream
- Ejemplos
- Servidor TCP y Cliente TCP ejemplo stream duplex
- Servidor Gunzip
- Cliente Gzip
- Cliente Gzip Servidor Gunzip con cifrado descifrado
Resumen Streams
Tipos.
Un flujo de lectura, Stream Readable, es una abstracción de una fuente desde la que se pueden consumir datos. Los datos leídos se pueden almacenar en un búfer en el caso de que la aplicación que los consuma sea más lenta que el sistema operativo que lee de la fuente. Un ejemplo de esto es el método
fs.createReadStream
.Un flujo de escritura Stream Writable es una abstracción de un destino en el que se pueden escribir datos. Para evitar la pérdida de datos o la saturación del destino, en caso de que el destino sea más lento que la aplicación de escritura, los datos pueden almacenarse en un búfer. Un ejemplo de eso es el método
fs.createWriteStream
.Un flujo dúplex, Stream Duplex, sirve tanto para escribir en un destino, como para leer de una fuente, es decir, contiene una parte de lectura y otra de escritura que son independientes y tienen sus propios búferes. Un ejemplo de esto sería un socket TCP.
Un flujo de transformación, Stream Transform, es básicamente un flujo dúplex que se puede utilizar para modificar o transformar los datos a medida que se escriben y leen. Un ejemplo de esto es el
zlib.createGzip
para comprimir los datos usando gzip
Eventos
Todas los Streams son instancias de EventEmitter por lo que emiten eventos que pueden usarse para leer y escribir datos.
Los eventos más importantes en un stream readable son:
· El evento ‘data’
, que se emite cada vez que la transmisión pasa una gran cantidad de datos al consumidor.
· El evento ‘end’
, que se emite cuando no hay más datos para consumir de la transmisión.
Los eventos más importantes en un writable stream son:
· El evento ‘drain’
, que es una señal de que el flujo de escritura puede recibir más datos.
· El evento ‘finish’
, que se emite cuando todos los datos se han vaciado al sistema subyacente.
Modos de lectura.
Los streams readable tienen dos modos de lectura principales, que afectan la forma en que podemos consumirlos:
Modos de lectura de los flujos de lectura.
- El modo fluido (flowing), llamado también de empuje, en el que los datos fluyen continuamente por lo que es necesario un consumidor de datos, necesitamos un controlador para el evento
‘data’
. - El modo pausado (paused), llamado también de tracción, en donde los datos deben solicitarse explícitamente para ser consumidos.
Esto modos se pueden alternar usando los métodos resume()
y pause()
.
Consumir datos de los Stream
Todas los Stream son instancias de EventEmitter por lo que emiten eventos que pueden usarse para leer y escribir datos, es decir, para consumir los datos, no obstante podemos consumir los datos de los flujos de una manera más simple usando el método pipe
. En general, para consumir los datos se recomienda o utilizar el método pipe
o usar eventos, pero es preferible evitar mezclarlos. Usaremos eventos cuando se necesite personalizar el consumo de datos.
Pipe.
Mediante el método pipe
de los flujos de lectura, canalizamos la salida de un flujo o secuencia de lectura, la fuente de datos, como la entrada de un flujo o secuencia de escritura, el destino. Por lo tanto, la fuente debe ser un flujo de lectura y el destino debe ser un flujo de escritura, evidentemente, ambos también pueden ser flujos dúplex/transformación.
Contrapresión.
Los flujos de datos de lectura y escritura pueden almacenar datos internamente en un búfer, no obstante, la cantidad de datos que pueden almacenar en búfer está limitada por la memoria total disponible del sistema, por lo que el búfer de los flujos tiene un umbral llamado highWaterMark (HWM), este umbral es usado para determinar si la velocidad con la que se están pasado los datos al flujo es más alta que la velocidad con la que los datos se eliminan o consumen del flujo.
El sistema de contrapresión permite que cuando canalizamos datos desde un flujo de lectura a un flujo de escritura, el flujo de escritura pueda pedirle al flujo de lectura que detenga el trasiego de datos si el búfer de escritura se llena más allá de su highWatermark.
En este sentido el método pipe
administra automáticamente la contrapresión en los casos en los que un flujo de datos es más lento o más rápida que el otro, además también maneja de manera automática otras cuestiones como los errores o finalización de archivos.
Ejemplos
En los siguientes ejemplos vamos a usar streams que ya están implementados en Node.js, clases que extienden de los diferentes tipos de stream que hemos visto, en concreto usaremos clases que pertenecen a los módulos:
net
: que permite crear servidores y clientes TCPfs
: que permite interactuar con el sistema de archivos.crypto
: que proporciona funcionalidades criptográficas basadas en OpenSSLzlib
: que proporciona funcionalidades de compresión implementadas mediante Gzip.
El objetivo de los siguientes ejemplos no es entrar en detalle en las propiedades y métodos de las diferentes clases que usaremos, pues ya lo haremos más adelante cuando se aborden los diferentes APIs de los módulos anteriormente mencionados, el objetivo es mostrar el comportamiento que tienen como clases que heredan de Stream y principalmente destacar la potencialidad que tienen los streams en Node.js a la hora de componer transmisiones de datos usando el método pipe()
, el cual nos permite conectar las diferentes unidades de procesamiento, cada una responsable de una sola funcionalidad, debido precisamente a la interfaz uniforme y a que pueden entenderse entre sí en términos de API.
Servidor TCP y Cliente TCP ejemplo stream duplex
En los siguientes ejemplos vamos a usar la clase net.Socket
, (perteneciente al módulo net
), que extiende de stream.Duplex
y es una abstracción de un socket TCP, un objeto de esta clase puede usarse directamente por un cliente para interactuar con un servidor o puede ser devuelto por un servidor TCP, cuando se recibe una conexión de un cliente.
Para crear un servidor usaremos el método net.createServer([options][, connectionListener])
que devuelve un objeto de la clase net.Server
, la función “connectionListener
” es la que se ejecutará cuando el servidor reciba una conexión, evento ‘connection’
, recibiendo un objeto de conexión net.socket
. Para vincular nuestro servidor con un puerto y host usamos el método server.listen()
, cuando el servidor se vincula con un puerto se emite el evento ‘listening’
agregamos un oyente para ese evento. El ejemplo lo tenemos en el archivo servidorTCP_A.js
const net = require("net");
/**
*
* ServidorTCP
* @param {net.Socket} socket
*/
servidor = net.createServer((socket) => {
/*escribimos en el socket para que el cliente los consuma*/
socket.write("Hola desde el servidor!!!\n");
/*leemos los datos proporcionados desde el extremo cliente*/
socket.on("data", (chunk) => {
console.log(chunk.toString());
});
/*canalizamos los datos leidos del cliente al propio socket
para que el cliente los consuma*/
socket.pipe(socket);
});
//El evento listening es emitido después de vincularse al puerto.
servidor.on("listening", () => {
let nombre = servidor.address();
console.log(`Servidor TCP escuchando en: ${nombre.address}:${nombre.port}`);
});
//vinculamos puerto y host
servidor.listen(8082, "192.168.1.217");
Analizando la función que maneja las conexiones recibidas, nuestra connectionListener
, podemos observar como el objeto socket que representa la conexión funciona como un flujo Duplex por un lado podemos escribir en nuestro socket para comunicarnos con el cliente, por otro funciona como un flujo de lectura podemos leer del socket los datos que nos proporciona el cliente escuchando el evento ‘data’
y además como es un flujo Duplex podemos canalizar los datos que leemos del socket, datos que nos suministra al cliente, al flujo de escritura del socket devolviendo los datos al cliente, haciendo así la función de un servidor eco.
(socket) => {
/*escribimos en el socket para que el cliente los consuma*/
socket.write("Hola desde el servidor!!!\n");
/*leemos los datos proporcionados desde el extremo cliente*/
socket.on("data", (chunk) => {
console.log(chunk.toString());
});
/*canalizamos los datos leidos del cliente al propio socket
para que el cliente los consuma*/
socket.pipe(socket);
}
Si ejecutamos el anterior código obtenemos la siguiente salida
Nos podemos conectar con nuestro servidor TCP usando netcat e interactuar con el servidor, podemos observar como recibimos la bienvenida y al escribir datos como se produce el efecto eco
En el servidor podemos observar como leemos los datos que suministra nuestro cliente.
De una manera fácil podemos hacer un cliente para nuestro anterior servidor TCP, archivo clienteTCP_A.js, básicamente creamos un socket y le pasamos los parámetros para conectarse con nuestro servidor, podemos comprobar como nuestro socket trabaja como un stream Duplex, podemos escribir en el socket, socketTcp.write
, para que el servidor lea esos datos, podemos leer del socket los datos que suministra el servidor escuchando el evento ‘data’
.
const net=require('net');
//Cliente TCP
const socketTcp = new net.Socket();
const options={
port: 8082,
host: '192.168.1.217',
}
socketTcp.connect(options);//Conectamos con servidor
socketTcp.write('Hola desde el cliente')
socketTcp.on('data',(chunk)=>{
console.log('Dato leido del socket: '+chunk)
})
Si ejecutamos los archivos correspondientes al servidor y al cliente TCP obtenemos las siguientes salidas:
En el servidor:
En el cliente:
Servidor Gunzip
En el siguiente ejemplo vamos a crear un servidor TCP que recibe un archivo comprimido en formato gzip, los descomprime y guarda en disco, este ejemplo combina un flujo Dúplex, socket TCP, un flujo de Transformación zlib.createGunzip
y un flujo de escritura fs.createWriteStream
.
La secuencia sería la siguiente:
- [Servidor] Recibe datos del cliente.
- [Servidor] Descomprime los datos.
- [Servidor] Escribe los datos en el disco.
El ejemplo lo podemos encontrar en el archivo servidorGzip.js.
Mediante el método fs.createWriteStream()
creamos una instancia de la clase fs.WriteStream
que extiende de los flujos de escritura stream.Writable
y nos permiten escribir en un archivo.
Mediante el método zlib.createGunzip
obtenemos un objeto zlib.Gunzip
que permite descomprimir un flujo o secuencia que previamente ha sido comprimida en Gzip. La clase zlib.Gunzip
hereda de la clase zlib.ZlibBase
que es la clase base de las clases que permiten tanto la compresión como descompresión, esta clase hereda de los flujos de transformación “stream.Transform
” por lo que permite que sus objetos utilicen canalizaciones.
Los datos que leemos del socket, que nos proporciona un cliente, los vamos canalizando en la parte de escritura de nuestro flujo de transformación Gunzip, se aplica la descompresión y son puestos a disposición a la parte de lectura donde los canalizamos a un archivo en nuestro servidor, escuchamos el evento ‘finish’
que nos indica que ya no habrá más datos que escribir.
const net=require('net');
const zlib = require('zlib');
const fs = require('fs');
const fileServer = './/copia.png'//nombre del archivo en servidor
//Creamos un flujo de escritura
const writeStream = fs.createWriteStream(fileServer);
//Servidor TCP
const servidor=net.createServer((socket)=> {
socket//lectura
.pipe(zlib.createGunzip())//transform
.pipe(writeStream)//escritura
.on('finish', () => {
console.log('Fichero salvado en servidor')
})
})
servidor.on('listening', () => {//después de vincularse al puerto.
let nombre = servidor.address();
console.log(`Servidor TCP escuchando en: ${nombre.address}:${nombre.port}`);
});
servidor.listen(8082, '192.168.1.217');
Para comprobar la funcionalidad de nuestro servidor desde un puesto cliente podemos proceder de la siguiente manera:
- Comprimir un archivo, por ejemplo el archivo happy.png: gzip happy.png
- Mediante la utilidad netcat podemos conectarnos con nuestro servidor TCP para transferir el archivo: nc -N 192.168.1.217 8082 < happy.png.gz
Podemos comprobar la salida que nos muestra la terminal en nuestro servidor y como en nuestro servidor se ha creado el archivo copia.png
Cliente Gzip
Podemos crear un cliente para nuestro anterior servidor, este cliente leerá un archivo, comprimirá los datos del archivo a Gzip y enviará los datos al servidor.
- [Cliente] Leer del sistema de archivos
- [Cliente] Comprime los datos
- [Cliente] Envíalo al servidor
El ejemplo lo podemos encontrar en el archivo clienteGzip.js.
Mediante el método fs.createReadStream()
creamos una instancia de la clase fs.ReadStream
que extiende de los flujos de escritura stream.Readable
y nos permiten leer los datos de un archivo origen.
Mediante el método zlib.createGzip
obtenemos un objeto zlib.Gzip
que permite comprimir una secuencia de datos en Gzip. La clase zlib.Gzip
hereda de la clase zlib.ZlibBase
que es la clase base de las clases que permiten tanto la compresión como descompresión, esta clase hereda de los flujos de transformación “stream.Transform
” por lo que permite que sus objetos utilicen canalizaciones.
Los datos que leemos de nuestro archivo local, los vamos canalizando en la parte de escritura de nuestro flujo de transformación Gzip, se aplica la compresión y son puestos a disposición a la parte de lectura donde los canalizamos a la parte de escritura de nuestro socket para ponerlos a disposición del servidor, escuchamos el evento ‘finish’
que nos indica que ya no habrá más datos que escribir.
const net=require('net');
const zlib = require('zlib');
const fs = require('fs');
const fileCliente='.//happy.png'
//Cliente TCP
const socketTcp = new net.Socket();
const options={
port: 8082,
host: '192.168.1.217',
}
socketTcp.connect(options);//Conectamos con servidor
fs.createReadStream(fileCliente)
.pipe(zlib.createGzip())
.pipe(socketTcp)
.on('finish', () => {
console.log('\nFichero enviado')
})
Cliente Gzip Servidor Gunzip con cifrado descifrado
Aprovechando nuestro servidor y cliente TCP, vamos a agregar una capa de cifrado a nuestro sistema de transferencia de archivo, el código lo tenemos en el archivo serverclientFileTCP.js en donde juntamos el servidor y el cliente.
Necesitamos usar algunas funciones de utilidad desde del módulo crypto
. (1)
randomBytes()
permite generar una secuencia aleatoria de bytes. Lo usaremos para generar el vector (iv
) de inicialización para el cifrado del archivo (2)
y para generar una clave de cifrado aleatoria (key
) (3)
.
createDecipheriv
(4)
Crea y devuelve un objeto de la clase Decipher, con un algoritmo de cifrado un clave (key
) y vector de inicialización ( iv
) dados. Las instancias de la clase Decipher se utilizan para descifrar datos y extienden de stream.Transform
por lo que son flujos de transformación.
createCipheriv
(5)
Crea y devuelve un objeto de la clase Cipher, con un algoritmo de cifrado un clave (key
) y vector de inicialización ( iv
) dados. Las instancias de la clase Cipher se utilizan para cifrar datos y extienden de stream.Transform
por lo que son flujos de transformación.
En el código del servidor (A)
podemos observar que el primer paso de nuestra canalización ahora es el responsable de descifrar los datos entrantes leídos del socket para ello usamos un flujo de transformación createDecipheriv
(4)
En el código de nuestro cliente (B)
, que está envuelto dentro de un timeOut
para que se ejecute una vez este corriendo nuestro servidor, encriptamos los datos que queremos enviar a nuestro servidor, justo después de la fase Gzip utilizando el flujo de transformación createCipheriv
(5)
.
Finalmente nuestro sistema realizará estas operaciones:
- [Cliente] Leer del sistema de archivos
- [Cliente] Comprime los datos
- [Cliente] Cifra los datos
- [Cliente] Envíalo al servidor
- [Servidor] Recibe del cliente
- [Servidor] Descifra los datos.
- [Servidor] Descomprime los datos
- [Servidor] Escribe los datos en el disco
const net=require('net');
const crypto = require('crypto');//(1)
const zlib = require('zlib');
const fs = require('fs');
const fileServer = './/registro.txt'
const fileCliente='.//hola.txt'
const writeStream = fs.createWriteStream(fileServer);
const iv = crypto.randomBytes(16)//(2)
const key = crypto.randomBytes(24);//(3)
//Servidor TCP (A)
net.createServer((socket)=> {
socket
.pipe(crypto.createDecipheriv('aes192', key, iv))//(4)
.pipe(zlib.createGunzip())
.pipe(writeStream)
.on('finish', () => {
console.log('Fichero salvado en servidor')
})
}).listen(8082);
//Cliente TCP (B)
setTimeout(()=>{
const socketTcp = new net.Socket();
const options={
port: 8082,
host: '192.168.1.217',
}
socketTcp.connect(options);//Conectamos con servidor
fs.createReadStream(fileCliente)
.pipe(zlib.createGzip())
.pipe(crypto.createCipheriv('aes192', key, iv))//(5)
.pipe(socketTcp)
.on('finish', () => {
console.log('\nFichero enviado')
})
},1000);