[Java] Java AIO use

Welcome to the public number nullobject.
The article was first published on a personal blog https://www.nullobject.cn The public nullobject is updated synchronously.

This article mainly introduces Java AIO network programming.

1. What is AIO

AIO in this article refers specifically to AIO in Java environment. AIO is a kind of IO model in java. As an improvement and enhancement of NIO, AIO is integrated into the NiO package of JDK with the update of JDK version 1.7, so AIO is also called NIO 2.0. Unlike traditional BIO(Blocking IO, synchronous blocking model), JDK 1.4 existed in JDK before, and NIO published updates in JDK 1.4), AIO provides full asynchronous operation from setting up connection to read and write. AIO can be used for asynchronous file reading and writing and network communication. This article will introduce how to use AIO to realize a simple network communication and some key API s of AIO.

2. Simple Use

First, take the Server side as an example, you need to create an Asynchronous Server Socket Channel example and bind the listening port, then start listening for client connections:

public class SimpleAIOServer {

    public static void main(String[] args) {
        try {
            final int port = 5555;
            //First, open a Server Socket channel and get an instance of Asynchronous Server Socket Channel:
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
            //Bind ports that need to be listened on to server Socket Channel:  
            serverSocketChannel.bind(new InetSocketAddress(port));
            //Implementing a Completion Handler callback interface handler,
            //Next, we need to process connection requests and monitor the next connection, data sending and receiving, and communication anomalies in the handler implementation.
            CompletionHandler<AsynchronousSocketChannel, Object> handler = new CompletionHandler<AsynchronousSocketChannel,
                    Object>() {
                @Override
                public void completed(final AsynchronousSocketChannel result, final Object attachment) {
                    // Continue listening for the next connection request  
                    serverSocketChannel.accept(attachment, this);
                    try {
                        System.out.println("Accepted a connection:" + result.getRemoteAddress()
                                                              .toString());
                        // Send data to client and wait for completion
                        result.write(ByteBuffer.wrap("From Server:Hello i am server".getBytes()))
                              .get();
                        ByteBuffer readBuffer = ByteBuffer.allocate(128);
                        // Blocking waiting for client to receive data
                        result.read(readBuffer)
                              .get();
                        System.out.println(new String(readBuffer.array()));

                    } catch (IOException | InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(final Throwable exc, final Object attachment) {
                    System.out.println("Wrong:" + exc.getMessage());
                }
            };
            serverSocketChannel.accept(null, handler);
            // Because serverSocketChannel.accept(null, handler); is an asynchronous method, the call returns directly.
            // In order to allow subthreads time to process connection sessions listening to clients,
            // This ensures that the application does not quit immediately by keeping the main thread dormant for a period of time (which is not usually the case with actual development).
            TimeUnit.MINUTES.sleep(Integer.MAX_VALUE);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

result is the connection session of the client that is currently accepted, and the communication with the client needs to be carried out through the connection session.

Client terminal:

public class SimpleAIOClient {

    public static void main(String[] args) {
        try {
            // Open a Socket Channel channel and get an instance of Asynchronous Socket Channel
            AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
            // Connect to the server and process the connection results
            client.connect(new InetSocketAddress("127.0.0.1", 5555), null, new CompletionHandler<Void, Void>() {
                @Override
                public void completed(final Void result, final Void attachment) {
                    System.out.println("Successful connection to server!");
                    try {
                        // Send information to the server and wait for it to complete
                        client.write(ByteBuffer.wrap("From client:Hello i am client".getBytes()))
                              .get();
                        ByteBuffer readBuffer = ByteBuffer.allocate(128);
                        // Blocking Waiting for Receiving Server Data
                        client.read(readBuffer)
                              .get();
                        System.out.println(new String(readBuffer.array()));
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(final Throwable exc, final Void attachment) {
                    exc.printStackTrace();
                }
            });
            TimeUnit.MINUTES.sleep(Integer.MAX_VALUE);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3. Detailed description of AIO's main API s

From the first 2 As can be seen from the section examples, to implement the simplest AIO socket communication server and client, these related classes and interfaces are mainly needed:

  • AsynchronousServerSocketChannel

    Server Socket Channel Class, responsible for the creation and monitoring of server Socket;

  • AsynchronousSocketChannel

    Client Socket channel class, responsible for client message reading and writing;

  • CompletionHandler<A,V>

    Message processing callback interface is a message processor responsible for consuming the results of asynchronous IO operations.

  • ByteBuffer

    Responsible for carrying the messages that need to be read and written in the process of communication.

In addition, there are optional Asynchronous ChannelGroup classes for asynchronous channel resource sharing, and the main interfaces and usage of these classes will be introduced next.

3.1.1 AsynchronousServerSocketChannel

Asynchronous Server Socket Channel is an asynchronous channel for streaming sockets.

The use of Asynchronous Server Socket Channel involves three steps: creating/opening channels, binding addresses and ports, and listening for client connection requests.

First, create/open channels: Simply, you can create an instance of Asynchronous Server Socket Channel by calling the static method open() of Asynchronous Server Socket Channel:

try {
  AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
} catch (IOException e) {
  e.printStackTrace();
}

When the channel fails to open, an IOException exception is thrown. Asynchronous Server Socket Channel provides the function of setting up the channel group to share channel resources within the group. The open (Asynchronous Channel Group) overload method can be invoked to create a channel for a specified group:

try {
  ExecutorService pool = Executors.newCachedThreadPool();
  AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10);
  AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {
  e.printStackTrace();
}

Asynchronous Channel Group encapsulates the mechanism needed to handle the completion of I/O operations triggered by asynchronous channels bound to groups. Each Asynchronous Channel Group associates a completion-handlers thread pool that is used to submit completion-handlers that handle I/O events and distribute the results of asynchronous operations performed on intra-group channels. In addition to handling I/O events, the thread pool may also handle other tasks that support the completion of asynchronous I/O operations. As you can see from the above example, by specifying the Asynchronous Channel Group to open the Asynchronous Server Socket Channel, you can customize the thread pool for server channel execution. Detailed descriptions of the Asynchronous Channel Group can be found in official documentation comments. If the Asynchronous Channel Group is not specified, the Asynchronous Server Socket Channel is classified into a default group.

Binding addresses and ports: Binding listening addresses and ports by calling Asynchronous Server Socket Channel. bind (Socket Address):

// Build an instance of InetSocketAddress to specify the address and port to listen on. If you need to specify ip, call the InetSocketAddress(ip,port) construction method to create it.
serverSocketChannel.bind(new InetSocketAddress(port));

3. Monitor and receive client connection requests:

To monitor client connection requests, it is mainly done by calling Asynchronous Server SocketChannel. accept(). accept() has two overloading methods:

public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>);
public abstract Future<AsynchronousSocketChannel> accept();

These two overloaded methods behave exactly the same way. In fact, many AIO asynchronous API s encapsulate such overloaded methods as providing a CompletionHandle callback parameter or returning a Future < T > type variable. As you know from the Feture interface, you can call the Feture.get() method to block waiting for the result of the call. Take the first overload method as an example. When acceptance of a new client connection or accept operation is abnormal, the result is returned to the user through Completion Handler for processing:

serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
        AsynchronousServerSocketChannel>() {
          @Override
          public void completed(final AsynchronousSocketChannel result,
                                final AsynchronousServerSocketChannel attachment) {
            // Callback when a new client connection is received
            // result is the connection session with the client
            // At this point, you can interact with the client through result
          }

          @Override
          public void failed(final Throwable exc, final AsynchronousServerSocketChannel attachment) {
            // Callback when accept fails
          }
        });

It should be noted that the Asynchronous Server Socket Channel is thread-safe, but only one accept operation can be allowed at any time and at the same time. Therefore, you have to wait for the previous accept operation to complete before starting the next accept:

serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
        AsynchronousServerSocketChannel>() {
          @Override
          public void completed(final AsynchronousSocketChannel result,
                                final AsynchronousServerSocketChannel attachment) {
            // Receiving a new client connection, this accept is complete
            // Continue listening for the next client connection
            serverSocketChannel.accept(serverSocketChannel,this);
            // result is the connection session with the client
            // At this point, you can interact with the client through result
          }
          ...
        });

In addition, the socket option of Asynchronous Server Socket Channel can also be obtained and set by the following methods:

// Setting socket options
serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// Get socket option settings
boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

The StandardSocketOptions class encapsulates commonly used socket settings.

Get the local address:

InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();

3.1.2 AsynchronousSocketChannel

Asynchronous Socket Channel is an asynchronous channel for streaming sockets.

Asynchronous Socket Channel represents the connection channel between the server and the client. The client can be created by calling the Asynchronous SocketChannel static method open(), while the server can be created by calling the Asynchronous ServerSocketChannel. accept () method at an appropriate time within AIO. Next, take the client implementation as an example to introduce the Asynchronous Socket Channel.

First, create an Asynchronous Socket Channel and connect to the server: you need to create and open an instance of Asynchronous Socket Channel through open(), then call its connect() method to connect to the server, and then you can interact with the server:

// Open a socket channel
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// Blocking Waiting for Connection Success
socketChannel.connect(new InetSocketAddress(ip,port)).get();
// The connection is successful, and you can read and write next.

Along with Asynchronous Server Socket Channel, Asynchronous Socket Channel also provides an open (Asynchronous Channel Group) method for specifying channel groupings and customizing thread pools. socketChannel.connect() also provides two overload methods, CompletionHandler callback and Future return value. The example above uses overload with Future return value, and calls get() method to block and wait for the connection to be established.

Sending messages:

You can build a ByteBuffer object and call the socketChannel.write(ByteBuffer) method to send messages asynchronously, and receive the results through the CompletionHandler callback:

ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes());
socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {
    // Send completed, result: The total number of bytes written
  }

  @Override
  public void failed(final Throwable exc, final Object attachment) {
    // fail in send
  }
});

3. Read the message:

Build a ByteBuffer with specified receiving length to receive data, call socketChannel.read() method to read the message and process the reading result through CompletionHandler:

ByteBuffer readBuffer = ByteBuffer.allocate(128);
socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {
    // Read completed, result: The number of bytes actually read. If there is no data readable in the channel, result=-1.
  }

  @Override
  public void failed(final Throwable exc, final Object attachment) {
    // read failure
  }
});

In addition, Asynchronous Socket Channel encapsulates the method of setting/acquiring socket options:

// Setting socket options
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// Get socket option settings
boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

3.1.3 CompletionHandler

Completion Handler is a processor that consumes the results of asynchronous I/O operations.

The asynchronous channel defined in AIO allows specifying a Completion Handler processor to consume the results of an asynchronous operation. As can be seen from the above, most of the asynchronous I/O interfaces in AIO encapsulate an overloaded method with CompletionHandler type parameters. Using CompletionHandler, the results of asynchronous I/O operations in AIO can be handled easily. CompletionHandler is an interface with two generic type parameters, declaring two interface methods:

public interface CompletionHandler<V,A> {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

Generic V represents the result type of I/O operation, and consumes the result of I/O operation through this type of parameter; Generic A is the object type appended to I/O operation, and it can be used in Completion Handler implementation through the variables needed by this type of parameter. Therefore, most of the asynchronous I/O operations in AIO have an overload method similar to this:

<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);

For example, the AsynchronousServerSocketChannel.accept() method:

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);

AsynchronousSocketChannel.write() method, etc.

public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)

When the I/O operation completes successfully, it calls back to the completed method, while the failed method calls back when the I/O operation fails. It is important to note that in the implementation of Completion Handler, even the operation results should be processed to avoid occupying the calling thread and not being able to distribute other Comppletion Handler processors.

4 The End :)

Tags: Java socket JDK Session

Posted on Wed, 11 Sep 2019 04:23:10 -0700 by radstorm