redis publish and subscribe Java code implementation

In addition to being able to cache data, Redis also implements the pub / sub message communication mode: the sender (pub) sends messages, and the subscriber (sub) receives messages.

In order to implement the publish and subscribe mechanism of redis, first open the redis service; secondly, introduce the jar package required by redis, and add the following code in the pom.xml configuration file:

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.1.0</version>
</dependency>

Because the subscription message channel needs to be triggered when tomcat is started, a listener listener needs to be created to implement redis subscription in the listener. The listener configured in web.xml is as follows:

<listener>
<listener-class>com.test.listener.InitListener</listener-class>
</listener>

I. subscription message (implemented by InitListener)

redis supports multi-channel subscription. A client can subscribe to multiple message channels at the same time. As shown in the following code, it has subscribed to 13 channels. Because the subscription mechanism is thread blocking, an additional thread needs to be opened for processing subscription messages and receiving messages.

public class InitListener implements ServletContextListener{
    private Logger logger = Logger.getLogger(InitListener.class);
    
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        logger.info("start-up tomcat");// Connect redis
        Map<String, String> proMap = PropertyReader.getProperties();
        final String url = proMap.get("redis.host");
        final Integer port = Integer.parseInt(proMap.get("redis.port"));
        final ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
        final RedisSubListener redisSubListener = (RedisSubListener) classPathXmlApplicationContext.getBean("redisSubListener");
        // To prevent blocking tomcat Start, start thread execution
        new Thread(new Runnable(){  
            public void run(){  
                // Connect redis,Build monitoring
                Jedis jedis = null;
                while(true){
                    //Decode resource update notification,Screen selection reply,Screen selection stop reply,Plan initiation,Plan stop,Wheel cut start,Wheel cut stop,Plan start reply,Plan stop reply,Turning start reply,Rotation stop Recovery,Monitoring screen split status notification,Picture status notification
                    String[] channels = new String[] { "decodeResourceUpdateNtf", "tvSplitPlayRsp","tvSplitPlayStopRsp",
                            "planStartStatusNtf", "planStopStatusNtf", "pollStartStatusNtf", "pollStopStatusNtf",
                            "planStartRsp","planStopRsp","pollStartRsp","pollStopRsp","tvSplitTypeNtf","tvSplitStatusNtf"};
                    try{
                        jedis = new Jedis(url,port);
                        logger.info("redis Pull subscription channel");
                        jedis.subscribe(redisSubListener,channels);
                        logger.info("redis Subscription end");
                    }catch(JedisConnectionException e){
                        logger.error("Jedis Connection exception,Abnormal information :" + e);
                    }catch(IllegalStateException e){
                         logger.error("Jedis abnormal,Abnormal information :" + e);
                    }
                    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(jedis != null){
                        jedis = null;
                    }
                }
            }})
        .start();
    }

Finally, access the following configuration in the spring configuration file:

<!-- redis -->
     <bean id="redisMessageService" class="com.test.service.impl.RedisMessageServiceImpl" scope="singleton">
         <property name="webSocketService"><ref local="webSocketService" /></property>
         <property name="tvSplitStatusDao" ref="tvSplitStatusDao"></property>
     </bean>
     <bean id="redisSubListener" class="com.test.common.RedisSubListener" scope="singleton">
         <property name="redisMessageService"><ref local="redisMessageService" /></property>
     </bean>
RedisMessageServiceImpl is used to process received redis messages.

II. News release

public class RedisPublishUtil {
    private Logger logger = Logger.getLogger(RedisPublishUtil.class);
    public static Jedis pubJedis;
    private static Map<String, String> proMap = PropertyReader.getProperties();
    private static final String redisPort = proMap.get("redis.port");
    private static String url = proMap.get("redis.host");
    private static final int port = Integer.parseInt(redisPort);
    
    public void setPubJedis(Jedis jedis) {
        RedisPublishUtil.pubJedis = jedis;
    }
    
    public Jedis getPubJedis() {
        if (pubJedis == null) {
            createJedisConnect();
        }
        // Return object
        return pubJedis;
    }
    
    public Jedis createJedisConnect(){
        // Connect redis
        logger.info("===Create connection jedis=====");
        try {
            pubJedis = new Jedis(url, port);
        } catch (JedisConnectionException e) {
            logger.error("Jedis Connection exception,Abnormal information :" + e.getMessage());
            try {
                Thread.sleep(1000);
                logger.info("Initiate reconnection jedis");
                createJedisConnect();
            } catch (InterruptedException except) {
                except.printStackTrace();
            }
        }
        // Return object
        return pubJedis;
    }
    //Public publishing interface
    public void pubRedisMsg(String msgType,String msg){
        logger.info("redis Prepare to publish message content:" + msg);
        try {
            this.getPubJedis().publish(msgType, msg);

        } catch (JedisConnectionException e) {
            logger.error("redis Failed to publish message!", e);
            this.setPubJedis(null);
            logger.info("Republish the message, channel="+msgType);
            pubRedisMsg(msgType, msg);
        }
    }

}
public class PropertyReader {

     private static Logger logger = Logger.getLogger(PropertyReader.class);
    
    /*
     * Get profile for database link
     */
    public static Map<String,String> getProperties(){
        logger.info("read redis Profile start...");
        
         Properties prop = new Properties();     
        
         Map<String,String> proMap  = new HashMap<String,String>();
         
        try {
             //Read property file redis.properties
            InputStream in= PropertyReader.class.getClassLoader().getResourceAsStream("redis.properties");  
            
            prop.load(in);     ///Load property list
            Iterator<String> it=prop.stringPropertyNames().iterator();
            while(it.hasNext()){
                String key=it.next();
                proMap.put(key, prop.getProperty(key));
            }
            in.close();
            logger.info("read redis Profile success...");
        } catch (Exception e) {
            logger.error("read redis Configuration file exception!", e);
            e.printStackTrace();
        }
        return proMap;
    }
}

Tags: Java Redis Jedis xml Tomcat

Posted on Sun, 01 Dec 2019 14:47:53 -0800 by rortelli