Java learning notes 17 netty practice and optimization

Try to do it yourself

Short connection: after the request / response, close the established TCP connection, and establish another connection next time.

Long connection: after the request / response, do not close the TCP connection, multiple requests, and reuse the same connection.

In order to avoid the performance loss caused by frequent connection creation / release and the real-time message acquisition, the long connection is adopted.

Packet sticking: Nagle algorithm - clients accumulate a certain amount or buffer for a period of time before transmission. Server buffer heap. Causes multiple request data to stick together.

Unpacking: the data sent is larger than the sending buffer, so it is transmitted in pieces. The buffer of the server is piled up, resulting in incomplete request data read by the server.

Using WebSocket

WebSocket protocol is a new network protocol based on TCP.

Its appearance realizes the full duplex communication between browser and server: it allows the server to send information to the client actively.

Multi client, multi language and multi server support: browser, php, Java, ruby, nginx, python, Tomcat, erlang,. net, etc

Connection process:

  1. Client request connection -- get / chat http / 1.1 -- > > server
  2. Client < http / 1.1 101 XXX -- server returns response
  3. Client open < -- push -- server
  4. Client -- send -- > > server

WebSocket test code

WebSocketServer server

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class WebSocketServer {
    static int port = 20480;

    public static void main(String[] args) throws Exception {
        EventLoopGroup mainGroup = new NioEventLoopGroup(1);
        EventLoopGroup subGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(mainGroup, subGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            pipeline.addLast(new WebSocketServerHandler());
                        }
                    })
                    .childOption(ChannelOption.SO_REUSEADDR, true);

            ChannelFuture f = b.bind(port).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("Port binding completed:" + future.channel().localAddress());
                }
            }).sync();

            // Get keyboard input
            BufferedReader input = new BufferedReader(new InputStreamReader(System.in, "UTF-8"));
            new Thread(() -> {
                String msg;
                // Receive data from keyboard
                try {
                    System.out.print("Please enter information:");
                    while ((msg = input.readLine()) != null) {
                        WebSocketSession.pushMsg(msg);
                        if (msg.length() == 0) {
                            break;
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();

            f.channel().closeFuture().sync();
        } finally {
            mainGroup.shutdownGracefully();
            subGroup.shutdownGracefully();
        }
    }
}

WebSocketServerHandler server Handler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;

import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
    private static final String WEBSOCKET_PATH = "/websocket";
    private WebSocketServerHandshaker handshaker;

    public static final LongAdder counter = new LongAdder();

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        counter.add(1);
        if (msg instanceof FullHttpRequest) {
            System.out.println("Http request");
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            System.out.println("WebSocket request");
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // If http decoding fails, an http exception is returned, and whether the header contains the Upgrade field (protocol Upgrade) is judged
        if (!req.decoderResult().isSuccess() || req.method() != GET || (!"websocket".equals(req.headers().get("Upgrade")))) {
            System.out.println("Http abnormal");
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        // Construct handshake response return
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            // Version not supported
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            System.out.println("WebSocket Handshake response");
            handshaker.handshake(ctx.channel(), req);

            // Parse request, judge token
            Map<String, List<String>> parameters = new QueryStringDecoder(req.uri()).parameters();
            // Connection limit, token needs to be verified, strange connection is rejected
            String token = parameters.get("token").get(0);
            // Save token
            ctx.channel().attr(AttributeKey.valueOf("token")).getAndSet(token);
            // Save session
            WebSocketSession.saveSession(token, ctx.channel());

            // End
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // Close
        if (frame instanceof CloseWebSocketFrame) {
            Object token = ctx.channel().attr(AttributeKey.valueOf("token")).get();
            WebSocketSession.removeSession(token.toString());
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // ping/pong as heartbeat
        if (frame instanceof PingWebSocketFrame) {
            System.out.println("ping: " + frame);
            ctx.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // text
        if (frame instanceof TextWebSocketFrame) {
            String text = ((TextWebSocketFrame) frame).text();
            String outStr = text
                    + ", Welcome to use Netty WebSocket Service, now:"
                    + new java.util.Date().toString();
            System.out.println("Received:" + text);
            System.out.println("Send out:" + outStr);
            //Send to client websocket
            ctx.channel().write(new TextWebSocketFrame(outStr));
            return;
        }
        // Do not process binary messages
        if (frame instanceof BinaryWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
        }
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // Generate an error page if response getStatus code is not OK (200).
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength(res, res.content().readableBytes());
        }

        // Send the response and close the connection if necessary.
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    private static String getWebSocketLocation(FullHttpRequest req) {
        String location = req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;
        return "ws://" + location;
    }
}

WebSocketSession server session connection pool

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

public class WebSocketSession {
    /**
     * Session connection pool
     */
    static ConcurrentHashMap<String, Channel> sessionMap = new ConcurrentHashMap<>();

    /**
     * Save session
     *
     * @param sessionID
     * @param channel
     */
    public static void saveSession(String sessionID, Channel channel) {
        sessionMap.put(sessionID, channel);
    }

    /**
     * Remove session
     *
     * @param sessionID
     */
    public static void removeSession(String sessionID) {
        sessionMap.remove(sessionID);
    }

    /**
     * Push message
     * @param msg
     */
    public static void pushMsg(String msg) {
        try {
            if (sessionMap.isEmpty()) {
                return;
            }

            int size = sessionMap.size();
            ConcurrentHashMap.KeySetView<String, Channel> keySetView = sessionMap.keySet();
            String[] keys = keySetView.toArray(new String[]{});
            System.out.println(WebSocketServerHandler.counter.sum() + " : Number of current users" + keys.length);

            for (int i = 0; i < size; i++) {
                // Submit task to it for execution
                String key = keys[new Random().nextInt(size)];
                Channel channel = sessionMap.get(key);
                if (channel == null) {
                    continue;
                }
                if (!channel.isActive()) {
                    sessionMap.remove(key);
                    continue;
                }
                channel.eventLoop().execute(() -> {
                    System.out.println("Push:" + msg);
                    channel.writeAndFlush(new TextWebSocketFrame(msg));
                });
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

WebSocketClient client

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class WebSocketClient {
    static String host = "127.0.0.1";
    static int port = 20480;

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new HttpClientCodec());
                            p.addLast(new HttpObjectAggregator(8192));
                            p.addLast(WebSocketClientCompressionHandler.INSTANCE);
                            p.addLast(new WebSocketClientHandler());
                        }
                    });

            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

WebSocketClientHandler client Handler

import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    private static final String WEBSOCKET_PATH = "/websocket";
    private WebSocketClientHandshaker handshaker;
    private ChannelPromise handshakeFuture;

    static AtomicInteger counter = new AtomicInteger(0);

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        handshakeFuture = ctx.newPromise();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        if (handshaker == null) {
            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
            URI uri = null;
            try {
                uri = new URI("ws://" + address.getHostString() + ":" + address.getPort()
                        + WEBSOCKET_PATH + "?token=" + counter.incrementAndGet());
            } catch (Exception e) {
                e.printStackTrace();
            }
            handshaker = WebSocketClientHandshakerFactory.newHandshaker(
                    uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
        }
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("Client disconnected");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (!handshaker.isHandshakeComplete()) {
            try {
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                System.out.println("Client connected");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                System.out.println("Client connection failed");
                handshakeFuture.setFailure(e);
            }
            return;
        }

        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        }

        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            System.out.println("Have received:" + textFrame.text());
        } else if (frame instanceof PongWebSocketFrame) {
            System.out.println("Heartbeat");
        } else if (frame instanceof CloseWebSocketFrame) {
            System.out.println("Receive closed");
            ch.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}
Linux server optimization

Network knowledge: distinguish different connection modes, TCP connection quadruple

Server IP + server PORT + client IP + client PORT

Maximum number of server connections: in theory, it is infinite, but it will be affected by the maximum number of file connections and memory in Linux. At the beginning of TCP and UDP protocols, there will be 16 bits to store the source port number and the target port number respectively, 2 ^ 16-1 = 65535, so the transport layer protocol limits the maximum number of ports to 65535, but in practice, the number of connections can be increased by reusing ports.

# Process level maximum file connections
vi /etc/security/limits.conf  
* soft nofile 1000000
* hard nofile 1000000

vi /etc/sysctl.conf
fs.file-max=1000000 #System level maximum file connections
net.ipv4.ip_local_port_range=1024 65535 #New connection local port range
net.ipv4.tcp_tw_reuse=1 #Turn on Reuse
net.ipv4.tcp_tw_recycle=1 #Turn on accelerated recovery

sysctl -p
Netty optimization

1. Handler object reuse

@ChannelHandler.Sharable is identified as shareable

One Channel per connection, one Pipeline exclusive

Pipeline shares the handler object, no longer new handler()

Note: prevent shared variables in the handler from causing thread safety problems.

2. Time consuming operation introduced into business thread pool

In the process of processing, the time-consuming business logic will occupy the I/O thread and cause blocking, which should be separately handed over to the specified thread pool for processing

When adding a handler, you can specify the EventLoopGroup business exclusive thread pool

pipeline.addLast(businessGroup, businessHandller);

3. Response content must go through Netty I/O

Looking at the source code, we can see that the write in the business handler will eventually be sent through Netty I/O

Too much data in a single write will cause I/O threads to be occupied by a connection for a long time, resulting in poor user experience

Adjustable operating system TCP buffer size

In the business handler, a single write with too much data is performed many times in batches to speed up the operation speed and improve the user experience.

4. ByteBuf multiplexing mechanism

NIOEventLoop receives the Selector notification, then enters the read phase, applies for a ByteBuf object as the data carrier, and finally transfers it to the handler for business processing. ByteBuf uses the reference count, and the release can be recycled after release. The ByteBuf object can be released through ctx.write(msg) to ReferenceCountUtil.release(), or (ByteBuf) MSG. Release() Recycle ByteBuf.

Optimize 2-increase request / push throughput
  1. Business operations are submitted to a separate thread for execution.
  2. Adjust the size of TCP buffer to improve network throughput.
  3. The optimization of business code logic when developing based on Netty framework.
  4. Combined with the characteristics of Netty framework, the object is reused to improve the performance.

Summary

  1. The number of concurrent connections mainly depends on the optimization of operating system parameters;
  2. The improvement of throughput mainly depends on the code processing ability;
  3. Sometimes network and disk will become bottleneck;
  4. Horizontal expansion, cluster mode is the final solution;
  5. The operation mechanism of Netty framework is very important;

Tags: Programming Netty codec Java Session

Posted on Sun, 01 Dec 2019 10:55:42 -0800 by solee