Netty Source Analysis - How Data Flows in Pipelines

In the previous article, we have learned about the role of pipeline in netty, such as a pipeline that controls the reading and writing of byte streams. In this article, we continue to dig deeper into the pipeline's role in event propagation.

Unsafe

As the name implies, unsafe means to tell you not to use Unsafe and its derivatives directly in your application.

netty's official explanation is as follows

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread

Unsafe is defined in Channel and belongs to the inner class of Channel, which indicates that Unsafe and Channel are closely related.

Here are all the methods of unsafe interface

interface Unsafe {
   RecvByteBufAllocator.Handle recvBufAllocHandle();
   
   SocketAddress localAddress();
   SocketAddress remoteAddress();

   void register(EventLoop eventLoop, ChannelPromise promise);
   void bind(SocketAddress localAddress, ChannelPromise promise);
   void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
   void disconnect(ChannelPromise promise);
   void close(ChannelPromise promise);
   void closeForcibly();
   void beginRead();
   void write(Object msg, ChannelPromise promise);
   void flush();
   
   ChannelPromise voidPromise();
   ChannelOutboundBuffer outboundBuffer();
}

According to the function, it can be divided into memory allocation, Socket quadruple information, registration event loop, binding network card port, Socket connection and closure, Socket read and write, it can be seen that these operations are related to the bottom of jdk.

Unsafe inheritance structure

 

 

NioUnsafe adds the following interfaces to Unsafe

public interface NioUnsafe extends Unsafe {
    SelectableChannel ch();
    void finishConnect();
    void read();
    void forceFlush();
}

In terms of the added interface and class name, NioUnsafe adds the ability to access the Selectable Channel of the underlying jdk, and defines the read method for reading data from the Selectable Channel.

Classification of Unsafe

From the above inheritance structure, we can summarize two types of Unsafe classification, one is NioByteUnsafe related to the byte data read and write of the connection, and the other is NioMessageUnsafe related to the operation of establishing a new connection.

Read in NioByteUnsafe: Delegate to the external class NioSocket Channel

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

The last line is already related to the bottom of JDK and ByteBuf in netty, reading the byte data of jdk's Selectable Channel into netty's ByteBuf

Read in NioMessageUnsafe: Delegate to the external class NioSocketChannel

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    if (ch != null) {
        buf.add(new NioSocketChannel(this, ch));
        return 1;
    }
    return 0;
}

NioMessageUnsafe reads simply by calling jdk's accept() method to create a new connection

Write in NioByteUnsafe: Delegate to external class NioSocket Channel

@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

The last line has been associated with the bottom of JDK and ByteBuf in netty, writing byte data from netty's ByteBuf to jdk's Selectable Channel

head in pipeline

NioEventLoop

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
     //Readable data for new or existing connections
     if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
         unsafe.read();
     }
}

NioByteUnsafe

@Override
public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // Establish ByteBuf distributor
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    do {
        // Allocate one ByteBuf
        byteBuf = allocHandle.allocate(allocator);
        // Read the data to the assigned ByteBuf Go in
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        if (allocHandle.lastBytesRead() <= 0) {
            byteBuf.release();
            byteBuf = null;
            close = allocHandle.lastBytesRead() < 0;
            break;
        }

        // Trigger event, will trigger pipeline The Spread of Reading Events
        pipeline.fireChannelRead(byteBuf);
        byteBuf = null;
    } while (allocHandle.continueReading());
    pipeline.fireChannelReadComplete();
}

Similarly, I pulled out the core code and cut out the details first. What NioByteUnsafe does can be simply divided into the following steps

  1. When you get the config of Channel, you get the ByteBuf distributor. You use the distributor to distribute a ByteBuf. ByteBuf is the byte data carrier in netty, and the data you read later is read into this object.
  2. Read data from Channel to ByteBuf
  3. After reading the data, call pipeline.fireChannelRead(byteBuf); propagate from the head node to the entire pipeline
  4. Finally, fireChannelReadComplete() is called.

Here, our focus is actually pipeline.fireChannelRead(byteBuf);

DefaultChannelPipeline

final AbstractChannelHandlerContext head;
//...
head = new HeadContext(this);

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

Combine this picture

 

 

As you can see, the data flows in from the head node, and before proceeding to the next step, let's go through the functions of the head node.

HeadContext

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }

    @Override
    public ChannelHandler handler() {
        return this;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }

    @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.disconnect(promise);
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.close(promise);
    }

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.deregister(promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();

        // Remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isOpen()) {
            destroy();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();

        readIfIsAutoRead();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();

        readIfIsAutoRead();
    }

    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
}

From the two interfaces inherited by the head node, TA is not only a ChannelHandlerContext, but also belongs to inBound and outBound Handler.

In spreading read and write events, the head function is simply to spread events, such as ctx.fireChannelRead(msg);

When a read-write operation is actually performed, such as calling writeAndFlush() and other methods, it will eventually be delegated to unsafe, and when a data read out, the channelReadComplete method will be invoked.

inBound event propagation in pipeline

Let's go on to AbstractChannelHandlerContext.invokeChannelRead(head, msg) above; in this static way, the parameters are passed into the head, and we know that the inbound data starts from the head to ensure that all the handler s in the future process the data stream by chance.

Let's see what's going on inside this static method:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

Call the invokeChannelRead method of the Context (that is, head) and pass in the data. Let's look at the implementation of the invokeChannelRead method in headContext, which is actually in the parent class AbstractChannelHandlerContext of headContext:

AbstractChannelHandlerContext

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

public ChannelHandler handler() {
    return this;
}

The handler() above is the handler in the headContext, that is, the headContext itself, which calls the channelRead method of the head. So how does this approach work?

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

Nothing is done. Call Context's fire series methods and forward the request to the next node. Here's the fireChannelRead method. Note that the method names here are similar. It needs to be carefully distinguished. Let's look at fireChannelRead, a member method of Context:

AbstractChannelHandlerContext

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

This is the implementation of AbstractChannelHandlerContext, the abstract parent of head, which calls the static fire series methods again, but unlike last time, instead of putting the head parameter, it uses the return value of the findContextInbound method. As you can see from the name of this method, it is to find the handler of the inbound type. Let's look at the implementation of the method:

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

This method is simple, find the next node of the current Context (inbound type) and return. This allows the request to be passed to the inbound handler that follows. Let's take a look at invokeChannelRead(findContextInbound(), msg);

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }

}

Above, we find the next node (inbound type), and call next.invokeChannelRead(m) directly; if this next is our custom handler, then the parent class of our custom handler is AbstractChannelHandlerContext, then we go back to the invokeChannelRead implemented in AbstractChannelHandlerContext. The code is as follows:

AbstractChannelHandlerContext

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

public ChannelHandler handler() {
    return this;
}

At this point, the handler() is our custom handler, and then we call the channel Read (this, msg) in our custom handler.

When requests come in, pipeline will be delivered from the head node, and the perfect transmission of Context chain in pipeline can be achieved by means of fire series methods cooperating with invoker interface. Finally, it arrives at our custom handler.

Note: What if we want to continue to pass back? As we said earlier, you can call the fire series methods of Context, just like the channelRead method of head, call the fire series methods, and pass them back directly.

If all handlers call the fire series methods, they are passed to the last inbound type handler, the tail node, so let's look at the tail node.

tail in pipeline

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }

    @Override
    public ChannelHandler handler() {
        return this;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // This may not be a configuration error and so don't log anything.
        // The event may be superfluous for the current pipeline configuration.
        ReferenceCountUtil.release(evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        onUnhandledInboundException(cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
}

As we mentioned earlier, most of the role of tail nodes is to terminate the propagation of events (the method body is empty).

channelRead

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

Tail node finds that the business object after ByteBuf or decoder is not consumed in the pipeline flow process and falls to tail node. Tail node will give you a warning that I have lost your unprocessed data.

To sum up, the role of tail nodes is to end event propagation and to do some kind reminders about important events.

outBound event propagation in pipeline

In the previous section, when we discussed the functions of tail node, we ignored the functions of its parent AbstractChannelHandlerContext. In this section, we look at how outBound events in pipeline are propagated outward with the most common writeAndFlush operation.

In a typical message push system, there is a code similar to the following

Channel channel = getChannel(userInfo);
channel.writeAndFlush(pushInfo);

The meaning of this code is to get the corresponding Channel according to the user's information, then push the message to the user and follow up the channel.writeAndFlush.

NioSocketChannel

public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}

From pipeline to Outside

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

Most of the outBound events in Channel start from tail. The writeAndFlush() method is a method inherited from tail. Let's follow up.

AbstractChannelHandlerContext

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

AbstractChannelHandlerContext

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

First call the findContextOutbound() method to find the next outBound() node

AbstractChannelHandlerContext

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

The process of finding an outBound node is similar to finding an inBound node, traversing the bidirectional list in the pipeline in the opposite direction until the first outBound node next, and then calling next.invokeWriteAndFlush(m, promise)

AbstractChannelHandlerContext

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

Calling the write method of ChannelHandler on this node, we will ignore the flush method for the moment, and then we will talk about the complete process of writeAndFlush.

AbstractChannelHandlerContext

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

As you can see, the data starts to flow backwards and forwards, contrary to the inbound direction. So where does it end up, of course, going to the head node, because the head node is the handler of the outbound type.

HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

Called the underlying unsafe operation data, here, we deepen our understanding of the head node, that is, all data written through the head node

When the write method is executed, the method begins to unwind. Step back to unsafe's read method, go back to where you started, and then continue calling the pipeline.fireChannelReadComplete() method

summary

Summarize the flow of a request in pipeline:

  1. The fire series methods of pipeline are called. These methods are designed by invoker interface. pipeline implements all methods of invoker. inbound events flow in from head and outbound events flow out from tail.
  2. pipeline will submit the request to Context, and Context completes the data of each Context by abstracting the invoke series (static and non-static) of the parent AbstractChannelHandlerContext and combining the fire series of AbstractChannelHandlerContext with findContextInbound and findContextOutbound methods. The circulation.
  3. When the outbound method is called during the inbound process, the request does not go backwards. The latter processor will have no effect. If you want to continue the rendezvous transfer, call Context's fire series methods, and let Netty help you transfer data internally to the next node. If you want to pass through the whole channel, call the corresponding methods of channel or pipeline in handler, which will flow the data from beginning to end or from end to end.

Tags: Java Netty JDK socket network

Posted on Tue, 10 Sep 2019 01:45:26 -0700 by karnul