From Excel Batch Import Data to ForkJoin's Principles

Preface

Previously, we introduced EasyPOI, which is very convenient and simple for Excel import and export.However, version 4.0.0 and previous versions all parse data in Excel in a single-threaded way.The efficiency is poor.
Today, I'm going to do a test of 5000 pieces of data imported using EasyPOI and my own handwritten ForkJoin (multitask) to compare their performance differences.

Prepare before testing

1. First create a test project

First we need to create a test project, and I've created a new SpringBoot project here.
Then introduce easypoi dependency, this time with version 4.0.0 of easyPOI.

<!--easypoi-->
		<dependency>
			<groupId>cn.afterturn</groupId>
			<artifactId>easypoi-spring-boot-starter</artifactId>
			<version>4.0.0</version>
		</dependency>
		<!--easypoi-->

2. Import in two ways

2.1: How to use EasyPOI

 @Override
    public String batchUploadStudent_easyPOI(MultipartFile file) throws Exception {
        long startTime = System.currentTimeMillis();
        List<Student> studentList = ExcelImportUtil.importExcel(file.getInputStream(), Student.class, new ImportParams());
        log.info("********adopt EasyPOI The total time spent reading files is={},The total number of data bars read is={}", (System.currentTimeMillis() - startTime) + "Millisecond", studentList.size());
        return null;
    }

Importing using EasyPOI is very simple, just call the importExcel method.No more details.

2.2: How to Handwrite Fork-Join

Next, we handwritten Fork-Join to achieve file parsing.

  1. This demo reads each cell directly one by one, but it can also be done by annotation.The code is as follows:
    private List<Student> getData(Sheet sheet, int start, int end) {
        List<Student> mapList = new ArrayList<>();
        for (int i = start; i <= end; i++) {
            Student student = null;
            try {
                Row row = sheet.getRow(i);
                student = new Student();
                student.setClassName(ExcelUtil.getKeyValue(row.getCell(0)));
                student.setStudentName(ExcelUtil.getKeyValue(row.getCell(1)));
                student.setStudentMobile(ExcelUtil.getKeyValue(row.getCell(2)));               student.setIdCard(ExcelUtil.getKeyValue(row.getCell(3)));
                student.setStudentNo(ExcelUtil.getKeyValue(row.getCell(4)));
                student.setIdCard(ExcelUtil.getKeyValue(row.getCell(5)));
            } catch (Exception e) {
                log.info("***************Duty paragraph={},file name={},Exception in data parsing={}", e);
                continue;
            }
            mapList.add(student);
        }
        return mapList;
    }
}

It is also simple to read all the data between the start and end rows.Read each cell in strict Excel field order.
2. Define the RecursiveTask class.

class JoinTask extends RecursiveTask<List<Student>> {
		//Line to start reading
        private int start;
		//Line to end interpretation
        private int end;
		//paging
        private Sheet sheet;
		//Total number of rows
        private int total;

        public JoinTask(int start, int end, Sheet sheet) {
            this.start = start;
            this.end = end;
            this.sheet = sheet;
            this.total = sheet.getLastRowNum();
        }

        @Override
        protected List<Student> compute() {
			//Data Exception
            if (start > end || total < end) {
                return new ArrayList<>(1);
            }
			//Parse every 200 lines
            if (end - start <= 200) {
                return getData(sheet, start, end).stream().filter(DistinctUtil.distinctByKey(Student::getStudentNo)).collect(Collectors.toList());
            } else {
				//Divide the data into two equal pieces
                int mid = (start + end) / 2;
				//Recursive call, the one with the smaller number on the left
                JoinTask rightTask = new JoinTask(start, mid, sheet);
				//Recursive call, the big data block on the right
                JoinTask leftTask = new JoinTask(mid + 1, end, sheet);
                //how to write
                rightTask.fork();
                List<Student> leftList =  leftTask.compute();
                List<Student> rightList = rightTask.join();
                //Writing Two
                //invokeAll(rightTask, leftTask);
                //List<Student> leftList = leftTask.join();
                //List<Student> rightList = rightTask.join();
				//Merge left and right data
                leftList.addAll(rightList);
                return leftList;
            }
        }
    }

The RecursiveTask class is the core class of the ForkJoin style, and its role will be described later.
3. Entry to invoke

    public List<Student> importExcel(Workbook workbook) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);;
        Sheet sheet = workbook.getSheetAt(0);
		//Open Task
        JoinTask joinTask = new JoinTask(1, sheet.getLastRowNum(), sheet);
        List<Student> importVOList = forkJoinPool.invoke(joinTask);
        //excel internal weight removal
        List<Student> noRepeatImportVOList = importVOList.stream().filter(DistinctUtil.distinctByKey(Student::getStudentNo)).collect(Collectors.toList());
        return noRepeatImportVOList;
    }
  1. Test class:
  @Override
    public String batchUploadStudent_forkjoin(Workbook workbook) {
        long startTime = System.currentTimeMillis();
        List<Student> studentList = studentExcelImportWrapper.importExcel(workbook);
        log.info("********adopt Fork-Join The total time spent reading files in this way is={},The number of data bars read is={}", (System.currentTimeMillis() - startTime) + "Millisecond", studentList.size());
        return null;
    }

3. Test results

Excel, which uploads the same 5000 pieces of data, has the following test results:

From the above test results, we can clearly see that the performance difference is quite large, mainly because EasyPOI uses a single-threaded way to read Excel.The larger the amount of data, the more obvious the performance difference.Since this ForkJoin is so useful, let's get to know it.

Initial knowledge of ForkJoin

What is ForkJoin Framework

The ForkJoin framework is a framework provided by Java7 to execute tasks in parallel. It is a framework that divides large tasks into several small tasks and ultimately aggregates the results of each small task to get the results of the task.Fork is about splitting large tasks into smaller tasks that can be executed concurrently.Join is the result of merging all the small tasks.Its execution process is as follows:

Task Split and Result Merge Description

ForkJoinTask is the most basic task. It provides fork, join operations, and objects that must be created using the ForkJoin framework.Generally speaking, we don't need to inherit it directly, we just need to inherit its subclasses.It has two subclasses.
RecursiveAction: Used for tasks that return no results.
RecursiveTask: Used for tasks that have returned results.
Its fork method is to have task execute asynchronously, and join is to have task execute synchronously and get the return value.


exception handling

ForkJoinTask may throw an exception while executing, but we can't catch it directly in the main thread, so ForkJoinTask provides the isCompleteAbnormal() method to check if the task has run out of the exception or has been cancelled.
We can get exception information through the getException() method, which returns Throwable, CancellationException if the task is cancelled, and null if the task completes normally or no exception is thrown.

 if (rightTask.isCompletedAbnormally()) {
                    System.out.println(rightTask.getException());
                }

Implementation principle of ForkJoin framework

ForkJoinPool consists of a ForkJoinTask array and a ForkJoinWorkerThread array, which holds the tasks submitted by the program to ForkJoinPool and a ForkJoinWorkerThread array which performs these tasks.

fork method

    public final ForkJoinTask<V> fork() {
        Thread t;
		//If the current thread is a ForkJoinWorkerThread thread
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

As in the code above: The fork method is called, and the logic of this method is that if the current thread is a ForkJoinWorkerThread thread, the push method of ForkJoinWorkerThread is called to perform this task asynchronously, and the result is returned immediately.If not, call ForkJoinPool.common.externalPush(this); perform this task asynchronously.

push method

The push method is to store the current task in the queue of the ForkJoinTask array and then call the signalWork() method of ForkJoinPool to wake up or create a worker thread to execute the task.

  final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
				//Store current task in ForkJoinTask array
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
						//Call the signalWork method to wake up or create a worker thread to perform the task
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

Having said the fork() method, let's take a look at the join method.

Join method

The main purpose of ForkJoinTask's join method is to block the current thread and wait for execution results

    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
	
	    private void reportException(int s) {
        if (s == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL)
            rethrow(getThrowableException());
    }

As mentioned above, the join() method first calls the doJoin() method to get the execution status of the current task to determine what results are returned. There are four states of the task: Completed (NORMAL), Canceled (CANCELLED), Signaled (SIGNAL), and Exception (EXCEPTIONAL).

  1. If the task state is not completed, the reportException method is called, and its logic is
    Throw a CancellationException exception if the task was cancelled
    If the task is an exception, an encapsulation re-throw exception is thrown.

  2. If the task is completed, the result is returned.
    Next, let's look at the doJoin() method.

doJoin method

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

In the doJoin() method, you first check the status of the task to see if it has been completed. If it has been completed, you return directly to the status of the task. If it has not been completed, you take the task out of the task array and execute it.If the task completes successfully, set the status of the task to NORMAL. If an exception occurs, record the exception and set the status of the task to EXCEPTIONAL.

Reference material

JDK1.8 Source
http://ifeve.com/fork-join-5/

Source Address

https://github.com/XWxiaowei/JavaCode/tree/master/spring-poi-demo

Tags: Excel Spring SpringBoot Big Data

Posted on Sat, 18 Apr 2020 18:15:45 -0700 by NoReason