go延迟消息队列 - Go语言中文社区

go延迟消息队列


go-delay-queue

github项目位置

基于消息组件实现延迟队列

延迟队列其实是与时间相关的队列,普通队列只要队列有数据,客户端消费就能拿到这个数据,而延迟队列内的数据是与时间相关的,绑定了一个过期时间,只有到了过期时间,客户端才能消费到这个数据。

应用场景:

  1. 定时任务,比如任务A和任务B是同条流水线上的,当任务A完成了,一个小时后执行任务B
  2. 重试业务,比如业务A需要调用其它服务,而服务出现问题,这时候就需要做业务重试

实现方式:

  1. 基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,或者定义时间轮,新消息落在指定位置;
  2. 基于队列的延迟: 设置不同延迟级别的队列,比如5s、1min、30mins、1h等,每个队列中消息的延迟时间都是相同的。

基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rmq可安装插件实现。第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。

本文基于第二种方案使用go去实现
先看简单的实现方式
在这里插入图片描述

  1. 生产者将消息格式化后按延迟等级发布到不同生产队列
  2. 延迟服务订阅所有队列,拉出消息,判断是否到期
  3. 到期后投递到不同消费队列
  4. 消费者订阅所有消费队列,拉出消息,进行对应到业务逻辑
  5. 业务操作失败,需重试,格式化数据投递到高等级队列

问题:

  1. 客户端要自己实现订阅不同队列组件(redis/ kafka/ rmq/ mysql);
  2. 队列数量多;
  3. 客户端要自己格式化消息以符合延迟服务数据标准;
  4. 失败重试需要自己开发往哪个等级发布

优化方案:
在这里插入图片描述

  1. 对服务进行抽象,对外提供生产和消费消息接口;
  2. 客户端不需要自己订阅队列,队列数减少一半;
  3. 服务内部对数据进行格式化流转;
  4. 服务通过回调对失败消息往高等级队列投递

代码设计思路

github项目位置

  1. 从上图看出,有四个主体:生产者、消费者、服务提供方、消息组件。先看消息组件,该方案基于组件实现,需要能够接入各个组件,那么可以就定义接口由各个组件库实现,有两个方法,也是消息队列组件最基本的功能,订阅、生产消息。
type Iqueue interface {
    // 订阅消息
    // topic 为队列名,dealFn为消息处理逻辑方法,[]byte为队列消息,当err发生则放回消息,默认为程序退出
	SubscribeMsg(topic string, dealFn func([]byte) (err error)) (err error)
	// 生产消息
	PublishMsg(topic string, msg []byte) (err error)
}

以redis实现为例 (其他实现:kafka

func (dr *Dredis) SubscribeMsg(topic string, dealFn func([]byte)(err error)) (err error){
	go func() {
		var tk = time.NewTicker(5 * time.Second)
		for ;; {
			// 获取队列消息
			if msg,err := dr.conn.RPop(topic).Bytes(); err == nil {
				err = dealFn(msg)
				if err != nil {
					// 程序退出,消息重新放回
					err = dr.conn.RPush(topic, msg).Err()
					if err != nil {
						// log
						log.Print("消息重新放回失败:", err)
					}
				}
			} else {
				if err != redis.Nil {
					// log
					log.Print("消息出队失败:", err)
				}
				<-tk.C
			}
		}
	}()
	return 
}

func  (dr *Dredis) PublishMsg(topic string, msg []byte) (err error){
	err = dr.conn.LPush(topic, msg).Err()
	if err != nil {
		// log
		log.Print("消息入队失败:", err)
	}
	return 
}
  1. 生产者和消费者实质都为服务调用方,即客户端,那客户端和服务端之间的纽带就是消息,所以我们要构建消息队列,不同的消息队列代表了不同的延迟等级(1min,5min,1h等等)、处理逻辑等, 下面demo定义了三个等级(低中高)示例前往:
// 创建延迟消息
low := &go_delay_queue.DelayLevel{
	TopicName: "low", // 队列名
	Level:     0, // 延迟等级
	RetryNums: 1, // 重试次数,处理失败可重新放回该队列,超过次数就入到下一等级
	Ttl:       1 * time.Minute, // 消息延迟至少1min后可被使用
	DealFn: dealMsg, // 客户端消息处理逻辑方法
}
medium := &go_delay_queue.DelayLevel{
	TopicName: "medium",
	Level:     1,
	RetryNums: 1,
	Ttl:       5 * time.Minute,
	DealFn: dealMsg,
}
high := &go_delay_queue.DelayLevel{
	TopicName: "high",
	Level:     2,
	RetryNums: 1,
	Ttl:       1 * time.Hour,
	DealFn: dealMsg,
}

func dealMsg(dtm go_delay_queue.DelayTopicMsg) (err error) {
	fmt.Printf("开始处理消息:%#vn", dtm)
	return
}
  1. 有了消息就可以把服务方和消息组件关联起来,延迟服务本身做的几个关键事
  • 把延迟消息队列名交给对应消息接口实现,等待延迟消息
go dr.queuer.SubscribeMsg(item.TopicName, dr.dealMsg)
  • 计算延迟消息过期时长,生效则将延迟消息传递给客户端处理
// 判断时间是否达到指定时间
var ttl = stru.ExpiredAt - dr.now
if dr.Debug {
	fmt.Println(fmt.Sprintf("消息未到达指定时间,等待 %dsn", ttl))
}
select {
	case <-time.After(time.Duration(ttl) * time.Second):
	case <-dr.clientCtx.Done():
		// 消息重回
		err = errors.New("reload")
		return err
	}
}
  • 根据客户端处理结果,成功则丢弃消息,失败则重入消息
// 可以开始消费
go func(delayMsg *DelayTopicMsg) {
    err := dr.levelTopicMap[delayMsg.Level].DealFn(*delayMsg)
    if dr.Debug {
        fmt.Println("交付客户端处理:", err)
    }
    if err != nil {
        // 入下一等级消息
        dr.inQueue(true, delayMsg)
    }
}(stru)
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/xuandaijian/article/details/107925860
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2021-05-16 12:03:11
  • 阅读 ( 1274 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢