Python full stack advanced programming skills 9.Python multitasking process

Article directory

1, Comparison between threads and processes

1. Code analysis

import os
import time

#Create child thread
pid = os.fork()
print('Hello World')

#Judge is a child thread
if pid == 0:
    print('s_fork:{},f_fork:{}'.format(os.getpid(),os.getppid()))
else:
    print('f_fork:{}'.format(os.getpid()))

Further explanation:
The main process os.fork() creates a sub process. At this time, execute the main process first, pid is not 0, and execute the statement after else;
Execute the created sub process again, the pid return value is 0, so execute the statement after if.
A detailed explanation of os.fork():
The os.fork() function is used to create a new process. But it is only available on POSIX system. In Python version of windows, os module does not define os.fork() function.
The process of creating the process by the os.fork() function is as follows:
Each time the program executes, the operating system creates a new process to run the program instructions. The process can also call os.fork() to ask the operating system to create a new process. The parent process is the process that calls the os.fork() function. The process created by the parent process is called the child process. Each process has a unique process ID called pid, which identifies the process. The child process is exactly the same as the parent process. The child process inherits copies of multiple values from the parent process, such as global variables and environment variables. The only difference between the two processes is the return value of fork. The child process receives the return value 0, while the parent process receives the pid of the child process as the return value. An existing process can call the fork () function to create a new process. The new process created by fork is called child process. The fork() function is called once but returned twice. The only difference between the two returns is that the value of 0 is returned in the child process and the child process ID is returned in the parent process. For a program, as long as you judge the return value of fork, you will know whether you are in the parent process or the child process.
For further explanation, please refer to https://www.jianshu.com/p/e8f5b828cce0.

2. Thread and process comparison

  • Process:
    When p.start() is executed, the main process copies a copy of the code to the sub process to perform multiple tasks;
    Can complete multiple tasks, for example, a computer can run multiple QQ S at the same time.
    Supplement:
    It is not necessary to copy as many copies of code as many subprocesses. If the public variables are not modified between each process, a copy of code can be shared, involving write time copy.
  • Threading:
    When t.start() is executed, multiple threads execute a code together;
    Be able to complete multiple tasks, such as multiple chat windows in a QQ.

Fundamental difference:
Process is the basic unit of operating system resource allocation, while thread is the basic unit of task scheduling and execution.

The cost of switching between processes is very large, and the cost of switching between threads is very small.
Threads can be regarded as a lightweight process, and the same kind of threads share code and data space.

2, Interprocess communication Queue

There is no direct communication between processes. To achieve the communication between two processes, you can add an intermediate variable, such as through a file, a process to write, and another to read. However, this method is inefficient, generally through Queue.
The characteristics of the queue are first in, first out.

from multiprocessing import Queue


# Create a queue to store up to 3 pieces of data
q = Queue(3)
# Store data
q.put(1)
q.put('Corley')
q.put([11,22])
print('finished')

Printing

finished

Try again

from multiprocessing import Queue


# Create a queue to store up to 3 pieces of data
q = Queue(3)
# Store data
q.put(1)
q.put('Corley')
q.put([11,22])
q.put({'name':'Corley'})
print('finished')

display

Obviously, when there are more than three pieces of data, it will be blocked and can only be forced to stop.
Improvement Code:

from multiprocessing import Queue


# Create a queue to store up to 3 pieces of data
q = Queue(3)
# Store data
q.put(1)
q.put('Corley')
q.put([11,22])
# Throw an exception directly
q.put_nowait({'name':'Corley'})
print('finished')

Printing

Traceback (most recent call last):
  File "xxx/demo.py", line 11, in <module>
    q.put_nowait({'name':'Corley'})
  File "xxxx\Python\Python37\lib\multiprocessing\queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "xxxx\Python\Python37\lib\multiprocessing\queues.py", line 83, in put
    raise Full
queue.Full

The display queue is full and no more elements can be added.
Data collection:

from multiprocessing import Queue


# Create a queue to store up to 3 pieces of data
q = Queue(3)
# Store data
q.put(1)
q.put('Corley')
q.put([11,22])
print(q.get())
print(q.get())
print(q.get())
print('finished')

Printing

1
Corley
[11, 22]
finished

Take data in first in first out order.
Try again:

from multiprocessing import Queue


# Create a queue to store up to 3 pieces of data
q = Queue(3)
# Store data
q.put(1)
q.put('Corley')
q.put([11,22])
print(q.get())
print(q.get())
print(q.get())
# blocking
print(q.get())
print('finished')

Show:

Obviously, the data will be blocked after fetching.
Improvement Code:

from multiprocessing import Queue


# Create a queue to store up to 3 pieces of data
q = Queue(3)
# Store data
q.put(1)
q.put('Corley')
q.put([11,22])
print(q.get())
print(q.get())
print(q.get())
# Blocking, throw exception directly
print(q.get_nowait())
print('finished')

Printing

Traceback (most recent call last):
1
Corley
[11, 22]
  File "xxx/demo.py", line 14, in <module>
    print(q.get_nowait())
  File "xxxx\Python\Python37\lib\multiprocessing\queues.py", line 126, in get_nowait
    return self.get(False)
  File "xxxx\Python\Python37\lib\multiprocessing\queues.py", line 107, in get
    raise Empty
_queue.Empty

The display queue is empty.
Other methods:

from multiprocessing import Queue


# Create a queue to store up to 3 pieces of data
q = Queue(3)
# Store data
q.put(1)
q.put('Corley')
# Judge whether the queue is full
print(q.full())
q.put([11,22])
print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
print('finished')

Printing

False
3
1
Corley
[11, 22]
True
finished

The implementation principle of queue in process communication is as follows:

Downloading and processing data are separated, and decoupled to a certain extent.
Queue for process communication attempts:

import multiprocessing


def download(q):
    '''Download data'''
    lis = [11, 22, 33]
    for item in lis:
        q.put(item)
    print('Download complete and save to queue')


def analyse(q):
    '''Analysis data'''
    analyse_data = list()
    while True:
        data = q.get()
        analyse_data.append(data)
        if q.empty():
            break
    print(analyse_data)


def main():
    # Create queue
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=download, args=(q,))
    p2 = multiprocessing.Process(target=analyse, args=(q,))
    p1.start()
    p2.start()


if __name__ == '__main__':
    main()

Printing

Download complete and save to queue
[11, 22, 33]

3, Multi process sharing global variables

Attempt to share variables between processes:

import multiprocessing


a = 1


def demo1():
    global a
    a += 1


def demo2():
    # If the printed value is 2, it is shared
    print(a)


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=demo1)
    p2 = multiprocessing.Process(target=demo2)
    p1.start()
    p2.start()

Printing

1

The result printed out is 1, which means that global variables between processes are not shared and different from threads.
Try with normal queue:

import multiprocessing
from queue import Queue


def demo1(q):
    q.put('a')


def demo2(q):
    data = q.get()
    print(data)


if __name__ == '__main__':
    q = Queue()
    p1 = multiprocessing.Process(target=demo1, args=(q,))
    p2 = multiprocessing.Process(target=demo2, args=(q,))
    p1.start()
    p2.start()

Printing

Traceback (most recent call last):
  File "xxx/demo.py", line 93, in <module>
    p1.start()
  File "xxxx\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "xxxx\Python\Python37\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "xxxx\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "xxxx\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 89, in __init__
    reduction.dump(process_obj, to_child)
  File "xxxx\Python\Python37\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

Error is reported, which belongs to type error.
Improvement - the thread calls the run() method to achieve the effect:

import multiprocessing
from queue import Queue


def demo1(q):
    q.put('a')


def demo2(q):
    data = q.get()
    print(data)


if __name__ == '__main__':
    q = Queue()
    p1 = multiprocessing.Process(target=demo1, args=(q,))
    p2 = multiprocessing.Process(target=demo2, args=(q,))
    p1.run()
    p2.run()

Printing

a

At this time, the normal operation results, but it is no longer a multiprocess. Because the run() method is not called, the normal queue cannot realize the real inter thread communication. To achieve cross thread communication, multiprocessing.Queue is required.
Missing parameter exception handling attempt:

import multiprocessing


def demo1(q):
    try:
        q.put('a')
    except Exception as e:
        print(e.args[0])


def demo2(q):
    try:
        data = q.get()
        print(data)
    except Exception as e:
        print(e.args[0])


if __name__ == '__main__':
    q = multiprocessing.Queue()
    try:
        p1 = multiprocessing.Process(target=demo1)
        p2 = multiprocessing.Process(target=demo2)
        p1.start()
        p2.start()
    except Exception as e:
        print(e.args[0])

Printing

Process Process-1:
Traceback (most recent call last):
  File "xxxx\Python\Python37\lib\multiprocessing\process.py", line 297, in _bootstrap
    self.run()
  File "xxxx\Python\Python37\lib\multiprocessing\process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
TypeError: demo1() missing 1 required positional argument: 'q'
Process Process-2:
Traceback (most recent call last):
  File "xxxx\Python\Python37\lib\multiprocessing\process.py", line 297, in _bootstrap
    self.run()
  File "xxxx\Python\Python37\lib\multiprocessing\process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
TypeError: demo2() missing 1 required positional argument: 'q'

Obviously, the error reporting position is not in the code you write, but in Python's own lib\multiprocessing\process.py. It belongs to the parameter error of Process class.

4, Process pool

1. Introduction to process pool

When there are not many subprocesses to be created, you can directly use the Process in multiprocessing to dynamically generate multiple processes. However, if there are hundreds or even thousands of goals, the workload of manually creating a Process is huge. At this time, you can use the Pool class provided by multiprocessing module.
When initializing a Pool, you can specify a maximum number of processes. When a new request is submitted to the Pool, if the Pool is not full, a new process will be created to execute the request. However, if the number of processes in the Pool has reached the specified maximum, the request will wait until the end of a process in the Pool, and the previous process will be used to execute the new task.
The principle is as follows:

Try:

from multiprocessing import Pool
import os, time, random


def work(msg):
    t_start = time.time()
    print('%s Start execution,The process number is%d' % (msg, os.getpid()))

    time.sleep(random.random() * 2)
    t_stop = time.time()
    print(msg, "Execution completed,time consuming%0.2f seconds" % (t_stop - t_start))


def demo():
    pass


if __name__ == '__main__':
    po = Pool(3)  # Define a process pool with 3 processes
    for i in range(0, 10):
        # Add task, 10 tasks
        po.apply_async(work, (i,))
    print("--start--")
    # Close the process pool and no longer receive new task requests
    po.close()
    # Wait for the execution of subprocess to finish, and then the main process will finish
    po.join()
    print("--end--")

Show:

It is easy to know that there are three processes to execute at the beginning: 0, 1 and 2. The execution starts from 3 after the execution of 0, 4 after the execution of 1, and 5 after the execution of 3. The execution continues until all processes are completed.
Modify the process pool parameters - contains 4 processes:

from multiprocessing import Pool
import os, time, random


def work(msg):
    t_start = time.time()
    print('%s Start execution,The process number is%d' % (msg, os.getpid()))

    time.sleep(random.random() * 2)
    t_stop = time.time()
    print(msg, "Execution completed,time consuming%0.2f seconds" % (t_stop - t_start))


def demo():
    pass


if __name__ == '__main__':
    po = Pool(4)  # Define a process pool with 3 processes
    for i in range(0, 10):
        # Add task, 10 tasks
        po.apply_async(work, (i,))
    print("--start--")
    # Close the process pool and no longer receive new task requests
    po.close()
    # Wait for the execution of subprocess to finish, and then the main process will finish
    po.join()
    print("--end--")

Show:

Obviously, when there are four processes in the process pool, the execution efficiency is higher, because there are four processes executing at the same time.
po.close() cannot add tasks after execution:

from multiprocessing import Pool
import os, time, random


def work(msg):
    t_start = time.time()
    print('%s Start execution,The process number is%d' % (msg, os.getpid()))

    time.sleep(random.random() * 2)
    t_stop = time.time()
    print(msg, "Execution completed,time consuming%0.2f seconds" % (t_stop - t_start))


def demo():
    pass


if __name__ == '__main__':
    po = Pool(4)  # Define a process pool with 3 processes
    for i in range(0, 10):
        # Add task, 10 tasks
        po.apply_async(work, (i,))
    print("--start--")
    # Close the process pool and no longer receive new task requests
    po.close()
    po.apply_async(demo)
    # Wait for the execution of subprocess to finish, and then the main process will finish
    po.join()
    print("--end--")

Printing

--start--
Traceback (most recent call last):
  File "xxx/demo.py", line 151, in <module>
    po.apply_async(demo)
  File "xxxx\Python\Python37\lib\multiprocessing\pool.py", line 362, in apply_async
    raise ValueError("Pool not running")
ValueError: Pool not running

To add a task, you must add it before po.close().

2. Communication between processes in process pool

Try multiprocessing.Queue:

import multiprocessing

def demo1(q):
    q.put('a')


def demo2(q):
    data = q.get()
    print(data)


if __name__ == '__main__':
    q = multiprocessing.Queue()
    po = multiprocessing.Pool()
    po.apply_async(demo1,(q,))
    po.apply_async(demo2, (q,))
    po.close()
    po.join()

Printing

Printing is empty, that is, inter process communication of process pool is not implemented.
Test:

import multiprocessing

def demo1(q):
    print(1)
    q.put('a')


def demo2(q):
    print(2)
    data = q.get()
    print(data)


if __name__ == '__main__':
    q = multiprocessing.Queue()
    po = multiprocessing.Pool()
    po.apply_async(demo1,(q,))
    po.apply_async(demo2, (q,))
    po.close()
    po.join()

Printing

If it is still empty, further exception handling:

import multiprocessing

def demo1(q):
    try:
        q.put('a')
    except Exception as e:
        print(e.args[0])



def demo2(q):
    try:
        data = q.get()
        print(data)
    except Exception as e:
        print(e.args[0])


if __name__ == '__main__':
    q = multiprocessing.Queue()
    po = multiprocessing.Pool()
    po.apply_async(demo1,(q,))
    po.apply_async(demo2, (q,))
    po.close()
    po.join()

Printing

It is still empty. Obviously, demo1() and demo2() functions are not executed at all, so multiprocessing.Queue cannot be used for inter process communication of process pool, but multiprocessing.Manager().Queue.
Try again:

import multiprocessing

def demo1(q):
    try:
        q.put('a')
    except Exception as e:
        print(e.args[0])



def demo2(q):
    try:
        data = q.get()
        print(data)
    except Exception as e:
        print(e.args[0])


if __name__ == '__main__':
    q = multiprocessing.Manager().Queue()
    po = multiprocessing.Pool()
    po.apply_async(demo1,(q,))
    po.apply_async(demo2, (q,))
    po.close()
    po.join()

Printing

a

It is running normally at this time.
If an exception occurs to a process in the process pool, it will not be thrown actively, so try... Exception... To handle the exception should be increased as much as possible when using the process pool to implement multitasking.
Conclusion:
There are now three types of queues:
(1)queue.Queue:
Communication between processes cannot be completed;
(2)multiprocessing.Queue:
Communication between processes;
(3)multiprocessing.Manager().Queue:
Implement process communication between process pools.

5, Application - multi task folder replication

Implementation ideas:

  • Get the name of the folder the user wants to copy
  • Create a new folder
  • Get the names of all files in the folder to be copied
  • Create process pool
  • Add copy task

Preliminary code implementation:

import multiprocessing
import os


def copy_file(index,file_name,new_folder,old_folder):
    '''File copy'''
    print('%2d:File to Copy:%s' % (index + 1,file_name))
    # Read old file
    old_file = open(old_folder + '/' + file_name,'rb')
    content = old_file.read()
    old_file.close()
    # Save to a new folder
    # new_file = open(new_folder + '/' + file_name, 'wb')
    # new_file.write(content)
    # new_file.close()
    with open(new_folder + '/' + file_name, 'wb') as f:
        f.write(content)
    print('%2d:File %s Copy Finished' % (index + 1,file_name))


def main():
    # Get the name of the folder the user wants to copy
    old_folder = input('Inout Directory Name:')
    # Create a new folder
    new_folder = old_folder + '--copy'
    if not os.path.exists(new_folder):
        os.mkdir(new_folder)
    # Get the names of all files in the folder to be copied
    file_list = os.listdir(old_folder)
    # Create process pool
    po = multiprocessing.Pool(5)
    # Add copy task
    for index,file_name in enumerate(file_list):
        po.apply_async(copy_file,args=(index,file_name,new_folder,old_folder))
    po.close()
    po.join()


if __name__ == '__main__':
    main()

Printing

Inout Directory Name:test
 2:File to Copy:_collections_abc.py
 1:File to Copy:_bootlocale.py
 5:File to Copy:_dummy_thread.py
 3:File to Copy:_compat_pickle.py
 4:File to Copy:_compression.py
 2:File _collections_abc.py Copy Finished
 6:File to Copy:_markupbase.py
 5:File _dummy_thread.py Copy Finished
 1:File _bootlocale.py Copy Finished
 3:File _compat_pickle.py Copy Finished
 4:File _compression.py Copy Finished
 6:File _markupbase.py Copy Finished
 7:File to Copy:_osx_support.py
 8:File to Copy:_pydecimal.py
 7:File _osx_support.py Copy Finished
 9:File to Copy:_pyio.py
 8:File _pydecimal.py Copy Finished
10:File to Copy:_py_abc.py
11:File to Copy:__future__.py
 9:File _pyio.py Copy Finished
11:File __future__.py Copy Finished
12:File to Copy:__phello__.foo.py
10:File _py_abc.py Copy Finished
12:File __phello__.foo.py Copy Finished

Further expand - add progress display

import multiprocessing
import os
import random
import time


def copy_file(index,file_name,new_folder,old_folder,q):
    '''File copy'''
    print('%2d:File to Copy:%s' % (index + 1,file_name))
    # Read old file
    old_file = open(old_folder + '/' + file_name,'rb')
    content = old_file.read()
    old_file.close()
    # Save to a new folder
    # new_file = open(new_folder + '/' + file_name, 'wb')
    # new_file.write(content)
    # new_file.close()
    with open(new_folder + '/' + file_name, 'wb') as f:
        f.write(content)
    q.put(file_name)
    print('%2d:File %s Copy Finished' % (index + 1,file_name))
    time.sleep(random.random() * 5)


def main():
    # Get the name of the folder the user wants to copy
    old_folder = input('Inout Directory Name:')
    # Create a new folder
    new_folder = old_folder + '--copy'
    if not os.path.exists(new_folder):
        os.mkdir(new_folder)
    # Get the names of all files in the folder to be copied
    file_list = os.listdir(old_folder)
    # Create process pool
    po = multiprocessing.Pool(5)
    # Create queue
    q = multiprocessing.Manager().Queue()
    # Add copy task
    for index,file_name in enumerate(file_list):
        po.apply_async(copy_file,args=(index,file_name,new_folder,old_folder,q))
    po.close()
    # Add progress bar
    file_count = len(file_list)
    copy_count = 0
    while True:
        q.get()
        copy_count += 1
        print('Copy Process: %2.2f%%' % (copy_count / file_count * 100.0))
        # Exit loop
        if copy_count >= file_count:
            break
    po.join()


if __name__ == '__main__':
    main()

display

71 original articles published, 314 praised, 80000 visitors+
Private letter follow

Tags: Python Windows

Posted on Wed, 05 Feb 2020 02:01:28 -0800 by Tokunbo