Redis集群监听key过期事件

摘要: redis cluster中event 事件并不是我们常用的发布,订阅,它并没有广播到各个集群节点。而应用程序在启动时,只是连接到了集群节点中的一个而已。所以这个时候,你只能接收到key值存放在这个节点过期的事件。其他节点过期的事件,你是接收不到的。那么解决的方法就是监听所有节点。自己去实现方法。

这篇文章是上一篇文章(Redis单机监听key过期事件)的一个补充,因为在生产环境下,通常使用的是redis cluster, 并非单机模式。但在redis cluster的情况下,如果再用单机模式的方式,就不能全面得到过期的key, 只能获取到部分。原因redis官方已经说明了。

官网说明地址: https://redis.io/topics/notifications/ 在介绍的最后面部分,介绍了redis cluster下,监听key失效等情况,为什么只能得到部分,而不是全部。

event 事件并不是我们常用的发布,订阅,它并没有广播到各个集群节点。而应用程序在启动时,只是连接到了集群节点中的一个而已。所以这个时候,你只能接收到key值存放在这个节点过期的事件。其他节点过期的事件,你是接收不到的。那么解决的方法就是监听所有节点。自己去实现方法。


前期准备

与上一篇文章(Redis单机监听key过期事件)一致。多所有redis 集群节点,每一个节点都需要修改,如下图所示:

然后重启redis 集群.


程序修改

增加类 RedisClusterMessageListenerFactory ,用来自己构造监听所有node节点的类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import redis.clients.jedis.JedisShardInfo;

public class RedisClusterMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {

    private static final Logger log = LoggerFactory.getLogger(RedisClusterMessageListenerFactory.class);

    private DefaultListableBeanFactory beanFactory;

    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private MessageListener messageListener;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
    }

    public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
        if (redisClusterConnection != null) {
            Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
            for (RedisClusterNode node : nodes) {
                if (node.isMaster()) {
                    log.info(node.getHost() + ":" + node.getPort() + " is master, hash code is: " + node.hashCode());
                    String containerBeanName = "messageContainer" + node.hashCode();
                    if (beanFactory.containsBean(containerBeanName)) {
                        return;
                    }
                    JedisConnectionFactory factory = new JedisConnectionFactory(
                            new JedisShardInfo(node.getHost(), node.getPort()));
                    BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
                            .genericBeanDefinition(RedisMessageListenerContainer.class);
                    containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
                    containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
                    containerBeanDefinitionBuilder.setLazyInit(false);
                    beanFactory.registerBeanDefinition(containerBeanName,
                            containerBeanDefinitionBuilder.getRawBeanDefinition());

                    RedisMessageListenerContainer container = beanFactory
                            .getBean(containerBeanName, RedisMessageListenerContainer.class);
                    String listenerBeanName = "messageListener" + node.hashCode();
                    if (beanFactory.containsBean(listenerBeanName)) {
                        return;
                    }
                    container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired"));
                    container.start();
                }
            }
        }
    }

}
RedisClusterMessageListenerContainerConfig 类,用来加载上面自定义的监听redis node的类
import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;

@ConditionalOnExpression("!'${spring.redis.cluster.nodes:}'.isEmpty()")
@Configuration
public class RedisClusterMessageListenerContainerConfig {

    @Bean
    public RedisClusterMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
                                                                          RedisConnectionFactory redisConnectionFactory) {
        RedisClusterMessageListenerFactory beans = new RedisClusterMessageListenerFactory();
        beans.setBeanFactory(beanFactory);
        beans.setRedisConnectionFactory(redisConnectionFactory);
        return beans;
    }

}


监听类 KeyExpiredEventMessageListener 实际可能处理业务的地方:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

@Service
public class KeyExpiredEventMessageListener implements MessageListener {

    private static final Logger log = LoggerFactory.getLogger(KeyExpiredEventMessageListener.class);

    @Override
    public void onMessage(Message message, byte[] pattern) {

        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String topic = new String(channel);
        String expireKey = new String(body);

        log.info("exporeKey: {}, topic: {}", expireKey, topic);
    }

}

运行后效果图:

上一篇: Redis单机监听key过期事件
下一篇: springboot 读取资源文件
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号