Phase II, Practical Java High Concurrent Programming Patterns-7.nio, aio

I. outline

What is NIO? 
 Buffer
 Channel 
Network programming 
 AIO
 Why do we need to know about NIO and AIO?
  1. What is NIO?

     NIO is short for New I/O. Compared with the old stream-based I/O method, it represents a new set of Java I/O standards by name. It is incorporated into JDK in Java 1.4 and has the following features:
     NIO is based on blocks, which process data in blocks
     Buffer caching support for all primitive types
     - Adding Channel objects as new primitive I/O abstractions
     File Access Interface Supporting Lock and Memory Mapping Files
     - Provides an asynchronous network I/O based on Selector
    
  2. Buffer && Channel

    	import java.io.FileInputStream;
    	import java.io.FileOutputStream;
    	import java.io.IOException;
    	import java.io.RandomAccessFile;
    	import java.nio.ByteBuffer;
    	import java.nio.CharBuffer;
    	import java.nio.MappedByteBuffer;
    	import java.nio.channels.FileChannel;
    	import java.nio.charset.Charset;
    
    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/4    11:14 PM
    	 */
    	public class DelayMain {
    		public static void main(String[] args) throws IOException {
    			//Copy DelayMain.java to DelayMains.java
    			nioCopyFile("/Users/heliming/IdeaProjects/democloud/jvm/target/classes/DelayMain.class","/Users/heliming/IdeaProjects/democloud/jvm/target/classes/DelayMains.class");
    			test2();
    	//        test3();
    			test4();
    
    
    		}
    
    
    		public static void nioCopyFile(String resource, String destination) throws IOException {
    
    			FileInputStream fis = new FileInputStream(resource);
    			FileOutputStream fos = new FileOutputStream(destination);
    			FileChannel readChannel = fis.getChannel();
    			FileChannel writeChannel = fos.getChannel();
    			ByteBuffer buffer = ByteBuffer.allocate(1024);
    			while (true) {
    				buffer.clear();
    				int len = readChannel.read(buffer);
    				if (len == -1) {
    					break;
    					//Finished reading
    				}
    				buffer.flip();
    				//write file
    				writeChannel.write(buffer);
    			}
    			readChannel.close();
    			writeChannel.close();
    		}
    		public static void test4() throws IOException {
    			RandomAccessFile raf = new RandomAccessFile("/Users/heliming/IdeaProjects/democloud/jvm/src/main/java/DelayMain.java", "rw");
    			FileChannel fc = raf.getChannel();
    			//Mapping files to memory
    			MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length());
    			//Chinese
    					Charset charset = Charset.defaultCharset();
    			CharBuffer charBuffer = charset.decode(mbb);
    			while (charBuffer.hasRemaining()) {
    				System.out.print((char) charBuffer.get());
    			}
    			mbb.put(0, (byte) 98); //Modify file
    			raf.close();
    		}
    		public static void test2() throws IOException {
    			//2.
    			ByteBuffer b = ByteBuffer.allocate(15); //15 byte size buffer
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    			for (int i = 0; i < 10; i++) { //Store 10 bytes of data
    				b.put((byte) i);
    			}
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    			b.flip(); //Reset position
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    			for (int i = 0; i < 5; i++) {
    				System.out.print(b.get());
    			}
    			System.out.println();
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    			b.flip();
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    
    		}
    		public static void test3() throws IOException {
    			RandomAccessFile raf = new RandomAccessFile("/Users/heliming/IdeaProjects/democloud/jvm/src/main/java/DelayMain.java", "rw");
    			FileChannel fc = raf.getChannel();
    			//Mapping files to memory
    			MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length());
    			while (mbb.hasRemaining()) {
    				System.out.print((char) mbb.get());
    			}
    			mbb.put(0, (byte) 98); //Modify file
    			raf.close();
    		}
    
    	}
    

  3. Network programming

    Server side

    	import java.io.BufferedReader;
    	import java.io.IOException;
    	import java.io.InputStreamReader;
    	import java.io.PrintWriter;
    	import java.net.ServerSocket;
    	import java.net.Socket;
    	import java.util.concurrent.ExecutorService;
    	import java.util.concurrent.Executors;
    
    	/**
    	 * description: Server side
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    6:12 PM
    	 */
    	public class EchoServer {
    		private static ExecutorService tp = Executors.newCachedThreadPool();
    
    		public static void main(String args[]) {
    			ServerSocket echoServer = null;
    			Socket clientSocket = null;
    
    			try {
    				echoServer = new ServerSocket(8000);
    			} catch (IOException e) {
    				System.out.println(e);
    			}
    			while (true) {
    				try {
    					clientSocket = echoServer.accept();
    					System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
    					tp.execute(new HandleMsg(clientSocket));
    				} catch (IOException e) {
    					System.out.println(e);
    				}
    			}
    		}
    
    		static class HandleMsg implements Runnable {
    
    			private Socket clientSocket ;
    
    			public HandleMsg(Socket socket) {
    				this.clientSocket = socket;
    			}
    
    			public void run() {
    				PrintWriter os = null;
    				BufferedReader is = null;
    				try {
    					is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
    					os = new PrintWriter(clientSocket.getOutputStream(), true);// Read data sent by client from InputStream
    					String inputLine = null;
    					long b = System.currentTimeMillis();
    					while ((inputLine = is.readLine()) != null) {
    						os.println(inputLine);
    					}
    					long e = System.currentTimeMillis();
    					System.out.println("spend:" + (e - b) + "ms");
    				} catch (IOException e) {
    					e.printStackTrace();
    				} finally {
    	//                close resource
    					try {
    						is.close();
    						os.close();
    						clientSocket.close();
    					} catch (IOException e) {
    						e.printStackTrace();
    					}
    				}
    			}
    		}
    
    	}
    

    Client

    	import java.io.BufferedReader;
    	import java.io.IOException;
    	import java.io.InputStreamReader;
    	import java.io.PrintWriter;
    	import java.net.InetSocketAddress;
    	import java.net.Socket;
    
    	/**
    	 * description: Client
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    9:21 PM
    	 */
    	public class EchoServerclient {
    
    		public static void main(String[] args) throws IOException {
    			Socket client = null;
    			PrintWriter writer = null;
    			BufferedReader reader = null;
    			try {
    				client = new Socket();
    				client.connect(new InetSocketAddress("localhost", 8000));
    				writer = new PrintWriter(client.getOutputStream(), true);
    				writer.println("Hello!");
    				writer.flush();
    				reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
    				System.out.println("from server: " + reader.readLine());
    			} catch (Exception e) {
    			} finally {
    				//Resource closes
    				client.close();
    				writer.close();
    				reader.close();
    			}
    		}
    
    	}
    

    Network Programming - Simulate Low Efficiency Client

    Modify client code

    	import java.io.BufferedReader;
    	import java.io.IOException;
    	import java.io.InputStreamReader;
    	import java.io.PrintWriter;
    	import java.net.InetSocketAddress;
    	import java.net.Socket;
    	import java.util.concurrent.ExecutorService;
    	import java.util.concurrent.Executors;
    	import java.util.concurrent.locks.LockSupport;
    
    	/**
    	 * description: Client
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    9:21 PM
    	 */
    	public class EchoServerclient {
    		private static final int sleep_time = 1000 * 1000 * 1000;
    		private static ExecutorService tp = Executors.newCachedThreadPool();
    
    		public static void main(String[] args) {
    			for(int i =0 ;i<10;i++){
    				tp.execute(new EchoClient());
    			}
    			tp.shutdown();
    		}
    
    
    		public static class EchoClient implements Runnable {
    			Socket client = null;
    			PrintWriter writer = null;
    			BufferedReader reader = null;
    			public void run() {
    					try {
    						client = new Socket();
    						client.connect(new InetSocketAddress("localhost", 8000));
    						writer = new PrintWriter(client.getOutputStream(), true);
    						writer.print("H");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("e");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("l");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("l");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("o");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("!");
    						LockSupport.parkNanos(sleep_time);
    						writer.println();
    						writer.flush();
    						reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
    						System.out.println("from server: " + reader.readLine());
    					} catch (Exception e) {
    					} finally {
    						//Resource closes
    
    						try {
    							client.close();
    							writer.close();
    							reader.close();
    						} catch (IOException e) {
    							e.printStackTrace();
    						}
    					}
    			}
    		}
    	}
    

    The server output is as follows:

     spend:6038ms
     spend:6038ms
     spend:6040ms
     spend:6041ms
     spend:6042ms
     spend:6043ms
     spend:6043ms
     spend:6043ms
     spend:6045ms
     spend:6046ms
    
  4. Network Programming-NIO

    The server-side code is replaced by:

    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    9:15 PM
    	 */
    
    	import java.io.IOException;
    	import java.net.InetAddress;
    	import java.net.InetSocketAddress;
    	import java.net.ServerSocket;
    	import java.net.Socket;
    	import java.nio.ByteBuffer;
    	import java.nio.channels.SelectionKey;
    	import java.nio.channels.Selector;
    	import java.nio.channels.ServerSocketChannel;
    	import java.nio.channels.SocketChannel;
    	import java.util.HashMap;
    	import java.util.Iterator;
    	import java.util.LinkedList;
    	import java.util.Map;
    	import java.util.Set;
    	import java.util.concurrent.ExecutorService;
    	import java.util.concurrent.Executors;
    
    	public class NIOEchoServer
    	{
    		private static ExecutorService tp = Executors.newCachedThreadPool();
    
    		public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>();
    
    		private static ServerSocketChannel ssc = null;
    
    		private static Selector selector = null;
    
    		private static final int PORT = 8000;
    
    		public static void startServer() throws IOException
    		{
    			ssc = ServerSocketChannel.open();
    			selector = Selector.open();
    			ssc.configureBlocking(false);
    
    			final ServerSocket serverSocket = ssc.socket();
    
    			serverSocket.bind(new InetSocketAddress(PORT));
    
    			ssc.register(selector, SelectionKey.OP_ACCEPT);
    			while (true)
    			{
    				int n = selector.select();
    				if (n == 0)
    					continue;
    
    				final Set<SelectionKey> readyKeys = selector.selectedKeys();
    				final Iterator<SelectionKey> it = readyKeys.iterator();
    				long e = 0;
    				while (it.hasNext())
    				{
    					final SelectionKey key = it.next();
    					it.remove();
    					if(key.isAcceptable()){
    						doAccept(key);
    					}else if(key.isValid() && key.isReadable()){
    						if (!geym_time_stat.containsKey(((SocketChannel) key
    								.channel()).socket())) {
    							geym_time_stat.put(
    									((SocketChannel) key.channel()).socket(),
    									System.currentTimeMillis());
    						}
    						doRead(key);
    					}else if (key.isValid() && key.isWritable()) {
    						doWrite(key);
    						e = System.currentTimeMillis();
    						long b = geym_time_stat.remove(((SocketChannel) key
    								.channel()).socket());
    						System.out.println("spend:" + (e - b) + "ms");
    					}
    				}
    			}
    		}
    
    		private static void doWrite(SelectionKey key) {
    			SocketChannel channel = (SocketChannel) key.channel();
    			EchoClient echoClient = (EchoClient) key.attachment();
    			LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
    			ByteBuffer bb = outq.getLast();
    			try {
    				int len = channel.write(bb);
    				if (len == -1) {
    					disconnect(key);
    					return;
    				}
    				if (bb.remaining() == 0) {
    					outq.removeLast();
    				}
    			} catch (Exception e) {
    				disconnect(key);
    			}
    			if (outq.size() == 0) {
    				key.interestOps(SelectionKey.OP_READ);
    			}
    		}
    
    		private static void doRead(SelectionKey key) {
    
    			SocketChannel channel = (SocketChannel) key.channel();
    			ByteBuffer bb = ByteBuffer.allocate(8192);
    			int len;
    			try {
    				len = channel.read(bb);
    				if (len < 0) {
    					disconnect(key);
    					return;
    				}
    			} catch (Exception e) {
    				disconnect(key);
    				return;
    			}
    			bb.flip();
    			tp.execute(new NIOEchoServer.HandleMsg(key, bb));
    		}
    		private static void disconnect(SelectionKey sk) {
    			try {
    				sk.channel().close();
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		static class HandleMsg implements Runnable {
    			SelectionKey sk;
    			ByteBuffer bb;
    
    			public HandleMsg(SelectionKey sk, ByteBuffer bb) {
    				super();
    				this.sk = sk;
    				this.bb = bb;
    			}
    
    			@Override
    			public void run() {
    				EchoClient echoClient = (EchoClient) sk.attachment();
    				echoClient.enqueue(bb);
    				sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    				//Force selector to return immediately
    				selector.wakeup();
    			}
    
    		}
    		private static void doAccept(SelectionKey key) {
    			SocketChannel clientChannel= null;
    			ServerSocketChannel server = (ServerSocketChannel) key.channel();
    			try {
    				clientChannel = server.accept();
    				clientChannel.configureBlocking(false);
    				SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
    				EchoClient echoClient = new EchoClient();
    				clientKey.attach(echoClient);
    
    				InetAddress clientAddress = clientChannel.socket().getInetAddress();
    				System.out.println("Acception connection from "+ clientAddress.getHostAddress()+" ! ");
    
    			} catch (IOException e) {
    				System.out.println( " Failed to accept new client.");
    				e.printStackTrace();
    			}
    		}
    
    		public static void main(String[] args) throws IOException
    		{
    			NIOEchoServer.startServer();
    		}
    	}
    	class EchoClient
    	{
    		private LinkedList<ByteBuffer> outq;
    
    		public EchoClient() {
    			this.outq = new LinkedList<ByteBuffer>();
    		}
    
    		public LinkedList<ByteBuffer> getOutputQueue(){
    			return  outq;
    		}
    
    		public void enqueue(ByteBuffer bb){
    			outq.addFirst(bb);
    
    		}
    
    	}
    

    The server output is as follows:

     spend:7ms
     spend:2ms
     spend:2ms
     spend:4ms
     spend:1ms
     spend:1ms
     spend:1ms
     spend:0ms
     spend:0ms
     spend:0ms
    
  5. Network Programming AIO

     Let me know when you have finished reading.
     I won't speed up IO, just notify after reading. 
     Use callback function for business processing
    

    The server

    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    11:43 PM
    	 */
    
    	import java.net.InetSocketAddress;
    	import java.nio.ByteBuffer;
    	import java.nio.channels.AsynchronousServerSocketChannel;
    	import java.nio.channels.AsynchronousSocketChannel;
    	import java.util.concurrent.Future;
    
    	public class Server {
    		public static void main(String[] args) {
    			try {
    				Server server = new Server();
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    
    		public Server() throws Exception {
    			AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
    			InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8000);
    			serverSocketChannel.bind(inetSocketAddress);
    
    			Future<AsynchronousSocketChannel> accept;
    
    			while (true) {
    				// accept() will not block.
    				accept = serverSocketChannel.accept();
    
    				System.out.println("=================");
    				System.out.println("Server Waiting for Connection...");
    				AsynchronousSocketChannel socketChannel = accept.get();// The get() method will block.
    
    				System.out.println("Server accepts connection");
    				System.out.println("Server and" + socketChannel.getRemoteAddress() + "Establish connection");
    
    				ByteBuffer buffer = ByteBuffer.wrap("zhangphil".getBytes());
    				Future<Integer> write=socketChannel.write(buffer);
    
    				while(!write.isDone()) {
    					Thread.sleep(10);
    				}
    
    				System.out.println("The server has finished sending data..");
    				socketChannel.close();
    			}
    		}
    	}
    

    Client

    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/4    8:07 PM
    	 */
    	import java.net.InetSocketAddress;
    	import java.nio.ByteBuffer;
    	import java.nio.channels.AsynchronousSocketChannel;
    	import java.util.concurrent.Future;
    
    	public class Client {
    		public static void main(String[] args) {
    			AsynchronousSocketChannel socketChannel = null;
    			try {
    				socketChannel = AsynchronousSocketChannel.open();
    				InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8000);
    				Future<Void> connect = socketChannel.connect(inetSocketAddress);
    
    				while (!connect.isDone()) {
    					Thread.sleep(10);
    				}
    
    				System.out.println("Establish connection" + socketChannel.getRemoteAddress());
    
    				ByteBuffer buffer = ByteBuffer.allocate(1024);
    				Future<Integer> read = socketChannel.read(buffer);
    
    				while (!read.isDone()) {
    					Thread.sleep(10);
    				}
    
    				System.out.println("Receiving server data:" + new String(buffer.array(), 0, read.get()));
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
     Future interface
     public V get(long timeout, TimeUnit unit) call
    
     private int awaitDone(boolean timed, long nanos) call
    
     park method and unpart method call of LockSupport class
    
      park method and unpark method call of UNSAFE class
    
     The native method blocks the current thread.
    
     Blocking park Method and Unblocking unpark Method
    
     Unsafe (providing CAS operations)
     LockSupport (providing park/unpark operations)
     Will eventually return to the cas operation of the operating system.
    

Tags: Programming Java socket network

Posted on Mon, 07 Oct 2019 02:35:52 -0700 by charmedp3