With Completable Future, asynchronous programming is less difficult!

Introduction to this article:

  • Introduction to Business Requirements Scenarios
  • Thoughts on Technical Design Scheme
  • Future Design Model
  • Completable Future Mode Actual Combat
  • Suggestions for Completable Future Production
  • Completable Future Performance Test
  • Completable Future uses extensions

1. Introduction to Business Requirements Scenarios

The same thing is always changing.

Presumably, in your spare time, you will often watch videos, often using a few APP, such as Youku, Iqiyi, Tencent and so on.

These video APP can be broadcast not only on mobile phones but also on TV.

The APP broadcast on TV terminals is an independent version, which is different from the APP on the mobile phone terminal.

When we watch a movie and click on a movie, we go to the album details page. At this time, the player will play the video automatically. The album details page that the user sees on the phone is different from the album details page and page style design seen on TV.

Let's see the effect intuitively.

Video album details on mobile phones:

The first half of the screenshot, the following are for your recommendation, star actors, peripheral recommendations, comments and other functions.

Correspondingly, the way the album details page is displayed on TV is different. Suppose the product manager asks for a revision of the details page.
Style requirements are as follows:

The style comparison of the two terminals, in the TV album details page, contains a lot of plates, each plate shows a number of content horizontally.

The product design requirements are that some plate content comes from recommendation, some plate comes from search, and some plate comes from CMS (Content Management System). Simply understood as, each plate content source is different, from the recommendation, search and other interface content requirements are near real-time requests.

2. Reflections on Technical Design Scheme

Considering this requirement, it is not difficult to realize it.

It is mainly divided into static data part and dynamic data part. For the data which is not changed frequently, it can be acquired through static interface, and for the data which is near real-time, it can be acquired through dynamic interface.

Static interface design:

The attributes of the album itself and the video data under the album generally do not change very often.
In the demand scenario presentation, I took a screenshot of the movie channel. If it's a TV series channel, it will show a series list (all videos under the album, such as Episode 1, Episode 2...), and the updates of the videos are usually not very frequent, so the series list data on the album details page can be obtained from the static interface.

Static interface data generation process:

Another part is the need for dynamic interfaces to achieve, calling third-party interfaces to obtain data, such as recommendation, search data.
At the same time, it is required that the contents between plates should not be duplicated.

Dynamic interface design:

Scenario 1:

Serial call, that is, according to the display sequence of each plate, call the corresponding third-party interface to obtain data.

Option two:

Parallel invocation, that is, multiple plates can be invoked in parallel to improve the overall interface response efficiency.

In fact, the above two schemes have their own advantages and disadvantages.

The advantage of the scheme is that the development model is simple, the interface is called in sequence according to the serial mode, the content data is de-duplicated, and all the data is aggregated and returned to the client.

However, the response time of the interface depends on the response time of the third-party interface. Usually, the third-party interface is always unreliable, which may increase the overall response time of the interface, and then lead to too long thread-occupying time, affecting the overall throughput of the interface.

Parallel invocation of scheme 2 can theoretically improve the overall response time of the interface. Assuming that multiple third-party interfaces are invoked simultaneously, it depends on the slowest response time of the interface.

When invoking in parallel, we need to consider "pooling technology", that is, we can not create too many threads on JVM processes without restriction. At the same time, we should also take into account the content data between the plate and the plate, according to the product design sequence to do the weight.

According to this demand scenario, we choose the second scheme to achieve more appropriate.

Option 2, we abstract a simple model as shown in the following figure:

T1, T2 and T3 represent multiple plate content threads. T1 thread returns the results first, T2 thread returns the results can not be repeated with the results returned by T1 thread, T3 thread returns the results can not be repeated with the results returned by T1 and T2 threads.

Considering the technical implementation, when calling multiple third-party interfaces in parallel, we need to get the return result of the interface. The first thought is Future, which can achieve asynchronous access to task results.

In addition, JDK8 provides an easy-to-use tool class for obtaining asynchronous results in Completable Future, which solves some of the pain points in the use of Future and achieves combined asynchronous programming in a more elegant way, and also conforms to functional programming.

3. Future Design Model

Future interface design:

It provides an interface for obtaining task results, canceling tasks and judging task status. Calling the method to get the result of the task will cause the call blocking when the task is not completed.

The Future interface provides methods:
```
// Obtaining task results
V get() throws InterruptedException, ExecutionException;

// Supporting timeout to obtain task results
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

// Determine whether the task has been completed
boolean isDone();

// Determine whether the task has been cancelled
boolean isCancelled();

// Cancellation of tasks
boolean cancel(boolean mayInterruptIfRunning);
```

Usually, we use ThreadPool Executor or FutureTask to achieve functional requirements when we consider using Future to obtain task results.

ThreadPool Executor, FutureTask and Future interface class diagram:

TheadPoolExecutor provides three submit methods:

// 1. Submitting tasks that do not require return values, Runnable interface run() method has no return values
public Future<?> submit(Runnable task) {
}

// 2. Submit tasks requiring return values. Callable interface call() method has return values
public <T> Future<T> submit(Callable<T> task) {
}

// 3. Submit the task that needs to return the value, and the result of the task is the second parameter result object
public <T> Future<T> submit(Runnable task, T result) {
}

The third submit method uses an example as follows:

static String x = "Thoughts on Dongsheng";
public static void main(String[] args) throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(1);
    // Create Result object r
    Result r = new Result();
    r.setName(x);

    // Submit tasks
    Future<Result> future =
                    executor.submit(new Task(r), r);
    Result fr = future.get();

    // The following equation holds
    System.out.println(fr == r);
    System.out.println(fr.getName() == x);
    System.out.println(fr.getNick() == x);
}

static class Result {
    private String name;
    private String nick;
    // ... ignore getter and setter 
}

static class Task implements Runnable {
    Result r;

    // Pass in result by constructor
    Task(Result r) {
            this.r = r;
    }

    @Override
    public void run() {
            // Can operate result
            String name = r.getName();
            r.setNick(name);
    }
}

The execution results are all true.

FutureTask design and implementation:

Runnable and Future interfaces are implemented. The Runnable interface is implemented, which shows that it can be submitted to ThreadPool Executor as a task object for execution. The Future interface is implemented, which shows that the return result of the task can be obtained.

Let's use FutureTask to simulate two threads according to the requirements of the product, and implement the following functions through examples.
Understanding with sample code annotations:

public static void main(String[] args) throws Exception {
    // Create FutureTask for Task T1 and call Recommendation Interface to get data
    FutureTask<String> ft1 = new FutureTask<>(new T1Task());
    // Create FutureTask for Task T1, call search interface to get data, and depend on the result of Task T1.
    FutureTask<String> ft2  = new FutureTask<>(new T2Task(ft1));
    // Thread T1 executes task ft1
    Thread T1 = new Thread(ft1);
    T1.start();
    // Thread T2 executes task ft2
    Thread T2 = new Thread(ft2);
    T2.start();
    // Waiting for thread T2 execution results
    System.out.println(ft2.get());
}

// T1Task calls recommendation interface to get data
static class T1Task implements Callable<String> {
    @Override
    public String call() throws Exception {
            System.out.println("T1: Calling Recommendation Interface to Get Data...");
            TimeUnit.SECONDS.sleep(1);

            System.out.println("T1: Get recommended interface data...");
            TimeUnit.SECONDS.sleep(10);
            return " [T1 Plate data] ";
    }
}
        
// T2Task calls to search interface data, and it also needs to recommend interface data.
static class T2Task implements Callable<String> {
    FutureTask<String> ft1;

    // T2 task requires FutureTask of T1 task to return results to de-duplicate
    T2Task(FutureTask<String> ft1) {
         this.ft1 = ft1;
    }

    @Override
    public String call() throws Exception {
        System.out.println("T2: Call the search interface to get data...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("T2: Get the data of the search interface...");
        TimeUnit.SECONDS.sleep(5);
        // Getting data for T2 threads
        System.out.println("T2: call T1.get() Interface Gets Recommendation Data");
        String tf1 = ft1.get();
        System.out.println("T2: Get recommended interface data:" + tf1);

        System.out.println("T2: take T1 And T2 Decomposition of Plate Data");
        return "[T1 and T2 Aggregation results of plate data]";
    }
}

The results are as follows:

> Task :FutureTaskTest.main()
T1: Call the recommendation interface to get the data.
T2: Call the search interface to get the data.
T1: Get the recommended interface data.
T2: Get the data of the search interface.
T2: Call the T1.get() interface to get the recommended data
 T2: Get the recommended interface data: [T1 plate data] 
T2: Re-processing T1 and T2 plate data
 [Aggregation results of T1 and T2 plate data] 

Summary:

Future means "the future". It mainly refers to the time-consuming tasks that are handed over to separate threads for execution. In order to achieve the purpose of asynchronism, the current thread submitting the task can continue to perform other operations after submitting the task and in the process of obtaining the task results, without waiting foolishly for the execution results to be returned.

4. Completeable Future Mode

For the Future design pattern, although we do not enter any blocking when we submit a task, the caller may block until the task is completed to obtain the execution result of the task.

This problem has existed since the beginning of JDK 1.5 design. It was only when Completable Future was introduced into JDK 1.8 that it was perfectly enhanced.

In the meantime, Google's open source Guava toolkit provides Listenable Future to support callbacks when tasks are completed. Interested friends can consult the research for themselves.

In the introduction of business requirements scenarios, the data sources of different plates are different, and there is a data dependence between plates.

It can be understood that there is a temporal relationship between tasks, and according to some functional features provided by Completable Future, it is very suitable for this business scenario.

CompletableFuture class diagram:

Completable Future implements two interfaces, Future and ComppletionStage. The realization of the Future interface is concerned with when the asynchronous task ends and gets the result of the asynchronous task execution. CompletionStage interface, which provides a very rich function, realizes the serial relationship, parallel relationship, convergence relationship and so on.

Completable Future core strengths:

1) There is no need to maintain threads manually and assign threads to tasks without the attention of developers.

2) In use, the semantics are clearer and clearer.

For example, T3 = t1. thenCombine (t2, () - > {// doSomething...} can clearly state that Task 3 will not start until Task 2 and Task 1 are completed.

3) Code more concise, support chain call, so that you are more focused on business logic.

4) Handling exceptions easily

Next, through Completable Future to simulate the implementation of multi-plate data aggregation processing under the album.

The code is as follows:

public static void main(String[] args) throws Exception {
    // Temporary data
    List<String> stashList = Lists.newArrayList();
    // Task 1: Call Recommendation Interface to Get Data
    CompletableFuture<String> t1 =
                    CompletableFuture.supplyAsync(() -> {
                            System.out.println("T1: Getting Recommended Interface Data...");
                            sleepSeconds(5);
                            stashList.add("[T1 Plate data]");
                            return "[T1 Plate data]";
                    });
    // Task 2: Call the search interface to get data
    CompletableFuture<String> t2 =
                    CompletableFuture.supplyAsync(() -> {
                            System.out.println("T2: Call the search interface to get data...");
                            sleepSeconds(3);
                            return " [T2 Plate data] ";
                    });
    // Task 3: Task 1 and Task 2 are executed after completion, aggregating results
    CompletableFuture<String> t3 =
                    t1.thenCombine(t2, (t1Result, t2Result) -> {
                            System.out.println(t1Result + " And " + t2Result + "Implementing de-duplicate logic processing");
                            return "[T1 and T2 Aggregation results of plate data]";
                    });
    // Waiting for Task 3 Execution Results
    System.out.println(t3.get(6, TimeUnit.SECONDS));
}

static void sleepSeconds(int timeout) {
    try {
            TimeUnit.SECONDS.sleep(timeout);
    } catch (InterruptedException e) {
            e.printStackTrace();
    }
}

The results are as follows:

> Task :CompletableFutureTest.main()
T1: Get the recommended interface data.
T2: Call the search interface to get the data.
Decomposition Logic Processing of [T1 Plate Data] and [T2 Plate Data]
[Aggregation results of T1 and T2 plate data]

The above example code creates a new Class in IDEA, which can be copied directly and run normally.

** 5. Completable Future Production Recommendations**

Create a reasonable thread pool:

In production environments, it is not recommended to use the above sample code format directly. Because of what is used in the sample code
CompletableFuture.supplyAsync(() -> {});
The supplyAsync() method for creating the CompletableFuture object (the factory method pattern used here) and the default thread pool used at the bottom may not necessarily meet business needs.

Take a look at the underlying source code:

// Use ForkJoinPool thread pool by default
private static final Executor asyncPool = useCommonPool ?
       ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
     return asyncSupplyStage(asyncPool, supplier);
}

Create a ForkJoinPool thread pool:
The default thread pool size is Runtime.getRuntime().availableProcessors() - 1 (CPU Number - 1), which can be set by the JVM parameter - Djava.util.concurrent.ForkJoinPool.common.parallelism.

Configure - Djava.util.concurrent.ForkJoinPool.common.threadFactory on JVM parameters to set thread factory classes; Configure - Djava.util.concurrent.ForkJoinPool.common.exceptionHandler to set exception handling classes. After these two parameters are set, Class is loaded internally through the system class loader.

If all Completable Futures use the default thread pool, once a task performs a slow I/O operation, it will cause all threads to block on the I/O operation, thus affecting the overall performance of the system.

Therefore, it is recommended that you create different thread pools according to different business types when using in production environment to avoid mutual influence.

Completable Future also provides additional methods to support thread pooling.

// The second parameter supports passing Executor custom thread pool
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
}

The custom thread pool is recommended to refer to the "Java development manual". It is recommended to use ThreadPoolExecutor custom thread pool, use bounded queues, and set the queue size according to actual business conditions.

In the book "Java Concurrent Programming Practice", Brian Goetz offers a number of optimization suggestions for thread pool size settings. If the number of thread pools is too large, competition for CPU and memory resources leads to a lot of time on context switching. Conversely, if the number of thread pools is too small, the advantage of CPU multi-core cannot be fully utilized.

The ratio of thread pool size to CPU processor utilization can be estimated using the following formula:

Exception handling:

Completable Future provides very simple exception handling. These methods support chain programming.

// Similar to catch {} in try{}catch {}
public CompletionStage<T> exceptionally
        (Function<Throwable, ? extends T> fn);
                
// Similar to last {} in try {} last {}, return results are not supported
public CompletionStage<T> whenComplete
        (BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync
        (BiConsumer<? super T, ? super Throwable> action);
                
// Similar to last {} in try {} last {}, support for returning results
public <U> CompletionStage<U> handle
        (BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync
        (BiFunction<? super T, Throwable, ? extends U> fn);

#### 6. Completable Future Performance Test:

The number of cyclic manometry tasks is shown below. Each time the manometry is executed, the aggregation results are superimposed from 1 to jobNum data, which is time-consuming.
Statistical dimension: Completable Future default thread pool and custom thread pool.
Performance test code:

// Performance test code
Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17, 30, 50, 100, 150, 200, 300).forEach(offset -> {
                    int jobNum = PROCESSORS + offset;
                    System.out.println(
                                    String.format("When %s tasks => stream: %s, parallelStream: %s, future default: %s, future custom: %s",
                                                    testCompletableFutureDefaultExecutor(jobNum), testCompletableFutureCustomExecutor(jobNum)));
});

// Completable Future uses the default ForkJoinPool thread pool
private static long testCompletableFutureDefaultExecutor(int jobCount) {
    List<CompletableFuture<Integer>> tasks = new ArrayList<>();
    IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(CompleteableFuturePerfTest::getJob)));

    long start = System.currentTimeMillis();
    int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
    checkSum(sum, jobCount);
    return System.currentTimeMillis() - start;
}

// Completable Future uses a custom thread pool
private static long testCompletableFutureCustomExecutor(int jobCount) {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(200, 200, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("CUSTOM_DAEMON_COMPLETABLEFUTURE");
                    thread.setDaemon(true);
                    return thread;
            }
    }, new ThreadPoolExecutor.CallerRunsPolicy());

    List<CompletableFuture<Integer>> tasks = new ArrayList<>();
    IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(CompleteableFuturePerfTest::getJob, threadPoolExecutor)));

    long start = System.currentTimeMillis();
    int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
    checkSum(sum, jobCount);
    return System.currentTimeMillis() - start;
}

Test Machine Configuration: 8 Core CPU, 16G Memory

Performance test results:

According to the results of pressure measurement, the performance of using default thread pool is worse as the number of pressure measurement tasks increases.

7. Completable Future uses extensions:

Object creation:

In addition to the supplyAsync method mentioned earlier, CompletableFuture provides the following methods:

// Completable Future < Void > No return value, default thread pool
public static CompletableFuture<Void> runAsync(Runnable runnable) {
      return asyncRunStage(asyncPool, runnable);
}
// Completable Future < Void > No Return Value for Task Execution, Support Custom Thread Pool
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
}

In the actual combat of Completable Future mode, we mentioned that Completable Future implements the CompletionStage interface, which provides very rich functions.

CompletionStage interface supports serial relationship, convergent AND relationship and convergent OR relationship.
Below is a brief description of the interface of these relationships. You can refer to the JDK API when you use it.
At the same time, each method in these relational interfaces provides the corresponding xxxAsync() method to represent the asynchronous execution of tasks.

Serial relationship:

CompletionStage describes the serial relationship, including thenApply, thenRun, thenAccept and thenCompose serial interfaces.

The source code is as follows:

// Corresponding to U apply (T), receive parameter T and support return value U
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);

// No parameters are received and return values are not supported
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);

// Receiving parameters but not supporting return values
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

// Combining two dependent ComppletableFuture objects
public <U> CompletionStage<U> thenCompose
        (Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync
        (Function<? super T, ? extends CompletionStage<U>> fn);

Convergence of AND relationships:

CompletionStage describes the aggregation of AND relationships, including the nCombine, thenAcceptBoth and runAfterBoth series interfaces.

The source code is shown below (the Async method is omitted):

// When the current and other Comppletable Futures are completed, two parameters are passed to fn, which has a return value.
public <U,V> CompletionStage<V> thenCombine
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);

// When the current and other Comppletable Futures are completed, two parameters are passed to the action, and the action does not return a value.
public <U> CompletionStage<Void> thenAcceptBoth
        (CompletionStage<? extends U> other,
         BiConsumer<? super T, ? super U> action);

// Execute action when the current and other Comppletable Futures are complete
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
                                              Runnable action);

Convergence OR relationship:

CompletionStage describes aggregate OR relationships, mainly including the applyToEither, acceptEither and runAfterEither series interfaces.

The source code is shown below (the Async method is omitted):

// The current execution is completed with any other Comppletable Future and passed to fn to support the return value
public <U> CompletionStage<U> applyToEither
        (CompletionStage<? extends T> other,
         Function<? super T, U> fn);

// The current execution is completed with any other Comppletable Future and passed to the action. The return value is not supported.
public CompletionStage<Void> acceptEither
        (CompletionStage<? extends T> other,
         Consumer<? super T> action);

// Execute action directly, currently completed with any other Comppletable Future execution
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
                                                Runnable action);

So far, the features of Completable Future are all covered.

Asynchronous programming is becoming more and more mature, Java language official website also began to support asynchronous programming mode, so it is necessary to learn asynchronous programming well.

This paper combines business requirement scenario driven, leads to Future design pattern actual combat, and then makes further analysis on how to use Completable Future in JDK 1.8, core advantages, performance test comparison, use expansion.

Hope to help you all!

Welcome to pay attention to my public number, scan the two-dimensional code, pay attention to unlocking more wonderful articles, grow up with you~

Tags: Java Programming JDK Mobile jvm

Posted on Tue, 08 Oct 2019 07:48:43 -0700 by strangebeer