rabbitmq delay queue demo

Engineering structure:

 

Define the version that the jar package depends on. The version is very important. rabbit depends on spring and must be consistent. Otherwise, an error is reported

<properties>
    <springframework.version>4.2.7.RELEASE</springframework.version>
    <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version>
    <junit.version>4.12</junit.version>
</properties>

dependencies:

<dependencies>

    <!-- LOGGING begin -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.5</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.0.13</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.0.13</version>
    </dependency>
    <!-- Code calls directly common-logging Will be bridged to slf4j -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>jcl-over-slf4j</artifactId>
        <version>1.7.5</version>
    </dependency>
    <!-- LOGGING end -->

    <!--springframework-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${springframework.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${springframework.version}</version>
    </dependency>

    <!-- rabbitmq spring rely on -->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>${spring-rabbit.version}</version>
    </dependency>

    <!--common utils-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.3.2</version>
    </dependency>

    <!--test begin-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>${junit.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${springframework.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <!--test end-->
</dependencies>

 

spring-applicationContext:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd 
         http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="fileEncoding" value="UTF-8"></property>
        <property name="locations">
            <list>
                <value>classpath:applicationContext.properties</value>
            </list>
        </property>
    </bean>

    <context:annotation-config/>

    <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
    <!-- Configure scan path -->
    <context:component-scan base-package="demo"></context:component-scan>

    <!--rabbit server parameter -->
    <rabbit:connection-factory id="connectionFactory"
                               username="${paycenter.mq.user.username}"
                               password="${paycenter.mq.user.password}"
                               addresses="${paycenter.mq.user.host}"></rabbit:connection-factory>

    <import resource="classpath:mq-applicationContext-producer.xml"/>
    <import resource="classpath:mq-applicationContext-consumer.xml"/>
</beans>

 

mq-applicationContext-producer.xml:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">

    <!--By specifying the admin Information, current producer Medium exchange and queue Will be in rabbitmq Auto build on server -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring amqp The default is jackson A plug-in for,Purpose to convert data produced by producers into json Put into message queue -->
    <bean id="mqMessageConverter"
          class="org.springframework.amqp.support.converter.SimpleMessageConverter">
    </bean>

    <!--<bean id="publisherConfirmsReturns" class="com.emaxcard.mq.rabbit.PublisherConfirmsReturns"></bean>-->


    <!--========================Delay queue configuration begin =========================-->
    <rabbit:queue id="agentpayqueryQueue2" durable="true" auto-delete="true" exclusive="false"
                  name="agentpayqueryQueue2"/>
    <rabbit:direct-exchange id="agentpayqueryExchange2" durable="true" auto-delete="true" name="agentpayqueryExchange2">
        <rabbit:bindings>
            <rabbit:binding queue="agentpayqueryQueue2" key="delay"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <rabbit:queue id="agentpayqueryQueue1" durable="true" auto-delete="true" exclusive="false"
                  name="agentpayqueryQueue1">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="agentpayqueryExchange2"/>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:direct-exchange id="agentpayqueryExchange1" durable="true" auto-delete="true" name="agentpayqueryExchange1">
        <rabbit:bindings>
            <rabbit:binding queue="agentpayqueryQueue1" key="delay"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--Definition RabbitTemplate Example-->
    <!--confirm-callback="publisherConfirmsReturns" return-callback="publisherConfirmsReturns"-->
    <rabbit:template id="agentpayQueryMsgTemplate"
                     exchange="agentpayqueryExchange1" routing-key="delay"
                     connection-factory="connectionFactory" message-converter="mqMessageConverter"
                     mandatory="true"
    />
    <!--========================Delay queue configuration end =========================-->

</beans>

 

mq-applicationContext-consumer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">


    <bean id="agentpayQueryConsumer" class="demo.TestMQConsumer" />

    <!-- TODO Subsequent deletion
    receive-timeout:Timeout waiting for receive affects connection creation and destruction

    concurrency:Number of consumers
    max-concurrency:Number of largest consumers
    min-start-interval:Start successively to reduce concurrent environment(Or the sudden network delay of the tripartite system) Performance loss due to large number of connections
    min-stop-interval:Continuous destruction reduces the sudden quiet, resulting in a large number of available connections being destroyed
    min-consecutive-active: continuity N Time out when no reception occurs is considered as the need to create a consumer
    min-consecutive-idle: continuity N The consumer needs to be destroyed in case of receiving timeout

    prefetch:The number of pre read entries per consumer is set to 1 (the default is 5) because the performance bottleneck of asynchronous call is in the network and three-party system. Only one message is ACK Will receive the next message
    transaction-size:Will affect prefetch Quantity
    -->
    <!--  Monitor -->
    <!-- queue litener  Observe the listening mode. When a message arrives, it will notify the listener on the corresponding queue-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"
                               max-concurrency="20"
                               concurrency="5"
                               prefetch="10">
        <rabbit:listener ref="agentpayQueryConsumer" queue-names="agentpayqueryQueue2" />
    </rabbit:listener-container>
</beans>

 

 

Producer class:
package demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class TestMQProducer {

    private static Logger logger = LoggerFactory.getLogger(TestMQProducer.class.getSimpleName());

    @Autowired
    private RabbitTemplate agentpayQueryMsgTemplate;

    @Test
    public void test() throws Exception {
        for (int i = 0; i <= 100; i++) {
            Object data = String.valueOf(i);
            agentpayQueryMsgTemplate.convertAndSend(data);
            logger.info("Join the team:{}", data);
        }
        Thread.sleep(12000);
    }
}

 

 

Consumer class:
package demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class TestMQConsumer implements MessageListener {

    private static Logger logger = LoggerFactory.getLogger(TestMQConsumer.class.getSimpleName());

    public void onMessage(Message message) {
        String data = new String(message.getBody());

        try {
            //Slow simulation processing
            Thread.sleep(1);

            logger.info("Team out:{}", data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

 

 

This is the end of the code.

 

Note: when defining the queue above, I set the auto delete property to true, so when the consumer finishes consuming and closes the connection, the queue will be deleted automatically. So does exchange. (from the mq console, it can be seen that agentpayqueryQueue2 and agentpayqueryExchange2 in chestnut disappear automatically after execution. agentpayqueryQueue1 and agentpayqueryExchange1 still exist.)

spring-rabbit-x.xml explanation of auto delete attribute of queue and exchange:

Flag indicating that an queue will be deleted when it is no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:queue)

Flag indicating that an exchange will be deleted when no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:exchange)

 

Description of the consumer's concurrency:

Similarly, see the explanation of spring-rabbit-x.xml:

The number of concurrent consumers to start for each listener initially.
See also 'max-concurrency'.

 

The value I set above is 5. From the mq console, see the following figure for the consumer of the queue:

From the outgoing log, it can be seen that there are 5 threads consuming these messages.

 

 

 

Tags: Java Spring xml Junit encoding

Posted on Sun, 01 Dec 2019 09:03:33 -0800 by goldberg