NIO Transport Protocol of ActiveMQ

Note: Before changing the transport protocol, you must ensure that ActiveMQ is not running!  

brief introduction

NIO Transport and Conventions TCP transmission Very similar. The difference is that it is implemented using the NIO API, which can help improve performance and scalability. NIO is only a server-side transport option. Try to use it on the client side to instantiate regular TCP transmissions.  

To switch from TCP to NIO, you only need to change the scheme part of the URI. This is an example defined in the agent's XML configuration file:

<broker>
  ...
  <transportConnectors>
    <transportConnector name="nio" uri="nio://0.0.0.0:61616"/>  
  </<transportConnectors>
  ...
</broker>

Note: If ActiveMQ's network listening ports are not specifically specified, then these ports will use the BIO network IO model (OpenWire, STOMP, AMQPS...). Therefore, in order to improve the network throughput performance of single node, it is necessary to specify the Active network IO model. As shown below, the URL format header starts with "You", which indicates that this port uses the NIO network IO model based on TCP protocol:

Scenarios suitable for using NIO protocol:

  1. There may be a large number of Clients to connect to Broker, and in general, a large number of Clients to connect to Broker is limited by the threads of the operating system. Therefore, NIO implementations require fewer threads to run than TCP implementations, so NIO protocol is recommended.
  2. Perhaps there is a very slow network transmission for Broker, and NIO provides better performance than TCP.

Configuration grammar

nio://hostname:port?key=value

Configuration options and TCP transmission Same.

Note: The original NIO transport is an alternative to tcp transport using OpenWire protocol. Other network protocols, such as AMQP, MQTT and Stomp, also have their own NIO transport implementations. It is usually configured by adding the "+nio" suffix to the protocol prefix, such as

mqtt+nio://localhost:1883

All protocol-specific configurations should also apply to NIO versions of transport.

Example (using publish/subscribe mode, Topic topic)

1. Change the activemq.xml configuration file, add it to the label < transportConnectors > and comment out the TCP URL:

<transportConnector name="nio" uri="nio://localhost:61618?trace=true"/>

2. Configure the Spring file, change the connection URL to NIO, and note that the port number is changed to 61618.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
">

    <!--Configuration Pack Automatic Scanning-->
    <context:component-scan base-package="com.hern.avtivemq"/>

    <!--Configuration Autoannotation-->
    <context:annotation-config></context:annotation-config>

    <!--Configuration of producers-->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <!-- ActiveMq Connecting factories -->
        <property name="connectionFactory">
            <!--Really productive Connection Of ConnectionFactory´╝îBy the corresponding JMS Service Provider-->
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <!--To configure ActiveMQ Of Broker Connect URL-->
                <property name="brokerURL" value="nio://127.0.0.1:61618"/>
            </bean>
        </property>
        <!--maximum connection-->
        <property name="maxConnections" value="100"/>
    </bean>

    <!--Configuration queue Queue-->
    <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>

    <!--Configuration theme Topic-->
    <bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-active-topic"/>
    </bean>

    <!--Spring Provided JMS Tool class, which can send and receive messages, etc.-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="defaultDestination" ref="activeMQTopic"/>
        <!--<property name="defaultDestination" ref="activeMQQueue"/>-->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <!--Configuration monitoring-->
    <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="destination" ref="activeMQTopic"/>
        <!--Refer to your own listener-->
        <property name="messageListener" ref="myMessageListener"/>
        <!--Single message confirmation-->
        <property name="sessionAcknowledgeMode" value="4"/>
    </bean>
</beans>

3. Generator

package com.hern.avtivemq;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/*
* Message producer
* */
@Service
public class SpringMQ_Produce {

    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("application-configure.xml");

        SpringMQ_Produce springMQ_produce = (SpringMQ_Produce) applicationContext.getBean("springMQ_Produce");

        //send message
        springMQ_produce.jmsTemplate.send(session -> {
            TextMessage textMessage = session.createTextMessage("---Spring integration ActiveMQ---");
            System.out.println("The message sent is:" + textMessage.getText());
            return textMessage;
        });

        System.out.println("Send End");
    }
}

4. Consumers

package com.hern.avtivemq;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

/*
* News consumers
* */
@Service
public class SpringMQ_Consume {

    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("application-configure.xml");

        SpringMQ_Consume springMQ_consume = (SpringMQ_Consume) applicationContext.getBean("springMQ_Consume");

        //receive messages
        String result = (String) springMQ_consume.jmsTemplate.receiveAndConvert();

        System.out.println("The received message is:" + result);
    }
}

5. Monitor

package com.hern.avtivemq;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@Component("myMessageListener")
public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message != null && message instanceof TextMessage){
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("The listener accepts:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

 

Tags: Spring network Apache xml

Posted on Sat, 10 Aug 2019 03:42:24 -0700 by kester