最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

对于NodeJS如何操作消息队列RabbitMQ的分析

来源:动视网 责编:小采 时间:2020-11-27 19:33:44
文档

对于NodeJS如何操作消息队列RabbitMQ的分析

对于NodeJS如何操作消息队列RabbitMQ的分析:这篇文章主要介绍了关于对NodeJS如何操作消息队列RabbitMQ的分析,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下一. 什么是消息队列?消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能
推荐度:
导读对于NodeJS如何操作消息队列RabbitMQ的分析:这篇文章主要介绍了关于对NodeJS如何操作消息队列RabbitMQ的分析,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下一. 什么是消息队列?消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能
这篇文章主要介绍了关于对NodeJS如何操作消息队列RabbitMQ的分析,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下

一. 什么是消息队列?

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

二. 常用的消息队列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至现在部分NoSQL也可做消息队列,如Redis。

三. 消息队列的使用场景?

  • 异步处理

  • 应用解耦

  • 流量削峰

  • 四. 使用案例

    上规模的公司都会有自己的日志分析系统,日志系统是怎么实现的呢?

    图解:用户在访问应用的时候,我们要记录下用户的操作记录和系统的异常日志,常规的做法是将系统产生的日志保存到服务器磁盘,在服务器中开启定时任务,定时将磁盘的日志信息传入mq中(生产者),也定时将mq中的消息取出并存到相应的数据库,如ElasticSearch或Hive中。

    五. 如何安装RabbitMQ?

    上面的案例介绍了MQ的一个使用场景,我这里是用RabbitMQ举例,现实项目中可能用到的是Kafka。

    首先安装brew(mac为例)

    /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

    安装RabbitMQ

    brew install rabbitmq

    运行RabbitMQ

    进入到 /usr/local/Cellar/rabbitmq/3.7.7,执行

    sbin/rabbitmq-server

    启动插件

    进入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ./rabbitmq-plugins enable rabbitmq_management

    登陆管理界面

    打开浏览器输入:http://localhost:15672,RabbitMQ默认15672端口六. Nodejs操作RabbitMQ

    网上可以找到好几个相应的Node SDK,这里推荐amqplib

    1.生产者

    /**
     * 对RabbitMQ的封装
     */
    let amqp = require('amqplib');
    
    class RabbitMQ {
     constructor() {
     this.hosts = [];
     this.index = 0;
     this.length = this.hosts.length;
     this.open = amqp.connect(this.hosts[this.index]);
     }
     sendQueueMsg(queueName, msg, errCallBack) {
     let self = this;
    
     self.open
     .then(function (conn) {
     return conn.createChannel();
     })
     .then(function (channel) {
     return channel.assertQueue(queueName).then(function (ok) {
     return channel.sendToQueue(queueName, new Buffer(msg), {
     persistent: true
     });
     })
     .then(function (data) {
     if (data) {
     errCallBack && errCallBack("success");
     channel.close();
     }
     })
     .catch(function () {
     setTimeout(() => {
     if (channel) {
     channel.close();
     }
     }, 500)
     });
     })
     .catch(function () {
     let num = self.index++;
    
     if (num <= self.length - 1) {
     self.open = amqp.connect(self.hosts[num]);
     } else {
     self.index == 0;
     }
     });
     }
    }

    2. 消费者

    /**
     * 对RabbitMQ的封装
     */
    let amqp = require('amqplib');
    
    class RabbitMQ {
     constructor() {
     this.open = amqp.connect(this.hosts[this.index]);
     }
     receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
     let self = this;
    
     self.open
     .then(function (conn) {
     return conn.createChannel();
     })
     .then(function (channel) {
     return channel.assertQueue(queueName)
     .then(function (ok) {
     return channel.consume(queueName, function (msg) {
     if (msg !== null) {
     let data = msg.content.toString();
     channel.ack(msg);
     receiveCallBack && receiveCallBack(data);
     }
     })
     .finally(function () {
     setTimeout(() => {
     if (channel) {
     channel.close();
     }
     }, 500)
     });
     })
     })
     .catch(function () {
     let num = self.index++;
     if (num <= self.length - 1) {
     self.open = amqp.connect(self.hosts[num]);
     } else {
     self.index = 0;
     self.open = amqp.connect(self.hosts[0]);
     }
     });
     }

    3. 通过生产者向MQ发送一个消息,并创建队列

    let mq = new RabbitMQ();
    mq.sendQueueMsg('testQueue', 'my first message', (error) => {
     console.log(error)
    })

    执行之后,我们打开管理平台,发现RabbbitMQ已经接受到了一条消息:

    并且RabbbitMQ新增了一个队列testQueue

    4. 获取指定队列的消息

    let mq = new RabbitMQ();
    mq.receiveQueueMsg('testQueue',(msg) => { 
     console.log(msg)
    })// 
    输出结果:my first message

    此时打开RabbitMQ管理平台,消息数量已经变为0

    综上:我们简单讲述了消息队列及RabbitMQ相关的一些知识,以及我们如何通过nodejs来生产与消费消息。

    文档

    对于NodeJS如何操作消息队列RabbitMQ的分析

    对于NodeJS如何操作消息队列RabbitMQ的分析:这篇文章主要介绍了关于对NodeJS如何操作消息队列RabbitMQ的分析,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下一. 什么是消息队列?消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能
    推荐度:
    • 热门焦点

    最新推荐

    猜你喜欢

    热门推荐

    专题
    Top