为了实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。我们很多时候都会考虑将消息系统纳入我们的选择中;比如我一个登录事件,有可能我登录之后需要做很多东西,比如日志,比如发布消息,比如推送,再比如发送代金券等等;这些事件与登录息息相关,但是本质上它们与登录这个事件没有直接的关系,只是在登录事件后,系统按照需求需要去初始化一些东西,或者去记录一些东西等等;如果把所有的东西都纳入到登录这个事件中(同一个事物中),那登录的事件内处理的逻辑更多,会造成什么后果?登录时间很长,让用户无法忍受,另外,假如登录过程中出现了未发现异常,那是不是导致用户直接无法登录?为了解决这样的问题,我们引入了消息系统,比如我这台机登录过后,我将登录的一些信息,通过远程方式发送到另外一台机器上(或者同一台机),让它们去处理相应的后续逻辑实现;
目的是:1、用户登录更快,体验上更好,
2、只要保证登录部分完整,即便后续出错,并不影响用户正常使用,即容错性更强!
谈到消息系统,首先想到的第一个问题肯定会是:
消息的顺序性
本来很想说一下关于消息顺序性的一些问题,不过由于我也是借鉴了一些其他的帖子,以及官方的文档,所以这里就不会去赘述这些了,稍后我会分享一些很不错的链接,留给自己以后看,也希望可以给一些刚好要入门 rocketmq 的网友提供一些资料;
rocketmq 是阿里云的一套开源产品,功能什么的就不赘述了,请自行去网站了解:https://help.aliyun.com/document_detail/29532.html?spm=5176.doc34411.6.104.EvZr21
rocketmq 是一类二级消息分类的产品,一级为 topic,二级为 tag;
broker 按照收到生产者发送的消息体,分析其中的 topic,然后去找到相应的 topic 转发出去,在消费端,消费者根据收到的消息分析出 tag 的不同去做不同的逻辑处理;
那么在这个时候,我们就会好奇,为了保证消息的顺序执行的情况,RokectMQ 是如何选择 topic?为此,我们先看看 rokcetmq 的源代码:
// 官方例子如下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 10000000; i++)
try {
{
Message msg = new Message("TopicTest",// topic
"TagA",// tag
"OrderID188",// key
("Hello MetaQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// body
SendResult sendResult = producer.send(msg); //发送消息
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
// defalutMQProducer 类下封装的方法
@Override
public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
}
// send 的实现方法
@Override
public SendResult send(Message message) {
this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
try {
com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ);
message.setMsgID(sendResultRMQ.getMsgId());
SendResult sendResult = new SendResult();
sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());//如何选择 topic
sendResult.setMessageId(sendResultRMQ.getMsgId());
return sendResult;
} catch (Exception e) {
log.error(String.format("Send message Exception, %s", message), e);
throw checkProducerException(message.getTopic(), message.getMsgID(), e);
}
}
在官方例子中,我们可以看到,在发送消息的时候,我们并没有去了解细致的发送消息时,那么 MQ 到底是如何选择 topic 的?
但是可以从代码中看到,它确实有个 MessageQueueSelector 接口,这个接口负责是选择 topic,那么我们就来看看它到底为我们提供了那些实现方法吧(一般的消息都是轮询去寻找 topic 来实现负载均衡):
/**
* 如果 lastBrokerName 不为 null,则寻找与其不同的 MessageQueue(轮询负载均衡)
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName != null) {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();//轮询
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}
else {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
return this.messageQueueList.get(pos);
}
}
如果对于这种轮询方式的负载均衡不满意,并不能打达到我们的需求,那么我们又改如何去选择?
阿里云提供了三种方式来解决我们的需求,如果再不能满足,那么就知道修改源码算法部分来达到自己的要求了。
/**
* 使用哈希算法来选择队列,顺序消息通常都这样做<br>
*
* @author shijia.wxr<vintage.wang@gmail.com>
* @since 2013-6-27
*/
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
/**
* 根据机房来选择发往哪个队列,支付宝逻辑机房使用
*
* @author shijia.wxr<vintage.wang@gmail.com>
* @since 2013-7-25
*/
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// TODO Auto-generated method stub
return null;
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
/**
* 发送消息,随机选择队列
*
* @author shijia.wxr<vintage.wang@gmail.com>
* @since 2013-7-25
*/
public class SelectMessageQueueByRandoom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
可以看到,rocketmq 为我们提供了三个选择(除去轮询方法),那么如果我们在非常关注消息顺序的时候,我们可以选择通过哈希算法求值的方式来实现
SelectMessageQueueByHash
我们每个传递进入的对象都会被哈希算法计算出 一个哈希值,比如我们传递的是订单号,那么无疑我们可以保证相同的订单号可以传递给相同的 topic 去处理,那么只要再保证是一致的 tag 就可以保证顺序的一致性啦;
目的是:生产者 — MQ 服务端 — 消费者 可以达到一一对应的关系
第二种是机房选择,算法是木有啦,应该是根据 ip 地址去区分,反正概念我不是很清晰,也没有去注意和了解;有了解的亲留个资料给我吧,链接就好,谢谢撒……
第三种是随机选择,也就是谁也不知道它到底会选择谁,这种效率其实很差,没有负载均衡,谁也不知道会不会堵塞起来,谁也不知道某个队列是否已经塞满。
有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:
1、不关注乱序的应用实际大量存在
2、队列无序并不意味着消息无序
参考链接如下: