16 Tornado - Cognitive Asynchronism

1. Synchronization

We use two functions to simulate two client requests and process them in turn:

# coding:utf-8

def req_a():
    """Simulated request a"""
    print 'Start processing requests req_a'
    print 'Complete processing request req_a'

def req_b():
    """Simulated request b"""
    print 'Start processing requests req_b'
    print 'Complete processing request req_b'

def main():
    """simulation tornado Framework, handling two requests"""
    req_a()
    req_b()

if __name__ == "__main__":
    main()

Implementation results:

Start processing request req_a
 Complete processing request req_a
 Start processing request req_b
 Complete processing request req_b

Synchronization is carried out step by step, always at the same pace, the last step is not completed will not carry out the next step.

Think about it, if you need to perform a time-consuming task (such as IO) when processing req_a request, what is the execution process?

# coding:utf-8

import time

def long_io():
    """Time-consuming simulation IO operation"""
    print "Start execution IO operation"
    time.sleep(5)
    print "complete IO operation"
    return "io result"

def req_a():
    print "Start processing requests req_a"
    ret = long_io()
    print "ret: %s" % ret
    print "Complete processing request req_a"

def req_b():
    print "Start processing requests req_b"
    print "Complete processing request req_b"

def main():
    req_a()
    req_b()

if __name__=="__main__":
    main()

Implementation process:

Start processing request req_a
 Start performing IO operations
 Complete IO operation
 Complete processing request req_a
 Start processing request req_b
 Complete processing request req_b

In the test above, we see that time-consuming operations block code execution, that is, req_a is not processed req_b can not be executed.

2. Asynchronism

For a time-consuming process, we leave it to someone else (such as another thread) to execute, and we proceed to the next step. When someone else completes the time-consuming operation, the result is fed back to us. This is what we call asynchrony.

We use easy-to-understand threading mechanisms to achieve asynchrony.

2.1 Callback Writing Principle

# coding:utf-8

import time
import thread

def long_io(callback):
    """Delegate time-consuming operations to another thread for processing"""
    def fun(cb): # Callback function as parameter
        """time-consuming operation"""
        print "Start execution IO operation"
        time.sleep(5)
        print "complete IO Operate and execute callback functions"
        cb("io result")  # Execute callback function
    thread.start_new_thread(fun, (callback,))  # Open threads to perform time-consuming operations

def on_finish(ret):
    """callback"""
    print "Start executing callback functions on_finish"
    print "ret: %s" % ret
    print "Complete execution of callback function on_finish"

def req_a():
    print "Start processing requests req_a" 
    long_io(on_finish)
    print "Leave processing requests req_a"

def req_b():
    print "Start processing requests req_b"
    time.sleep(2) # Add this sentence to highlight the process of program execution
    print "Complete processing request req_b"

def main():
    req_a()
    req_b()
    while 1: # Adding this sentence prevents the program from exiting and ensures that the thread can finish executing.
        pass

if __name__ == '__main__':
    main()

Implementation process:

Start processing request req_a
 Departure processing request req_a
 Start processing request req_b
 Start performing IO operations
 Complete processing request req_b
 Complete IO operation and execute callback function
 Start executing the callback function on_finish
ret: io result
 Complete the execution of the callback function on_finish

Asynchronism is characterized by multiple steps in a program, i.e. code that belongs to the same process may be executed at different steps at the same time.

2.2 Realization Principle of Coprocess Writing

When writing asynchronous programs with callback functions, code that belongs to one execution logic (processing request a) needs to be split into two functions, req_a and on_finish, which are quite different from the writing of synchronous programs. Synchronization programs are easier to understand business logic, so can we write asynchronization programs using synchronization code?

Initial version

# coding:utf-8

import time
import thread

gen = None # Global Generator for long_io

def long_io():
    def fun():
        print "Start execution IO operation"
        global gen
        time.sleep(5)
        try:
            print "complete IO Operations, and send Result Wake-up and Suspend Procedure Continue to Execute"
            gen.send("io result")  # Return the result with send and wake up the program to continue execution
        except StopIteration: # Capture Generator Completes Iteration to Prevent Program Exit
            pass
    thread.start_new_thread(fun, ())

def req_a():
    print "Start processing requests req_a"
    ret = yield long_io()
    print "ret: %s" % ret
    print "Complete processing request req_a"

def req_b():
    print "Start processing requests req_b"
    time.sleep(2)
    print "Complete processing request req_b"

def main():
    global gen
    gen = req_a()
    gen.next() # Open the execution of generator req_a
    req_b()
    while 1:
        pass

if __name__ == '__main__':
    main()

Implementation process:

Start processing request req_a
 Start processing request req_b
 Start performing IO operations
 Complete processing request req_b
 Complete IO operation and send result wake up pending program to continue execution
ret: io result
 Complete processing request req_a

Upgraded version

Although req_a is written in a similar way to synchronous code, we can not simply call req_a in main as a normal function, but need to be treated as a generator.

Now, we're trying to modify it so that req_a and main are written like synchronous code.

# coding:utf-8

import time
import thread

gen = None # Global Generator for long_io

def gen_coroutine(f):
    def wrapper(*args, **kwargs):
        global gen
        gen = f()
        gen.next()
    return wrapper

def long_io():
    def fun():
        print "Start execution IO operation"
        global gen
        time.sleep(5)
        try:
            print "complete IO Operations, and send Result Wake-up and Suspend Procedure Continue to Execute"
            gen.send("io result")  # Return the result with send and wake up the program to continue execution
        except StopIteration: # Capture Generator Completes Iteration to Prevent Program Exit
            pass
    thread.start_new_thread(fun, ())

@gen_coroutine
def req_a():
    print "Start processing requests req_a"
    ret = yield long_io()
    print "ret: %s" % ret
    print "Complete processing request req_a"

def req_b():
    print "Start processing requests req_b"
    time.sleep(2)
    print "Complete processing request req_b"

def main():
    req_a()
    req_b()
    while 1:
        pass

if __name__ == '__main__':
    main()

Implementation process:

Start processing request req_a
 Start processing request req_b
 Start performing IO operations
 Complete processing request req_b
 Complete IO operation and send result wake up pending program to continue execution
ret: io result
 Complete processing request req_a

Final version

The version just completed is still not ideal because there is a global variable gen for long_io. Now we rewrite the program again to eliminate the global variable gen.

# coding:utf-8

import time
import thread

def gen_coroutine(f):
    def wrapper(*args, **kwargs):
        gen_f = f()  # gen_f as generator req_a
        r = gen_f.next()  # r is generator long_io
        def fun(g):
            ret = g.next() # Execution Generator long_io
            try:
                gen_f.send(ret) # Return the result to req_a and continue execution
            except StopIteration:
                pass
        thread.start_new_thread(fun, (r,))
    return wrapper

def long_io():
    print "Start execution IO operation"
    time.sleep(5)
    print "complete IO Operation, yield Return operation result"
    yield "io result"

@gen_coroutine
def req_a():
    print "Start processing requests req_a"
    ret = yield long_io()
    print "ret: %s" % ret
    print "Complete processing request req_a"

def req_b():
    print "Start processing requests req_b"
    time.sleep(2)
    print "Complete processing request req_b"

def main():
    req_a()
    req_b()
    while 1:
        pass

if __name__ == '__main__':
    main()

Implementation process:

Start processing request req_a
 Start processing request req_b
 Start performing IO operations
 Complete processing request req_b
 Complete the IO operation and return the result of the operation
ret: io result
 Complete processing request req_a

This final version is the simplest model to understand the principles of Tornado asynchronous programming. However, the mechanism of Tornado asynchronization is not threads, but epoll, which hands over the asynchronous process to epoll for execution and monitoring callbacks.

It is important to note that the version we implemented is not a co-process strictly because the hanging and waking of two programs are implemented on two threads, while Tornado uses epoll to achieve asynchrony. The hanging and waking of programs are always on one thread and are scheduled by Tornado himself, which belongs to the real sense. Protocol. Nevertheless, it does not prevent us from understanding the principles of Tornado asynchronous programming.

Tags: Programming

Posted on Mon, 02 Sep 2019 03:07:50 -0700 by fitchn