Concurrency-related concepts and code examples

Preface

Take a simple example to see what synchronous asynchronous, blocking, and non-blocking are.(Students who don't want to know these basic concepts can skip it directly)

Examples come from knowing:

Call the bookstore owner to ask if he has Distributed System. If it is a synchronization mechanism, the bookstore owner will immediately tell you (return the result); if it is an asynchronous mechanism, the bookstore owner will hang up the phone (do not return the result first) and then call you back (callback).

The same is true.Call the bookstore owner to ask if he has a Distributed System book. If it's a blocking call, you'll stay on the phone and do nothing until the owner tells you the result. If it's a non-blocking call, you'll play around and occasionally check to see if the owner replied to you.

Synchronization and asynchronization focus more on the communication mechanism (behavior) of the message, and blocking and non-blocking focus more on the state of the program waiting for the result of the call.

Multiprocess & Multithread

Processes are the basic unit for system independent scheduling and allocation of system resources (CPU, memory). A system can open multiple processes (the maximum number of open processes is the same as the number of machine CPUs) and execute multiple tasks in parallel at the same time.Python 3.2 used the multiprocessing | threading module to write multiprocess and multithreaded code before it was abstracted and encapsulated. The concurrent.futures module, which contains ProcessPoolExecutor | ThreadPoolExecutor, is used to create multiprocess and multithreaded code.

Key concepts

Exectuor: It is a task abstract class that provides methods for asynchronous execution (submit, map)

Example of asynchronous operation returned by Future:Executor

Process pool code implementation

import time
from concurrent.futures import ProcessPoolExecutor

def task(task_id):
    time.sleep(1)
    print(f'now is {task_id}')
    return task_id

if __name__ == '__main__':
    start = time.time()
    cpu_count = multiprocessing.cpu_count()
    with ProcessPoolExecutor() as p:
        futures = [p.submit(task, i) for i in range(cpu_count)]

        print([future.result() for future in futures])
        print(time.time() - start)

# Result:
# now is 0
# now is 2
# now is 1
# now is 3
# [0, 1, 2, 3]
# 1.3288664817810059

Thread pool code implementation

Multithreads do not need to switch contexts, so the thread pool will be slightly faster

import time
from concurrent.futures import ThreadPoolExecutor

def task(task_id):
    time.sleep(1)
    print(f'now is {task_id}')
    return task_id

if __name__ == '__main__':
    start = time.time()
    with ThreadPoolExecutor() as p:
        futures = [p.submit(task, i) for i in range(5)]

        print([future.result() for future in futures])
        print(time.time() - start)
        
# Result:
# now is 1
# now is 0
# now is 4
# now is 3
# now is 2
# [0, 1, 2, 3, 4]
# 1.0025899410247803

GIL & Protocol

Here's a quick reference to GIL, a very old python topic.

GIL refers to Global Interpreter Lock Global Interpreter Lock. For thread safety, Python introduces a global lock when multithreaded execution, which ensures that only one thread is executing at a time, then switches over 100 ticks, which has little impact on I/O-intensive program execution, but undoubtedly limits the efficiency of execution for CPU-intensive programs.So Python later mapped the collaboration Coroutine.

A coprocess is a user-state lightweight thread that interrupts function A at any time, executes function B, and then switches function A to achieve multitask asynchronous execution.

Code example:

import asyncio


# Define a protocol (a generator that can be executed and switched at any time)
async def task(i):
    print(f'start {i} task')
    await asyncio.sleep(1)
    print(f'end {i} task')


loop = asyncio.get_event_loop()   # Gets the event loop object, registers the function on the event, and calls the corresponding function if the condition is met
tasks = [task(i) for i in range(3)]   # Task Object
loop.run_until_complete(asyncio.wait(tasks))  # Execute a collaborative task

# Result:
# start 1 task
# start 2 task
# start 0 task
# end 1 task
# end 2 task
# end 0 task

Reference resources

https://cuiqingcai.com/6160.html

https://www.cnblogs.com/hucho...

https://www.zhihu.com/questio...

Tags: Python

Posted on Sun, 10 Nov 2019 18:23:13 -0800 by JC99