Golang实现基于Redis的可靠延迟队列

目录

前言

原理详解

pending2ReadyScript

ready2UnackScript

unack2RetryScript

ack

consume

前言

在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 go 语言社区中并无类似的库。不过问题不大,没有轮子我们自己造。

本文的完整代码实现在hdt3213/delayqueue,可以直接 go get 安装使用。

使用有序集合结构实现延时队列的方法已经广为人知,无非是将消息作为有序集合的 member, 投递时间戳作为 score 使用 zrangebyscore 命令搜索已到投递时间的消息然后将其发给消费者。

然而消息队列不是将消息发给消费者就万事大吉,它们还有一个重要职责是确保送达和消费。通常的实现方式是当消费者收到消息后向消息队列返回确认(ack),若消费者返回否定确认(nack)或超时未返回,消息队列则会按照预定规则重新发送,直到到达最大重试次数后停止。如何实现 ack 和重试机制是我们要重点考虑的问题。

我们的消息队列允许分布式地部署多个生产者和消费者,消费者实例定时执行 lua 脚本驱动消息在队列中的流转无需部署额外组件。由于 Redis 保证了 lua 脚本执行的原子性,整个流程无需加锁。

消费者采用拉模式获得消息,保证每条消息至少投递一次,消息队列会重试超时或者被否定确认的消息(nack) 直至到达最大重试次数。一条消息最多有一个消费者正在处理,减少了要考虑的并发问题。

请注意:若消费时间超过了 MaxConsumeDuration 消息队列会认为消费超时并重新投递,此时可能有多个消费者同时消费。

具体使用也非常简单,只需要注册处理消息的回调函数并调用 start() 即可:

package main import ( "github.com/go-redis/redis/v8" "github.com/hdt3213/delayqueue" "strconv" "time" ) func main() { redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool { // 注册处理消息的回调函数 // 返回 true 表示已成功消费,返回 false 消息队列会重新投递次消息 return true }) // 发送延时消息 for i := 0; i < 10; i++ { err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3)) if err != nil { panic(err) } } // start consume done := queue.StartConsume() <-done }

由于数据存储在 redis 中所以我们最多能保证在 redis 无故障且消息队列相关 key 未被外部篡改的情况下不会丢失消息。

原理详解

消息队列涉及几个关键的 redis 数据结构:

msgKey: 为了避免两条内容完全相同的消息造成意外的影响,我们将每条消息放到一个字符串类型的键中,并分配一个 UUID 作为它的唯一标识。其它数据结构中只存储 UUID 而不存储完整的消息内容。每个 msg 拥有一个独立的 key 而不是将所有消息放到一个哈希表是为了利用 TTL 机制避免

pendingKey: 有序集合类型,member 为消息 ID, score 为投递时间的 unix 时间戳。

readyKey: 列表类型,需要投递的消息 ID。

unAckKey: 有序集合类型,member 为消息 ID, score 为重试时间的 unix 时间戳。

retryKey: 列表类型,已到重试时间的消息 ID

garbageKey: 集合类型,用于暂存已达重试上线的消息 ID

retryCountKey: 哈希表类型,键为消息 ID, 值为剩余的重试次数

流程如下图所示:

由于我们允许分布式地部署多个消费者,每个消费者都在定时执行 lua 脚本,所以多个消费者可能处于上述流程中不同状态,我们无法预知(或控制)上图中五个操作发生的先后顺序,也无法控制有多少实例正在执行同一个操作。

因此我们需要保证上图中五个操作满足三个条件:

都是原子性的

不会重复处理同一条消息

操作前后消息队列始终处于正确的状态

只要满足这三个条件,我们就可以部署多个实例且不需要使用分布式锁等技术来进行状态同步。

是不是听起来有点吓人?其实简单的很,让我们一起来详细看看吧~

pending2ReadyScript

pending2ReadyScript 使用 zrangebyscore 扫描已到投递时间的消息ID并把它们移动到 ready 中:

-- keys: pendingKey, readyKey -- argv: currentTime local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中找出已到投递时间的消息 if (#msgs == 0) then return end local args2 = {'LPush', KEYS[2]} -- 将他们放入 ready key 中 for _,v in ipairs(msgs) do table.insert(args2, v) end redis.call(unpack(args2)) redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中删除已投递的消息 ready2UnackScript

ready2UnackScript 从 ready 或者 retry 中取出一条消息发送给消费者并放入 unack 中,类似于 RPopLPush:

-- keys: readyKey/retryKey, unackKey -- argv: retryTime local msg = redis.call('RPop', KEYS[1]) if (not msg) then return end redis.call('ZAdd', KEYS[2], ARGV[1], msg) return msg unack2RetryScript

unack2RetryScript 从 retry 中找出所有已到重试时间的消息并把它们移动到 unack 中:

-- keys: unackKey, retryCountKey, retryKey, garbageKey -- argv: currentTime local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重试时间的消息 if (#msgs == 0) then return end local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查询剩余重试次数 for i,v in ipairs(retryCounts) do local k = msgs[i] if tonumber(v) > 0 then -- 剩余次数大于 0 redis.call("HIncrBy", KEYS[2], k, -1) -- 减少剩余重试次数 redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中 else -- 剩余重试次数为 0 redis.call("HDel", KEYS[2], k) -- 删除重试次数记录 redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后续删除 end end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 将已处理的消息从 unack key 中删除

因为 redis 要求 lua 脚本必须在执行前在 KEYS 参数中声明自己要访问的 key, 而我们将每个 msg 有一个独立的 key,我们在执行 unack2RetryScript 之前是不知道哪些 msg key 需要被删除。所以 lua 脚本只将需要删除的消息记在 garbage key 中,脚本执行完后再通过 del 命令将他们删除:

func (q *DelayQueue) garbageCollect() error { ctx := context.Background() msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result() if err != nil { return fmt.Errorf("smembers failed: %v", err) } if len(msgIds) == 0 { return nil } // allow concurrent clean msgKeys := make([]string, 0, len(msgIds)) for _, idStr := range msgIds { msgKeys = append(msgKeys, q.genMsgKey(idStr)) } err = q.redisCli.Del(ctx, msgKeys...).Err() if err != nil && err != redis.Nil { return fmt.Errorf("del msgs failed: %v", err) } err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err() if err != nil && err != redis.Nil { return fmt.Errorf("remove from garbage key failed: %v", err) } return nil }

之前提到的 lua 脚本都是原子性执行的,不会有其它命令插入其中。 gc 函数由 3 条 redis 命令组成,在执行过程中可能会有其它命令插入执行过程中,不过考虑到一条消息进入垃圾回收流程之后不会复活所以不需要保证 3 条命令原子性。

ack

ack 只需要将消息彻底删除即可:

func (q *DelayQueue) ack(idStr string) error { ctx := context.Background() err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err() if err != nil { return fmt.Errorf("remove from unack failed: %v", err) } // msg key has ttl, ignore result of delete _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err() q.redisCli.HDel(ctx, q.retryCountKey, idStr) return nil }

否定确认只需要将 unack key 中消息的重试时间改为现在,随后执行的 unack2RetryScript 会立即将它移动到 retry key

func (q *DelayQueue) nack(idStr string) error { ctx := context.Background() // update retry time as now, unack2Retry will move it to retry immediately err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{ Member: idStr, Score: float64(time.Now().Unix()), }).Err() if err != nil { return fmt.Errorf("negative ack failed: %v", err) } return nil } consume

消息队列的核心逻辑是每秒执行一次的 consume 函数,它负责调用上述脚本将消息转移到正确的集合中并回调 consumer 来消费消息:

func (q *DelayQueue) consume() error { // 执行 pending2ready,将已到时间的消息转移到 ready err := q.pending2Ready() if err != nil { return err } // 循环调用 ready2Unack 拉取消息进行消费 var fetchCount uint for { idStr, err := q.ready2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } fetchCount++ ack, err := q.callback(idStr) if err != nil { return err } if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } if err != nil { return err } if fetchCount >= q.fetchLimit { break } } // 将 nack 或超时的消息放入重试队列 err = q.unack2Retry() if err != nil { return err } // 清理已达到最大重试次数的消息 err = q.garbageCollect() if err != nil { return err } // 消费重试队列 fetchCount = 0 for { idStr, err := q.retry2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } fetchCount++ ack, err := q.callback(idStr) if err != nil { return err } if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } if err != nil { return err } if fetchCount >= q.fetchLimit { break } } return nil }

至此一个简单可靠的延时队列就做好了,何不赶紧开始试用呢?

以上就是Golang实现基于Redis的可靠延迟队列的详细内容,更多关于Golang Redis可靠延迟队列的资料请关注易知道(ezd.cc)其它相关文章!

推荐阅读

    3500元超额值学生娱乐结构的优化配置

    3500元超额值学生娱乐结构的优化配置,,作为一个DIY的主流用户领域的学生,每个用户51学生攒机的高峰。因为学生用户没有稳定的收入来源,攒机

    探探语言设置|探探怎么设置语言

    探探语言设置|探探怎么设置语言,,1. 探探怎么设置语言打开探探软件,然后就有消息提示的红点,点开就行了!其实这些软件都是挺简单的操作的,都是

    git设置编码|git语言设置

    git设置编码|git语言设置,,git设置编码点击cap4j搜索从git直接链接上拉代码。git语言设置Git是一个开源的分布式版本控制系统,可以有效、高

    论竞技比赛结构中的十种常见误区

    论竞技比赛结构中的十种常见误区,,近年来,在中国电子竞技的发展得到了迅速的发展。虽然网络游戏市场正在进一步扩大,但我们也看到CS和魔兽争

    js用代码实现简单购物车

    js用代码实现简单购物车,,图: 选择所有按钮: 复制代码代码如下所示: 选择 笔记本电脑:3000元 笔记本电脑:3000元 笔记本电脑:3000元 笔记本电脑:3

    区域语言设置|区域语言设置工具

    区域语言设置|区域语言设置工具,,区域语言设置工具你好,大致的方法如下,可以参考:1、按下键盘的windows 图标,再开始菜单中单击“设置”;出现的

    c4d语言设置|c4d汉语设置

    c4d语言设置|c4d汉语设置,,1. c4d汉语设置mac版的C4D是这样的,中文字体是有的,但是是以拼音的形式存在,比如黑体就是ht。中文字体以拼音方式

    电脑宣传语|电脑宣传语言

    电脑宣传语|电脑宣传语言,,1. 电脑宣传语言1.我做好了与你过一辈子的打算,也做好了你随时要走的准备,2.每段青春都会苍老,但我希望记忆里的你

    office语言设置|微软office语言设置

    office语言设置|微软office语言设置,,微软office语言设置一、首先点击桌面左下角“WIN键”。二、弹出选项内点击“所有程序”。三、接着点