吐血总结——消息队列之RocketMQ知识梳理_.net 7 rocketmq-程序员宅基地

技术标签: 流量削峰  中间件  消息队列  分布式  RocketMQ  

个人公众号: : 可为编程
个人信条: 知足知不足 有为有不为 为与不为皆为可为
本篇简介: 本片详细说明了消息队列之RocketMQ知识梳理使用规则和注意要点,并给出具体操作实例,如有出入还望指正。

关注公众号【可为编程】回复【面试】领取最新面试题!!!

摘要

在说消息队列之前,我们要明白为啥需要消息队列,知乎上有一篇文章写的不错,链接: 为什么要使用消息队列?

消息队列主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。今天我就首先分析一下RocketMQ,目前公司用的也是这个,因此在进行一下梳理,加深一下印象。

RocketMQ概述

RocketMQ为分布式消息中间件,其高性能在于顺序写盘(CommitLog)、零拷贝和跳跃读(尽量命中PageCache),高可靠性在于刷盘机制和Master/Slave,另外NameServer如果全部挂掉都不会影响已经运行的Broker,Producer,Consumer。

RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:

1、支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
2、在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
3、支持拉(pull)和推(push)两种消息模式
4、单一队列百万消息的堆积能力
5、支持多种消息协议,如 JMS、MQTT 等
6、分布式高可用的部署架构,满足至少一次消息传递语义
7、提供 docker 镜像用于隔离测试和云集群部署
8、提供配置、指标和监控等功能丰富的Dashboard

那么首先先明白NameServer、Broker、Producer、Consumer都是什么,其实后面当在分析Kafka的时候,就会发现两点很像,因为RocketMQ是阿里根据Kafka架构进行的自研开发,在一些功能结构上面保留了Kafka的一些特性。首先我们先看一下RocketMQ的架构图:

在这里插入图片描述
从图中就能够看出来这是一个双主双从的集群模式结构图,我们根据这个图进行一下分析:
关注公众号【可为编程】回复【面试】领取最新面试题!!!

NameServer

为producer 和 consumer 提供路由信息,记录broker与topic的关系,然后在这个基础上对Broker进行每十秒的监测,判断Broker是否依然存活。
其优点如下:

1、可集群部署
2、相互之间独立,没有通信,不必保障节点间的数据强一致性
3、其他角色同时向多个NameServer机器上报状态信息
4、本身是无状态的,NameServer中的BrokerTopic等状态信息不会持久存储,由各个角色定时上报并存储到内存中的(NameServer支持配置参数的持久化,一般用不到)
5、采用每30s心跳机制
6、长连接持续提供给ProducerConsumer Topic信息
7、存储当前集群所有的Broker信息、TopicBroker的对应关系(
1)broker的基本信息(ip port等)2)主题topic的地址信息3)broker集群信息4)存活的broker信息5)filter 过滤器
)
8、只做集群元数据存储和心跳工作,功能简单,稳定性高
9、多机热备,单台NameServer宕机不影响其他NameServer工作
10、每个NameServer注册的信息都是一样的,而且是当前系统中的所有broker的元数据信息

需要注意的是,即使整个NameServer集群宕机了,已经正常工作的Producer、Consumer、Broker仍然能正常工作,但新起的Producer、Consumer、Broker就无法工作。

Broker(Master):

MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护,同时用来消息存储和生产消费转发。

1、单个Broker跟所有Namesrv保持心跳请求,心跳间隔为302、心跳请求中包括当前Broker信息(IP+端口等)以及存储所有topic信息
3RocketMQ消息代理服务器主节点,负责接收Producer发送的消息、消息存储、Consumer拉取消息;
BrokerSlave):
RocketMQ消息代理服务器备份节点,主要是通过同步/异步的方式将主节点的消息同步过来进行备份,为RocketMQ集群的高可用性提供保障;
关注公众号【可为编程】回复【面试】领取最新面试题!!!
Producer:
1、负责生产消息,一般由业务系统负责生产消息。
2、一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
3RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
4、同步和异步方式均需要Broker返回确认信息,单向发送不需要
Consumer:
1、负责消费消息,一般是后台系统负责异步消费。
2、一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。
3、从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费(其实都是拉取,保持长连接)

在这里插入图片描述
关注公众号【可为编程】回复【面试】领取最新面试题!!!

Queue

主要用于支持点对点的消息传递模式,即生产者将消息发送至队列,队列存在于Broker中,消息者从该队列中取出消息。这种传递方式的特点是,一个队列可以被多个生产者或消费者共用,但是某条消息一旦被某消费者取出,它将不再存在于队列中。即一条消息只能传递给一个消费者。

Topic(主题):
1、主要用于发布/订阅的传递模式。
2、生产者可将消息发布到Topic中,该Topic可由多个消费者订阅,所有订阅该Topic的消费者都能收到生产者发布的消息。
3、所有订阅的消费者都接收消息后,消息才会从Topic中移除。即一条消息可以传递给多个消费者。
4、来代表一种数据的集合,Topic 并不具有真正的属性,它只是一类数据的集合,不同类型的数据我们应该放到不同的 Topic5Topic 会分布式的进行存储;

Topic与Broker的对应关系
在这里插入图片描述

注意: 当我们真正使用 MQ 时,第一步应该总是先创建一些 Topic,作为数据集合存放不同类型的消息,其实本质上来讲和使用数据库时总是先创建表结构是一样的。
关注公众号【可为编程】回复【面试】领取最新面试题!!!

什么是长轮询机制?

Consumer从消息队列获取消息的方式主要有两种:pull和push。两种都有一些问题,比如说pull的情况下,有时候可能导致消息在服务端堆积,消息处理延时较高,有时候又可能因为消息队列中没有消息而导致空拉取,造成资源浪费,而在push的情况下,可能导致超出客户端压力,造成客户端卡死甚至宕机。于是,把pull和push相结合,得到了长轮询。
长轮询的机制是由客户端发起pull请求,服务端接收到客户端的请求后,如果发现队列中没有消息,并不立即返回,而是持有该请求一段时间,在此期间,服务端不断轮询队列中是否有新消息,如果有,则用现有连接将消息返回给客户端,如果一段时间内还是没有新消息,则返回空。长轮询机制的好处在于,其本质还是pull,所以,消息处理的主动权还是在客户端手中,客户端就可以根据自己的能力去做消息处理。而服务端持有请求一段时间的机制又很大程序的避免了空拉取,减少了资源的浪费。但是,这种机制也有一定问题,当客户端数量过多时,服务端可能在时间段内需要持有过多的连接,这种请求下,也会对服务端造成压力。不过,一般来说,消息队列的承压能力还是比较可靠的,再加上集群的保障,基本不用担心这个问题。
在这里插入图片描述

Rocketmq的工作流程是怎样的?

1、先启动Namesrv,Namesrver启动后监听端口,等待Broker、Produer、Consumer连接上来,相当于一个路由控制中心。
2、Broker启动,跟所有的Namesrver保持长连接,每30s发送一次心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,Namesrv集群中就有Topic跟Broker的映射关系。
3、收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
关注公众号【可为编程】回复【面试】领取最新面试题!!!
4、Producer启动并且发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。
5、Consumer跟Producer类似,跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。

RocketMq使用哪种方式消费消息,pull还是push?

RocketMq提供两种方式:pull和push进行消息的消费 而RocketMq的push方式,本质上也是采用pull的方式进行实现的。也就是说这两种方式本质上都是采用consumer轮询从broker拉取消息的 push方式里,consumer把轮询过程封装了一层,并注册了MessageListener监听器。当轮询取到消息后,便唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉好像消息是被推送过来的 其实想想,消息统一都发到了broker,而broker又不会主动去push消息,那么消息肯定都是需要消费者主动去拉的喽~

后面知识点太多了 还是自己看吧,有时间再总结!!!

RocketMQ中的延迟消息

RocketMQ-延时消息Demo及实现原理分析

RocketMQ的消费模式

细谈RocketMQ的消费模式

RocketMQ 的核心 NameServer

不要和陌生人说话,消息中间件之 Topic

还在纠结秒杀?看看 MQ 如何搞定

为什么要使用消息中间件?

《吃透MQ系列》核心基础全在这里了,一文啃透!

分布式消息队列

RocketMQ相关问题研究

Broker 主从同步机制

RocketMQ刷盘机制

RocketMQ练习:

如果以集群的形式来进行演示的话必定会使得我电脑不堪重负,所以我采用单机的模式来进行。
@rocketMQ配置
106.12.50.23 rocketmq-nameserver-1
106.12.50.23 rocketmq-master-1
18.191.180.186 rocketmq-nameserver-2
18.191.180.186 rocketmq-master-2

broker-a.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
启动命令
sed -i 's#${
    user.home}#/usr/local/mq/rocketmq#g' *.xml
启动nameserver
nohup sh mqnamesrv &
启动broker
nohup sh mqbroker -c  /usr/local/mq/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
jps
单机模式启动配置broker-a.properties
# 集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示Master,>0 表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver-1:9876
# 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker 自动创建Topic, 建议线下开启, 线上关闭
autoCreateTopicEnable=true
# 是否允许Broker 自动创建订阅组, 建议线下开启, 线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认是凌晨4点
deleteWhen=04
# 文件保留时间,默认是48小时
fileReservedTime=48
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条, 根据业务情况调整
mapedFileSizeConsumeQueue=30000
# destroyMapedFileIntervalForcibly=12000
# redeleteHangedFileInterval=12000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/mq/rocketmq/store
# commitLog存储路径
storePathCommitLog=/usr/local/mq/rocketmq/store/commitlog
# 消息队列储存路径
storePathConsumeQueue=/usr/local/mq/rocketmq/store/consumequeue
# 消息索引粗存路径
storePathIndex=/usr/local/mq/rocketmq/store/index
# checkpoint 文件储存路径
storeCheckpoint=/usr/local/mq/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/mq/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker的角色
# -ASYNC_MASTER 异步复制Master
# -SYNC_MASTER 同步双写Master
# -SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageTreadPoolNums=128
# 拉消息线程池数量
# pullMessageTreadPoolNums=128lushDiskType=ASYNC_FLUSHH
# 关注公众号【可为编程】回复【面试】领取最新面试题!!!

欢迎感兴趣的小伙伴一起探讨学习知识,以上是个人的一些总结分享,如有错误的地方望各位留言指出,十分感谢。觉得有用的话别忘点赞、收藏、关注,手留余香!

这里是一个真诚的***青年技术交流QQ群:761374713***,不管你是大学生、社畜、想学习变成的其他人员,欢迎大家加入我们,一起成长,一起进步,真诚的欢迎你,不管是技术,还是人生,还是学习方法。有道无术,术亦可求,有术无道,止于术。在这里插入图片描述

欢迎大家关注【可为编程】,成长,进步,编程,技术、掌握更多知识!
在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_42103983/article/details/119269232

智能推荐

FX3/CX3 JLINK 调试_ezusbsuite_qsg.pdf-程序员宅基地

文章浏览阅读2.1k次。FX3 JLINK调试是一个有些麻烦的事情,经常有些莫名其妙的问题。 设置参见 c:\Program Files (x86)\Cypress\EZ-USB FX3 SDK\1.3\doc\firmware 下的 EzUsbSuite_UG.pdf 文档。 常见问题: 1.装了多个版本的jlink,使用了未注册或不适当的版本 选择一个正确的版本。JLinkARM_V408l,JLinkA_ezusbsuite_qsg.pdf

用openGL+QT简单实现二进制stl文件读取显示并通过鼠标旋转缩放_qopengl如何鼠标控制旋转-程序员宅基地

文章浏览阅读2.6k次。** 本文仅通过用openGL+QT简单实现二进制stl文件读取显示并通过鼠标旋转缩放, 是比较入门的级别,由于个人能力有限,新手级别,所以未能施加光影灯光等操作, 未能让显示的stl文件更加真实。****效果图:**1. main.cpp```cpp#include "widget.h"#include <QApplication>int main(int argc, char *argv[]){ QApplication a(argc, argv); _qopengl如何鼠标控制旋转

刘焕勇&王昊奋|ChatGPT对知识图谱的影响讨论实录-程序员宅基地

文章浏览阅读943次,点赞22次,收藏19次。以大规模预训练语言模型为基础的chatgpt成功出圈,在近几日已经给人工智能板块带来了多次涨停,这足够说明这一风口的到来。而作为曾经的风口“知识图谱”而言,如何找到其与chatgpt之间的区别,找好自身的定位显得尤为重要。形式化知识和参数化知识在表现形式上一直都是大家考虑的问题,两种技术都应该有自己的定位与价值所在。知识图谱构建往往是抽取式的,而且往往包含一系列知识冲突检测、消解过程,整个过程都能溯源。以这样的知识作为输入,能在相当程度上解决当前ChatGPT的事实谬误问题,并具有可解释性。

如何实现tomcat的热部署_tomcat热部署-程序员宅基地

文章浏览阅读1.3k次。最重要的一点,一定是degbug的方式启动,不然热部署不会生效,注意,注意!_tomcat热部署

用HTML5做一个个人网站,此文仅展示个人主页界面。内附源代码下载地址_个人主页源码-程序员宅基地

文章浏览阅读10w+次,点赞56次,收藏482次。html5 ,用css去修饰自己的个人主页代码如下:&lt;!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"&gt;&lt;html xmlns="http://www.w3.org/1999/xh..._个人主页源码

程序员公开上班摸鱼神器!有了它,老板都不好意思打扰你!-程序员宅基地

文章浏览阅读201次。开发者(KaiFaX)面向全栈工程师的开发者专注于前端、Java/Python/Go/PHP的技术社区来源:开源最前线链接:https://github.com/svenstaro/gen..._程序员怎么上班摸鱼

随便推点

UG\NX二次开发 改变Block UI界面的尺寸_ug二次开发 调整 对话框大小-程序员宅基地

文章浏览阅读1.3k次。改变Block UI界面的尺寸_ug二次开发 调整 对话框大小

基于深度学习的股票预测(完整版,有代码)_基于深度学习的股票操纵识别研究python代码-程序员宅基地

文章浏览阅读1.3w次,点赞18次,收藏291次。基于深度学习的股票预测数据获取数据转换LSTM模型搭建训练模型预测结果数据获取采用tushare的数据接口(不知道tushare的筒子们自行百度一下,简而言之其免费提供各类金融数据 , 助力智能投资与创新型投资。)python可以直接使用pip安装tushare!pip install tushareCollecting tushare Downloading https://files.pythonhosted.org/packages/17/76/dc6784a1c07ec040e74_基于深度学习的股票操纵识别研究python代码

中科网威工业级防火墙通过电力行业测评_电力行业防火墙有哪些-程序员宅基地

文章浏览阅读2k次。【IT168 厂商动态】 近日,北京中科网威(NETPOWER)工业级防火墙通过了中国电力工业电力设备及仪表质量检验测试中心(厂站自动化及远动)测试,并成为中国首家通过电力协议访问控制专业测评的工业级防火墙生产厂商。   北京中科网威(NETPOWER)工业级防火墙专为工业及恶劣环境下的网络安全需求而设计,它采用了非X86的高可靠嵌入式处理器并采用无风扇设计,整机功耗不到22W,具备极_电力行业防火墙有哪些

第十三周 ——项目二 “二叉树排序树中查找的路径”-程序员宅基地

文章浏览阅读206次。/*烟台大学计算机学院 作者:董玉祥 完成日期: 2017 12 3 问题描述:二叉树排序树中查找的路径 */#include #include #define MaxSize 100typedef int KeyType; //定义关键字类型typedef char InfoType;typedef struct node

C语言基础 -- scanf函数的返回值及其应用_c语言ignoring return value-程序员宅基地

文章浏览阅读775次。当时老师一定会告诉你,这个一个"warning"的报警,可以不用管它,也确实如此。不过,这条报警信息我们至少可以知道一点,就是scanf函数调用完之后是有一个返回值的,下面我们就要对scanf返回值进行详细的讨论。并给出在编程时利用scanf的返回值可以实现的一些功能。_c语言ignoring return value

数字医疗时代的数据安全如何保障?_数字医疗服务保障方案-程序员宅基地

文章浏览阅读9.6k次。十四五规划下,数据安全成为国家、社会发展面临的重要议题,《数据安全法》《个人信息保护法》《关键信息基础设施安全保护条例》已陆续施行。如何做好“数据安全建设”是数字时代的必答题。_数字医疗服务保障方案

推荐文章

热门文章

相关标签