RocketMQ source code analysis RPC communication

In the first part, the protocol format, message encoding and decoding, communication mode (synchronous / asynchronous / one-way), message sending / receiving and asynchronous callback of RocketMQ are introduced. This paper focuses on the Netty multithreading model of RocketMQ message queuing RPC communication part.

1, Why use Netty as a high-performance communication library?

When looking at the RPC communication part of RocketMQ, many students may have such questions. Why does RocketMQ choose Netty instead of directly using the NIO of JDK for network programming? It is necessary to introduce netty briefly. Netty is an open source framework of high performance network communication that encapsulates the NiO Library of JDK. It provides asynchronous, event driven network application framework and tools for rapid development of high-performance, high reliability network server and client programs. The following mainly lists the reasons why netty is chosen as the underlying communication library by RPC communication module of the following general systems (the author thinks that RPC of rocketmq is also based on this choice of netty):

(1) Netty's programming API is simple to use and low to develop, so there is no need for programmers to pay attention to and understand too many NIO programming models and concepts;

(2) For programmers, customized development can be carried out according to business requirements, and the communication framework can be flexibly customized and expanded through Netty's channel handler;

(3) The Netty framework supports unpacking / unpacking, exception detection and other mechanisms, so that programmers can get rid of the tedious details of JAVA NIO, and only need to pay attention to the business processing logic;

(4) Netty solves the bug (Epoll bug) of JDK NIO (which will lead to null polling of Selector and eventually 100% CPU);

(5) Netty framework optimizes threads and selector s in some details. The well-designed reactor multithreading model can achieve very efficient concurrent processing;

(6) Netty has been fully verified in many open source projects (Hadoop RPC framework avro uses netty as the communication framework), with good robustness / reliability.

2, Netty multithreading model of RPC communication in RocketMQ

The RPC communication part of RocketMQ adopts the Reactor multithreading mode of * * "1+N+M1+M2" *, which extends and optimizes the network communication part to some extent. In this section, let's see the specific design and implementation of this part.

2.1. Design concept and brief introduction of Netty's Reactor multithreading model

It is necessary to briefly introduce the Reactor multithreading model of Netty. The design idea of Reactor multithreading model is "divide and rule + event driven".

(1) Generally speaking, the whole process of a network request connection can be divided into the following steps: accept, read, decode/encode, process and send. The Reactor model maps every step into a task. The smallest logical unit executed by the server thread is no longer a complete network request, but a task executed in a non blocking way.

(2) Event driven: each task corresponds to a specific network event. When the task is ready, the Reactor receives the corresponding network event notification and distributes the task to the Handler bound with the corresponding network event for execution.

2.2. Design and implementation of 1+N+M1+M2 Reactor multithreading for RPC communication in RocketMQ

(1) The Reactor multithreading design of RPC communication in RocketMQ and the RPC communication process in RocketMQ use Netty component as the underlying communication library, also follow the Reactor multithreading model, and do some expansion and Optimization on this. Next, I will give a Netty multithreading model framework diagram of RocketMQ's RPC communication layer, so that you can have a general understanding of the multithreading separation design in RocketMQ's RPC communication.From the above block diagram, you can roughly understand the Reactor multithreading model of nettyremoving server in RocketMQ. A Reactor main thread (eventLoopGroupBoss, i.e. the 1 above) is responsible for listening to the TCP network connection request, after the connection is established, it will be dropped to the Reactor thread pool (eventloopgroup selector, i.e. "n" above, the default setting in the source code is 3), which is responsible for registering the socket with which the connection is established to the Select (in the source code of RocketMQ, NIO and Epoll will be automatically selected according to the OS type, or they can be configured by parameters), and then listen to the real network data. After getting the network data, it will be thrown to the Worker thread pool (defaulteventexecutor group, that is, "M1" above, which is set to 8 by default in the source code). In order to deal with RPC network requests more efficiently, the Worker thread pool here is specially used to deal with Netty network communication (including encoding / decoding, idle link management, network connection management and network request processing). The business operations are executed in the business thread pool (this content is also mentioned in the RPC communication (I) of RocketMQ). According to the business request code of RomotingCommand, go to the processorTable local cache variable to find the corresponding Processor is then encapsulated as a task task and submitted to the corresponding business processor processing thread pool for execution (sendMessageExecutor, for example, "M2" above). The following table lists the "1+N+M1+M2" Reactor multithreading model described above

Number of threads Thread name Thread details
  1 NettyBoss_%d
  N NettyServerEPOLLSelector_%d_%d
  M1 NettyServerCodecThread_%d
  M2 RemotingExecutorThread_%d

(2) The concrete implementation of the Reactor multithread code of RPC communication in RocketMQ. Having finished the overall design and process of the Reactor multithread, you should have a more comprehensive understanding of the Netty part of RPC communication in RocketMQ. Then, from the source code point of view, we can see some details (you need to read Java when looking at this part of the code Relevant concepts and technical points of NiO and Netty are understood). When the instance of nettyremoteingserver is initialized, various related variables will be initialized, including serverBootstrap, nettyServerConfig parameter, channelEventListener listener, and two Netty EventLoopGroup thread pools of eventLoopGroupBoss and eventLoopGroupSelector will be initialized at the same time (note here that if it is Linux platform, and native is enabled epoll, use EpollEventLoopGroup, which is written in JNI, c; otherwise, use NioEventLoopGroup of Java NiO.) , the specific code is as follows:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;
      //Omit some codes
      //When initializing, nThreads is set to 1, which means that the thread of dispatcher link management and distribution request of Remoting server is 1, which is used to receive TCP connection of client
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        /**
         * Set NIO or Epoll as the Selector thread pool according to the configuration
         * If it's on Linux platform and native epoll is enabled, epoll is written with EpollEventLoopGroup, which is also written with JNI and c; otherwise, NioEventLoopGroup of Java NIO is used.
         *
         */
        if (useEpoll()) {
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        //Omit some codes

After the NettyRemotingServer instance is initialized, it starts. In the start-up phase, the Server will bind 1 acceptor thread (eventLoopGroupBoss), N IO threads (eventLoopGroupSelector) and M1 Worker threads (defaultEventExecutorGroup) instantiated before. The role of each thread pool has also been described in the previous section. It should be noted here that after the Worker thread gets the network data, it will be handed over to the channel pipeline of Netty (which adopts the responsibility chain design mode) to execute from Head to Tail. These handlers are specified when creating the nettyremoteingserver instance. NettyEncoder and NettyDecoder are responsible for encoding and decoding between the network transmission data and remotingcommand. After NettyServerHandler gets the remotingcommand from decoding, according to the RemotingCommand.type To determine whether it is a request or a response for the corresponding processing, it is encapsulated into different task tasks according to the business request code, and then submitted to the corresponding business processor processing thread pool for processing.  

@Override
   public void start() {
       //The default processing thread pool group is used to process the logical operations of multiple later netty handlers

       this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
               nettyServerConfig.getServerWorkerThreads(),
               new ThreadFactory() {

                   private AtomicInteger threadIndex = new AtomicInteger(0);

                   @Override
                   public Thread newThread(Runnable r) {
                       return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                   }
               });
       /**
        * First, let's look at the Reactor thread model of RocketMQ NettyServer,
        * A Reactor main thread is responsible for listening to TCP connection requests;
        * After the connection is established, it will be sent to the Reactor thread pool, which is responsible for registering the socket with which the connection is established to the selector
        * Go up (there are two ways, NIO and Epoll, configurable), and then listen to the real network data;
        * After getting the network data, it will be lost to the Worker thread pool;
        *
        */
       //Rocketmq - > java NiO's 1+N+M model: 1 acceptor thread, n IO threads, M1 worker threads.
       ServerBootstrap childHandler =
               this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                       .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                       .option(ChannelOption.SO_BACKLOG, 1024)
                       //The server processes the client connection requests in sequence, so only one client connection can be processed at a time. When multiple clients come, the server puts the client connection requests that cannot be processed in the queue and waits for processing. The backlog parameter specifies the size of the queue
                       .option(ChannelOption.SO_REUSEADDR, true)//This parameter indicates that the local address and port can be reused
                       .option(ChannelOption.SO_KEEPALIVE, false)//When this option is set, if there is no data communication within two hours, TCP will automatically send an active detection data message.
                       .childOption(ChannelOption.TCP_NODELAY, true)//The function of this parameter is to disable the use of Nagle algorithm for the instant transmission of small data
                       .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//These two parameters are used to operate on the receive buffer and the send buffer
                       .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                       .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                       .childHandler(new ChannelInitializer<SocketChannel>() {
                           @Override
                           public void initChannel(SocketChannel ch) throws Exception {

                               ch.pipeline()
                                       .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                               new HandshakeHandler(TlsSystemConfig.tlsMode))
                                       .addLast(defaultEventExecutorGroup,
                                               new NettyEncoder(),//rocketmq decoder, which covers the encode and decode methods of the parent class respectively
                                               new NettyDecoder(),//rocketmq encoder
                                               new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty's own heartbeat Manager
                                               new NettyConnectManageHandler(),//The connection manager is responsible for capturing new connection, disconnection, exception and other events, and then scheduling them to the nettyevent executor processor for processing.
                                               new NettyServerHandler()//When a message passes through the preceding decoding steps, it is then dispatched to the channelRead0 method, and then distributed according to the message type.
                                       );
                           }
                       });

       if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
           childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
       }

       try {
           ChannelFuture sync = this.serverBootstrap.bind().sync();
           InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
           this.port = addr.getPort();
       } catch (InterruptedException e1) {
           throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
       }

       if (this.channelEventListener != null) {
           this.nettyEventExecutor.start();
       }

       //Scan the responseTable regularly to get the returned result, and the processing times out
       this.timer.scheduleAtFixedRate(new TimerTask() {

           @Override
           public void run() {
               try {
                   NettyRemotingServer.this.scanResponseTable();
               } catch (Throwable e) {
                   log.error("scanResponseTable exception", e);
               }
           }
       }, 1000 * 3, 1000);
   }

From the above description, we can summarize the Reactor thread pool model block diagram of RocketMQ's RPC communication part.As a whole, we can see that with the help of Netty's multithread model, RocketMQ's RPC communication is separated from its server listening thread and IO thread, and the business logic of RPC communication layer is further separated from the thread handling specific business. Simple services with controllable time are directly completed in the RPC communication part. Complex and uncontrollable services are submitted to the backend business thread pool for processing, which improves the communication efficiency and the overall performance of MQ. (ps: NioEventLoop is abstracted to represent a thread continuously executing processing tasks in cycles. Each NioEventLoop has a selector, which is used to listen to the socket link bound to it.)

3, Summary

In the process of reading RocketMQ carefully, I have gained a lot of knowledge about network communication design technology. For the children's shoes of RocketMQ just contacted with the open source version, if you want to master the technical knowledge points of RPC communication part, you need to use the local environment to debug and read the source code repeatedly. Limited to the author's lack of talent and learning, there may be some incomprehensible points in the content of this article. If there are any unreasonable points, please leave a message to discuss. In the future, relevant technical articles of other modules of RocketMQ (Client, Broker, NameServer, etc.) will be published. Please pay attention. Here, I'll call myself a Call. Interested friends can pay attention to my personal official account: "a unique blog". Articles on Java concurrency, Spring, database and message queue will be published on this official account. Welcome to exchange and discuss.

 

Tags: Netty network Java encoding

Posted on Sat, 06 Jun 2020 03:33:48 -0700 by carylson