Redis集群监听key过期事件
By:Roy.LiuLast updated:2021-07-28
这篇文章是上一篇文章(Redis单机监听key过期事件)的一个补充,因为在生产环境下,通常使用的是redis cluster, 并非单机模式。但在redis cluster的情况下,如果再用单机模式的方式,就不能全面得到过期的key, 只能获取到部分。原因redis官方已经说明了。
官网说明地址: https://redis.io/topics/notifications/ 在介绍的最后面部分,介绍了redis cluster下,监听key失效等情况,为什么只能得到部分,而不是全部。

event 事件并不是我们常用的发布,订阅,它并没有广播到各个集群节点。而应用程序在启动时,只是连接到了集群节点中的一个而已。所以这个时候,你只能接收到key值存放在这个节点过期的事件。其他节点过期的事件,你是接收不到的。那么解决的方法就是监听所有节点。自己去实现方法。
前期准备
与上一篇文章(Redis单机监听key过期事件)一致。多所有redis 集群节点,每一个节点都需要修改,如下图所示:

然后重启redis 集群.
程序修改
增加类 RedisClusterMessageListenerFactory ,用来自己构造监听所有node节点的类:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import redis.clients.jedis.JedisShardInfo;
public class RedisClusterMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {
private static final Logger log = LoggerFactory.getLogger(RedisClusterMessageListenerFactory.class);
private DefaultListableBeanFactory beanFactory;
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private MessageListener messageListener;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = (DefaultListableBeanFactory) beanFactory;
}
public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
if (redisClusterConnection != null) {
Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
for (RedisClusterNode node : nodes) {
if (node.isMaster()) {
log.info(node.getHost() + ":" + node.getPort() + " is master, hash code is: " + node.hashCode());
String containerBeanName = "messageContainer" + node.hashCode();
if (beanFactory.containsBean(containerBeanName)) {
return;
}
JedisConnectionFactory factory = new JedisConnectionFactory(
new JedisShardInfo(node.getHost(), node.getPort()));
BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
.genericBeanDefinition(RedisMessageListenerContainer.class);
containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
containerBeanDefinitionBuilder.setLazyInit(false);
beanFactory.registerBeanDefinition(containerBeanName,
containerBeanDefinitionBuilder.getRawBeanDefinition());
RedisMessageListenerContainer container = beanFactory
.getBean(containerBeanName, RedisMessageListenerContainer.class);
String listenerBeanName = "messageListener" + node.hashCode();
if (beanFactory.containsBean(listenerBeanName)) {
return;
}
container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired"));
container.start();
}
}
}
}
}RedisClusterMessageListenerContainerConfig 类,用来加载上面自定义的监听redis node的类
import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@ConditionalOnExpression("!'${spring.redis.cluster.nodes:}'.isEmpty()")
@Configuration
public class RedisClusterMessageListenerContainerConfig {
@Bean
public RedisClusterMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
RedisConnectionFactory redisConnectionFactory) {
RedisClusterMessageListenerFactory beans = new RedisClusterMessageListenerFactory();
beans.setBeanFactory(beanFactory);
beans.setRedisConnectionFactory(redisConnectionFactory);
return beans;
}
}监听类 KeyExpiredEventMessageListener 实际可能处理业务的地方:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class KeyExpiredEventMessageListener implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeyExpiredEventMessageListener.class);
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String topic = new String(channel);
String expireKey = new String(body);
log.info("exporeKey: {}, topic: {}", expireKey, topic);
}
}运行后效果图:

From:一号门
Previous:Redis单机监听key过期事件
Next:springboot 读取资源文件

COMMENTS