普通消息
普通消息也叫做无序消息,简单来说就是没有顺序的消息,producer 只管发送消息,consumer 只管接收消息,至于消息和消息之间的顺序并没有保证,可能先发送的消息先消费,也可能先发送的消息后消费。
举个简单例子,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息。
因为不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景。
代码示例:
- 生产者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//声明并初始化一个 producer
//需要一个 producer group 名字作为构造方法的参数,这里为 concurrent_producer
DefaultMQProducer producer = new DefaultMQProducer("concurrent_producer");
//设置 NameServer 地址,此处应改为实际 NameServer 地址,多个地址之间用;分隔
//NameServer 的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
//调用 start()方法启动一个 producer 实例
producer.start();
//发送 10 条消息到 Topic 为 TopicTest,tag 为 TagA,消息内容为“Hello RocketMQ”拼接上 i 的值
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTestConcurrent",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
//调用 producer 的 send()方法发送消息
//这里调用的是同步的方式,所以会有返回结果,同时默认发送的也是普通消息
SendResult sendResult = producer.send(msg);
//打印返回结果,可以看到消息发送的状态以及一些相关信息
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
//发送完消息之后,调用 shutdown()方法关闭 producer
producer.shutdown();
}
}
- 消费者
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//声明并初始化一个 consumer
//需要一个 consumer group 名字作为构造方法的参数,这里为 concurrent_consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrent_consumer");
//同样也要设置 NameServer 地址
consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
//这里设置的是一个 consumer 的消费策略
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在 broker 的)全部消费一遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和 setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
consumer.subscribe("TopicTestConcurrent", "*");
//设置一个 Listener,主要进行消息的逻辑处理
//注意这里使用的是 MessageListenerConcurrently 这个接口
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
//返回消费状态
//CONSUME_SUCCESS 消费成功
//RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//调用 start()方法启动 consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
有序消息
有序消息就是按照一定的先后顺序的消息类型。
举个例子来说,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序也就是 1、2、3 ,而不会出现普通消息那样的 2、1、3 等情况。
那么有序消息是如何保证的呢?我们都知道消息首先由 producer 到 broker,再从 broker 到 consumer,分这两步走。那么要保证消息的有序,势必这两步都是要保证有序的,即要保证消息是按有序发送到 broker,broker 也是有序将消息投递给 consumer,两个条件必须同时满足,缺一不可。
进一步还可以将有序消息分成
- 全局有序消息
- 局部有序消息
之前我们讲过,topic 只是消息的逻辑分类,内部实现其实是由 queue 组成。当 producer 把消息发送到某个 topic 时,默认是会消息发送到具体的 queue 上。
全局有序
举个例子,producer 发送 order id 为 1、2、3、4 的四条消息到 topicA 上,假设 topicA 的 queue 数为 3 个(queue0、queue1、queue2),那么消息的分布可能就是这种情况,id 为 1 的在 queue0,id 为 2 的在 queue1,id 为 3 的在 queue2,id 为 4 的在 queue0。同样的,consumer 消费时也是按 queue 去消费,这时候就可能出现先消费 1、4,再消费 2、3,和我们的预期不符。那么我们如何实现 1、2、3、4 的消费顺序呢?道理其实很简单,只需要把订单 topic 的 queue 数改为 1,如此一来,只要 producer 按照 1、2、3、4 的顺序去发送消息,那么 consumer 自然也就按照 1、2、3、4 的顺序去消费,这就是全局有序消息。
由于一个 topic 只有一个 queue ,即使我们有多个 producer 实例和 consumer 实例也很难提高消息吞吐量。就好比过独木桥,大家只能一个挨着一个过去,效率低下。
那么有没有吞吐量和有序之间折中的方案呢?其实是有的,就是局部有序消息。
局部有序
我们知道订单消息可以再细分为订单创建、订单付款、订单完成等消息,这些消息都有相同的 order id。同时,也只有按照订单创建、订单付款、订单完成的顺序去消费才符合业务逻辑。但是不同 order id 的消息是可以并行的,不会影响到业务。这时候就常见做法就是将 order id 进行处理,将 order id 相同的消息发送到 topicB 的同一个 queue,假设我们 topicB 有 2 个 queue,那么我们可以简单的对 id 取余,奇数的发往 queue0,偶数的发往 queue1,消费者按照 queue 去消费时,就能保证 queue0 里面的消息有序消费,queue1 里面的消息有序消费。
由于一个 topic 可以有多个 queue,所以在性能比全局有序高得多。假设 queue 数是 n,理论上性能就是全局有序的 n 倍,当然 consumer 也要跟着增加才行。在实际情况中,这种局部有序消息是会比全局有序消息用的更多。
示例代码:
- 生产者
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
// 声明并初始化一个 producer
// 需要一个 producer group 名字作为构造方法的参数,这里为 ordered_producer
DefaultMQProducer orderedProducer = new DefaultMQProducer("ordered_producer");
// 设置 NameServer 地址,此处应改为实际 NameServer 地址,多个地址之间用;分隔
//NameServer 的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
orderedProducer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
// 调用 start()方法启动一个 producer 实例
orderedProducer.start();
// 自定义一个 tag 数组
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
// 发送 10 条消息到 Topic 为 TopicTestOrdered,tag 为 tags 数组按顺序取值,
// key 值为“KEY”拼接上 i 的值,消息内容为“Hello RocketMQ”拼接上 i 的值
for (int i = 0; i < 10; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTestOrdered", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = orderedProducer.send(msg, new MessageQueueSelector() {
// 选择发送消息的队列
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// arg 的值其实就是 orderId
Integer id = (Integer) arg;
// mqs 是队列集合,也就是 topic 所对应的所有队列
int index = id % mqs.size();
// 这里根据前面的 id 对队列集合大小求余来返回所对应的队列
return mqs.get(index);
}
}, orderId);
System.out.println(sendResult);
}
orderedProducer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
至于是要实现全局有序,还是局部有序,在此示例代码中,就取决于 TopicTestOrdered 这个 Topic 的队列数了。
- 消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
//声明并初始化一个 consumer
//需要一个 consumer group 名字作为构造方法的参数,这里为 concurrent_consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer");
//同样也要设置 NameServer 地址
consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
//这里设置的是一个 consumer 的消费策略
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在 broker 的)全部消费一遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和 setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置 consumer 所订阅的 Topic 和 Tag
consumer.subscribe("TopicTestOrdered", "TagA || TagC || TagD");
//设置一个 Listener,主要进行消息的逻辑处理
//注意这里使用的是 MessageListenerOrderly 这个接口
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
//返回消费状态
//SUCCESS 消费成功
//SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
//调用 start()方法启动 consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
延时消息
延时消息,简单来说就是当 producer 将消息发送到 broker 后,会延时一定时间后才投递给 consumer 进行消费。
RcoketMQ 的延时等级为:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应 5s,以此类推。
这种消息一般适用于消息生产和消费之间有时间窗口要求的场景。比如说我们网购时,下单之后是有一个支付时间,超过这个时间未支付,系统就应该自动关闭该笔订单。那么在订单创建的时候就会就需要发送一条延时消息(延时 15 分钟)后投递给 consumer,consumer 接收消息后再对订单的支付状态进行判断是否关闭订单。
设置延时非常简单,只需要在 Message 设置对应的延时级别即可:
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
// 这里设置需要延时的等级即可
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
更多技术干货
作者:冯先生的笔记
链接:https://www.jianshu.com/p/11e875074a8f
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。