[RPC] Implementation of RPC framework based on netty+zookeeper step by step

The last one realizes the current limitation of service, and this one realizes the service fuse.

First of all, you should post the GitHub code address, and you can download and run it directly if you want to see the code: https://github.com/whiteBX/wrpc

Under the current micro-service architecture, due to the large number of services and long invocation links, it is likely that some of these services will sometimes be abnormal and lead to unavailability of services, such as bug s caused by publishing, network problems in computer rooms, etc. At this time, if there is no protection mechanism, it is easy to cause service avalanche. At this time, service fuse is introduced, and this article realizes a simple fuse.

First, let's define the state of the fuse, as follows:

The figure shows three states of our fuse: full-open and half-closed.
Their transfer process is as follows:

  1. Initially closed.
  2. When we encounter errors to our preset threshold ratio, it is converted to full open state.
  3. After a certain period of time, the fuse becomes half-open.
  4. A request is allowed in the semi-open state. If the request succeeds, it turns to the closed state, and if it fails, it turns to the fully open state.

Next, look at the code implementation:

public class CircuitUtil {

    // The default request cardinality is reached before the open fuse is judged.
    private static final int DEFAULT_FAIL_COUNT = 5;
    // Semi-open to full open time (milliseconds)
    private static final long DEFAULT_HALF_OPEN_TRANSFER_TIME = 10000;
    // Default Failure Ratio to Open Fuse
    private static final double DEFAULT_FAIL_RATE = 0.8D;
    // Counting pair s succeeds on the left and fails on the right
    private Map<String, Pair<AtomicInteger, AtomicInteger>> counter = new ConcurrentHashMap<>();
    // Current status of fuse
    private volatile CircuitStatusEnum status = CircuitStatusEnum.CLOSE;
    // The last time it was fully open
    private volatile long timestamp;
    private final Semaphore semaphore = new Semaphore(1);

    /**
     * Simple Fuse Flow
     * 1:If the fuse is turned on, the specified information will be returned directly if the fuse is turned on.
     * 2:Execution logic, success or failure are marked with markSuccess markFail
     *
     * @param caller
     * @return
     * @throws Throwable
     */
    public String doCircuit(String methodName, Caller caller, String serverHost, String param) throws Throwable {
        if (isOpen()) {
            return "{\"code\":-1,\"message\":\"circuit break\"}";
        }
        String result;
        result = caller.call(serverHost, param);
        if ("exception".equals(result)) {
            markFail(methodName);
            return "{\"code\":-1,\"message\":\"exception request\"}";
        }
        markSuccess(methodName);
        return result;
    }

    /**
     * Judging whether a fuse is fully open or not is judging whether it is half-open and releasing a request.
     *
     * @return
     */
    private boolean isOpen() {
        boolean openFlag = true;
        // Close
        if (status.equals(CircuitStatusEnum.CLOSE)) {
            openFlag = false;
        }
        // Full open
        if (status.equals(CircuitStatusEnum.OPEN)) {
            // Before half-open time, return to open
            if (System.currentTimeMillis() - timestamp < DEFAULT_HALF_OPEN_TRANSFER_TIME) {
                return openFlag;
            }
            // Half-open time, half-open state, through a request
            if (semaphore.tryAcquire()) {
                status = CircuitStatusEnum.HALF_OPEN;
                timestamp = System.currentTimeMillis();
                openFlag = false;
                semaphore.release();
            }
        }
        return openFlag;
    }

    /**
     * Marking success
     * 1.Half-open state, successfully converted to closed state once
     * 2.Increase the number of successful records in other situations
     *
     * @param operation
     */
    private void markSuccess(String operation) {
        Pair<AtomicInteger, AtomicInteger> pair = counter.get(operation);
        if (pair == null) {
            counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
        }
        // Half-open state, successfully converted to closed state once
        if (status == CircuitStatusEnum.HALF_OPEN) {
            status = CircuitStatusEnum.CLOSE;
            counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
        } else {
            counter.get(operation).getKey().incrementAndGet();
        }
    }

    /**
     * Markup failure
     * 1.Half-open state, failure returns to open state once
     * 2.Other state judgment error ratio decides whether to open the fuse or not
     *
     * @param operation
     */
    private void markFail(String operation) {
        // Failure to change half-open state to full-open, otherwise counting judgment
        if (status == CircuitStatusEnum.HALF_OPEN) {
            status = CircuitStatusEnum.OPEN;
            timestamp = System.currentTimeMillis();
        } else {
            Pair<AtomicInteger, AtomicInteger> pair = counter.get(operation);
            if (pair == null) {
                counter.put(operation, new Pair<>(new AtomicInteger(), new AtomicInteger()));
                pair = counter.get(operation);
            }
            int failCount = pair.getValue().incrementAndGet();
            int successCount = pair.getKey().get();
            int totalCount = failCount + successCount;
            double failRate = (double) failCount / (double) totalCount;
            if (totalCount >= DEFAULT_FAIL_COUNT && failRate > DEFAULT_FAIL_RATE) {
                status = CircuitStatusEnum.OPEN;
                timestamp = System.currentTimeMillis();
            }
        }
    }
}

Then we introduce fuse into our RPC framework and modify the following method of RPC onsumer class:

    public <T> T getBean(final Class<T> clazz, final String appCode) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // Get the server address
                String serverHost = getServer(appCode);
                Span span = SpanBuilder.buildNewSpan(SpanHolder.get(), method.getName(), serverHost, appCode);
                //// TODO: 2018/10/25 Startup Thread Initiates rpc Call Remote Link Tracking Service Recording Tracking Log Instead of Logging Here
                System.out.println("Link tracking, calling remote services:" + JSON.toJSONString(span));
                BaseRequestBO baseRequestBO = buildBaseBO(span, clazz.getName(), method, JSON.toJSONString(args[0]));
                String result = circuitUtil.doCircuit(method.getName(), remoteCaller, serverHost, JSON.toJSONString(baseRequestBO));
                return JSON.parseObject(result, method.getReturnType());
            }
        });
    }

The remoteCaller is an abstract remote call from our modification. The code is as follows:

public interface Caller {

    /**
     * call
     * @param serverHost
     * @param param
     * @return
     */
    String call(String serverHost, String param) ;
}
public class RemoteCaller implements Caller {
    /**
     * netty Client
     */
    private NettyClient nettyClient = new NettyClient();

    /**
     * Remote Call
     *
     * @param serverHost
     * @param param
     * @return
     */
    @Override
    public String call(String serverHost, String param) {
        try {
            if (serverHost == null) {
                System.out.println("Remote Call Error:Currently there is no service provider");
                return "{\"code\":404,\"message\":\"no provider\"}";
            }
            // Connect netty, request and receive response
            RpcClientNettyHandler clientHandler = new RpcClientNettyHandler();
            clientHandler.setParam(param);
            nettyClient.initClient(serverHost, clientHandler);
            String result = clientHandler.process();
            System.out.println(MessageFormat.format("Call the server:{0},Request parameters:{1},Response parameters:{2}", serverHost, param, result));
            return result;
        } catch (Exception e) {
            System.out.println("Remote service invocation failed:" + e);
            return "error";
        }
    }
}

Next, modify the implementation of HelloService Impl:

public class HelloServiceImpl implements HelloService {

    @Override
    public HelloResponse hello(HelloRequest request) {
        System.out.println("The server receives the request,serial number:" + request.getSeq());
        if (request.getSeq() < 0) {
            throw new RuntimeException("seq error");
        }
        HelloResponse response = new HelloResponse();
        response.setCode(200);
        response.setMessage("success:" + request.getSeq());
        return response;
    }
}

Here add an exception thrown when the incoming seq is negative. Next, start the server and client to make calls:

When the client passes in - 1, it returns: {"code":-1,"message":"exception request"}
After five consecutive returns: {"code":-1,"message":"circuit break"}
At this point in the fuse state, even if the positive value of 1 is passed in, it will return: {"code":-1,"message":"circuit break"}
After 10S, it returns to normal at the time of incoming 1, and the service can be restored to normal use.

After the fuse, we can do the corresponding degrading processing, such as a remote call. When we find that the other party's service responds to a large number of timeouts, we can fuse the other party, and then degrade to an alternative solution to continue to implement our method, which will not cause our own services to be unavailable and achieve our own services. High availability purposes.

The above code is on github, you can download it and start a zookeeper with docker.

Tags: JSON github Netty network

Posted on Wed, 04 Sep 2019 23:44:14 -0700 by glossary