How to design a compact and extensible RPC framework? (with implementation code)

brief introduction

If you know something about RPC, you will hear some famous RPC frameworks, such as Dobbo and gRPC. But most people don't really understand the underlying principles of their implementation.

There is a better way to learn: if you want to understand the principle of a framework, you can try to write a simple version of the framework. For example, if you want to understand the idea of Spring IOC, the best way is to implement a small IOC container by yourself, and slowly experience it.

So this article tries to lead you to design a small RPC framework, while maintaining some expansion points for the framework.

By reading this article, you can gain:

  • Understanding the core concepts of RPC framework

  • Learn how to stay expansive when designing frameworks

This article will rely on components that are necessary to implement the RPC framework and will minimize the barriers that these knowledge poses. However, it is better to expect readers to have the following knowledge base:

  • Basic introduction to Zookeeper

  • Netty Basic Introduction

What should the RPC framework look like?

Let's start with: What is an RPC framework?

Our most intuitive feeling is:

After integrating the RPC framework, we configure the address of a registry. An application (called a service provider) exposes an interface, and another application (called a service consumer) references the interface, and then calls it. It's amazing how you can call a method to another application.

It feels like calling a local method. Even if two applications are not in the same JVM, even two applications are not in the same machine.

So how did they do it?

Actually, when our service consumer invokes a method of an RPC interface, its underlying layer will go through dynamic proxy, then through network invocation, go to the machine of the service provider, and then execute the corresponding method.

The result of the method is then returned to the service consumer through network transmission, and then the result can be obtained.

The whole process is as follows:


At this point, someone may ask: How does the service consumer know which port the service provider is on which machine?

At this time, it is necessary for the "registry" to appear, specifically as follows:

  • When the service provider starts, it submits the information of the machine in which it applies to the registry.

  • When service consumers start up, they will need to retrieve the information of the machine where the consumer interface is located.

In this way, the service consumer has a list of machines where the service provider is located.


"Service consumers" get the list of "service providers" and can initiate requests through network requests.

Network client, what should we adopt? There are several options:

  • Use JDK native BIO (that is, the Server Socket set). Blocking IO method can not support high concurrency.

  • Use JDK native NIO (the Selector, Selection Key set). Non-blocking IO can support high concurrency, but its implementation is complex and needs to deal with various network problems.

  • Netty, a well-known NIO framework, naturally supports high concurrency, good packaging and easy API use.

As a aspiring programmer, we need to develop a framework that supports high concurrency, simplicity and speed. Of course, we chose Netty to implement it. We can use some very basic API s of Netty to meet our needs.

Network Protocol Definition

Of course, since we need to use the network to transmit data. We first need to define a set of network protocols.

You may ask again, what is a network protocol?

Network protocol, popular understanding, means that our client should send the data what it looks like, the server can parse out to know what to do. Not much to say, code it:

Suppose we now have two classes of service providers:

// com.study.rpc.test.producer.HelloService
public interface HelloService {
 String sayHello(TestBean testBean);
}
// com.study.rpc.test.producer.TestBean
public class TestBean {
 private String name;
 private Integer age;
 public TestBean(String name, Integer age) {
 this.name = name;
 this.age = age;
 }
 // getter setter
}

Now I'm going to call the method HelloService.sayHello(TestBean testBean)

As a "service consumer", how should we define our request so that the server knows that I am calling this method?

This requires us to generate a unique identifier for this interface information: this identifier records the name of the interface, the specific method, and then the specific parameters!

Then organize the information and send it to the server. My way here is to save the information as a JSON format string for transmission.

For example, the data we transmit through the above interface is probably like this:

{
	"interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello&
	parameter=com.study.rpc.test.producer.TestBean",
	"requestId": "3",
	"parameter": {
		"com.study.rpc.test.producer.TestBean": {
			"age": 20,
			"name": "Zhang San"
		}
	}
}

Well, here I use a JSON to identify which interface this call is calling, where interface identifies the unique class, and parameter identifies the specific parameters inside, where key is the fully qualified class name of the parameter, and value is the JSON information of the class.

Perhaps you can see here, you may have an opinion: data does not necessarily transfer in JSON format, and the use of JSON is not necessarily the highest performance ah.

You use JDK Serializable with Netty's Object Decoder to implement it. Of course, this is an extension point. We should provide a variety of serialization methods for users to choose from.

But the reason for choosing JSON here is that it is more intuitive and reasonable for writing articles.

Developing service providers

Well, after we've worked out the network protocol, we're starting to develop "service providers". For service providers, because we are here to write a simple version of the RPC framework, in order to keep it concise.

We won't introduce container frameworks like Spring, so we need to define a service provider's configuration class, which defines what interface the service provider is, and then what specific instance objects it is:

public class ServiceConfig{
 public Class type;
 public T instance;
 public ServiceConfig(Classtype, T instance) {
 this.type = type;
 this.instance = instance;
 }
 public ClassgetType() {
 return type;
 }
 public void setType(Classtype) {
 this.type = type;
 }
 public T getInstance() {
 return instance;
 }
 public void setInstance(T instance) {
 this.instance = instance;
 }
}

With this, we know which interfaces we need to expose.

In order for the framework to have a unified entry, I defined a class called ApplicationContext, which can be considered as an application context. Its constructor receives two parameters. The code is as follows:

public ApplicationContext(String registryUrl, ListserviceConfigs){
 //1. Save the interface configuration that needs to be exposed
 this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs;
 //step 2: Instance Registry
 initRegistry(registryUrl);
 //step 3: Register the interface to the registry, get the interface from the registry, and initialize the list of service interfaces
 RegistryInfo registryInfo = null;
 InetAddress addr = InetAddress.getLocalHost();
 String hostname = addr.getHostName();
 String hostAddress = addr.getHostAddress();
 registryInfo = new RegistryInfo(hostname, hostAddress, port);
 doRegistry(registryInfo);
 
 
 //step 4: Initialize the Netty server, receive the request, and call directly into the service method of the service provider
 if (!this.serviceConfigs.isEmpty()) {
 //Exposure of interfaces is required
 nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);
 nettyServer.init(port);
 }
}

Registry Design

There are several steps. Firstly, the interface configuration is saved, and then the registry is initialized. Because the registry may provide a variety of options for users, we need to define a registry interface here:

public interface Registry {
 /**
 * Register the producer interface to the registry
 *
 * @param clazz class
 * @param registryInfo Local registration information
 */
 void register(Class clazz, RegistryInfo registryInfo) throws Exception;
}

Here we provide a registration method, the semantics of which is to register the interface corresponding to clazz into the registry. Receiving two parameters, one is the class object of the interface and the other is the registration information.

It contains some basic information about the machine, as follows:

public class RegistryInfo {
 private String hostname;
 private String ip;
 private Integer port;
 public RegistryInfo(String hostname, String ip, Integer port) {
 this.hostname = hostname;
 this.ip = ip;
 this.port = port;
 }
 // getter setter
}

Okay, define the registry and go back to the previous instantiated registry. The code is as follows:

/**
 * Registry
 */
private Registry registry;
private void initRegistry(String registryUrl) {
 if (registryUrl.startsWith("zookeeper://")) {
 registryUrl = registryUrl.substring(12);
 registry = new ZookeeperRegistry(registryUrl);
 } else if (registryUrl.startsWith("multicast://")) {
 registry = new MulticastRegistry(registryUrl);
 }
}

The logic here is also very simple, which is to judge which registry is based on the schema of the url.

The registry implements two implementation classes, zookeeper as the registry and broadcasting as the registry.

Radio Registry is just a demonstration here, which is not implemented internally. We mainly implemented zookeeper registration center.

(Of course, if you're interested, you can have more registries for users to choose from, like redis, just to keep the "expansion point".)

After instantiating the registry, go back to the above code:

Registered Service Provider

//step 3: Register the interface to the registry, get the interface from the registry, and initialize the list of service interfaces
RegistryInfo registryInfo = null;
InetAddress addr = InetAddress.getLocalHost();
String hostname = addr.getHostName();
String hostAddress = addr.getHostAddress();
registryInfo = new RegistryInfo(hostname, hostAddress, port);
doRegistry(registryInfo);

The logic here is simple, that is, to get the basic information of the machine, construct RegistryInfo, and then call the doRegistry method:

/**
 * Interface methods correspond to method objects
 */
private MapinterfaceMethods = new ConcurrentHashMap<>();
private void doRegistry(RegistryInfo registryInfo) throws Exception {
 for (ServiceConfig config : serviceConfigs) {
 Class type = config.getType();
 registry.register(type, registryInfo);
 Method[] declaredMethods = type.getDeclaredMethods();
 for (Method method : declaredMethods) {
 String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method);
 interfaceMethods.put(identify, method);
 }
 }
}

Here are two things to do:

  • Register the interface in the registry

  • For each method of each interface, a unique identifier is generated and stored in the interfaceMethods collection.

The following two things are analyzed separately. The first is the registration method.

Because we used zookeeper, we introduced the client framework curator of zookeeper for convenience.

<dependency>
 <groupId>org.apache.curatorgroupId>
 <artifactId>curator-recipesartifactId>
 <version>2.3.0version>
dependency>

Then look at the code:

public class ZookeeperRegistry implements Registry {
 private CuratorFramework client;
 public ZookeeperRegistry(String connectString) {
 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
 client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
 client.start();
 try {
 Stat myRPC = client.checkExists().forPath("/myRPC");
 if (myRPC == null) {
 client.create()
 .creatingParentsIfNeeded()
 .forPath("/myRPC");
 }
 System.out.println("Zookeeper Client Initialization completed......");
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 @Override
 public void register(Class clazz, RegistryInfo registryInfo) throws Exception {
 //1. When registering, get data from zk first
 //2. Add your own server address to the registry
 //Register a temporary node for each method of each interface, then key is the unique identifier for the interface method, and data is the list of service addresses
 Method[] declaredMethods = clazz.getDeclaredMethods();
 for (Method method : declaredMethods) {
 String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
 String path = "/myRPC/" + key;
 Stat stat = client.checkExists().forPath(path);
 ListregistryInfos;
 if (stat != null) {
 //If the interface has already been registered, take the data back and save your own information.
 byte[] bytes = client.getData().forPath(path);
 String data = new String(bytes, StandardCharsets.UTF_8);
 registryInfos = JSONArray.parseArray(data, RegistryInfo.class);
 if (registryInfos.contains(registryInfo)) {
 //Normally, the temporary node of zk disappears directly after disconnection, but restart often finds that there are nodes, so there is such code
 System.out.println("The address list already contains the local machine[" + key + "],No registration");
 } else {
 registryInfos.add(registryInfo);
 client.setData().forPath(path, JSONArray.toJSONString(registryInfos).getBytes());
 System.out.println("Register to the registry with the following paths:[" + path + "] The information is:" + registryInfo);
 }
 } else {
 registryInfos = new ArrayList<>();
 registryInfos.add(registryInfo);
 client.create()
 .creatingParentsIfNeeded()
 //Temporary nodes, disconnect and close
 .withMode(CreateMode.EPHEMERAL)
 .forPath(path, JSONArray.toJSONString(registryInfos).getBytes());
 System.out.println("Register to the registry with the following paths:[" + path + "] The information is:" + registryInfo);
 }
 }
 }
}

The zookeeper registry establishes a connection when it is initialized. Then, when registering, a unique identifier is generated for each method of the clazz interface.

The InvokeUtils.buildInterfaceMethodIdentify method is used here:

public static String buildInterfaceMethodIdentify(Class clazz, Method method) {
 Map<String, String> map = new LinkedHashMap<>();
 map.put("interface", clazz.getName());
 map.put("method", method.getName());
 Parameter[] parameters = method.getParameters();
 if (parameters.length > 0) {
 StringBuilder param = new StringBuilder();
 for (int i = 0; i < parameters.length; i++) {
 Parameter p = parameters[i];
 param.append(p.getType().getName());
 if (i < parameters.length - 1) {
 param.append(",");
 }
 }
 map.put("parameter", param.toString());
 }
 return map2String(map);
}
public static String map2String(Map<String, String> map) {
 StringBuilder sb = new StringBuilder();
 Iterator<map.entry<string, <="" span="">String>> iterator = map.entrySet().iterator();
 while (iterator.hasNext()) {
 Map.Entry<String, String> entry = iterator.next();
 sb.append(entry.getKey() + "=" + entry.getValue());
 if (iterator.hasNext()) {
 sb.append("&");
 }
 }
 return sb.toString();
}

In fact, the interface method uses their qualified names and parameters to form a unique identity, such as HelloService#sayHello(TestBean) generated roughly as follows:

interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean

The next logic is simple. A temporary node is created under the / myRPC path in Zookeeper. The name of the node is the unique identification of the interface method above us, and the data content is the machine information.

The reason for using temporary nodes is that if the machine goes down and the connection is disconnected, consumers can perceive it through zookeeper's watcher mechanism.

Perhaps it looks like this:

 /myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello&
 parameter=com.study.rpc.test.producer.TestBean
 [
 {
 "hostname":peer1,
 "port":8080
 },
 {
 "hostname":peer2,
 "port":8081
 }
 ]

In this way, the registration information can be obtained when the service is consumed, and then the port on which the machine can be invoked can be known.

Okay, after the registry is finished, let's go back to the second thing that the registry method did. We put each method identified by the interface method into a map:

/**
 * Interface methods correspond to method objects
 */
private Map<String, Method> interfaceMethods = new ConcurrentHashMap<>();

The reason for this is that when we receive a network request, we need to call the method object by reflection, so we save it.

Start the network server to accept requests

Next we can see the fourth step:

//step 4: Initialize the Netty server, receive the request, and call directly into the service method of the service provider
if (!this.serviceConfigs.isEmpty()) {
 //Exposure of interfaces is required
 nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);
 nettyServer.init(port);
}

Because Netty is used here, Netty dependencies need to be introduced:

<dependency>
 <groupId>io.nettygroupId>
 <artifactId>netty-allartifactId>
 <version>4.1.30.Finalversion>
dependency>

Next comes the analysis:

public class NettyServer {
 /**
 * handler responsible for calling methods
 */
 private RpcInvokeHandler rpcInvokeHandler;
 public NettyServer(ListserverConfigs, MapinterfaceMethods) throws InterruptedException {
 this.rpcInvokeHandler = new RpcInvokeHandler(serverConfigs, interfaceMethods);
 }
 public int init(int port) throws Exception {
 EventLoopGroup bossGroup = new NioEventLoopGroup();
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 ServerBootstrap b = new ServerBootstrap();
 b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 1024)
 .childHandler(new ChannelInitializer() {
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ByteBuf delimiter = Unpooled.copiedBuffer("$$");
 //Set the message to be segmented according to the delimiter "&" and limit the single message to 1MB
 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter));
 ch.pipeline().addLast(new StringDecoder());
 ch.pipeline().addLast().addLast(rpcInvokeHandler);
 }
 });
 ChannelFuture sync = b.bind(port).sync();
 System.out.println("start-up NettyService,The port is:" + port);
 return port;
 }
}

This part is mainly the api of netty. We don't do too much about it. Let's just talk about it briefly.

  • We use'&'as an identifier to distinguish two pieces of information, and then the maximum length of one piece of information is 1MB.

  • All the logic is in RpcInvokeHandler, where the configured service interface instances are passed in, and each interface method of the service interface instance uniquely identifies the corresponding Map set of Method objects.

public class RpcInvokeHandler extends ChannelInboundHandlerAdapter {
 /**
 * Interface method uniquely identifies the corresponding Method object
 */
 private Map<String, Method> interfaceMethods;
 /**
 * Implementation class corresponding to interface
 */
 private Map<class, Object> interfaceToInstance;
 /**
 * Thread pool, write at random, don't tuck.
 */
 private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,
 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
 new ThreadFactory() {
 AtomicInteger m = new AtomicInteger(0);
 @Override
 public Thread newThread(Runnable r) {
 return new Thread(r, "IO-thread-" + m.incrementAndGet());
 }
 });
 public RpcInvokeHandler(ListserviceConfigList,
 Map<String, Method> interfaceMethods) {
 this.interfaceToInstance = new ConcurrentHashMap<>();
 this.interfaceMethods = interfaceMethods;
 for (ServiceConfig config : serviceConfigList) {
 interfaceToInstance.put(config.getType(), config.getInstance());
 }
 }
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 try {
 String message = (String) msg;
 //What you get here is a bunch of JSON data, which is parsed into Request objects.
 //In fact, when parsing network data here, you can use serialization to specify an interface to serialize JSON format or other serialization.
 //But the demo version is all right.
 System.out.println("Received message:" + msg);
 RpcRequest request = RpcRequest.parse(message, ctx);
 threadPoolExecutor.execute(new RpcInvokeTask(request));
 } finally {
 ReferenceCountUtil.release(msg);
 }
 }
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 ctx.flush();
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 System.out.println("An anomaly has occurred...." + cause);
 cause.printStackTrace();
 ctx.close();
 }
 public class RpcInvokeTask implements Runnable {
 private RpcRequest rpcRequest;
 RpcInvokeTask(RpcRequest rpcRequest) {
 this.rpcRequest = rpcRequest;
 }
 @Override
 public void run() {
 try {
 /*
 * The data is probably like this.
 * {"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello¶meter=com
 * .study.rpc.test.producer.TestBean","requestId":"3","parameter":{"com.study.rpc.test.producer
 * .TestBean":{"age":20,"name":"Zhang San'}}
 */
 //Here you want to get a specific declaration for each interface of each service object
 String interfaceIdentity = rpcRequest.getInterfaceIdentity();
 Method method = interfaceMethods.get(interfaceIdentity);
 Map<String, String> map = string2Map(interfaceIdentity);
 String interfaceName = map.get("interface");
 Class interfaceClass = Class.forName(interfaceName);
 Object o = interfaceToInstance.get(interfaceClass);
 String parameterString = map.get("parameter");
 Object result;
 if (parameterString != null) {
 String[] parameterTypeClass = parameterString.split(",");
 Map<String, Object> parameterMap = rpcRequest.getParameterMap();
 Object[] parameterInstance = new Object[parameterTypeClass.length];
 for (int i = 0; i < parameterTypeClass.length; i++) {
 String parameterClazz = parameterTypeClass[i];
 parameterInstance[i] = parameterMap.get(parameterClazz);
 }
 result = method.invoke(o, parameterInstance);
 } else {
 result = method.invoke(o);
 }
 //Write back the response
 ChannelHandlerContext ctx = rpcRequest.getCtx();
 String requestId = rpcRequest.getRequestId();
 RpcResponse response = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity,
 requestId);
 String s = JSONObject.toJSONString(response) + "$$";
 ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());
 ctx.writeAndFlush(byteBuf);
 System.out.println("Response to client:" + s);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 
 public static Map<String, String> string2Map(String str) {
 String[] split = str.split("&");
 Map<String, String> map = new HashMap<>(16);
 for (String s : split) {
 String[] split1 = s.split("=");
 map.put(split1[0], split1[1]);
 }
 return map;
 }
 }
}

Here is the logic above:

The channelRead method is used to receive messages, which are the data in the JSON format we analyzed earlier, and then we parse the messages into RpcRequest.

public class RpcRequest {
 private String interfaceIdentity;
 private Map<String, Object> parameterMap = new HashMap<>();
 private ChannelHandlerContext ctx;
 private String requestId;
 public static RpcRequest parse(String message, ChannelHandlerContext ctx) throws ClassNotFoundException {
 /*
 * {
 * "interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello2¶meter=java.lang
 * .String,com.study.rpc.test.producer.TestBean",
 * "parameter":{
 * "java.lang.String":"haha",
 * "com.study.rpc.test.producer.TestBean":{
 * "name":"Xiao Wang,
 * "age":20
 * }
 * }
 * }
 */
 JSONObject jsonObject = JSONObject.parseObject(message);
 String interfaces = jsonObject.getString("interfaces");
 JSONObject parameter = jsonObject.getJSONObject("parameter");
 Set<String> strings = parameter.keySet();
 RpcRequest request = new RpcRequest();
 request.setInterfaceIdentity(interfaces);
 Map<String, Object> parameterMap = new HashMap<>(16);
 String requestId = jsonObject.getString("requestId");
 for (String key : strings) {
 if (key.equals("java.lang.String")) {
 parameterMap.put(key, parameter.getString(key));
 } else {
 Class clazz = Class.forName(key);
 Object object = parameter.getObject(key, clazz);
 parameterMap.put(key, object);
 }
 }
 request.setParameterMap(parameterMap);
 request.setCtx(ctx);
 request.setRequestId(requestId);
 return request;
 }
}

Then we parse the interface that needs to be invoked from the request and invoke the corresponding interface through reflection. After we get the result, we encapsulate the response as PrcResponse and write it back to the client:

public class RpcResponse {
 private String result;
 private String interfaceMethodIdentify;
 private String requestId;
 public String getResult() {
 return result;
 }
 public void setResult(String result) {
 this.result = result;
 }
 public static RpcResponse create(String result, String interfaceMethodIdentify, String requestId) {
 RpcResponse response = new RpcResponse();
 response.setResult(result);
 response.setInterfaceMethodIdentify(interfaceMethodIdentify);
 response.setRequestId(requestId);
 return response;
 }
}

It contains the result of the request JSON string, the unique identification of the interface method, and the request ID. The data probably looks like this:

{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean","requestId":"3",
"result":"\"Fucking great,I got the message: TestBean{name='Zhang San', age=20}\""}

With this information, the client can parse the response results.

Test Service Provider

Now that we have finished the code, we need to test it:

First, we write an implementation class of HelloService.

public class HelloServiceImpl implements HelloService {
 @Override
 public String sayHello(TestBean testBean) {
 return "Fucking great,I got the message:" + testBean;
 }
}

Then write the service provider code:

public class TestProducer {
 public static void main(String[] args) throws Exception {
 String connectionString = "zookeeper://localhost1:2181,localhost2:2181,localhost3:2181";
 HelloService service = new HelloServiceImpl();
 ServiceConfig config = new ServiceConfig<>(HelloService.class, service);
 ListserviceConfigList = new ArrayList<>();
 serviceConfigList.add(config);
 ApplicationContext ctx = new ApplicationContext(connectionString, serviceConfigList,
 null, 50071);
 }
}

Then start up and see the log:

Zookeeper Client is initialized...
Register to the registry with the following path: [myRPC/interface = com.study.rpc.test.producer.HelloService]&
method=sayHello¶meter=com.study.rpc.test.producer.TestBean]
Information: RegistryInfo{hostname='localhost', ip='192.168.16.7', port=50071}
Start NettyService, port: 50071

At this point, we expect to send requests with NettyClient:

{
	"interfaces": "interface=com.study.rpc.test.producer.HelloService&
	method=sayHello¶meter=com.study.rpc.test.producer.TestBean",
	"requestId": "3",
	"parameter": {
		"com.study.rpc.test.producer.TestBean": {
			"age": 20,
			"name": "Zhang San"
		}
	}
}

The response should be:

{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean","requestId":"3",
"result":"\"Fucking great,I got the message: TestBean{name='Zhang San', age=20}\""}

Then, you can write a test program (this program is only used for intermediate testing, and the reader does not need to understand it):

public class NettyClient {

public static void main(String[] args) {

EventLoopGroup group = new NioEventLoopGroup();

try {

Bootstrap b = new Bootstrap();

b.group(group)

.channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new StringDecoder());

ch.pipeline().addLast(new NettyClientHandler());

}

});

ChannelFuture sync = b.connect("127.0.0.1", 50071).sync();

sync.channel().closeFuture().sync();

} catch (Exception e) {

e.printStackTrace();

} finally {

group.shutdownGracefully();

}

}

private static class NettyClientHandler extends ChannelInboundHandlerAdapter {

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

JSONObject jsonObject = new JSONObject();

jsonObject.put("interfaces", "interface=com.study.rpc.test.producer" +

".HelloService&method=sayHello¶meter=com.study.rpc.test.producer.TestBean");

JSONObject param = new JSONObject();

JSONObject bean = new JSONObject();

bean.put("age", 20);

bean.put("name", "Zhang San");

param.put("com.study.rpc.test.producer.TestBean", bean);

jsonObject.put("parameter", param);

jsonObject.put("requestId", 3);

System.out.println("Send to server JSON for:" + jsonObject.toJSONString()));

String msg = jsonObject.toJSONString() + "$$";

ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);

byteBuf.writeBytes(msg.getBytes());

ctx.writeAndFlush(byteBuf);

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("Received message:" +msg);

}

}

}

After startup, you see the console output:

The JSON sent to the server is: {interfaces":"interface = com. study. rpc. test. producer. HelloService & method = sayHellolo Service & method&

parameter=com.study.rpc.test.producer.TestBean","requestId":3,

"parameter":{"com.study.rpc.test.producer.TestBean":{"name": "Zhang San", "age":20}}}

Received message: {"interfaceMethodIdentify": "interface = com. study. rpc. test. producer. HelloService"&

method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"3",

"result": "" Niubi, I received a message: TestBean{name='Zhang San', age=20}"}

Bigo, the perfect service provider of RPC. Next we just need to implement the service consumer.

Developing Service Consumers

Service consumers are treated the same way. We also need to define a consumer configuration:

public class ReferenceConfig{
 private Class type;
 public ReferenceConfig(Classtype) {
 this.type = type;
 }
 public ClassgetType() {
 return type;
 }
 public void setType(Classtype) {
 this.type = type;
 }
}

Then we are the Unified Entry, modifying the code in the Application Context:

public ApplicationContext(String registryUrl, ListserviceConfigs,
 ListreferenceConfigs, int port) throws Exception {
 //step 1: Preserving service providers and consumers
 this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs;
 this.referenceConfigs = referenceConfigs == null ? new ArrayList<>() : referenceConfigs;
 // ....
 
}
private void doRegistry(RegistryInfo registryInfo) throws Exception {
 for (ServiceConfig config : serviceConfigs) {
 Class type = config.getType();
 registry.register(type, registryInfo);
 Method[] declaredMethods = type.getDeclaredMethods();
 for (Method method : declaredMethods) {
 String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method);
 interfaceMethods.put(identify, method);
 }
 }
 for (ReferenceConfig config : referenceConfigs) {
 ListregistryInfos = registry.fetchRegistry(config.getType());
 if (registryInfos != null) {
 interfacesMethodRegistryList.put(config.getType(), registryInfos);
 initChannel(registryInfos);
 }
 }
}

At the time of registration, we need to capture the consumer interface through the registry, so the registry needs to add an interface method:

public interface Registry {
 /**
 * Register the producer interface to the registry
 *
 * @param clazz class
 * @param registryInfo Local registration information
 */
 void register(Class clazz, RegistryInfo registryInfo) throws Exception;
 /**
 * Grab the registry for the service provider
 *
 * @param clazz class
 * @return List of machines where service providers are located
 */
 ListfetchRegistry(Class clazz) throws Exception;
}

Get a list of machines for service providers

The implementation in Zookeeper is as follows:

@Override
public ListfetchRegistry(Class clazz) throws Exception {
 Method[] declaredMethods = clazz.getDeclaredMethods();
 ListregistryInfos = null;
 for (Method method : declaredMethods) {
 String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
 String path = "/myRPC/" + key;
 Stat stat = client.checkExists()
 .forPath(path);
 if (stat == null) {
 //watcher can be added here to monitor changes, which is simplified and not done.
 System.out.println("Warning: Service interface could not be found:" + path);
 continue;
 }
 if (registryInfos == null) {
 byte[] bytes = client.getData().forPath(path);
 String data = new String(bytes, StandardCharsets.UTF_8);
 registryInfos = JSONArray.parseArray(data, RegistryInfo.class);
 }
 }
 return registryInfos;
}

In fact, it is to go to zookeeper to get the data in the node, get the machine information where the interface is located, and get the registration information, the princes will call the following code:

if (registryInfos != null) {
 //Save interface and service address
 interfacesMethodRegistryList.put(config.getType(), registryInfos);
 //Initialization of network connections
 initChannel(registryInfos);
}
private void initChannel(ListregistryInfos) throws InterruptedException {
 for (RegistryInfo info : registryInfos) {
 if (!channels.containsKey(info)) {
 System.out.println("Start establishing connections:" + info.getIp() + ", " + info.getPort());
 NettyClient client = new NettyClient(info.getIp(), info.getPort());
 client.setMessageCallback(message -> {
 //Here, the message returned by the billing server is queued first
 RpcResponse response = JSONObject.parseObject(message, RpcResponse.class);
 responses.offer(response);
 synchronized (ApplicationContext.this) {
 ApplicationContext.this.notifyAll();
 }
 });
 //Waiting for the connection to be established
 ChannelHandlerContext ctx = client.getCtx();
 channels.put(info, ctx);
 }
 }
}

We will establish a connection for each unique RegistryInfo, and then have a piece of code like this:

client.setMessageCallback(message -> {
 //Here, the message returned by the billing server is queued first
 RpcResponse response = JSONObject.parseObject(message, RpcResponse.class);
 responses.offer(response);
 synchronized (ApplicationContext.this) {
 ApplicationContext.this.notifyAll();
 }
});

Set up a callback to call back the code here when you receive the message, which we will analyze later.

Then, when client.getCtx(), synchronize blocking until the connection is completed. After the connection is established, it passes through. The code of NettyClient is as follows:

public class NettyClient {
 private ChannelHandlerContext ctx;
 private MessageCallback messageCallback;
 public NettyClient(String ip, Integer port) {
 EventLoopGroup group = new NioEventLoopGroup();
 try {
 Bootstrap b = new Bootstrap();
 b.group(group)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer() {
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ByteBuf delimiter = Unpooled.copiedBuffer("$$".getBytes());
 //Set the message to be segmented according to the delimiter "&" and limit the single message to 1MB
 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter));
 ch.pipeline().addLast(new StringDecoder());
 ch.pipeline().addLast(new NettyClientHandler());
 }
 });
 ChannelFuture sync = b.connect(ip, port).sync();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 public void setMessageCallback(MessageCallback callback) {
 this.messageCallback = callback;
 }
 public ChannelHandlerContext getCtx() throws InterruptedException {
 System.out.println("Waiting for the connection to succeed...");
 if (ctx == null) {
 synchronized (this) {
 wait();
 }
 }
 return ctx;
 }
 private class NettyClientHandler extends ChannelInboundHandlerAdapter {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 try {
 String message = (String) msg;
 if (messageCallback != null) {
 messageCallback.onMessage(message);
 }
 } finally {
 ReferenceCountUtil.release(msg);
 }
 }
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
 NettyClient.this.ctx = ctx;
 System.out.println("Successful connection:" + ctx);
 synchronized (NettyClient.this) {
 NettyClient.this.notifyAll();
 }
 }
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 ctx.flush();
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 }
 }
 public interface MessageCallback {
 void onMessage(String message);
 }
}

Here we mainly use wait() and notifyAll() to achieve synchronous blocking waiting for connection establishment.

After establishing the connection, we save it in the collection:

//Waiting for the connection to be established
ChannelHandlerContext ctx = client.getCtx();
channels.put(info, ctx);

Send requests

Well, here we have established a network connection for every interface that needs to be consumed. The next thing we need to do is to provide an interface to the user to obtain an instance of the service provider:

I write this method in Application Context:

/**
 * Classes responsible for generating requestId
 */
private LongAdder requestIdWorker = new LongAdder();
/**
 * Get the invocation service
 */
@SuppressWarnings("unchecked")
publicT getService(Classclazz) {
 return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 String methodName = method.getName();
 if ("equals".equals(methodName) || "hashCode".equals(methodName)) {
 throw new IllegalAccessException("Not accessible" + methodName + "Method");
 }
 if ("toString".equals(methodName)) {
 return clazz.getName() + "#" + methodName;
 }
 //step 1: Get a list of service addresses
 ListregistryInfos = interfacesMethodRegistryList.get(clazz);
 if (registryInfos == null) {
 throw new RuntimeException("Unable to find service provider");
 }
 //step 2: Load balancing
 RegistryInfo registryInfo = loadBalancer.choose(registryInfos);
 ChannelHandlerContext ctx = channels.get(registryInfo);
 String identify = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
 String requestId;
 synchronized (ApplicationContext.this) {
 requestIdWorker.increment();
 requestId = String.valueOf(requestIdWorker.longValue());
 }
 Invoker invoker = new DefaultInvoker(method.getReturnType(), ctx, requestId, identify);
 inProgressInvoker.put(identify + "#" + requestId, invoker);
 return invoker.invoke(args);
 }
 });
}

This is mainly achieved by dynamic proxy. Firstly, the corresponding list of machines is obtained by class, and then a machine is selected by load Balancer. This LoaderBalance is an interface:

public interface LoadBalancer {
 /**
 * Choose a producer
 *
 * @param registryInfos Producer List
 * @return Selected producers
 */
 RegistryInfo choose(ListregistryInfos);
}

Different implementations can be chosen when the Application Context is initialized. Here I mainly implement a simple random algorithm (which can be extended to other implementations, such as Round Robin):

public class RandomLoadbalancer implements LoadBalancer {
 @Override
 public RegistryInfo choose(ListregistryInfos) {
 Random random = new Random();
 int index = random.nextInt(registryInfos.size());
 return registryInfos.get(index);
 }
}

Then we construct the unique identifier identification of the interface method and a request Id.

Why do you need a requestId?

This is because when we are dealing with a response, we need to find out which request a response corresponds to, but identify alone is not feasible.

Because we may have multiple threads in the same application calling the same method of the same interface at the same time, such identify is the same.

So we need to identify + requestId to judge that reqeustId is a self-increasing LongdAddr. The requestId is returned by the server when it responds.

Then we construct an Invoker and put it into the set of in Progress Invoker. Called its invoke method:

Invoker invoker = new DefaultInvoker(method.getReturnType(), ctx, requestId, identify);
inProgressInvoker.put(identify + "#" + requestId, invoker);
//Blocking waiting for results
return invoker.invoke(args);
public class DefaultInvokerimplements Invoker{
 private ChannelHandlerContext ctx;
 private String requestId;
 private String identify;
 private Cla***eturnType;
 private T result;
 DefaultInvoker(Cla***eturnType, ChannelHandlerContext ctx, String requestId, String identify) {
 this.returnType = returnType;
 this.ctx = ctx;
 this.requestId = requestId;
 this.identify = identify;
 }
 @SuppressWarnings("unckecked")
 @Override
 public T invoke(Object[] args) {
 JSONObject jsonObject = new JSONObject();
 jsonObject.put("interfaces", identify);
 JSONObject param = new JSONObject();
 if (args != null) {
 for (Object obj : args) {
 param.put(obj.getClass().getName(), obj);
 }
 }
 jsonObject.put("parameter", param);
 jsonObject.put("requestId", requestId);
 System.out.println("Send to server JSON For:" + jsonObject.toJSONString());
 String msg = jsonObject.toJSONString() + "$$";
 ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);
 byteBuf.writeBytes(msg.getBytes());
 ctx.writeAndFlush(byteBuf);
 waitForResult();
 return result;
 }
 @Override
 public void setResult(String result) {
 synchronized (this) {
 this.result = JSONObject.parseObject(result, returnType);
 notifyAll();
 }
 }
 private void waitForResult() {
 synchronized (this) {
 try {
 wait();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
}

We can see that after calling Invoker's invoke method, it runs to waitForResult(), where requests are sent over the network, but they get stuck.

This is because the result of our network request is not synchronized return, it may be that the client initiates many requests at the same time, so we can not let him synchronize blocking waiting here.

Acceptance response

Then for service consumers, the request is sent out but stuck, and when the server is finished processing, the message will be returned to the client. The entry to return is in

In NettyClient's onChannelRead:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 try {
 String message = (String) msg;
 if (messageCallback != null) {
 messageCallback.onMessage(message);
 }
 } finally {
 ReferenceCountUtil.release(msg);
 }
}

Callback is used here. Remember that when we initialize NettyClient, we set a callback?

/**
 * Response queue
 */
private ConcurrentLinkedQueueresponses = new ConcurrentLinkedQueue<>();
client.setMessageCallback(message -> {
 //Here, the message returned by the billing server is queued first
 RpcResponse response = JSONObject.parseObject(message, RpcResponse.class);
 responses.offer(response);
 synchronized (ApplicationContext.this) {
 ApplicationContext.this.notifyAll();
 }
});

After receiving the message here, we parse it into a RpcResponse object and then press it into the responses queue, so that we put all the request responses into the queue.

But then, how do we return the response result to the calling place?

We can do this: start one or more background threads, pull out the response from the queue, find the corresponding Invoker from our previously saved inProcess Invoker, and return the result.

public ApplicationContext(....){
 
 //.....
 
 //step 5: Start the processor that processes the response
 initProcessor();
 
}
private void initProcessor() {
 //In fact, how many processor s can be started here by reading the configuration file
 int num = 3;
 processors = new ResponseProcessor[num];
 for (int i = 0; i < 3; i++) {
 processors[i] = createProcessor(i);
 }
}
/**
 * Threads that process responses
 */
private class ResponseProcessor extends Thread {
 @Override
 public void run() {
 System.out.println("Start response processing threads:" + getName());
 while (true) {
 //Multiple threads get responses here, only one success
 RpcResponse response = responses.poll();
 if (response == null) {
 try {
 synchronized (ApplicationContext.this) {
 //If there is no response, sleep first
 ApplicationContext.this.wait();
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 } else {
 System.out.println("A response was received:" + response);
 String interfaceMethodIdentify = response.getInterfaceMethodIdentify();
 String requestId = response.getRequestId();
 String key = interfaceMethodIdentify + "#" + requestId;
 Invoker invoker = inProgressInvoker.remove(key);
 invoker.setResult(response.getResult());
 }
 }
 }
}

If the data is not available from the queue, the wait() method is called to wait.

Note here that when we get a response in callbak, we call notifyAll() to wake up the threads here:

responses.offer(response);
synchronized (ApplicationContext.this) {
 ApplicationContext.this.notifyAll();
}

After waking up here, there will be multiple threads competing for that response, because the queue is thread-safe, so multiple threads can get the response results here.

After the result is obtained, the unique request identifier is constructed by ident + requestId, the corresponding invoker is obtained from in Progress Invoker, and the result is set in by setResult:

String key = interfaceMethodIdentify + "#" + requestId;
Invoker invoker = inProgressInvoker.remove(key);
invoker.setResult(response.getResult());
@Override
public void setResult(String result) {
 synchronized (this) {
 this.result = JSONObject.parseObject(result, returnType);
 notifyAll();
 }
}

Once set in here, the result is deserialized by json to the result that the user needs, and then the notifyAll method is called to wake up the blocked thread of the invoke method:

 @SuppressWarnings("unckecked")
 @Override
 public T invoke(Object[] args) {
 JSONObject jsonObject = new JSONObject();
 jsonObject.put("interfaces", identify);
 JSONObject param = new JSONObject();
 if (args != null) {
 for (Object obj : args) {
 param.put(obj.getClass().getName(), obj);
 }
 }
 jsonObject.put("parameter", param);
 jsonObject.put("requestId", requestId);
 System.out.println("Send to server JSON For:" + jsonObject.toJSONString());
 String msg = jsonObject.toJSONString() + NettyServer.DELIMITER;
 ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);
 byteBuf.writeBytes(msg.getBytes());
 ctx.writeAndFlush(byteBuf);
 //Here awakened
 waitForResult();
 return result;
 }

Then you can return the result, which will be returned to the user.

Overall testing

Here we have finished the code for both producers and consumers. Let's test it as a whole. The producer's code is consistent with the previous code:

public class TestProducer {
 public static void main(String[] args) throws Exception {
 String connectionString = "zookeeper://localhost1:2181,localhost2:2182,localhost3:2181";
 HelloService service = new HelloServiceImpl();
 ServiceConfig config = new ServiceConfig<>(HelloService.class, service);
 ListserviceConfigList = new ArrayList<>();
 serviceConfigList.add(config);
 ApplicationContext ctx = new ApplicationContext(connectionString, serviceConfigList, null, 50071);
 }
}

Consumer test code:

public class TestConsumer {
 public static void main(String[] args) throws Exception {
 String connectionString = "zookeeper://localhost1:2181,localhost2:2182,localhost3:2181";
 ReferenceConfigconfig = new ReferenceConfig<>(HelloService.class);
 ApplicationContext ctx = new ApplicationContext(connectionString, null, Collections.singletonList(config),
 50070);
 HelloService helloService = ctx.getService(HelloService.class);
 System.out.println("sayHello(TestBean)The results are as follows:" + helloService.sayHello(new TestBean("Zhang San", 20)));
 }
}

Then start the producer, and then start the consumer:

The logs obtained by the producers are as follows:

Zookeeper Client is initialized...
Register to the registry with the following path: [myRPC/interface = com.study.rpc.test.producer.HelloService]&
method=sayHello¶meter=com.study.rpc.test.producer.TestBean]
Information: RegistryInfo{hostname='localhost', ip='192.168.16.7', port=50071}
Start NettyService, port: 50071
 Start Response Processing Thread: Response-processor-0
 Start Response Processing Thread: Response-processor-2
 Start Response Processing Thread: Response-processor-1
 Received message: {interfaces":"interface = com. study. rpc. test. producer. HelloService"&
method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"1",
"parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name": "Zhang San"}}
Response to client: {"interfaceMethodIdentify": "interface = com. study. rpc. test. producer. HelloService"&
method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"1",
"result": "" Niubi, I received a message: TestBean{name='Zhang San', age=20}"}

Consumers get logs as follows:

Zookeeper Client initialization completed...

Start building connections: 192.168.16.7, 50071

Waiting for the connection to succeed...

Start Response Processing Thread: Response-processor-1

Start Response Processing Thread: Response-processor-0

Start Response Processing Thread: Response-processor-2

Connection success: ChannelHandlerContext(NettyClient$NettyClientHandler#0,

[id: 0xb7a59701, L:/192.168.16.7:58354 - R:/192.168.16.7:50071])

The JSON sent to the server is: {"interfaces": "interface = com. study. rpc. test. producer. HelloService"&

method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"1",

"parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name": "Zhang San"}}

Received a response: RpcResponse{result='"Niubi, I received a message: TestBean{name='Zhang San', age=20}',

interfaceMethodIdentify='interface=com.study.rpc.test.producer.HelloService&

method=sayHello¶meter=com.study.rpc.test.producer.TestBean', requestId='1'}

Say Hello (TestBean) results: Niubi, I received a message: TestBean{name='Zhang San', age=20}

summary

By completing this RPC framework, you should have a general perceptual understanding of the principles of RPC implementation. Here we summarize the characteristics:

  • Support multiple registries, configurable (although only zookeeper is implemented, but our expansion is very simple)

  • Supporting load balancing

Of course, there are many shortcomings, which is undeniable. The frameworks written at will are different from those used at the industrial level.

I'll list some imperfections here.

  • Extension of serialization framework, multiple serialization for users to choose

  • Network request error handling, which is very simple to implement, very poor robustness

  • Registry does not support fault awareness and automatic recovery

  • Call monitoring, performance indicators


Tags: Java Zookeeper network JSON Netty

Posted on Sun, 08 Sep 2019 06:12:29 -0700 by venom999