Redis实现异步消息队列,延时队列 - Go语言中文社区

Redis实现异步消息队列,延时队列


异步消息队列

        Redis中的 list(列表)实现异步消息队列,使用rpush / lpush 操作插入队列消息,使用 lpop 和  rpop 来出队消息。

        

队列空了怎么办?

       如果队列空了,客户端就会陷入pop的死循环,不停地pop,没有数据,接着pop,有没有数据。这样的空轮询拉高了客户端的CPU,redis的QPS也会被拉高,Redis的慢查询可能会显著增多。

       解决方案:使用命令 blpop、brpop,b(blocking,阻塞)。

       阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立即醒来,消费的延迟几乎为零。用这两个命令代替 lpop、rpop可以解决上面的问题。

延迟队列的实现

       延迟队列可以使用 zset(有序列表)实现,我们将消息序列化成一个字符串作为列表的value,这个消息的到期处理时间作为score,然后多个线程轮询zset 获取到期的任务进行执行,多线程保证了可靠性,因为多个线程,需要考虑并发执行的问题,一个任务不能被多次执行。

代码如下:

package list;

 

import java.lang.reflect.Type;

import java.util.Set;

import java.util.UUID;

 

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.TypeReference;

 

import redis.clients.jedis.Jedis;

 

/**

 * 延时异步消息队列的实现

 */

public class RedisDelayingQueue<T> {

 

    static class TaskItem<T> {

        public String id;

        public T msg;

    }

 

    // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference

    private Type TaskType = new TypeReference<TaskItem<T>>() {

    }.getType();

 

    private Jedis jedis;

    private String queueKey;

 

    public RedisDelayingQueue(Jedis jedis, String queueKey) {

        this.jedis = jedis;

        this.queueKey = queueKey;

    }

 

    public void delay(T msg) {

        TaskItem<T> task = new TaskItem<T>();

        task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid

        task.msg = msg;

        String s = JSON.toJSONString(task); // fastjson 序列化

        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试

    }

 

    public void loop() {

        while (!Thread.interrupted()) {

            // 只取一条

            Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);

            if (values.isEmpty()) {

                try {

                    Thread.sleep(500); // 歇会继续

                } catch (InterruptedException e) {

                    break;

                }

                continue;

            }

            String s = values.iterator().next();

            if (jedis.zrem(queueKey, s) > 0) { // 抢到了

                TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化

                this.handleMsg(task.msg);

            }

        }

    }

 

    public void handleMsg(T msg) {

        System.out.println(msg);

    }

 

    public static void main(String[] args) {

        Jedis jedis = new Jedis();

        final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");

        Thread producer = new Thread() {

 

            public void run() {

                for (int i = 0; i < 10; i++) {

                    queue.delay("codehole" + i);

                }

            }

        };

        Thread consumer = new Thread() {

            public void run() {

                queue.loop();

            }

        };

        producer.start();

        consumer.start();

        try {

            producer.join();

            Thread.sleep(6000);

            consumer.interrupt();

            consumer.join();

        } catch (InterruptedException e) {

        }

    }

}

 

 

优化代码如下:待续

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/ruanhao1203/article/details/88679900
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-09-04 14:15:23
  • 阅读 ( 1664 )
  • 分类:Redis

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢