7. Invoker Call for Data Processing of Request or Response

In the previous section, we analyzed the parsing of the request body, and finally resolved it into a Request. The value held by the Request is an Invocation. Combined with the exposure of the service in Section 4, we know that dubbo stores an Exporter in the protocol of the exposed service.

//group / interface name: version: port - > Exporter
Map<String, Exporter<?>> exporterMap

Exporter holds invoker

The invoker was originally created at com. alibaba. dubbo. config. ServiceConfig # doExportUrls For1 Protocol, with the following code:

//(*1*)
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//This class maintains the relationship between invoker and ServiceConfig.
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);,

//(*1*)
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    //This Wrapper's processing logic is no longer verbose, simply to provide method information, attribute information, and by passing in object and method name, method parameters directly call methods (not reflection calls).
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    //Create Invoker objects
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

This Delegate Provider MetaData Invoker instance was decorated with a stack of filters when it was introduced to com. alibaba. dubbo. rpc. protocol. Protocol Filter Wrapper# export, forming a chain of responsibility.

private static <T> Invoker<T> com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper#buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    //Filter for all composite conditions obtained by SPI
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        //Constructing the Chain of Responsibility
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

This chain is currently filtered like this
EchoFilter -> ClassLoaderFilter -> GenericFilter -> ContextFilter -> TraceFilter -> TimeoutFilter -> MonitorFilter -> ExceptionFilter

public Result EchoFilter#invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
    //If the interface invoked is the $echo method of the EchoService interface, get the first parameter and return it directly
    //The effect is that the client enters anything and eventually returns the same content, which is generally used for echo testing.
    if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
        return new RpcResult(inv.getArguments()[0]);
    return invoker.invoke(inv);
}
                                                    |
                                                    V
public Result ClassLoaderFilter#invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    //Replace the class loading used by the current thread with the class loading loaded by invoker to avoid the mistake that the class loader of the current thread cannot load because invoker's class loading has classes that can only be loaded by it.
    ClassLoader ocl = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
    try {
        return invoker.invoke(invocation);
    } finally {
        Thread.currentThread().setContextClassLoader(ocl);
    }
}
                                                    |
                                                    V
public Result GenericFilter#invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
    //Check whether the current invocation is a generalized invocation
    if (inv.getMethodName().equals(Constants.$INVOKE)
            && inv.getArguments() != null
            && inv.getArguments().length == 3
            && !invoker.getInterface().equals(GenericService.class)) {
            //The first parameter represents the method name
        String name = ((String) inv.getArguments()[0]).trim();
        //Method parameter type array
        String[] types = (String[]) inv.getArguments()[1];
        //Array of method parameter values
        Object[] args = (Object[]) inv.getArguments()[2];
        try {
            //Getting the corresponding method signature from the current interface
            Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
            //Parameter type
            Class<?>[] params = method.getParameterTypes();
            //If no parameters are passed, an empty array of parameters is constructed
            if (args == null) {
                args = new Object[params.length];
            }
            //The generalization key is obtained from the additional parameters, which is used to specify what serialization methods the parameter uses in the key.
            String generic = inv.getAttachment(Constants.GENERIC_KEY);

            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
            }
            //If it's empty, try to convert value to type in a generic way
            if (StringUtils.isEmpty(generic)
                    || ProtocolUtils.isDefaultGenericSerialization(generic)) {
                args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
                //Object Input Stream for java
            } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (byte[].class == args[i].getClass()) {
                        try {
                            UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]);
                            args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                    .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                                    .deserialize(null, is).readObject();
                        } catch (Exception e) {
                            throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                        }
                    } else {
                        . . . . . . Eliminate some exception codes
                    }
                }
                //Using JavaBean serialization, this is not used
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (args[i] instanceof JavaBeanDescriptor) {
                        args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                    } else {
                       . . . . . . Eliminate some exception codes
            }
            //Call the interface to get the result
            Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
            if (result.hasException()
                    && !(result.getException() instanceof GenericException)) {
                    //Constructing generalized call exception
                return new RpcResult(new GenericException(result.getException()));
            }
            //The results are serialized in the original serialization mode, and then returned
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    ExtensionLoader.getExtensionLoader(Serialization.class)
                            .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                            .serialize(null, os).writeObject(result.getValue());
                            /
                    return new RpcResult(os.toByteArray());
                } catch (IOException e) {
                    throw new RpcException("Serialize result failed.", e);
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD));
            } else {
                return new RpcResult(PojoUtils.generalize(result.getValue()));
            }
        } catch (NoSuchMethodException e) {
            throw new RpcException(e.getMessage(), e);
        } catch (ClassNotFoundException e) {
            throw new RpcException(e.getMessage(), e);
        }
    }
    //Ordinary call
    return invoker.invoke(inv);
}
                                                    |
                                                    V
//It's mainly about doing some operations that call context parameters.
public Result ContextFilter#invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    Map<String, String> attachments = invocation.getAttachments();
    //Remove some additional parameters
    if (attachments != null) {
        attachments = new HashMap<String, String>(attachments);
        attachments.remove(Constants.PATH_KEY);
        attachments.remove(Constants.GROUP_KEY);
        attachments.remove(Constants.VERSION_KEY);
        attachments.remove(Constants.DUBBO_VERSION_KEY);
        attachments.remove(Constants.TOKEN_KEY);
        attachments.remove(Constants.TIMEOUT_KEY);
        attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
    }
    //Record information into context
    RpcContext.getContext()
            .setInvoker(invoker)
            .setInvocation(invocation)
//                .setAttachments(attachments)  // merged from dubbox
            .setLocalAddress(invoker.getUrl().getHost(),
                    invoker.getUrl().getPort());

    // mreged from dubbox
    // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
    if (attachments != null) {
        if (RpcContext.getContext().getAttachments() != null) {
            RpcContext.getContext().getAttachments().putAll(attachments);
        } else {
            RpcContext.getContext().setAttachments(attachments);
        }
    }

    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(invoker);
    }
    try {
        //Call the next invoker
        RpcResult result = (RpcResult) invoker.invoke(invocation);
        // pass attachments to result
        result.addAttachments(RpcContext.getServerContext().getAttachments());
        return result;
    } finally {
        RpcContext.removeContext();
        RpcContext.getServerContext().clearAttachments();
    }
}
                                                    |
                                                    V
//This filter is mainly used to send time-consuming information of interface calls to clients.
public Result TraceFilter#invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    //current time
    long start = System.currentTimeMillis();
    Result result = invoker.invoke(invocation);
    //Time after calling interface
    long end = System.currentTimeMillis();
    if (tracers.size() > 0) {
        //Get the channel that needs to send the log
        String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
        Set<Channel> channels = tracers.get(key);
        if (channels == null || channels.isEmpty()) {
            //Attempt to obtain by interface name
            key = invoker.getInterface().getName();
            channels = tracers.get(key);
        }
        if (channels != null && !channels.isEmpty()) {
            for (Channel channel : new ArrayList<Channel>(channels)) {
                //If the channel is still connected
                if (channel.isConnected()) {
                    try {
                        int max = 1;
                        //Maximum Limitation to Get Send Logs
                        Integer m = (Integer) channel.getAttribute(TRACE_MAX);
                        if (m != null) {
                            max = (int) m;
                        }
                        int count = 0;
                        //Log Send Count
                        AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
                        if (c == null) {
                            c = new AtomicInteger();
                            channel.setAttribute(TRACE_COUNT, c);
                        }
                        count = c.getAndIncrement();
                        //If the count is not exceeded, then sending the interface call time-consuming log to the client
                        if (count < max) {
                            String prompt = channel.getUrl().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
                            channel.send("\r\n" + RpcContext.getContext().getRemoteAddress() + " -> "
                                    + invoker.getInterface().getName()
                                    + "." + invocation.getMethodName()
                                    + "(" + JSON.toJSONString(invocation.getArguments()) + ")" + " -> " + JSON.toJSONString(result.getValue())
                                    + "\r\nelapsed: " + (end - start) + " ms."
                                    + "\r\n\r\n" + prompt);
                        }
                        //If the transmission limit is exceeded, GG can be used.
                        if (count >= max - 1) {
                            channels.remove(channel);
                        }
                    } catch (Throwable e) {
                        channels.remove(channel);
                        logger.warn(e.getMessage(), e);
                    }
                } else {
                    channels.remove(channel);
                }
            }
        }
    }
    return result;
}
                                                    |
                                                    V
public Result TimeoutFilter#invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    //current time
    long start = System.currentTimeMillis();
    Result result = invoker.invoke(invocation);
    //Invoking interfaces is time-consuming
    long elapsed = System.currentTimeMillis() - start;
    //If the calling interface takes longer than the specified maximum time, print the warning log
    if (invoker.getUrl() != null
            && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),
            "timeout", Integer.MAX_VALUE)) {
        if (logger.isWarnEnabled()) {
            logger.warn("invoke time out. method: " + invocation.getMethodName()
                    + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
                    + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
        }
    }
    return result;
}
                                                    |
                                                    V
public Result MonitorFilter#invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    //If the monitoring center is found to be used in the previous service exposures, the monitor parameter is added to the url
    if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
        RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called
        //Get host
        String remoteHost = context.getRemoteHost();
        long start = System.currentTimeMillis(); // record start timestamp
        //count
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
        try {
            //Calling interface
            Result result = invoker.invoke(invocation); // proceed invocation chain
            //Collect information, submit information to the monitoring center, and see for yourself if you are interested.
            collect(invoker, invocation, result, remoteHost, start, false);
            return result;
        } catch (RpcException e) {
            collect(invoker, invocation, null, remoteHost, start, true);
            throw e;
        } finally {
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    } else {
        return invoker.invoke(invocation);
    }
}
                                                    |
                                                    V
//This filter mainly deals with uniform exception handling
public Result ExceptionFilter#invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    try {
        Result result = invoker.invoke(invocation);
        //If there are exceptions, and the interface is not a generalized interface call
        if (result.hasException() && GenericService.class != invoker.getInterface()) {
            try {
                //Get exception
                Throwable exception = result.getException();

                // directly throw if it's checked exception
                //Direct throw check exception
                if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                    return result;
                }
                // directly throw if the exception appears in the signature
                //If the exception is defined on the interface, throw it directly
                try {
                    Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                    Class<?>[] exceptionClassses = method.getExceptionTypes();
                    for (Class<?> exceptionClass : exceptionClassses) {
                        if (exception.getClass().equals(exceptionClass)) {
                            return result;
                        }
                    }
                } catch (NoSuchMethodException e) {
                    return result;
                }
                //If there are other exceptions, then there are some unchecked exceptions, printing the error log
                // for the exception not found in method's signature, print ERROR message in server's log.
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                // directly throw if exception class and interface class are in the same jar file.
                //If the exception comes from the same jar package, return it directly
                String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                    return result;
                }
                // directly throw if it's JDK exception
                //Exceptions under the JDK package, returned directly
                String className = exception.getClass().getName();
                if (className.startsWith("java.") || className.startsWith("javax.")) {
                    return result;
                }
                // directly throw if it's dubbo exception
                if (exception instanceof RpcException) {
                    return result;
                }

                // otherwise, wrap with RuntimeException and throw back to the client
                //Other types are encapsulated as Runtime Exception
                return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
            } catch (Throwable e) {
                logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                return result;
            }
        }
        return result;
    } catch (RuntimeException e) {
        logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
        throw e;
    }
}

Then it calls the Delegate Provider MetaData Invoker created at com. alibaba. dubbo. config. Service Config # doExportUrlsFor1 Protocol, and continues to call the Delegate Provider MetaData Invoker.

public Result AbstractProxyInvoker#invoke(Invocation invocation) throws RpcException {
    try {
        return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
    } catch (InvocationTargetException e) {
        return new RpcResult(e.getTargetException());
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
                                                    |
                                                    V
public <T> Invoker<T> JavassistProxyFactory#getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

Wrapper's role is no longer redundant.
Ultimately, what we call is the implementation of our own interface.

public User getUser(long userId) {
	
    return new User("Xiao Ming", 18);
}

After the interface call is successful, dubbo needs to tell the client the result, which is the data response that needs to be analyzed in the next section.

Tags: Dubbo Java JSON JDK

Posted on Sun, 06 Oct 2019 06:33:23 -0700 by aa-true-vicious