spring boot-使用redis的Keyspace Notifications实现定时任务队列 - Go语言中文社区

spring boot-使用redis的Keyspace Notifications实现定时任务队列


前言:

最近项目中有一个需求:需要将执行失败的指令存起来,隔5分钟之后,再取出来执行一次,看到这个需求描述,我第一时间想到了Redis的Keyspace Notifications机制。当然也可以使用jdk自带的DelayQueue来实现,或者更进一步,使用ScheduledThreadPoolExecutor池来实现。由于系统是分布式的,所以考虑使用Redis来实现。

关于Redis的Keyspace Notifications机制,请参考:http://redisdoc.com/topic/notification.htm

实现原理:在Redis2.8之后的版本中,当我们将<key, value>对使用Redis缓存起来并设置缓存失效时间的时候,会触发Redis的键事件通知,客户端订阅这个通知事件,服务端会将对应的通知事件发送给每个订阅的客户端,然后客户端根据收到的通知,做相应的后续处理(例如:键过期时间通知对应的topic为:"__keyevent@0__:expired")。Redis支持的通知类型如下:

因为开启键空间通知功能需要消耗一些 CPU ,所以在默认配置下,该功能处于关闭状态。

可以通过修改 redis.conf 文件,或者直接使用CONFIGSET 命令来开启或关闭键空间通知功能:

配置文件修改方式如下:

notify-keyspace-events Ex  // 打开此配置,其中Ex表示键事件通知里面的key过期事件,每当有过期键被删除时,会发送通知

  • notify-keyspace-events 选项的参数为空字符串时,功能关闭。
  • 另一方面,当参数不是空字符串时,功能开启。

notify-keyspace-events 的参数可以是以下字符的任意组合,它指定了服务器该发送哪些类型的通知:

字符 发送的通知
K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
g DEL EXPIRERENAME 等类型无关的通用命令的通知
$ 字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 过期事件:每当有过期键被删除时发送
e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
A 参数 g$lshzxe 的别名

输入的参数中至少要有一个 K 或者E ,否则的话,不管其余的参数是什么,都不会有任何通知被分发。

举个例子,如果只想订阅键空间中和列表相关的通知,那么参数就应该设为 Kl ,诸如此类。

将参数设为字符串 "AKE" 表示发送所有类型的通知。

1、编写监听器

package com.chhliu.springboot.redis.config;


import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class TopicMessageListener implements MessageListener {

	@Override
	public void onMessage(Message message, byte[] pattern) {// 客户端监听订阅的topic,当有消息的时候,会触发该方法
        	byte[] body = message.getBody();// 请使用valueSerializer
        	byte[] channel = message.getChannel();
        	String topic = new String(channel);
        	String itemValue = new String(body);
        	// 请参考配置文件,本例中key,value的序列化方式均为string。
        	System.out.println("topic:"+topic);
        	System.out.println("itemValue:"+itemValue);
	}
}

2、配置RedisMessageListenerContainer监听容器

package com.chhliu.springboot.redis.config;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class RedisMessageListenerContainerConfig {
	
	@Autowired
	private RedisTemplate<Object,Object> redisTemplate;
	
	@Autowired
	private TopicMessageListener messageListener;
	
	@Autowired
	private TaskThreadPoolConfig config;
	
	@Value("spring.redis.topic")
	private String topic;
	
	@Bean
	public RedisMessageListenerContainer configRedisMessageListenerContainer(Executor executor){
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		// 设置Redis的连接工厂
		container.setConnectionFactory(redisTemplate.getConnectionFactory());
		// 设置监听使用的线程池
		container.setTaskExecutor(executor);
		// 设置监听的Topic
		ChannelTopic channelTopic = new ChannelTopic("__keyevent@0__:expired");
		// 设置监听器
		container.addMessageListener(messageListener, channelTopic);
		return container;
	}
	
	@Bean // 配置线程池
	public Executor myTaskAsyncPool() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(config.getCorePoolSize());
		executor.setMaxPoolSize(config.getMaxPoolSize());
		executor.setQueueCapacity(config.getQueueCapacity());
		executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
		executor.setThreadNamePrefix(config.getThreadNamePrefix());

		// rejection-policy:当pool已经达到max size的时候,如何处理新任务
		// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		executor.initialize();
		return executor;
	}
}
注:具体的线程池配置,可以参考我的另一篇博客:http://blog.csdn.net/liuchuanhong1/article/details/64132520

注意:”_keyevent@0_:expired“中的"_",并不是英文下的下划线,建议从Redis客户端直接拷贝,否则会匹配不上订阅的topic


3、配置文件

server.port=9999
########################################################
###REDIS (RedisProperties) redis基本配置;
########################################################
# database name
spring.redis.database=0
# server host1 单机使用,对应服务器ip
#spring.redis.host=127.0.0.1  
# server password 密码,如果没有设置可不配
#spring.redis.password=
#connection port  单机使用,对应端口号
#spring.redis.port=6379
# pool settings ...池配置
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1
# name of Redis server  哨兵监听的Redis server的名称
spring.redis.sentinel.master=mymaster
# comma-separated list of host:port pairs  哨兵的配置列表
spring.redis.sentinel.nodes=192.168.1.108:26379,192.168.1.108:26479,192.168.1.108:26579

spring.task.pool.corePoolSize=10
spring.task.pool.maxPoolSize=20
spring.task.pool.keepAliveSeconds=60
spring.task.pool.queueCapacity=100
spring.task.pool.threadNamePrefix=myThreadPool

spring.redis.topic=__keyevent@0__:expired

4、启动程序

package com.chhliu.springboot.redis;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

import com.chhliu.springboot.redis.config.TaskThreadPoolConfig;

@SpringBootApplication
@EnableConfigurationProperties({TaskThreadPoolConfig.class} ) // 开启配置属性支持
public class SpringbootRedisApplication {
	
	public static void main(String[] args) {
		SpringApplication.run(SpringbootRedisApplication.class, args);
	}
}

5、在客户端输入命令

set myname chhliu
expire myname 2
或者如下:
set myname chhliu px 2000
设置键值,并指定超时时间为2s,大概2s后,就会看到console端如下的输出了:

2017-04-12 20:23:16.367  INFO 12464 --- [  myThreadPool2] c.c.s.redis.config.TopicMessageListener  : 是否获取到锁:true
topic:__keyevent@0__:expired
itemValue:myname
2017-04-12 20:23:16.369  INFO 12464 --- [  myThreadPool2] c.c.s.redis.config.TopicMessageListener  : 任务结束,释放锁!
注意:

1、上面的_keyevent@0_:expired是key过期事件对应的topic,服务端会将过期事件推送到该topic中,然后客户端监听这个topic。

2、key过期事件推送到topic中的内容只有key,而无value,因为一旦过期,value就不存在了。

注意事项

Redis 使用以下两种方式删除过期的键:

  • 当一个键被访问时,程序会对这个键进行检查,如果键已经过期,那么该键将被删除。
  • 底层系统会在后台渐进地查找并删除那些过期的键,从而处理那些已经过期、但是不会被访问到的键。

当过期键被以上两个程序的任意一个发现、并且将键从数据库中删除时,Redis 会产生一个 expired 通知。

Redis 并不保证生存时间(TTL)变为 0 的键会立即被删除:如果程序没有访问这个过期键,或者带有生存时间的键非常多的话,那么在键的生存时间变为0 ,直到键真正被删除这中间,可能会有一段比较显著的时间间隔。

因此,Redis 产生 expired 通知的时间为过期键被删除的时候,而不是键的生存时间变为0 的时候。如果业务无法容忍从过期到删除中间的时间间隔,那么就只有用其他的方式了。
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/liuchuanhong1/article/details/70147149
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢