python pipeline, event, semaphore, process pool

Pipeline: two-way communication two processes communicate with each other

from  multiprocessing import Process, Pipe

def f1(conn):
    from_zjc_msg = conn.recv()
    print('I'm a subprocess')
    print('Messages from the main process:', from_zjc_msg)

if __name__ == '__main__':
    conn1, conn2 = Pipe() # Create a pipe object, full duplex, Return to both ends of pipe, But messages sent at one end,Can only be accepted at the other end;I can't accept it on my own end
    # One or both sides can be sent to other processes, So many processes communicate with each other through this pipeline
    p1 = Process(target=f1, args=(conn2, ))
    p1.start()
    conn1.send('I feel a little sleepy')

    print('I am the main process')

Event usage:

from mutiprocessing import Event

event=Event() 񖓿 set an event object, and the initial flag is False

event.set() 񖓿 change flag bit to True

event.clear() 񖓿 change flag bit to False

event.wait() ා wait until the flag bit is set to True, then continue to execute downward

from multiprocessing import Process, Event

e = Event() # Create event object, The initial state of this object is False
print('e The state is', e.is_set())

print('The program is running here')

e.set() # take e The status of is changed to True
print('e The state is', e.is_set())
# e.clear() # Change the state of e to False

e.wait() # e If the value of this event object is False, Just wait here.
print('The program has passed. wait')

Event based process communication

import time
from multiprocessing import Process, Event

def f1(e):
    time.sleep(2)
    n = 100
    print('The result of subprocess calculation is:', n)
    e.set()

if __name__ == '__main__':
    e = Event()
    p = Process(target=f1, args=(e, ))
    p.start()

    print('Main process waiting.....')
    e.wait()
    print('The results have come out, You can get the value')

Semaphore, used to control the number of concurrent threads

Module import required:

from multiprocessing import Process, Semaphore
Important methods have 2 objects. acquire() and object. release()
import time
import random
from multiprocessing import Process, Semaphore

def f1(i, s):
    s.acquire()

    print('%s Guest number one is in' % i)
    time.sleep(random.randint(1, 3))
    print('%s Guest No' % i)
    s.release()

if __name__ == '__main__':
    s = Semaphore(2) # Counter 2, acquire One time reduction, For 0, Others wait, release Plus 1
    for i in range(5):
        p = Process(target=f1, args=(i, s))
        p.start()

Process pool: define a pool in which a fixed number of processes are placed. If there is a need, process in a pool will be used to process tasks. After processing, the process will not be closed, but will be put back into the process pool to wait for tasks.
If there are many tasks that need to be executed and the number of processes in the pool is not enough, the task will wait for the previous process to finish executing the task and return, and then it can continue to execute after getting the idle process. In other words, the number of processes in the pool is fixed,
At most a fixed number of processes are running at the same time.
# #Comparison of execution time between the process pool and multiprocess
import time
from multiprocessing import Process, Pool

def f(n):
    for i in range(5):
        n += i
if __name__ == '__main__':
    # Count the time when the process pool executes 100 tasks
    s_time = time.time()
    pool = Pool(4) # This parameter specifies how many processes are in the process pool, 4 Represents 4 processes, If not, The number of processes opened by default is generally cpu Number of
    pool.map(f, range(100)) # Parameter data must be iterative, Submit tasks asynchronously, Bring along join function
    e_time = time.time()
    dif_time = e_time - s_time

    # Count 100 processes, To perform 100 tasks
    p_s_t = time.time() # Multiprocess start time
    p_list = [ ]
    for i in range(100):
        p = Process(target=f, args=(i,))
        p.start()
        p_list.append(p)
    [pp.join() for pp in p_list]
    p_e_t = time.time()
    p_dif_t = p_e_t - p_s_t
    print('Execution time of process pool:', dif_time)
    print('Execution time of multi process:', p_dif_t)

Process pool synchronization method:

import time
from multiprocessing import Process, Pool

def f1(n):
    # print(n)
    time.sleep(2)
    return n * 2

if __name__ == '__main__':
    pool = Pool(2)

    for i in range(5):
        print('xxxxx')
        res = pool.apply(f1, args=(i, ))
        print(res)

Result: execute XXXX for 2s, execute 0, and XXXX for 2s, execute 2 xxxxx  
Process pool asynchronous:
import time
from multiprocessing import Process, Pool

def f1(n):
    time.sleep(2)
    return n * 2

if __name__ == '__main__':
    pool = Pool(2)
    res_list = [ ]
    for i in range(5):
        print('xxxxx')
        res = pool.apply_async(f1, args=(i,))
        res_list.append(res)
    for i  in res_list:
        print(i.get())

Callback function for process pool:

import os
from multiprocessing import Pool, Process

def f1(n):
    print('Processes in process pool pid', os.getpid())
    print(n)
    return 2 * n

def f2(n):
    print('Process in callback function pid', os.getpid())
    print(n)

if __name__ == '__main__':
    pool = Pool(4)
    res = pool.apply_async(f1, args=(5,), callback=f2)
    pool.close()
    pool.join()
    print('Process in main program pid', os.getpid())

Tags: Python

Posted on Tue, 03 Dec 2019 00:02:27 -0800 by steelaz