Netty application example websocket application example

1. server implementation

Source code implementation:

public void run(int port){
   EventLoopGroup bossGroup = new NioEventLoopGroup();
   EventLoopGroup workGroup = new NioEventLoopGroup();
   try{
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workGroup)
      .channel(NioServerSocketChannel.class)
      .childHandler(new ChannelInitializer<SocketChannel>() {

         @Override
         protected void initChannel(SocketChannel sc) throws Exception {
            ChannelPipeline pipeline = sc.pipeline();
            // Server, decoding the request  
            pipeline.addLast("http-codec", new HttpServerCodec());
            // Aggregator, which converts multiple messages into a single FullHttpRequest or FullHttpResponse  
            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
             // Block write processor
            pipeline.addLast("http-chunked",new ChunkedWriteHandler());
            //Custom processor
            pipeline.addLast("handler", new WebSocketServerHandler());
            
         }
      });
      
      ChannelFuture cf = b.bind(port).sync();
      
      System.out.println("Web socket server started at port " + port + '.');
      System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
      
      cf.channel().closeFuture().sync();
      
   }catch(Exception e){
      e.printStackTrace();
   }finally{
      bossGroup.shutdownGracefully();
      workGroup.shutdownGracefully();
      
   }
}
  • HttpServerCodec: decode request and reply messages into HTTP messages
  • HttpObjectAggregator: synthesize multiple parts of an HTTP message into a complete HTTP message
  • ChunkedWriteHandler: send HTML5 files to clients

2. Implementation of WebsocketHandler

Source code implementation:

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
   

   private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
   
   private WebSocketServerHandshaker handshaker;

   @Override
   protected void messageReceived(ChannelHandlerContext ctx, Object obj)
         throws Exception {
      // Traditional HTTP access
      if(obj instanceof FullHttpRequest){
         handleHttpRequest(ctx,(FullHttpRequest)obj);
      }
      // WebSocket access
      else if(obj instanceof WebSocketFrame){
         handleWebSocketFrame(ctx,(WebSocketFrame)obj);
      }
   }
   
   private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception{
      System.out.println("handleHttpRequest");
      // HHTP exception returned if HTTP decoding fails
      if(!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
         sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
      }
      
      // Construct handshake response return, native test
      WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);
      handshaker =wsFactory.newHandshaker(req);
      if(handshaker==null){
         WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
      }else{
         handshaker.handshake(ctx.channel(), req);
      }
   }
   
   private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
      // Determine whether the command is to close the link
      if(frame instanceof CloseWebSocketFrame){
         handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
         return;
      }
      
      // Determine whether it is a Ping message
      if(frame instanceof PingWebSocketFrame){
         ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
         return;
      }
      
      // This routine only supports text messages, not binary messages
      if(!(frame instanceof TextWebSocketFrame)){
          throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
      }
      
      // Return reply message
      String request = ((TextWebSocketFrame)frame).text();
      if(logger.isLoggable(Level.FINE)){
          logger.fine(String.format("%s received %s", ctx.channel(), request));
      }
      
      ctx.channel().write(new TextWebSocketFrame(request+ " , Welcome to use Netty WebSocket Service, now:"+ new java.util.Date().toString()));
      
      
   }
   
   private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res){
      // Return reply to client
      if(res.status().code() !=200){
         ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
         res.content().writeBytes(buf);
         buf.release();
         try {
            Http2Headers http2Headers = HttpUtil.toHttp2Headers(res);
            http2Headers.addLong(HttpHeaderNames.CONTENT_LENGTH, res.content().readableBytes());
         }catch (Exception e){
            e.printStackTrace();
         }

      }
      
      // If it is not keep alive, close the connection
      ChannelFuture cf = ctx.channel().writeAndFlush(res);

      try {
         Http2Headers http2Headers = HttpUtil.toHttp2Headers(res);
         Boolean keepAlive = Boolean.valueOf(http2Headers.get(HttpHeaderNames.CONTENT_LENGTH).toString());
         if(!keepAlive || res.status().code() != 200){
            cf.addListener(ChannelFutureListener.CLOSE);
         }
      }catch (Exception e){
         e.printStackTrace();
      }
   }
   
   
   @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
      ctx.flush();
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
      cause.printStackTrace();
      ctx.close();
   }
}

Before the link is established successfully:

  • The first handshake is hosted by the HTTP protocol, so it is an HTTP message. It is determined whether it is a websocket according to whether the header contains the "Upgrade" field.
  • After verification, the WebSocketServerHandshaker is constructed to return the handshake response information to the client. Meanwhile, the encoding and decoding classes related to WebSocket are dynamically added to the ChannelPipeline.

After the link is established successfully:

  • The client submits the request to the server through a text box, and the Handler receives the WebSocketFrame message after it has been decoded.
  • If it is closed, close the link according to the link command
  • If it is a ping message to maintain the link, it returns a Pong message.
  • Otherwise, the reply message will be returned

3. Test results

websocket test tool:

http://coolaf.com/tool/chattest , enter the above URL in the browser, and you can see the following interface:

Test tool.png

The output of the server is as follows:

Server output.png

The data sent by the client is as follows:

Client sends data.png

Source code: https://github.com/zhaozhou11/netty-demo.git

Tags: Netty codec socket html5

Posted on Tue, 03 Dec 2019 16:39:40 -0800 by madhukar_garg