深入浅出消息队列----【核心之消息的发送】
- 普通消息
- 同步消息
- 异步消息
- 单向消息
- 顺序消息
- 延迟消息
- 批量消息
本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】
根据 RocketMQ 官方,消息的类型可分为 5 大类,分别是:
- 普通消息
- 顺序消息
- 延迟消息
- 批量消息
- 事务消息
普通消息
普通消息,就是普普通通的消息…
同步消息
同步消息指的是:生产者发送一条消息给 Broker,需要等待 Broker 返回响应(类似返回我接收到啦的消息确认消息),然后才会继续发送后续的消息。
producer.send(msg)
这样就保证了发送的消息一定被成功接收之后,才继续处理后面的业务。
从图上看,消息 B 的发送,需要 Broker 反馈消息 A 已经收到了,不然生产者就会一直等着。
假设 Broker 没反馈消息 A 咋办?因为网络是不稳定的,很有可能 Broker 反馈了,但是生产者没收到。
此时,生产者就会进行重试。
默认重试三次,如果三次后还是失败就会抛出异常,这时候我们需要捕获这个异常,进行日志记录或其他兜底操作。
通过这个方式,就可以确保发送成功的消息一定是被接收的了,这样一来一回就类似 TCP 的三次握手,一次请求过去,一个 ack 回来。
重试机制会有一个弊端:消息的重复发送。
其实 Broker 已经存储了消息 A,并且通知生产者收到消息了,但是因为网络的不稳定,生产者没有收到这个响应,然后超时后重新发送消息 A,这样一来 Broker 就存储了两条消息 A,后面消费者消费时也会拿到两条消息 A。
虽然这样看起来,拿到两条相同的消息无关紧要,但是带入真实场景,假设消息 A 是银行卡扣钱的消息,那我们卡里的钱是不是可能会被扣 2 次了?这不损失大了!
所以要对这方面做好消息的防重,或者幂等。
面试题:如何保证消息一定发送成功?
答:Broker 接收成功后,返回 ACK (类似接收到了的响应)给生产者,没接收到 ACK 则认为消息发送失败,进行重试。
异步消息
与同步消息对应的就是异步消息了。
发送异步消息时,生产者不需要阻塞等待着上一条消息的返回,它可以紧接着发送后续的消息。
那么问题来了,假设前面的消息发送失败怎么办?
发送异步消息其实需要提供一个方法,方法里面定义了 onSuccess、onException 两个方法。
会有另外的线程来处理 Broker 的响应,如果接受到成功的响应就会执行 onSuccess 的逻辑。
如果发送失败,会执行 onException 的逻辑,我们可以在这个逻辑里面实现失败的记录数据,然后进行后续的人工处理或定时处理或报警等等。
当然,异步消息也可以设置重试的次数,有个参数 RetryTimesWhenSendAsyncFailed,调整这个参数我们可以定义异步发送失败重试的次数。
它跟同步消息的区别主要在于场景的应用,同步消息需要等到前一条消息的响应才能继续发后面的消息,而异步消息不需要等待。
因此,在对响应时间敏感的场景下,异步消息比较合适,因为生产者不需要等待消息的响应可以直接处理后续的消息发送。
还有,因为异步消息也可以设置重试,因此也会出现和同步消息一样的消息重复问题。
单向消息
前面无论是同步消息还是异步消息,我们都会关注发送完消息 Broker 的响应,但是有些场景压根就不关心这个结果。
即生产者只管发送消息,至于 Broker 有没有收到消息,生产者不关心,不需要等待响应。
比如日志的收集,日志的量级很大,但可靠性的要求不高(丢几条日志没关系),因此单向消息在这个场景就非常合适。
因为不需要等待响应,发完就完事,发送的耗时会很短,且不需要异步线程来等待 Broker,这样一来系统能同时承载更多的消息发送,性能会比较好。
缺点就是 Broker 不一定会收到消息(会丢失消息)。
因此适用于对消息可靠性要求不高的场景。
顺序消息
顺序消息,顾名思义就是按顺序发布消息,并且按发布消息的顺序来消费消息。
一个很常见的例子就是订单场景:
- 创建订单
- 支付
- 发货
- 完结订单
我们肯定需要先创建订单,才能支付订单,且支付完才能发货,最后收到货完结订单。
这看起来天经地义,但是我们已经知晓消费队列的实际实现:一个 Topic 是分多个队列的,每个队列都有消费者并行消费。
假设订单相关的 Topic 叫 Topic-Order,那么创建订单消息发送到了队列1,紧接着客户支付了,支付消息发送到了队列2,商家立马发货,发货消息发到了队列3。
从图上来看,消息的顺序确实是有先后关系的,但是每个消费者消费的速度是不一样的,我们保证不了他们的消费速度!
很可能消费者-2消费了支付的消息,而消费者-1还没有消费完创建订单的消息,这样一来业务的顺序就错乱了,处理就报错了!
订单都没生成支付啥呀?
所以普通消息不能保证先发送的消息一定被先消费,分析可知,本质原因是因为多队列的实现。
如何解决呢?
把这几个消息都发送到一个队列不就完事了吗?
这就是顺序消息。
如果报创建订单、支付、发货、完结订单,这几类消息全都发往一个队列,这叫全局顺序消息。
如果把同一笔订单的创建、支付、发货、完结发往一个队列,不同的订单可以发往不同队列,这叫分区顺序消息。
理论上分区顺序消息够用了,并且分区顺序消息的并发度更好,从上图来看,消费者1、2能同时处理订单-1和订单-2,如果是全局顺序消息,那么只有消费者-1一个人在干活。
然后具体将消息发送到哪个队列是生产者指定的。
假设要实现全局顺序消息,那么生产者将这几条消息都指定往队列1发送,即可实现全局顺序消息。
如果是分区顺序消息,生产者只需要依靠一个叫 sharding key 的东西来分区即可,比如订单的场景可以将订单号作为 sharding key。
那么我们仅需在发送消息的时候,根据 sharding key(订单号:orderId)来选择队列即可,我用伪代码来实现下:
int queueIndex = orderId % queue.size();
producer.send(msg, queueIndex);
这样我们就能保证一笔订单相关的消息都发往一个分区!默认普通消息是轮询选择队列,比如上一次消息发送的是队列1,后面就是队列2这样的轮询。
所以在顺序要求的场景下,我们需要采用顺序消息来实现,并且最好是分区顺序消息,这样能提供并发度,加快消息的消费速率。
延迟消息
生产者发送了消息,但是并不想立马被消费者消费,希望延迟一段时间后才能被消费。
比如订单取消场景,一般我们下单后,如果 15 分支没有支付,这笔订单就需要被取消。
这个场景下我们就可以在下单时同时发送一个订单取消的延迟消息,时间是 15分支,这样 15分支后消费者就能收到这个消息,然后看看此时的订单有没有被值,如果没有被支付, 那么就执行订单取消的逻辑。
RocketMQ 中具体是如何实现延迟消息的你?
很容易想到的一个方式是将消息正常发送给 Broker,然后消费者消费的时候来判断是不是延迟消息,看看是否已经到时间了,到了就消费,没到就不消费。
实际上并没有这么简单,还记得我们之前文章提到的消息点位?消费者每消费完一条消息,需要更新消息点位来。
如当前消费的点位是 100,第 101 条消息消费后,点位 +1,即 101。
但延迟消息没到时间无法被消费,那不就使得正常点位卡着了,加不上去了,这样一来排在后面的正常消息不就不能被消费了?
所以延迟消息不能这样实现,那 RocketMQ 是怎么做的呢?
实际上延迟消息一开始不放在正常的 Topic 中,RocketMQ 专门搞了个 Topic 叫 SCHEDULE_TOPIC_XXXX,将所有延迟消息都放在这个 Topic 下。
然后有个定时任务来扫描遍历消息的延迟时间到了没,如果到了,那么再把延迟消息发往它本身的 Topic 队列中。
这样就保证了延迟消息到时间之前,消费者不会消费到这个消息(因为消费者根本就没有订阅 SCHEDULE_TOPIC_XXXX),然后一到时间,消息就被投递到原来的 Topic 上,这样消费者就能消费到了。
这样的设计就复用了本身关于 Topic、队列还有消费者消费消息的逻辑。
对了,在 RocketMQ 中,延迟的时间是无法自定义的,是有固定的阶梯型限制,我们在发送消息的时候,只能设定投递等级,不同等级固定对应一个延迟时间:
在商业版,如阿里云上的服务时支持自定义时间的。
批量消息
批量消息就是一次性打包发送多条消息,在对吞吐量敏感的场景,批量消息非常合适。
正常消息是一条一条的发送,然后一条一条的等待响应。
而批量消息是一批一批的发送,比如 100 条消息,本来需要调用 100 次发生接口,且需要等待 100 次响应。
现在将这 100 条消息打包成 1 条消息发送,这样是不是仅需要调用 1 次发生接口,且等待一次响应?
这样处理的效率(吞吐量)肯定是变高了。
当然,如果其中一条数据出错,可能需要一批重来了,处理起来也会比较麻烦。
关于批量消息使用起来也很简单:
这样传入一个 list,RocketMQ 自然就知道这是一个批量消息了,它内部会有一个 batch 操作来打包这个列表: