Java 使用Redis实现延时队列 - Go语言中文社区

Java 使用Redis实现延时队列


A:需求说明:

  1. 如果系统中需要用到定时执行计划的,又不想用到中间件,如果轮询数据库的话,会导致大量资源消耗,这样我们就可以使用Redis来实现类似功(需要使用rabbitMQ的请看这里:https://blog.csdn.net/u010096717/article/details/82148681
  2. 业务类型,如订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论,还有排队到时提醒等

B:实现思路:

  1. 将整个Redis当做消息池,以kv形式存储消息,key为id,value为具体的消息body
  2. 使用ZSET做优先队列,按照score维持优先级(用当前时间+需要延时的时间作为score)
  3. 轮询ZSET,拿出score比当前时间戳大的数据(已过期的)
  4. 根据id拿到消息池的具体消息进行消费
  5. 消费成功,删除改队列和消息
  6. 消费失败,让该消息重新回到队列

C:代码实现

  1. Message消息封装类
    @Data
    public class Message {
    
        /**
         * 消息id
         */
        private String id;
        /**
         * 消息延迟/毫秒
         */
        private long delay;
    
        /**
         * 消息存活时间
         */
        private int ttl;
        /**
         * 消息体,对应业务内容
         */
        private String body;
        /**
         * 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
         * 用来消除时间的影响
         */
        private long createTime;
    
    }

 

2.基于redis的消息队列

@Component
public class RedisMQ {

    /**
     * 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
     * 的消息体body作为值存储
     */
    public static final String MSG_POOL = "Message:Pool:";
    /**
     * zset队列 名称 queue
     */
    public static final String QUEUE_NAME = "Message:Queue:";

    private static final int SEMIH = 30*60;



    @Autowired
    private RedisService redisService;

    /**
     * 存入消息池
     * @param message
     * @return
     */
    public boolean addMsgPool(Message message) {

        if (null != message) {
            return redisService.setExp(MSG_POOL + message.getId(), message.getBody(), Long.valueOf(message.getTtl() + SEMIH));
        }
        return false;
    }

    /**
     * 从消息池中删除消息
     * @param id
     * @return
     */
    public void deMsgPool(String id) {
        redisService.remove(MSG_POOL + id);
    }

    /**
     * 向队列中添加消息
     * @param key
     * @param score 优先级
     * @param val
     * @return 返回消息id
     */
    public void enMessage(String key, long score, String val) {
        redisService.zsset(key,val,score);
    }

    /**
     * 从队列删除消息
     * @param id
     * @return
     */
    public boolean deMessage(String key, String id) {
        return redisService.zdel(key, id);
    }
    
}

 

3Redis操作工具类,这个工具类比较多方法,就不贴在这里了(https://blog.csdn.net/u010096717/article/details/83783865)

4.编写消息发送(生产者)

@Component
public class MessageProvider {

    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);


    private static int delay = 30;//30秒,可自己动态传入

    @Resource
    private RedisMQ redisMQ;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //改造成redis
    public void sendMessage(String messageContent) {
        try {
            if (messageContent != null){
                String seqId = UUID.randomUUID().toString();
                // 将有效信息放入消息队列和消息池中
                Message message = new Message();
                // 可以添加延迟配置
                message.setDelay(delay*1000);
                message.setCreateTime(System.currentTimeMillis());
                message.setBody(messageContent);
                message.setId(seqId);
                // 设置消息池ttl,防止长期占用
                message.setTtl(delay + 360);
                redisMQ.addMsgPool(message);
                //当前时间加上延时的时间,作为score
                Long delayTime = message.getCreateTime() + message.getDelay();
                String d = sdf.format(message.getCreateTime());
                System.out.println("当前时间:" + d+",消费的时间:" + sdf.format(delayTime));
                redisMQ.enMessage(RedisMQ.QUEUE_NAME,delayTime, message.getId());
            }else {
                logger.warn("消息内容为空!!!!!");
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

5.消息消费者

@Component
public class RedisMQConsumer {

    @Resource
    private RedisMQ redisMQ;

    @Autowired
    private RedisService redisService;

    @Autowired
    private MessageProvider provider;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    /**
     * 消息队列监听器<br>
     *
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void monitor() {
        Set<String> set = redisService.rangeByScore(RedisMQ.QUEUE_NAME, 0, System.currentTimeMillis());
        if (null != set) {
            long current = System.currentTimeMillis();
            for (String id : set) {
                long  score = redisService.getScore(RedisMQ.QUEUE_NAME, id).longValue();
                if (current >= score) {
                    // 已超时的消息拿出来消费
                    String str = "";
                    try {
                        str = redisService.get(RedisMQ.MSG_POOL + id);
                        System.out.println("消费了:" + str+ ",消费的时间:" + sdf.format(System.currentTimeMillis()));
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出了异常,则重新放回队列
                        System.out.println("消费异常,重新回到队列");
                        provider.sendMessage(str);
                    } finally {
                        redisMQ.deMessage(RedisMQ.QUEUE_NAME, id);
                        redisMQ.deMsgPool(id);
                    }
                }
            }
        }
    }
}

6.配置信息

<!--1依赖引入-->
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>


2yml配置
spring:
  redis:
    database: 1
    host: 127.0.0.1
    port: 6379

以上代码已经实现了延迟消费功能,现在来测试一下,调用MessageProvider的sendMessage方法,我设定了30秒

可以看到结果

因为我们是用定时器去轮询的,会出现误差

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/u010096717/article/details/83783039
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-09-04 14:15:18
  • 阅读 ( 1401 )
  • 分类:Redis

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢