How to implement the Selector of Java NIO


Let's go through the source code first:
Selector selector = Selector.open(); an instance of WindowsSelectorImpl is generated by default and a Pipe is established
To create a Selector object:

Selector selector = Selector.open();

Implementation principle of Selector:
The instance initialization of SocketChannel, ServerSocketChannel and Selector are all implemented through the SelectorProvider class. First, we get the corresponding SelectorProvider.

public static SelectorProvider provider() {
    synchronized (lock) {
    //Provider is not empty, return to provider directly
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                //Construct SelectorProvider by reflecting the class set by the parameter of JDK - Djava.nio.channels.spi.SelectorProvider=class
                        if (loadProviderFromProperty())
                            return provider;
                             //Find the first class set by the parameter java.nio.channels.spi.SelectorProvider=class from the directory META-INF/services configuration file in jar to reflect the construction of SelectorProvider
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

AccessController.doPrivileged is a privileged operation, which means that no matter which user initiates this method, there is no need to check the resources (file read / write privileges, etc.) involved in this operation.

Because the SelectorProvider has different implementations under different operating systems.
provider = sun.nio.ch.DefaultSelectorProvider.create(); different implementation classes are returned according to different operating systems. Here, the whole process is mainly sorted out by windows implementation, and the windows platform returns to WindowsSelectorProvider.

package sun.nio.ch;

import java.io.IOException;
import java.nio.channels.spi.AbstractSelector;

public class WindowsSelectorProvider extends SelectorProviderImpl {
    public WindowsSelectorProvider() {
    }

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}

Take the default case, windows selector impl

In JDK, sun.nio.ch.DefaultSelectorProvider source code:

	package sun.nio.ch;
	import java.nio.channels.spi.SelectorProvider;
	public class DefaultSelectorProvider
	{
	  public static SelectorProvider create()
	  {
	    return new WindowsSelectorProvider();
	  }
	}

Here, we directly return to WindowsSelectorProvider, that is, without any configuration, WindowsSelectorProvider will be used, its construction method is empty, and there is no implementation. After obtaining the SelectorProvider, we call the openSelector() method to obtain the corresponding Selector.

private final Pipe wakeupPipe = Pipe.open();
 
WindowsSelectorImpl(SelectorProvider var1) throws IOException {
    super(var1);
    this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
    SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
    var2.sc.socket().setTcpNoDelay(true);
    this.wakeupSinkFd = var2.getFDVal();
    this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}

To observe the construction method, we first need to use wakeupPipe. To get this member, we need Pipe to call the open() method.

public static Pipe open() throws IOException {
    return SelectorProvider.provider().openPipe();
}

Directly called the openPipe() method of SelectorProvider:

public Pipe openPipe() throws IOException {
    return new PipeImpl(this);
}

The openPipe() method is implemented in the SelectorProviderImpl class, the parent class of WindowsSelectorProvider,
The implementation of openPipe() calls the construction method of pipeImpl,

static {
    Util.load();
    byte[] var0 = new byte[8];
    boolean var1 = IOUtil.randomBytes(var0);
    if(var1) {
        rnd = new Random(ByteBuffer.wrap(var0).getLong());
    } else {
        rnd = new Random();
    }
 
}
 
PipeImpl(SelectorProvider var1) throws IOException {
    try {
        AccessController.doPrivileged(new PipeImpl.Initializer(var1));
    } catch (PrivilegedActionException var3) {
        throw (IOException)var3.getCause();
    }
}

Here, a random number will be generated in the static block of PipeImpl to save. Then, in its construction method, call doPrivileged method new, an internal class Initializer in PipeImpl through AccessController. This method will call the run() method implemented by the passed in class, and ensure the internal permission issues. Next, see its run() method:

     public Void run() throws IOException {
         PipeImpl.Initializer.LoopbackConnector var1 = new PipeImpl.Initializer.LoopbackConnector();
         var1.run();
         if(this.ioe instanceof ClosedByInterruptException) {
             this.ioe = null;
             Thread var2 = new Thread(var1) {
                 public void interrupt() {
                 }
             };
             var2.start();

             while(true) {
                 try {
                     var2.join();
                     break;
                 } catch (InterruptedException var4) {
                     ;
                 }
             }

             Thread.currentThread().interrupt();
         }

         if(this.ioe != null) {
             throw new IOException("Unable to establish loopback connection", this.ioe);
         } else {
             return null;
         }
     }

In this method, the read-write object of Pipe pipe will be initialized internally, and the exceptions will be classified after initialization. Start a thread. Look at the LoopbackConnector(), the internal class of the Initializer. Call its run() method through the instantiated object of the LoopbackConnector

			private LoopbackConnector() {
            }

            public void run() {
            //Define server Channel
                ServerSocketChannel var1 = null;
                //Define two client channels for reading and writing
                SocketChannel var2 = null;
                SocketChannel var3 = null;

                try {
                //Initialize two ByteBuffer buffers, supporting read and write operations
                    ByteBuffer var4 = ByteBuffer.allocate(16);
                    ByteBuffer var5 = ByteBuffer.allocate(16);
                    InetAddress var6 = InetAddress.getByName("127.0.0.1");

                    assert var6.isLoopbackAddress();
//Store the IP and port created by the server,
                    InetSocketAddress var7 = null;
//Spin processing, initialize read and write objects directly and successfully
                    while(true) {
                    
            // Initialize ServerSocketChannel to provide services
                        if (var1 == null || !var1.isOpen()) {
                            var1 = ServerSocketChannel.open();
                            var1.socket().bind(new InetSocketAddress(var6, 0));
                            var7 = new InetSocketAddress(var6, var1.socket().getLocalPort());
                        }
//Open a write channel through the initialized IP and port
                        var2 = SocketChannel.open(var7);
                        PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array());

                        do {
                        //Write out all the data
                            var2.write(var4);
                        } while(var4.hasRemaining());

                        var4.rewind();
                        // Get a read request through the server
        				// Read the data written in the previous step here
                        var3 = var1.accept();

                        do {
                         // Read to write data, add to another buffer
                            var3.read(var5);
                        } while(var5.hasRemaining());

                        var5.rewind();
                        // If the read and write data are consistent, the pipeline communication is normal. Initialize the read and write objects of the pipeline
                        if (var5.equals(var4)) {
                        // Read object
                            PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2);

               		 	// Write object 
                            PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3);
                            break;
                        }

                        var3.close();
                        var2.close();
                    }
                } catch (IOException var18) {
                    try {
                        if (var2 != null) {
                            var2.close();
                        }

                        if (var3 != null) {
                            var3.close();
                        }
                    } catch (IOException var17) {
                        ;
                    }

                    Initializer.this.ioe = var18;
                } finally {
                    try {
                        if (var1 != null) {
                            var1.close();
                        }
                    } catch (IOException var16) {
                        ;
                    }

                }

            }

val1 through ServerSocketChannel.open(), through openServerSocketChannel() of SelectorProviderImpl

    public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }

Return to ServerSocketChannelImpl

    ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
        super(var1);
        this.fd = Net.serverSocket(true);
        this.fdVal = IOUtil.fdVal(this.fd);
        this.state = 0;
    }

Return to the construction method of WindowsSelectorImpl, save fdVal value of source (SourceChannelImpl) with wakeupSourceFd, fdVal value of sink (SinkChannelImpl) with wakeupSinkFd, disable Nagle algorithm, and save fdVal value of source with pollwrapper member. Get the fd of source and the fd of sink, save them, and add wakeupSourceFd to PoolWrapper. Now the selector is created.

    WindowsSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
        SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
        var2.sc.socket().setTcpNoDelay(true);
        this.wakeupSinkFd = var2.getFDVal();
        this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
    }

    void addWakeupSocket(int var1, int var2) {
        this.putDescriptor(var2, var1);
        this.putEventOps(var2, Net.POLLIN);
    }    
   
    void putDescriptor(int var1, int var2) {
        this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
    }
 
    void putEventOps(int var1, int var2) {
        this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
    }
Published 22 original articles, won praise 4, visited 2063
Private letter follow

Tags: Java socket Windows JDK

Posted on Wed, 15 Jan 2020 04:45:00 -0800 by bubatalazi