点击数:59
前言
实现方案
1. 基于外部存储实现的方案
这里讨论的外部存储指的是在 MQ 本身自带的存储以外又引入的其他的存储系统。

基于 数据库(如MySQL)
CREATE TABLE `delay_msg` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`delivery_time` DATETIME NOT NULL COMMENT '投递时间',
`payloads` blob COMMENT '消息内容',
PRIMARY KEY (`id`),
KEY `time_index` (`delivery_time`)
)
通过定时线程定时扫描到期的消息,然后进行投递。定时线程的扫描间隔理论上就是你延时消息的最小时间精度。 优点:
-
实现简单;
-
B+Tree索引不适合消息场景的大量写入;
基于 RocksDB

-
RocksDB LSM 树很适合消息场景的大量写入;
-
实现方案较重,如果你采用这个方案,需要自己实现 RocksDB 的数据容灾逻辑;
基于 Redis
本方案来源于:基于Redis实现延时队列服务

-
Messages Pool 所有的延时消息存放,结构为KV结构,key为消息ID,value为一个具体的message(这里选择Redis Hash结构主要是因为hash结构能存储较大的数据量,数据较多时候会进行渐进式rehash扩容,并且对于HSET和HGET命令来说时间复杂度都是O(1)) -
Delayed Queue是16个有序队列(队列支持水平扩展),结构为ZSET,value 为 messages pool中消息ID,score为过期时间**(分为多个队列是为了提高扫描的速度)** -
Worker 代表处理线程,通过定时任务扫描 Delayed Queue 中到期的消息
-
Redis ZSET 很适合实现延时队列 -
性能问题,虽然 ZSET 插入是一个 O(logn) 的操作,但是Redis 基于内存操作,并且内部做了很多性能方面的优化。
定时线程检查的缺陷与改进
2. 开源 MQ 中的实现方案
RocketMQ
RocketMQ 开源版本支持延时消息,但是只支持 18 个 Level 的延时,并不支持任意时间。只不过这个 Level 在 RocketMQ 中可以自定义的,所幸来说对普通业务算是够用的。默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
通俗的讲,设定了延时 Level 的消息会被暂存在名为SCHEDULE_TOPIC_XXXX
的topic中,并根据 level 存入特定的queue,queueId = delayTimeLevel – 1,**即一个queue只存相同延时的消息,保证具有相同发送延时的消息能够顺序消费。**broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
下面是整个实现方案的示意图,红色代表投递延时消息,紫色代表定时调度到期的延时消息:

-
Level 数固定,每个 Level 有自己的定时器,开销不大 -
将 Level 相同的消息放入到同一个 Queue 中,保证了同一 Level 消息的顺序性;不同 Level 放到不同的 Queue 中,保证了投递的时间准确性; -
通过只支持固定的Level,将不同延时消息的排序变成了固定Level Topic 的追加写操作
-
Level 配置的修改代价太大,固定 Level 不灵活 -
CommitLog 会因为延时消息的存在变得很大
Pulsar

-
内存开销: 维护延时消息索引的队列是放在堆外内存中的,并且这个队列是以订阅组(Kafka中的消费组)为维度的,比如你这个 Topic 有 N 个订阅组,那么如果你这个 Topic 使用了延时消息,就会创建 N 个 队列;并且随着延时消息的增多,时间跨度的增加,每个队列的内存占用也会上升。(是的,在这个方案下,支持任意的延时消息反而有可能让这个缺陷更严重) -
故障转移之后延时消息索引队列的重建时间开销: 对于跨度时间长的大规模延时消息,重建时间可能会到小时级别。(摘自 Pulsar 官方公众号文章) -
存储开销:延时消息的时间跨度会影响到 Pulsar 中已经消费的消息数据的空间回收。打个比方,你的 Topic 如果业务上要求支持一个月跨度的延时消息,然后你发了一个延时一个月的消息,那么你这个 Topic 中底层的存储就会保留整整一个月的消息数据,即使这一个月中99%的正常消息都已经消费了。

QMQ
如果对时间轮不熟悉的可以阅读笔者的这篇文章 从 Kafka 看时间轮算法设计

-
时间轮算法适合延时/定时消息的场景,省去延时消息的排序,插入删除操作都是 O(1) 的时间复杂度; -
通过多级时间轮设计,支持了超大时间跨度的延时消息; -
通过延时加载,内存中只会有最近要消费的消息,更久的延时消息会被存储在磁盘中,对内存友好; -
延时消息单独存储(schedule log),不会影响到正常消息的空间回收;
总结
参考
-
blog.itpub.net/31555607/viewspace-2672190
-
www.cnblogs.com/hzmark/p/mq-delay-msg.html
-
mp.weixin.qq.com/s/_wnwBgZgQhjLP14APlQTkA
-
github.com/qunarcorp/qmq/blob/master/docs/cn/arch.md
-
github.com/apache/rocketmq
如喜欢本文,请点击右上角,把文章分享到朋友圈
如有想了解学习的技术点,请留言给若飞安排分享
·END·
相关阅读:
一张图看懂微服务架构路线 基于Spring Cloud的微服务架构分析 微服务等于Spring Cloud?了解微服务架构和框架 如何构建基于 DDD 领域驱动的微服务? 微服务架构实施原理详解 微服务的简介和技术栈 设计一个容错的微服务架构
作者:Richard_Yi
来源:juejin.cn/post/7052894117105238053
版权申明:内容来源网络,仅供分享学习,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:[email protected]
本篇文章来源于微信公众号: 架构师