最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

Node.js设计模式使用流进行编码

来源:动视网 责编:小采 时间:2020-11-27 20:07:40
文档

Node.js设计模式使用流进行编码

Node.js设计模式使用流进行编码:本文主要和大家分享Node.js设计模式使用流进行编码,希望能帮助到大家。Streams是Node.js最重要的组件和模式之一。 社区中有一句格言Stream all the things(Steam就是所有的),仅此一点就足以描述流在Node.js中的地位。 Dominic Tar
推荐度:
导读Node.js设计模式使用流进行编码:本文主要和大家分享Node.js设计模式使用流进行编码,希望能帮助到大家。Streams是Node.js最重要的组件和模式之一。 社区中有一句格言Stream all the things(Steam就是所有的),仅此一点就足以描述流在Node.js中的地位。 Dominic Tar


正如我们所预料到的那样,使用Buffer来进行大文件的读取显然是错误的。

使用Streams进行压缩文件

我们必须修复我们的Gzip应用程序,并使其处理大文件的最简单方法是使用StreamsAPI。 让我们看看如何实现这一点。 让我们用下面的代码替换刚创建的模块的内容:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
 .pipe(zlib.createGzip())
 .pipe(fs.createWriteStream(file + '.gz'))
 .on('finish', () => console.log('File successfully compressed'));

“是吗?”你可能会问。是的;正如我们所说的,由于Streams的接口和可组合性,因此我们还能写出这样的更加简洁,优雅和精炼的代码。 我们稍后会详细地看到这一点,但是现在需要认识到的重要一点是,程序可以顺畅地运行在任何大小的文件上,理想情况是内存利用率不变。 尝试一下(但考虑压缩一个大文件可能需要一段时间)。

时间效率

现在让我们考虑一个压缩文件并将其上传到远程HTTP服务器的应用程序的例子,该远程HTTP服务器进而将其解压缩并保存到文件系统中。如果我们的客户端是使用BufferedAPI实现的,那么只有当整个文件被读取和压缩时,上传才会开始。 另一方面,只有在接收到所有数据的情况下,解压缩才会在服务器上启动。 实现相同结果的更好的解决方案涉及使用Streams。 在客户端机器上,Streams只要从文件系统中读取就可以压缩和发送数据块,而在服务器上,只要从远程对端接收到数据块,就可以解压每个数据块。 我们通过构建前面提到的应用程序来展示这一点,从服务器端开始。

我们创建一个叫做gzipReceive.js的模块,代码如下:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
 const filename = req.headers.filename;
 console.log('File request received: ' + filename);
 req
 .pipe(zlib.createGunzip())
 .pipe(fs.createWriteStream(filename))
 .on('finish', () => {
 res.writeHead(201, {
 'Content-Type': 'text/plain'
 });
 res.end('That\'s it\n');
 console.log(`File saved: ${filename}`);
 });
});

server.listen(3000, () => console.log('Listening'));

服务器从网络接收数据块,将其解压缩,并在接收到数据块后立即保存,这要归功于Node.jsStreams

我们的应用程序的客户端将进入一个名为gzipSend.js的模块,如下所示:

在前面的代码中,我们再次使用Streams从文件中读取数据,然后在从文件系统中读取的同时压缩并发送每个数据块。

现在,运行这个应用程序,我们首先使用以下命令启动服务器:

node gzipReceive

然后,我们可以通过指定要发送的文件和服务器的地址(例如localhost)来启动客户端:

node gzipSend <path to file> localhost

如果我们选择一个足够大的文件,我们将更容易地看到数据如何从客户端流向服务器,但为什么这种模式下,我们使用Streams,比使用BufferedAPI更有效率? 下图应该给我们一个提示:

一个文件被处理的过程,它经过以下阶段:

  1. 客户端从文件系统中读取

  2. 客户端压缩数据

  3. 客户端将数据发送到服务器

  4. 服务端接收数据

  5. 服务端解压数据

  6. 服务端将数据写入磁盘

为了完成处理,我们必须按照流水线顺序那样经过每个阶段,直到最后。在上图中,我们可以看到,使用BufferedAPI,这个过程完全是顺序的。为了压缩数据,我们首先必须等待整个文件被读取完毕,然后,发送数据,我们必须等待整个文件被读取和压缩,依此类推。当我们使用Streams时,只要我们收到第一个数据块,流水线就会被启动,而不需要等待整个文件的读取。但更令人惊讶的是,当下一块数据可用时,不需要等待上一组任务完成;相反,另一条装配线是并行启动的。因为我们执行的每个任务都是异步的,这样显得很完美,所以可以通过Node.js来并行执行Streams的相关操作;唯一的限制就是每个阶段都必须保证数据块的到达顺序。

从前面的图可以看出,使用Streams的结果是整个过程花费的时间更少,因为我们不用等待所有数据被全部读取完毕和处理。

组合性

到目前为止,我们已经看到的代码已经告诉我们如何使用pipe()方法来组装Streams的数据块,Streams允许我们连接不同的处理单元,每个处理单元负责单一的职责(这是符合Node.js风格的)。这是可能的,因为Streams具有统一的接口,并且就API而言,不同Streams也可以很好的进行交互。唯一的先决条件是管道的下一个Streams必须支持上一个Streams生成的数据类型,可以是二进制,文本甚至是对象,我们将在后面的章节中看到。

为了证明Streams组合性的优势,我们可以尝试在我们先前构建的gzipReceive / gzipSend应用程序中添加加密功能。
为此,我们只需要通过向流水线添加另一个Streams来更新客户端。 确切地说,由crypto.createChipher()返回的流。 由此产生的代码应如下所示:

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
 hostname: server,
 port: 3000,
 path: '/',
 method: 'PUT',
 headers: {
 filename: path.basename(file),
 'Content-Type': 'application/octet-stream',
 'Content-Encoding': 'gzip'
 }
};

const req = http.request(options, res => {
 console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
 .pipe(zlib.createGzip())
 .pipe(crypto.createCipher('aes192', 'a_shared_secret'))
 .pipe(req)
 .on('finish', () => {
 console.log('File successfully sent');
 });

使用相同的方式,我们更新服务端的代码,使得它可以在数据块进行解压之前先解密:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
 const filename = req.headers.filename;
 console.log('File request received: ' + filename);
 req
 .pipe(crypto.createDecipher('aes192', 'a_shared_secret'))
 .pipe(zlib.createGunzip())
 .pipe(fs.createWriteStream(filename))
 .on('finish', () => {
 res.writeHead(201, {
 'Content-Type': 'text/plain'
 });
 res.end('That\'s it\n');
 console.log(`File saved: ${filename}`);
 });
});

server.listen(3000, () => console.log('Listening'));
crypto是Node.js的核心模块之一,提供了一系列加密算法。

只需几行代码,我们就在应用程序中添加了一个加密层。 我们只需要简单地通过把已经存在的Streams模块和加密层组合到一起,就可以。类似的,我们可以添加和合并其他Streams,如同在玩乐高积木一样。

显然,这种方法的主要优点是可重用性,但正如我们从目前为止所介绍的代码中可以看到的那样,Streams也可以实现更清晰,更模块化,更加简洁的代码。 出于这些原因,流通常不仅仅用于处理纯粹的I / O,而且它还是简化和模块化代码的手段。

开始使用Streams

在前面的章节中,我们了解了为什么Streams如此强大,而且它在Node.js中无处不在,甚至在Node.js的核心模块中也有其身影。 例如,我们已经看到,fs模块具有用于从文件读取的createReadStream()和用于写入文件的createWriteStream()HTTP请求和响应对象本质上是Streams,并且zlib模块允许我们使用StreamsAPI压缩和解压缩数据块。

现在我们知道为什么Streams是如此重要,让我们退后一步,开始更详细地探索它。

Streams的结构

Node.js中的每个Streams都是Streams核心模块中可用的四个基本抽象类之一的实现:

  • stream.Readable

  • stream.Writable

  • stream.Duplex

  • stream.Transform

  • 每个stream类也是EventEmitter的一个实例。实际上,Streams可以产生几种类型的事件,比如end事件会在一个可读的Streams完成读取,或者错误读取,或其过程中产生异常时触发。

    请注意,为简洁起见,在本章介绍的例子中,我们经常会忽略适当的错误处理。但是,在生产环境下中,总是建议为所有Stream注册错误事件侦听器。

    Streams之所以如此灵活的原因之一是它不仅能够处理二进制数据,而且几乎可以处理任何JavaScript值。实际上,Streams可以支持两种操作模式:

  • 二进制模式:以数据块形式(例如buffersstrings)流式传输数据

  • 对象模式:将流数据视为一系列离散对象(这使得我们几乎可以使用任何JavaScript值)

  • 这两种操作模式使我们不仅可以使用I / O流,而且还可以作为一种工具,以函数式的风格优雅地组合处理单元,我们将在本章后面看到。

    在本章中,我们将主要使用在Node.js 0.11中引入的Node.js流接口,也称为版本3。 有关与旧接口差异的更多详细信息,请参阅StrongLoop在https://strongloop.com/strong...。

    可读的Streams

    一个可读的Streams表示一个数据源,在Node.js中,它使用stream模块中的Readableabstract类实现。

    从Streams中读取信息

    从可读Streams接收数据有两种方式:non-flowing模式和flowing模式。 我们来更详细地分析这些模式。

    non-flowing模式(不流动模式)

    从可读的Streams中读取数据的默认模式是为其附加一个可读事件侦听器,用于指示要读取的新数据的可用性。然后,在一个循环中,我们读取所有的数据,直到内部buffer被清空。这可以使用read()方法完成,该方法同步从内部缓冲区中读取数据,并返回表示数据块的BufferString对象。read()方法以如下使用模式:

    readable.read([size]);

    使用这种方法,数据随时可以直接从Streams中按需提取。

    为了说明这是如何工作的,我们创建一个名为readStdin.js的新模块,它实现了一个简单的程序,它从标准输入(一个可读流)中读取数据,并将所有数据回送到标准输出:

    process.stdin
     .on('readable', () => {
     let chunk;
     console.log('New data available');
     while ((chunk = process.stdin.read()) !== null) {
     console.log(
     `Chunk read: (${chunk.length}) "${chunk.toString()}"`
     );
     }
     })
     .on('end', () => process.stdout.write('End of stream'));

    read()方法是一个同步操作,它从可读Streams的内部Buffers区中提取数据块。如果Streams在二进制模式下工作,返回的数据块默认为一个Buffer对象。

    在以二进制模式工作的可读的Stream中,我们可以通过在Stream上调用setEncoding(encoding)来读取字符串而不是Buffer对象,并提供有效的编码格式(例如utf8)。

    数据是从可读的侦听器中读取的,只要有新的数据,就会调用这个侦听器。当内部缓冲区中没有更多数据可用时,read()方法返回null;在这种情况下,我们不得不等待另一个可读的事件被触发,告诉我们可以再次读取或者等待表示Streams读取过程结束的end事件触发。当一个流以二进制模式工作时,我们也可以通过向read()方法传递一个size参数来指定我们想要读取的数据大小。这在实现网络协议或解析特定数据格式时特别有用。

    现在,我们准备运行readStdin模块并进行实验。让我们在控制台中键入一些字符,然后按Enter键查看回显到标准输出中的数据。要终止流并因此生成一个正常的结束事件,我们需要插入一个EOF(文件结束)字符(在Windows上使用Ctrl + Z或在Linux上使用Ctrl + D)。

    我们也可以尝试将我们的程序与其他程序连接起来;这可以使用管道运算符(|),它将程序的标准输出重定向到另一个程序的标准输入。例如,我们可以运行如下命令:

    cat <path to a file> | node readStdin

    这是流式范例是一个通用接口的一个很好的例子,它使得我们的程序能够进行通信,而不管它们是用什么语言写的。

    flowing模式(流动模式)

    Streams中读取的另一种方法是将侦听器附加到data事件;这会将Streams切换为flowing模式,其中数据不是使用read()函数来提取的,而是一旦有数据到达data监听器就被推送到监听器内。例如,我们之前创建的readStdin应用程序将使用流动模式:

    process.stdin
     .on('data', chunk => {
     console.log('New data available');
     console.log(
     `Chunk read: (${chunk.length}) "${chunk.toString()}"`
     );
     })
     .on('end', () => process.stdout.write('End of stream'));

    flowing模式是旧版Streams接口(也称为Streams1)的继承,其灵活性较低,API较少。随着Streams2接口的引入,flowing模式不是默认的工作模式,要启用它,需要将侦听器附加到data事件或显式调用resume()方法。 要暂时中断Streams触发data事件,我们可以调用pause()方法,导致任何传入数据缓存在内部buffer中。

    调用pause()不会导致Streams切换回non-flowing模式。

    实现可读的Streams

    现在我们知道如何从Streams中读取数据,下一步是学习如何实现一个新的Readable数据流。为此,有必要通过继承stream.Readable的原型来创建一个新的类。 具体流必须提供_read()方法的实现:

    readable._read(size)

    Readable类的内部将调用_read()方法,而该方法又将启动
    使用push()填充内部缓冲区:

    请注意,read()是Stream消费者调用的方法,而_read()是一个由Stream子类实现的方法,不能直接调用。下划线通常表示该方法为私有方法,不应该直接调用。

    为了演示如何实现新的可读Streams,我们可以尝试实现一个生成随机字符串的Streams。 我们来创建一个名为randomStream.js的新模块,它将包含我们的字符串的generator的代码:

    const stream = require('stream');
    const Chance = require('chance');
    
    const chance = new Chance();
    
    class RandomStream extends stream.Readable {
     constructor(options) {
     super(options);
     }
    
     _read(size) {
     const chunk = chance.string(); //[1]
     console.log(`Pushing chunk of size: ${chunk.length}`);
     this.push(chunk, 'utf8'); //[2]
     if (chance.bool({
     likelihood: 5
     })) { //[3]
     this.push(null);
     }
     }
    }
    
    module.exports = RandomStream;

    在文件顶部,我们将加载我们的依赖关系。除了我们正在加载一个chance的npm模块之外,没有什么特别之处,它是一个用于生成各种随机值的库,从数字到字符串到整个句子都能生成随机值。

    下一步是创建一个名为RandomStream的新类,并指定stream.Readable作为其父类。 在前面的代码中,我们调用父类的构造函数来初始化其内部状态,并将收到的options参数作为输入。通过options对象传递的可能参数包括以下内容:

  • 用于将Buffers转换为Stringsencoding参数(默认值为null

  • 是否启用对象模式(objectMode默认为false

  • 存储在内部buffer区中的数据的上限,一旦超过这个上限,则暂停从data source读取(highWaterMark默认为16KB

  • 好的,现在让我们来解释一下我们重写的stream.Readable类的_read()方法:

  • 该方法使用chance生成随机字符串。

  • 它将字符串push内部buffer。 请注意,由于我们push的是String,此外我们还指定了编码为utf8(如果数据块只是一个二进制Buffer,则不需要)。

  • 5%的概率随机中断stream的随机字符串产生,通过push null到内部Buffer来表示EOF,即stream的结束。

  • 我们还可以看到在_read()函数的输入中给出的size参数被忽略了,因为它是一个建议的参数。 我们可以简单地把所有可用的数据都push到内部的buffer中,但是如果在同一个调用中有多个推送,那么我们应该检查push()是否返回false,因为这意味着内部buffer已经达到了highWaterMark限制,我们应该停止添加更多的数据。

    文档

    Node.js设计模式使用流进行编码

    Node.js设计模式使用流进行编码:本文主要和大家分享Node.js设计模式使用流进行编码,希望能帮助到大家。Streams是Node.js最重要的组件和模式之一。 社区中有一句格言Stream all the things(Steam就是所有的),仅此一点就足以描述流在Node.js中的地位。 Dominic Tar
    推荐度:
    标签: 模式 使用 设计
    • 热门焦点

    最新推荐

    猜你喜欢

    热门推荐

    专题
    Top