社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
延迟队列其实是与时间相关的队列,普通队列只要队列有数据,客户端消费就能拿到这个数据,而延迟队列内的数据是与时间相关的,绑定了一个过期时间,只有到了过期时间,客户端才能消费到这个数据。
应用场景:
实现方式:
基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rmq可安装插件实现。第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。
本文基于第二种方案使用go去实现
先看简单的实现方式
问题:
优化方案:
type Iqueue interface {
// 订阅消息
// topic 为队列名,dealFn为消息处理逻辑方法,[]byte为队列消息,当err发生则放回消息,默认为程序退出
SubscribeMsg(topic string, dealFn func([]byte) (err error)) (err error)
// 生产消息
PublishMsg(topic string, msg []byte) (err error)
}
以redis实现为例 (其他实现:kafka )
func (dr *Dredis) SubscribeMsg(topic string, dealFn func([]byte)(err error)) (err error){
go func() {
var tk = time.NewTicker(5 * time.Second)
for ;; {
// 获取队列消息
if msg,err := dr.conn.RPop(topic).Bytes(); err == nil {
err = dealFn(msg)
if err != nil {
// 程序退出,消息重新放回
err = dr.conn.RPush(topic, msg).Err()
if err != nil {
// log
log.Print("消息重新放回失败:", err)
}
}
} else {
if err != redis.Nil {
// log
log.Print("消息出队失败:", err)
}
<-tk.C
}
}
}()
return
}
func (dr *Dredis) PublishMsg(topic string, msg []byte) (err error){
err = dr.conn.LPush(topic, msg).Err()
if err != nil {
// log
log.Print("消息入队失败:", err)
}
return
}
// 创建延迟消息
low := &go_delay_queue.DelayLevel{
TopicName: "low", // 队列名
Level: 0, // 延迟等级
RetryNums: 1, // 重试次数,处理失败可重新放回该队列,超过次数就入到下一等级
Ttl: 1 * time.Minute, // 消息延迟至少1min后可被使用
DealFn: dealMsg, // 客户端消息处理逻辑方法
}
medium := &go_delay_queue.DelayLevel{
TopicName: "medium",
Level: 1,
RetryNums: 1,
Ttl: 5 * time.Minute,
DealFn: dealMsg,
}
high := &go_delay_queue.DelayLevel{
TopicName: "high",
Level: 2,
RetryNums: 1,
Ttl: 1 * time.Hour,
DealFn: dealMsg,
}
func dealMsg(dtm go_delay_queue.DelayTopicMsg) (err error) {
fmt.Printf("开始处理消息:%#vn", dtm)
return
}
go dr.queuer.SubscribeMsg(item.TopicName, dr.dealMsg)
// 判断时间是否达到指定时间
var ttl = stru.ExpiredAt - dr.now
if dr.Debug {
fmt.Println(fmt.Sprintf("消息未到达指定时间,等待 %dsn", ttl))
}
select {
case <-time.After(time.Duration(ttl) * time.Second):
case <-dr.clientCtx.Done():
// 消息重回
err = errors.New("reload")
return err
}
}
// 可以开始消费
go func(delayMsg *DelayTopicMsg) {
err := dr.levelTopicMap[delayMsg.Level].DealFn(*delayMsg)
if dr.Debug {
fmt.Println("交付客户端处理:", err)
}
if err != nil {
// 入下一等级消息
dr.inQueue(true, delayMsg)
}
}(stru)
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!