• -------------------------------------------------------------
  • ====================================

消息中间件 RocketMQ 源码解析:Filtersrv

kafka dewbay 6年前 (2019-04-12) 2273次浏览 已收录 0个评论 扫描二维码

本文主要基于 RocketMQ 4.0.x 正式版

  1. 概述
  2. Filtersrv 注册到 Broker
  3. 过滤类
    3.1 Consumer 订阅时设置 过滤类代码
    3.2 Consumer 上传 过滤类代码
    3.3 Filter 编译 过滤类代码
  4. 过滤消息
    4.1 Consumer 从 Filtersrv 拉取消息
    4.2 Filtersrv 从 Broker 拉取消息
  5. Filtersrv 高可用

《Dubbo 实现原理与源码解析 —— 精品合集》
《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》
《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》
《数据库实体设计合集》
《Java 面试题 —— 精品合集》
《Java 学习指南 —— 精品合集》

  1. 概述
    Filtersrv ,负责自定义规则过滤 Consumer 从 Broker 拉取的消息。

为什么 Broker 不提供过滤消息的功能呢?我们来看看官方的说法:

Broker 端消息过滤
在 Broker 中,按照 Consumer 的要求做过滤,优点是减少了对于 Consumer 无用消息的网络传输。 缺点是增加了 Broker 的负担,实现相对复杂。
(1). 淘宝 Notify 支持多种过滤方式,包含直接按照消息类型过滤,灵活的语法表达式过滤,几乎可以满足最苛刻的过滤需求。
(2). 淘宝 RocketMQ 支持按照简单的 Message Tag 过滤,也支持按照 Message Header、body 进行过滤。
(3). CORBA Notification 规范中也支持灵活的语法表达式过滤。
Consumer 端消息过滤
这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到 Consumer 端。
就是在这种考虑下,Filtersrv 出现了。减少了 Broker 的负担,又减少了 Consumer 接收无用的消息。当然缺点也是有的,多了一层 Filtersrv 网络开销。

  1. Filtersrv 注册到 Broker
    🦅 一个 Filtersrv 只对应一个 Broker。
    🦅 一个 Broker 可以对应多个 Filtersrv。Filtersrv 的高可用通过启动多个 Filtersrv 实现。
    🦅 Filtersrv 注册失败时,主动退出关闭。
    核心代码如下: 1: // ⬇️⬇️⬇️【FiltersrvController.java】
    2: public boolean initialize() {
    3: // ….(省略代码)
    4:
    5: // 固定间隔注册到 Broker
    6: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    7:
    8: @Override
    9: public void run() {
    10: FiltersrvController.this.registerFilterServerToBroker();
    11: }
    12: }, 15, 10, TimeUnit.SECONDS); // TODO edit by 芋艿:initialDelay 时间太短,可能导致初始化失败。从 3=》15
    13:
    14: // ….(省略代码)
    15: }
    16:
    17: /**
    18: * 注册 Filtersrv 到 Broker
    19: * !!!如果注册失败,关闭 Filtersrv
    20: */
    21: public void registerFilterServerToBroker() {
    22: try {
    23: RegisterFilterServerResponseHeader responseHeader =
    24: this.filterServerOuterAPI.registerFilterServerToBroker(
    25: this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
    26: this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
    27: .setDefaultBrokerId(responseHeader.getBrokerId());
    28:
    29: if (null == this.brokerName) {
    30: this.brokerName = responseHeader.getBrokerName();
    31: }
    32:
    33: log.info(“register filter server<{}> to broker<{}> OK, Return: {} {}”,
    34: this.localAddr(),
    35: this.filtersrvConfig.getConnectWhichBroker(),
    36: responseHeader.getBrokerName(),
    37: responseHeader.getBrokerId());
    38: } catch (Exception e) {
    39: log.warn(“register filter server Exception”, e);
    40:
    41: log.warn(“access broker failed, kill oneself”);
    42: System.exit(-1); // 异常退出
    43: }
    44: }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
  2. 过滤类

3.1 Consumer 订阅时设置 过滤类代码
🦅 Consumer 针对每个 Topic 可以订阅不同的 过滤类代码。
1: // ⬇️⬇️⬇️【DefaultMQPushConsumer.java】
2: @Override
3: public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
4: this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
5: }
1
2
3
4
5
3.2 Consumer 上传 过滤类代码
🦅 Consumer 心跳注册到 Broker 的同时,上传 过滤类代码 到 Broker 对应的所有 Filtersrv。
1: // ⬇️⬇️⬇️【MQClientInstance.java】
2: /**
3: * 发送心跳到 Broker,上传过滤类源码到 Filtersrv
4: / 5: public void sendHeartbeatToAllBrokerWithLock() { 6: if (this.lockHeartbeat.tryLock()) { 7: try { 8: this.sendHeartbeatToAllBroker(); 9: this.uploadFilterClassSource(); 10: } catch (final Exception e) { 11: log.error(“sendHeartbeatToAllBroker exception”, e); 12: } finally { 13: this.lockHeartbeat.unlock(); 14: } 15: } else { 16: log.warn(“lock heartBeat, but failed.”); 17: } 18: } 19: 20: /*
21: * 上传过滤类到 Filtersrv
22: */
23: private void uploadFilterClassSource() {
24: Iterator> it = this.consumerTable.entrySet().iterator();
25: while (it.hasNext()) {
26: Entry next = it.next();
27: MQConsumerInner consumer = next.getValue();
28: if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
29: Set subscriptions = consumer.subscriptions();
30: for (SubscriptionData sub : subscriptions) {
31: if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
32: final String consumerGroup = consumer.groupName();
33: final String className = sub.getSubString();
34: final String topic = sub.getTopic();
35: final String filterClassSource = sub.getFilterClassSource();
36: try {
37: this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
38: } catch (Exception e) {
39: log.error(“uploadFilterClassToAllFilterServer Exception”, e);
40: }
41: }
42: }
43: }
44: }
45: }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
3.3 Filter 编译 过滤类代码
🦅 Filtersrv 处理 Consumer 上传的 过滤类代码,并进行编译使用。
核心代码如下:

1: // ⬇️⬇️⬇️【FilterClassManager.java】
2: /**
3: * 注册过滤类
4: *
5: * @param consumerGroup 消费分组
6: * @param topic Topic
7: * @param className 过滤类名
8: * @param classCRC 过滤类源码 CRC
9: * @param filterSourceBinary 过滤类源码
10: * @return 是否注册成功
11: */
12: public boolean registerFilterClass(final String consumerGroup, final String topic,
13: final String className, final int classCRC, final byte[] filterSourceBinary) {
14: final String key = buildKey(consumerGroup, topic);
15: // 判断是否要注册新的过滤类
16: boolean registerNew = false;
17: FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
18: if (null == filterClassInfoPrev) {
19: registerNew = true;
20: } else {
21: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
22: if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { // 类有变化
23: registerNew = true;
24: }
25: }
26: }
27: // 注册新的过滤类
28: if (registerNew) {
29: synchronized (this.compileLock) {
30: filterClassInfoPrev = this.filterClassTable.get(key);
31: if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
32: return true;
33: }
34: try {
35: FilterClassInfo filterClassInfoNew = new FilterClassInfo();
36: filterClassInfoNew.setClassName(className);
37: filterClassInfoNew.setClassCRC(0);
38: filterClassInfoNew.setMessageFilter(null);
39:
40: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
41: String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
42: // 编译新的过滤类
43: Class newClass = DynaCode.compileAndLoadClass(className, javaSource);
44: // 创建新的过滤类对象
45: Object newInstance = newClass.newInstance();
46: filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
47: filterClassInfoNew.setClassCRC(classCRC);
48: }
49:
50: this.filterClassTable.put(key, filterClassInfoNew);
51: } catch (Throwable e) {
52: String info = String.format(“FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s”,
53: consumerGroup, topic, className);
54: log.error(info, e);
55: return false;
56: }
57: }
58: }
59:
60: return true;
61: }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

  1. 过滤消息

4.1 Consumer 从 Filtersrv 拉取消息
🦅 Consumer 拉取 使用过滤类方式订阅 的消费消息时,从 Broker 对应的 Filtersrv 列表随机选择一个拉取消息。如果选择不到 Filtersrv,则无法拉取消息。因此,Filtersrv 一定要做高可用。
1: // ⬇️⬇️⬇️【PullAPIWrapper.java】
2: /**
3: * 拉取消息核心方法
4: *
5: * @param mq 消息嘟列
6: * @param subExpression 订阅表达式
7: * @param subVersion 订阅版本号
8: * @param offset 拉取队列开始位置
9: * @param maxNums 批量拉 取消息数量
10: * @param sysFlag 拉取系统标识
11: * @param commitOffset 提交消费进度
12: * @param brokerSuspendMaxTimeMillis broker 挂起请求最大时间
13: * @param timeoutMillis 请求 broker 超时时间
14: * @param communicationMode 通讯模式
15: * @param pullCallback 拉取回调
16: * @return 拉取消息结果。只有通讯模式为同步时,才返回结果,否则返回 null。
17: * @throws MQClientException 当寻找不到 broker 时,或发生其他 client 异常
18: * @throws RemotingException 当远程调用发生异常时
19: * @throws MQBrokerException 当 broker 发生异常时。只有通讯模式为同步时才会发生该异常。
20: * @throws InterruptedException 当发生中断异常时
21: / 22: protected PullResult pullKernelImpl( 23: final MessageQueue mq, 24: final String subExpression, 25: final long subVersion, 26: final long offset, 27: final int maxNums, 28: final int sysFlag, 29: final long commitOffset, 30: final long brokerSuspendMaxTimeMillis, 31: final long timeoutMillis, 32: final CommunicationMode communicationMode, 33: final PullCallback pullCallback 34: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 35: // // ….(省略代码) 36: // 请求拉取消息 37: if (findBrokerResult != null) { 38: // ….(省略代码) 39: // 若订阅 topic 使用过滤类,使用 filtersrv 获取消息 40: String brokerAddr = findBrokerResult.getBrokerAddr(); 41: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { 42: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); 43: } 44: 45: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( 46: brokerAddr, 47: requestHeader, 48: timeoutMillis, 49: communicationMode, 50: pullCallback); 51: 52: return pullResult; 53: } 54: 55: // Broker 信息不存在,则抛出异常 56: throw new MQClientException(“The broker[” + mq.getBrokerName() + “] not exist”, null); 57: } 58: 59: /*
60: * 计算 filtersrv 地址。如果有多个 filtersrv,随机选择一个。
61: *
62: * @param topic Topic
63: * @param brokerAddr broker 地址
64: * @return filtersrv 地址
65: * @throws MQClientException 当 filtersrv 不存在时
66: / 67: private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) 68: throws MQClientException { 69: ConcurrentHashMap topicRouteTable = this.mQClientFactory.getTopicRouteTable(); 70: if (topicRouteTable != null) { 71: TopicRouteData topicRouteData = topicRouteTable.get(topic); 72: List list = topicRouteData.getFilterServerTable().get(brokerAddr); 73: if (list != null && !list.isEmpty()) { 74: return list.get(randomNum() % list.size()); 75: } 76: } 77: throw new MQClientException(“Find Filter Server Failed, Broker Addr: ” + brokerAddr + ” topic: ” 78: + topic, null); 79: } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 4.2 Filtersrv 从 Broker 拉取消息 🦅 Filtersrv 拉取消息后,会建议 Consumer 向 Broker 主节点 拉取消息。 🦅 Filtersrv 可以理解成一个 Consumer,向 Broker 拉取消息时,实际使用的 DefaultMQPullConsumer.java 的方法和逻辑。 1: // ⬇️⬇️⬇️【DefaultRequestProcessor.java】 2: /*
3: * 拉取消息
4: *
5: * @param ctx 拉取消息 context
6: * @param request 拉取消息请求
7: * @return 响应
8: * @throws Exception 当发生异常时
9: */
10: private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
11: final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
12: final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
13: final PullMessageRequestHeader requestHeader =
14: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
15:
16: final FilterContext filterContext = new FilterContext();
17: filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
18:
19: response.setOpaque(request.getOpaque());
20:
21: DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
22:
23: // 校验 Topic 过滤类是否完整
24: final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
25: if (null == findFilterClass) {
26: response.setCode(ResponseCode.SYSTEM_ERROR);
27: response.setRemark(“Find Filter class failed, not registered”);
28: return response;
29: }
30: if (null == findFilterClass.getMessageFilter()) {
31: response.setCode(ResponseCode.SYSTEM_ERROR);
32: response.setRemark(“Find Filter class failed, registered but no class”);
33: return response;
34: }
35:
36: // 设置下次请求从 Broker 主节点。
37: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
38:
39: MessageQueue mq = new MessageQueue();
40: mq.setTopic(requestHeader.getTopic());
41: mq.setQueueId(requestHeader.getQueueId());
42: mq.setBrokerName(this.filtersrvController.getBrokerName());
43: long offset = requestHeader.getQueueOffset();
44: int maxNums = requestHeader.getMaxMsgNums();
45:
46: final PullCallback pullCallback = new PullCallback() {
47:
48: @Override
49: public void onSuccess(PullResult pullResult) {
50: responseHeader.setMaxOffset(pullResult.getMaxOffset());
51: responseHeader.setMinOffset(pullResult.getMinOffset());
52: responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
53: response.setRemark(null);
54:
55: switch (pullResult.getPullStatus()) {
56: case FOUND:
57: response.setCode(ResponseCode.SUCCESS);
58:
59: List msgListOK = new ArrayList();
60: try {
61: for (MessageExt msg : pullResult.getMsgFoundList()) {
62: // 使用过滤类过滤消息
63: boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
64: if (match) {
65: msgListOK.add(msg);
66: }
67: }
68:
69: if (!msgListOK.isEmpty()) {
70: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
71: return;
72: } else {
73: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
74: }
75: } catch (Throwable e) {
76: final String error =
77: String.format(“do Message Filter Exception, ConsumerGroup: %s Topic: %s “,
78: requestHeader.getConsumerGroup(), requestHeader.getTopic());
79: log.error(error, e);
80:
81: response.setCode(ResponseCode.SYSTEM_ERROR);
82: response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
83: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
84: return;
85: }
86:
87: break;
88: case NO_MATCHED_MSG:
89: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
90: break;
91: case NO_NEW_MSG:
92: response.setCode(ResponseCode.PULL_NOT_FOUND);
93: break;
94: case OFFSET_ILLEGAL:
95: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
96: break;
97: default:
98: break;
99: }
100:
101: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
102: }
103:
104: @Override
105: public void onException(Throwable e) {
106: response.setCode(ResponseCode.SYSTEM_ERROR);
107: response.setRemark(“Pull Callback Exception, ” + RemotingHelper.exceptionSimpleDesc(e));
108: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
109: return;
110: }
111: };
112:
113: // 拉取消息
114: pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
115: return null;
116: }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

  1. Filtersrv 高可用
  2. 彩蛋

作者:公众号-芋道源码
来源:CSDN
原文:https://blog.csdn.net/github_38592071/article/details/72355786
版权声明:本文为博主原创文章,转载请附上博文链接!


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:消息中间件 RocketMQ 源码解析:Filtersrv
喜欢 (0)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址