• -------------------------------------------------------------
  • ====================================

rocket mq的工作原理

kafka dewbay 6年前 (2019-04-12) 3344次浏览 已收录 0个评论 扫描二维码

一、整体介绍 在 RocketMQ 里,有以下几个核心的模块:Producer,Consumer,Broker,NameSrv。他们之间的关系如下: 

先简单了解一下各个模块的功能,下面会有章节详细介绍各个模块的功能。

Producer 和 Consumer 很好理解,顾名思义就是生产者和消费者,生产者负责生产消息,消费者负责消费消息,这 2 块的逻辑都是由业务使用者定义的。

Broker 是 RocketMQ 的核心,Broker 实现了消息的存储、拉取等功能。Broker 通常以集群方式启动,并可配置主从,每个 Broker 上提供对指定 topic 的服务。理解了 Broker 的原理,以及和其他服务交互的方式就基本弄懂了整个消息中间件的原理。

NameSrv 是一个无状态的名称服务,可以集群部署。所有 Broker 启动的时候会向 NameSrv 注册自己的信息。Producer 会根据目标 topic 从 NameSrv 获取到达指定 Broker 的路由信息,Consumer 同理。

对于 Producer 端 RocketMQ 采用了轮询的方式保证了负载均衡,Consumer 端通常采用 cluster 集群方式消费消息,我们可以自己定义消息在消息端的分配方式。另外,MQ 还提供了顺序消息的特性,简单了解一下 MQ 提供的特性即可,具体实现后面章节会进行阐述。

源码目录结构介绍&Remoting 通信层
一:源码目录结构介绍 RocketMQ 源码分为以下几个 package:

rocketmq-broker:整个 mq 的核心,他能够接受 producer 和 consumer 的请求,并调用 store 层服务对消息进行处理。HA 服务的基本单元,支持同步双写,异步双写等模式。
rocketmq-client::mq 客户端实现,目前官方仅仅开源了 java 版本的 mq 客户端,c++,go 客户端有社区开源贡献。
rocketmq-common:一些模块间通用的功能类,比如一些配置文件、常量。
rocketmq-example:官方提供的例子,对典型的功能比如 order message,push consumer,pull consumer 的用法进行了示范。
rocketmq-filtersrv:消息过滤服务,相当于在 broker 和 consumer 中间加入了一个 filter 代理。
rocketmq-remoting:基于 netty 的底层通信实现,所有服务间的交互都基于此模块。
rocketmq-srvut:解析命令行的工具类。
rocketmq-store:存储层实现,同时包括了索引服务,高可用 HA 服务实现。
rocketmq-tools:mq 集群管理工具,提供了消息查询等功能。
底层基于 Netty 网络库驱动
producer 
producer 1.启动流程
 

Producer 如何感知要发送消息的 broker 即 brokerAddrTable 中的值是怎么获得的,

  1. 发送消息的时候指定会指定 topic,如果 producer 集合中没有会根据指定 topic 到 namesrv 获取 topic 发布信息 TopicPublishInfo,并放入本地集合
  2. 定时从 namesrv 更新 topic 路由信息,
    Producer 与 broker 间的心跳

Producer 定时发送心跳将 producer 信息(其实就是 procduer 的 group)定时发送到, brokerAddrTable 集合中列出的 broker 上去
Producer 发送消息只发送到 master 的 broker 机器,在通过 broker 的主从复制机制拷贝到 broker 的 slave 上去
producer 2.如何发送消息
Producer 轮询某 topic 下的所有队列的方式来实现发送方的负载均衡

1)  Topic 下的所有队列如何理解:

比如 broker1, broker2, borker3 三台 broker 机器都配置了 Topic_A

Broker1 的队列为 queue0 , queue1

Broker2 的队列为 queue0, queue2, queue3,

Broker3 的队列为 queue0

当然一般情况下的 broker 的配置都是一样的

以上当 broker 启动的时候注册到 namesrv 的 Topic_A 队列为共 6 个分别为:

broker1_queue0, broker1_queue1,

broker2_queue0, broker2_queue1, broker2_queue2,

broker3_queue0,

2)  Producer 如何实现轮询队列:

Producer 从 namesrv 获取的到 Topic_A 路由信息 TopicPublishInfo

       --List<MessageQueue>messageQueueList  //Topic_A 的所有的队列

       --AtomicIntegersendWhichQueue        //自增整型

       方法 selectOneMessageQueue 方法用来选择一个发送队列

                (++sendWitchQueue)% messageQueueList.size 为队列集合的下标

                每次获取 queue 都会通过 sendWhichQueue 加一来实现对所有 queue 的轮询

                        如果入参 lastBrokerName 不为空,代表上次选择的 queue 发送失败,这次选择应该避开同一个 queue

3)  Producer 发消息系统重试:

发送失败后,重试几次 retryTimesWhenSendFailed = 2

发送消息超时 sendMsgTimeout = 3000

Producer 通过 selectOneMessageQueue 方法获取一个 MessagQueue 对象

       --topic            //Topic_A

       --brokerName           //代表发送消息到达的 broker

       --queueId              //代表发送消息的在指定 broker 上指定 topic 下的队列编号

向指定 broker 的指定 topic 的指定 queue 发送消息

               发送失败(1)重试次数不到两次(2)发送此条消息花费时间还没有到 3000(毫秒), 换个队列继续发送。 

 producer 发送普通消息

producer 3.如何发送顺序消息

Rocketmq 能够保证消息严格顺序,但是 Rocketmq 需要 producer 保证顺序消息按顺序发送到同一个 queue 中,比如购买流程(1)下单(2)支付(3)支付成功,
这三个消息需要根据特定规则将这个三个消息按顺序发送到一个 queue

如何实现把顺序消息发送到同一个 queue:

       一般消息是通过轮询所有队列发送的,顺序消息可以根据业务比如说订单号 orderId 相同的消息发送到同一个队列, 或者同一用户 userId 发送到同一队列等等

messageQueueList [orderId%messageQueueList.size()]

messageQueueList [userId%messageQueueList.size()]

producer 4.如何发布分布式事务消息
先引入官方文档图:

分布式事物是基于二阶段提交的

1) 一阶段,向 broker 发送一条 prepared 的消息,返回消息的 offset 即消息地址 commitLog 中消息偏移量。Prepared 状态消息不被消费
发送消息 ok,执行本地事物分支, 本地事物方法需要实现 rocketmq 的回调接口

2) LocalTransactionExecuter,
处理本地事物逻辑返回处理的事物状态 LocalTransactionState

3) 二阶段,处理完本地事物中业务得到事物状态, 根据 offset 查找到 commitLog 中的 prepared 消息,设置消息状态 commitType 或者 rollbackType,
让后将信息添加到 commitLog 中, 其实二阶段生成了两条消息

事物消息发送

producer 5.消息在落地 broker 落地之普通消息
Broker 根据 producer 请求的 RequestCode.SEND_MESSAGE 选择对应的处理器 SendMessageProcessor

     根据请求消息内容构建消息内部结构 MessageExtBrokerInner

     调 DefaultMessageStore 加消息写入 commitlog

producer 6.消息在落地 broker 落地之事务消息

  1. 消息落地
    commitLog 针对事物消息的处理,消息的第 20 位开始的八位记录是的消息在逻辑队列中的 queueoffset,
    但是针对事物消息为 preparedType 和 rollbackType 的存储的是事物状态表的索引偏移量
  2. 分发事物消息:    分发消息位置信息到 ConsumeQueue: 事物状态为 preparedType 和 rollbackType 的消息不会将请求分发到 ConsumeQueue 中去,即不处理,所以不会被消息
    更新 transactionstable table:如果是 prepared 消息记,通过 TransactionStateService 服务将消息加到存储事务状态的表格 tranStateTable 的文件中;
    如果是 commitType 和 rollbackType 消息, 修改事物状态表格 tranStateTable 中的消息状态。
    记录 Transaction Redo Log 日志: 记录了 commitLogOffset, msgSize,preapredTransactionOffset, storeTimestamp。
  3. 事物状态表
    事物状态表是有 MapedFileQueue 将多个文件组成一个连续的队列,它的存储单元是定长为 24 个字节的数据, tranStateTableOffset 可以认为是事物状态消息的个数,索引偏移量, 它的值是 tranStateTable.getMaxOffset()/ TSStoreUnitSize  
  1. 事物回查

定时回查线程会定时扫描(默认每分钟)每个存储事务状态的表格文件,遍历存储事务状态的表格记录

如果是已经提交或者回滚的消息调过过,

如果是 prepared 状态的如果消息小于事务回查至少间隔时间(默认是一分钟)跳出终止遍历

调 transactionCheckExecuter.gotocheck 方法向 producer 回查事物状态,

     根据 group 随机选择一台 producer

     查询消息,根据 commitLogOffset 和 msgSize 到 commitlog 查找消息

     向 Producder 发起请求,请求 code 类型为 CHECK_TRANSACTION_STATE,producer 的 DefaultMQProducerImpl.checkTransactionState()方法
     来处理 broker 定时回调的请求,这里构建一个 Runnable 任务异步执行 producer 注册的回调接口,处理回调,在调 endTransactionOneway 向 broker
     发送请求更新事物消息的最终状态

无 Prepared 消息,且遍历完,则终止扫描这个文件的定时任务

  1. 事物消息的 load&recover

TransactionStateService.load ()事物状态服务加载, 加载只是建立文件映射
redoLog 队列恢复,加载本地 redoLog 文件
tranStateTable 事物状态表, 加载本地 tranStateTable 文件
 recover:

正常恢复:

     利用 tranRedoLog 文件的 recover

     利用 tranStateTable 文件重建事物状态表

异常恢复:

     先按照正常流程恢复 TranRedo Log

     commitLog 异常恢复,commitLog 根据 checkpoint 时间点重新生成 redolog,重新分发消息 DispatchRequest,

分发消息到位置信息到 ConsumeQueue

              更新 Transaction State Table

               记录 TransactionRedo Log

     删除事物状态表 tranStateTable

通过 RedoLog 全量恢复 StateTable

     重头扫描 RedoLog, 过滤出所有 prepared 状态的消息, 将 commit 或者 rollback 的消息对应的 prepared 消息删除

     重建 StateTable,  将上面过滤出的 prepared 消息,添加到事物状态表文件中

这个事物状态表 transstable 的作用是定期(1 分钟)将状态为 prepared 事物回查 producer 端 redolog 这个队列其实标记消费到哪了,
事物状态的恢复根本上是有 commitlog 来做的
 

 consumer 主要 2 中模式

pull  and push

作者:wwyh520
来源:CSDN
原文:https://blog.csdn.net/wanbf123/article/details/77991895
版权声明:本文为博主原创文章,转载请附上博文链接!


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:rocket mq的工作原理
喜欢 (0)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址