Case analysis of python multi-process communication

The operating system allocates a separate address space for each created process, and the address space of different processes is completely isolated, so they can't feel each other's existence without other measures. So how do processes communicate? What is the relationship between them? What is the principle of implementation? In this paper, we will use Python to simply talk about the communication between processes. Or that sentence, the principle is the same, I hope that through specific examples to understand the essence of things.

Next, try to introduce each type of communication in a simple way. The specific details can be used with reference to documents.

1. Pipeline

Let's first look at the simplest and oldest IPC: pipeline. Usually it refers to nameless pipelines. Essentially, it can be seen as a kind of file, which only exists in memory and does not save. Different processes read or write data to the pipeline through the interface provided by the system.

That is to say, we provide a way of communicating with the process through such an intermediate medium. The limitation of anonymous pipelines is that they are generally used only for parent-child processes with direct relationships. Here's a simple example of how to use it.

from multiprocessing import Process, Pipe

def pstart(pname, conn):
    conn.send("Data@subprocess")
    print(conn.recv())          # Data@parentprocess

if __name__ == '__main__':
    conn1, conn2 = Pipe(True)
    sub_proc = Process(target=pstart, args=('subprocess', conn2,))
    sub_proc.start()
    print (conn1.recv())        # Data@subprocess
    conn1.send("Data@parentprocess")
    sub_proc.join()

Pipeline communication trilogy:

  1. Create Pipe and get two connection objects conn1 and conn2.
  2. The parent process holds conn1 and passes conn2 to the child process.
  3. The parent-child process transmits and receives data by send and recv operations on the connection objects it holds.

Above we have created a full duplex pipeline, can also create a half duplex pipeline, specific use can refer to the official description:

Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

If duplex is True (the default) then the pipe is bidirectional. If duplex is False then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages.

2. Named Pipeline (FIFO)

The pipelines described above are mainly used for processes that have a direct relationship with each other, and they have great limitations. Let's look at a named pipe that can communicate between any process.

Because the os module on Windows platform has no mkfifo attribute, this example can only run on linux (test environment CentOS 7, Python 2.7.5):

#!/usr/bin/python
import os, time
from multiprocessing import Process

input_pipe = "./pipe.in"
output_pipe = "./pipe.out"

def consumer():
    if os.path.exists(input_pipe):
        os.remove(input_pipe)
    if os.path.exists(output_pipe):
        os.remove(output_pipe)

    os.mkfifo(output_pipe)
    os.mkfifo(input_pipe)
    in1 = os.open(input_pipe, os.O_RDONLY)        # read from pipe.in
    out1 = os.open(output_pipe, os.O_SYNC | os.O_CREAT | os.O_RDWR)
    while True:
        read_data = os.read(in1, 1024)
        print("received data from pipe.in: %s @consumer" % read_data)
        if len(read_data) == 0:
            time.sleep(1)
            continue

        if "exit" in read_data:
            break
        os.write(out1, read_data)
    os.close(in1)
    os.close(out1)

def producer():
    in2 = None
    out2 = os.open(input_pipe, os.O_SYNC | os.O_CREAT | os.O_RDWR)

    for i in range(1, 4):
        msg = "msg " + str(i)
        len_send = os.write(out2, msg)
        print("------product msg: %s by producer------" % msg)
        if in2 is None:
            in2 = os.open(output_pipe, os.O_RDONLY)        # read from pipe.out
        data = os.read(in2, 1024)
        if len(data) == 0:
            break
        print("received data from pipe.out: %s @producer" % data)
        time.sleep(1)

    os.write(out2, 'exit')
    os.close(in2)
    os.close(out2)

if __name__ == '__main__':
    pconsumer = Process(target=consumer, args=())
    pproducer = Process(target=producer, args=())
    pconsumer.start()
    time.sleep(0.5)
    pproducer.start()
    pconsumer.join()
    pproducer.join()

The operation process is as follows:

The process of each round is as follows:

  1. The producer process writes message data to the pipe.in file.
  2. The consumer process reads the message data from the pipe.in file.
  3. The consumer process writes the receipt message data to the pipe.out file.
  4. The producer process reads the receipt message data from the pipe.out file.

The results are as follows:

[shijun@localhost python]$ python main.py
------product msg: msg 1 by producer------
received data from pipe.in: msg 1 @consumer
received data from pipe.out: msg 1 @producer
------product msg: msg 2 by producer------
received data from pipe.in: msg 2 @consumer
received data from pipe.out: msg 2 @producer
------product msg: msg 3 by producer------
received data from pipe.in: msg 3 @consumer
received data from pipe.out: msg 3 @producer
received data from pipe.in: exit @consumer

There is no direct relationship between the two processes. Each process has a read file and a write file. If the read and write files of the two processes are related, they can communicate.

3. Queue

Message data is transmitted between processes by adding data to or retrieving data from queues. Here is a simple example.

from multiprocessing import Process, Queue
import time

def producer(que):
    for product in ('Orange', 'Apple', ''):
        print('put product: %s to queue' % product)
        que.put(product)
        time.sleep(0.5)
        res = que.get()
        print('consumer result: %s' % res)

def consumer(que):
    while True:
        product = que.get()
        print('get product:%s from queue' % product)
        que.put('suc!')
        time.sleep(0.5)
        if not product:
            break

if __name__ == '__main__':
    que = Queue(1)
    p = Process(target=producer, args=(que,))
    c = Process(target=consumer, args=(que,))
    p.start()
    c.start()
    p.join()
    c.join()

This example is relatively simple, the specific use of queue can refer to the official website.

Result:

put product: Orange to queue
consumer result: suc!
put product: Apple to queue
consumer result: suc!
put product:  to queue
consumer result: suc!
get product:Orange from queue
get product:Apple from queue
get product: from queue

Here are a few points to note:

  1. You can specify the capacity of the queue, if it exceeds the capacity, there will be an exception: raise Full;
  2. The default put and get block the current process.
  3. If put is not blocked, it is possible to retrieve the data from the queue itself.

4. Shared memory

Shared memory is a common and efficient way of communication between processes. In order to ensure orderly access to shared memory, additional synchronization measures are needed for processes.

The following example simply demonstrates how to use shared memory to communicate between different processes in Python.

from multiprocessing import Process
import mmap
import contextlib
import time

def writer():
    with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_WRITE)) as mem:
        for share_data in ("Hello", "Alpha_Panda"):
            mem.seek(0)
            print('Write data:== %s == to share memory!' % share_data)
            mem.write(str.encode(share_data))
            mem.flush()
            time.sleep(0.5)

def reader():
    while True:
        invalid_byte, empty_byte = str.encode('\x00'), str.encode('')
        with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_READ)) as mem:
            share_data = mem.read(1024).replace(invalid_byte, empty_byte)
            if not share_data:
                """ End when shared memory has no valid data reader """
                break
            print("Get data:== %s == from share memory!" % share_data.decode())
        time.sleep(0.5)


if __name__ == '__main__':
    p_reader = Process(target=reader, args=())
    p_writer = Process(target=writer, args=())
    p_writer.start()
    p_reader.start()
    p_writer.join()
    p_reader.join()

Implementation results:

Write data:== Hello == to share memory!
Write data:== Alpha_Panda == to share memory!
Get data:== Hello == from share memory!
Get data:== Alpha_Panda == from share memory!

The following is a brief explanation of the principle of shared memory.

A mapping barrier from the process virtual address to the physical address is as follows:

The diagram above clearly shows the principle of shared memory.

On the left, under normal circumstances, the linear address space of different processes is mapped to different physical memory pages, so that no matter how other processes modify physical memory, it will not affect other processes.

In the case of shared memory, part of the linear addresses of different processes will be mapped to the same physical page. The modification of one process to the physical page will be immediately visible to another process.

Of course, the potential problem is to take process synchronization measures, that is, access to shared memory must be mutually exclusive. This can be achieved by means of semaphores.

5. socket communication

Finally, we introduce a kind of inter-process communication that can cross-host: socket.

People who know network programming should be familiar with this. Sockets can communicate not only across hosts, but sometimes between different processes on the same host.

This part of the code is relatively simple and common, here only use the flow chart to show the process of socket communication and related interfaces.

The chart above shows a process of socket communication between a process on the client and a listener on the server.

Summary

Here we briefly introduce some common concepts and examples of interprocess communication. I hope this article can give you a deeper understanding and understanding of interprocess communication.

Combining with the introduction of threads, process concepts and some measures of synchronization between threads in previous articles, I believe that we should have a simple and clear understanding of the concepts of threads and processes.

Tags: Python socket Windows Attribute

Posted on Sat, 31 Aug 2019 06:47:08 -0700 by zypher11