golang 实现延迟消息原理与方法 - Go语言中文社区

golang 实现延迟消息原理与方法


原文:https://www.cnblogs.com/jkko123/p/7239420.html

实现延迟消息最主要的两个结构:

环形队列:通过golang中的数组实现,分成3600个slot。

任务集合:通过map[key]*Task,每个slot一个map,map的值就是我们要执行的任务。

原理图如下:
在这里插入图片描述

package main

import (
	"fmt"
	"errors"
	"time"
)

type DelayMessage struct {
	curIndex int;
	slots [3600]map[string]*Task;
	closed chan bool;
	taskClose chan bool;
	timeClose chan bool;
	startTime time.Time;
}


type TaskFunc func(args ... interface{})

type Task struct {
	cycleNum int;
	exec TaskFunc;
	params []interface{};
}

func NewDelayMessage() *DelayMessage{
	dm := &DelayMessage{
		curIndex: 0,
		closed:make(chan bool),
		taskClose: make(chan bool),
		timeClose: make(chan bool),
		startTime: time.Now(),
	}

	for i := 0; i < 3600; i++{
		dm.slots[i] = make(map[string]*Task);
	}

	return dm;
}

func (dm *DelayMessage)Close(){
	dm.closed <- true;
}

func (dm *DelayMessage)taskLoop(){
	defer func() {
		fmt.Println("taskLoop exit")
	}();

	for {
		select{
		case <- dm.taskClose:
			{
				return
			}
		default:
			{
				tasks := dm.slots[dm.curIndex];
				if len(tasks) > 0{
					for k, v := range tasks{
						if v.cycleNum == 0{
							go v.exec(v.params...);
							delete(tasks, k);
						}else{
							v.cycleNum--;
						}
					}
				}
			}
		}
	}
}
//启动延迟消息
func (dm *DelayMessage) Start() {
	go dm.taskLoop();
	go dm.timeLoop();
	select {
	case <-dm.closed:
		{
			dm.taskClose <- true;
			dm.timeClose <- true;
			break;
		}
	};
}
//处理每1秒移动下标
func (dm *DelayMessage) timeLoop() {
	defer func() {
		fmt.Println("timeLoop exit");
	}();
	tick := time.NewTicker(time.Second);
	for {
		select {
		case <-dm.timeClose:
			{
				return;
			}
		case <-tick.C:
			{
				fmt.Println(time.Now().Format("2006-01-02 15:04:05"));
				//判断当前下标,如果等于3599则重置为0,否则加1
				if dm.curIndex == 3599 {
					dm.curIndex = 0;
				} else {
					dm.curIndex++;
				}
			}
		}
	}
}

//添加任务
func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []interface{}) error {
	if dm.startTime.After(t) {
		return errors.New("时间错误");
	}
	//当前时间与指定时间相差秒数
	subSecond := t.Unix() - dm.startTime.Unix();
	//计算循环次数
	cycleNum := int(subSecond / 3600);
	//计算任务所在的slots的下标
	ix := subSecond % 3600;
	//把任务加入tasks中
	tasks := dm.slots[ix];
	if _, ok := tasks[key]; ok {
		return errors.New("该slots中已存在key为" + key + "的任务");
	}
	tasks[key] = &Task{
		cycleNum: cycleNum,
		exec:     exec,
		params:   params,
	};
	return nil;
}

func main() {
	//创建延迟消息
	dm := NewDelayMessage();
	//添加任务
	dm.AddTask(time.Now().Add(time.Second*10), "test1", func(args ...interface{}) {
		fmt.Println(args...);
	}, []interface{}{1, 2, 3});
	dm.AddTask(time.Now().Add(time.Second*10), "test2", func(args ...interface{}) {
		fmt.Println(args...);
	}, []interface{}{4, 5, 6});
	dm.AddTask(time.Now().Add(time.Second*20), "test3", func(args ...interface{}) {
		fmt.Println(args...);
	}, []interface{}{"hello", "world", "test"});
	dm.AddTask(time.Now().Add(time.Second*30), "test4", func(args ...interface{}) {
		sum := 0;
		for arg := range args {
			sum += arg;
		}
		fmt.Println("sum : ", sum);
	}, []interface{}{1, 2, 3});

	//40秒后关闭
	time.AfterFunc(time.Second*40, func() {
		dm.Close();
	});
	dm.Start();
}
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/zhizhengguan/article/details/90576511
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-09-04 14:15:54
  • 阅读 ( 1359 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢