Basic use and principle of Netty (NIO framework)

NIO framework

  • Netty, Mina, Akka (scala highly available, scalable)

Netty provides asynchronous event driven Web application framework and tools for rapid development of high performance, high reliability Network server And client programs. Netty is a client and server programming framework based on NIO. Using netty can ensure that you can develop a network application quickly and simply

Why use Netty

  • Simple API, low development threshold (no need to master NIO)
  • Powerful, preset a variety of codecs (half package, package sticking) - support a variety of mainstream protocols
  • Strong customization ability - (customized codec) - customized protocol
  • Fix the JDK NIO bug and solve the instability of the original NIO
  • More users and active community

Netty architecture design

Development dependence

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

Server development

//1. Create service startup Guide
ServerBootstrap sbt=new ServerBootstrap();
//2. Create thread pool group boss and worker
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
//3. Set thread pool group
sbt.group(boss,worker);
//4. Set the service implementation class
sbt.channel(NioServerSocketChannel.class);
//5. Initialize communication pipeline
sbt.childHandler(new CustomServerChannelInitializer());
//6. Bind the port and start the service
System.out.println("I was listening in 9999...");
ChannelFuture future = sbt.bind(9999).sync();
//7. Close channel resources
future.channel().closeFuture().sync();
//8. Release resources
boss.shutdownGracefully();
worker.shutdownGracefully();

---
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class CustomServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //Communication pipeline
        ChannelPipeline pipeline = ch.pipeline();

        //Add finisher at end of pipe
        pipeline.addLast(new CustomServerChannelHandlerAdapter());
    }
}
---
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;

public class CustomServerChannelHandlerAdapter extends ChannelHandlerAdapter {
    /**
     * Abnormal callback
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }

    /**
     * receive messages
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg:"+msg);
        ByteBuf buf= ctx.alloc().buffer();
        buf.writeBytes((new Date().toLocaleString()).getBytes());
        //send message
        ChannelFuture future = ctx.writeAndFlush(buf);
        //Close SocketChannel
        future.addListener(ChannelFutureListener.CLOSE);
    }
}

Client

//1. Startup guidance
Bootstrap bt=new Bootstrap();
//2. Create thread pool group worker
EventLoopGroup worker=new NioEventLoopGroup();
//3. Set thread pool group
bt.group(worker);
//4. Set the service implementation class
bt.channel(NioSocketChannel.class);
//5. Initialize communication pipeline
bt.handler(new CustomClientChannelInitializer());
//6. Connect the port and start the service
ChannelFuture future = bt.connect("127.0.0.1",9999).sync();
//7. Close channel resources
future.channel().closeFuture().sync();
//8. Release resources
worker.shutdownGracefully();
---
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class CustomClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //End add final processor
        pipeline.addLast(new CustomClientChannelHandlerAdapter());
    }
}
---
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;

public class CustomClientChannelHandlerAdapter extends ChannelHandlerAdapter {
    /**
     * Abnormal callback
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }

    /**
     * Connect to server, send data
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf=ctx.alloc().buffer();
        buf.writeBytes("hello,I am the client".getBytes());

        ctx.writeAndFlush(buf);
    }
    /**
     * receive messages
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       ByteBuf buf= (ByteBuf) msg;
       System.out.println("Client received:"+buf.toString(CharsetUtil.UTF_8));
    }
}

ByteBuf

Operation principle

ByteBuf buf=  Unpooled.buffer(3);
                          //new PooledByteBufAllocator().buffer();
                         //new UnpooledByteBufAllocator(true).buffer();
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
        buf.writeBytes("abc".getBytes());
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());

        byte[] values=new byte[1];
        buf.readBytes(values);
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
        buf.discardReadBytes();
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());

        buf.clear();
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());

Netty transport object

How does Netty catch serialization exceptions?

 //Catch serialization exception
future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
//Automatically close connection in case of exception
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
public class UserMessageToMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    /**
     *
     * @param ctx
     * @param msg :Decode a frame of data
     * @param out : Decode frame data
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {

        byte[] values=new byte[msg.readableBytes()];
        msg.readBytes(values);

        ByteArrayInputStream bais=new ByteArrayInputStream(values);
        ObjectInputStream ois=new ObjectInputStream(bais);
        Object o = ois.readObject();
        ois.close();

        out.add(o);

    }
}

---
public class UserMessageToMessageEncoder extends MessageToMessageEncoder<Object> {
    /**
     *
     * @param ctx
     * @param msg  :One frame data of the object to be encoded
     * @param out  : 
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
        ByteBuf buf= Unpooled.buffer();

        ByteArrayOutputStream baos=new ByteArrayOutputStream();
        ObjectOutputStream oos=new ObjectOutputStream(baos);
        oos.writeObject(msg);
        oos.flush();
        oos.close();

        buf.writeBytes(baos.toByteArray());

        out.add(buf);

    }
}    

Last article: RPC design and Implementation

Tags: Netty network socket Scala

Posted on Tue, 03 Dec 2019 22:36:48 -0800 by mabans