社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
A:需求说明:
B:实现思路:
C:代码实现
Message消息封装类
@Data
public class Message {
/**
* 消息id
*/
private String id;
/**
* 消息延迟/毫秒
*/
private long delay;
/**
* 消息存活时间
*/
private int ttl;
/**
* 消息体,对应业务内容
*/
private String body;
/**
* 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
* 用来消除时间的影响
*/
private long createTime;
}
2.基于redis的消息队列
@Component
public class RedisMQ {
/**
* 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
* 的消息体body作为值存储
*/
public static final String MSG_POOL = "Message:Pool:";
/**
* zset队列 名称 queue
*/
public static final String QUEUE_NAME = "Message:Queue:";
private static final int SEMIH = 30*60;
@Autowired
private RedisService redisService;
/**
* 存入消息池
* @param message
* @return
*/
public boolean addMsgPool(Message message) {
if (null != message) {
return redisService.setExp(MSG_POOL + message.getId(), message.getBody(), Long.valueOf(message.getTtl() + SEMIH));
}
return false;
}
/**
* 从消息池中删除消息
* @param id
* @return
*/
public void deMsgPool(String id) {
redisService.remove(MSG_POOL + id);
}
/**
* 向队列中添加消息
* @param key
* @param score 优先级
* @param val
* @return 返回消息id
*/
public void enMessage(String key, long score, String val) {
redisService.zsset(key,val,score);
}
/**
* 从队列删除消息
* @param id
* @return
*/
public boolean deMessage(String key, String id) {
return redisService.zdel(key, id);
}
}
3Redis操作工具类,这个工具类比较多方法,就不贴在这里了(https://blog.csdn.net/u010096717/article/details/83783865)
4.编写消息发送(生产者)
@Component
public class MessageProvider {
static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
private static int delay = 30;//30秒,可自己动态传入
@Resource
private RedisMQ redisMQ;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//改造成redis
public void sendMessage(String messageContent) {
try {
if (messageContent != null){
String seqId = UUID.randomUUID().toString();
// 将有效信息放入消息队列和消息池中
Message message = new Message();
// 可以添加延迟配置
message.setDelay(delay*1000);
message.setCreateTime(System.currentTimeMillis());
message.setBody(messageContent);
message.setId(seqId);
// 设置消息池ttl,防止长期占用
message.setTtl(delay + 360);
redisMQ.addMsgPool(message);
//当前时间加上延时的时间,作为score
Long delayTime = message.getCreateTime() + message.getDelay();
String d = sdf.format(message.getCreateTime());
System.out.println("当前时间:" + d+",消费的时间:" + sdf.format(delayTime));
redisMQ.enMessage(RedisMQ.QUEUE_NAME,delayTime, message.getId());
}else {
logger.warn("消息内容为空!!!!!");
}
}catch (Exception e){
e.printStackTrace();
}
}
}
5.消息消费者
@Component
public class RedisMQConsumer {
@Resource
private RedisMQ redisMQ;
@Autowired
private RedisService redisService;
@Autowired
private MessageProvider provider;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 消息队列监听器<br>
*
*/
@Scheduled(cron = "*/1 * * * * *")
public void monitor() {
Set<String> set = redisService.rangeByScore(RedisMQ.QUEUE_NAME, 0, System.currentTimeMillis());
if (null != set) {
long current = System.currentTimeMillis();
for (String id : set) {
long score = redisService.getScore(RedisMQ.QUEUE_NAME, id).longValue();
if (current >= score) {
// 已超时的消息拿出来消费
String str = "";
try {
str = redisService.get(RedisMQ.MSG_POOL + id);
System.out.println("消费了:" + str+ ",消费的时间:" + sdf.format(System.currentTimeMillis()));
} catch (Exception e) {
e.printStackTrace();
//如果出了异常,则重新放回队列
System.out.println("消费异常,重新回到队列");
provider.sendMessage(str);
} finally {
redisMQ.deMessage(RedisMQ.QUEUE_NAME, id);
redisMQ.deMsgPool(id);
}
}
}
}
}
}
6.配置信息
<!--1依赖引入-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2yml配置
spring:
redis:
database: 1
host: 127.0.0.1
port: 6379
以上代码已经实现了延迟消费功能,现在来测试一下,调用MessageProvider的sendMessage方法,我设定了30秒
可以看到结果
因为我们是用定时器去轮询的,会出现误差
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!