Multithreading of Crawler Introduction and the Use of Thread Pool

What is Thread

Python's threading module is a lower level module. python's threading module is a package of threads, which can be used more conveniently.
1. Threads are the basic unit of cpu execution
2. The execution between threads is disorderly
3. Thread resources under the same process are shared (thread lock, mutex lock)
4. Threads can achieve multi-tasking and deal with I/O-intensive tasks.

Using threading module

Single thread execution

import time 
  def saySorry():  
 for i in range(5): 
 print("Honey, I'm wrong. Can I eat?") 
 time.sleep(1)
 def do(): for i in range(5):
  print("Honey, I'm wrong. I'll give you a massage.") 
 time.sleep(1)
 if __name__ == "__main__":
    saySorry() 
    saydo()

Multithread execution

import threading 
import time
def saySorry():   
	for i in range(5): 
	print("Honey, I'm wrong. Can I eat?")
	 time.sleep(1)
 def do(): 
 	for i in range(5):
 	print("Honey, I'm wrong. I'll give you a massage.")
   	time.sleep(1)
   	if __name__ == "__main__": 
	   	td1 = threading.Thread(target=saySorry) 
   		td1.start() #Start a thread, that is, let the thread start execution
   		td2 = threading.Thread(target=saySorry)
   	  	td2.start() #Start a thread, that is, let the thread start execution

Introduction of threading.Thread parameters

  • target: Thread-executed function

  • Name: Thread name

  • args: Parameters to be passed in the execution function, tuple type

  • kwargs: Passing Parameters (Dictionary)
    In addition, note the daemon parameter

  • If the daemon attribute of a sub-thread is False, the main thread will check whether the sub-thread ends at the end of the main thread. If the sub-thread is still running, the main thread will wait for it to complete before exiting.

  • If the daemon attribute of a sub-thread is True, the main thread does not check the sub-thread at the end of running and exits directly. At the same time, all sub-threads whose daemon value is True will end with the main thread, regardless of whether the running is completed or not.

  • The value of the attribute daemon defaults to False, and if you need to modify it, you must set it before calling the start() method to start the thread.

Explain

  1. It's obvious that multithreaded concurrent operations take much less time.
  2. When start() is called, the thread is actually created and executed.
Method name Effect
start() method Open threads
join() method Thread blocking
daemon = False Background thread, the end of main thread does not affect the running of sub-threads
daemon = True Front threads, main threads end sub-threads end

mutex

Synchronization control is required when multiple threads modify a shared data almost simultaneously
Thread synchronization can ensure that multiple threads access competing resources safely. The simplest synchronization mechanism is the introduction of mutex.
Mutexes introduce a state for resources: lock/unlock
When a thread wants to change the shared data, it locks it first. At this time, the state of the resource is "locked", and other threads can not change it. Until the thread releases the resource and changes the state of the resource to "unlocked", other threads can lock the resource again. Mutex lock ensures that there is only one thread to write at a time, thus ensuring the correctness of data in the case of multi-threading.
Lock class is defined in hreading module, which can handle lock conveniently:

	# Create locks
 	mutex = threading.Lock() 
	 # locking 
 	mutex.acquire()
  	# release
 	 mutex.release(

Be careful:

  • If the lock was not locked before, acquire would not be blocked.
  • If acquire has been locked by other threads before calling acquire to lock the lock, acquire will block until the lock is unlocked.
    Using mutex to complete two threads adding 1 million operations to the same global variable
import threading 
import time
g_num = 0
def test1(num): 
global g_num
 for i in range(num):
  mutex.acquire() # Lock up 
  g_num += 1
   mutex.release() # Unlock
   print("---test1---g_num=%d"%g_num)
def test2(num):
	global g_num
	for i in range(num):
	mutex.acquire() # Lock up
	g_num += 1
	mutex.release() # Unlock	
	print("---test2---g_num=%d"%g_num)
	# Create a mutex 
	# The default is unlocked 
	mutex = threading.Lock()
	//Create two threads to add 1000000 times to g_num
	p1 = threading.Thread(target=test1, args=(1000000,))
	p1.start()
	p2 = threading.Thread(target=test2, args=(1000000,))
	p2.start()
	p1.join() p2.join()
	print("2 The end result of a single thread operating on the same global variable is:%s" % g_num)

Operation results:

After two threads operate on the same global variable, the final result is: 2000000

You can see the final result. When mutexes are added, the results are in line with expectations.
Lock-up and unlock process

  • When a thread calls the acquire() method of the lock to get the lock, the lock enters the "locked" state.
  • Only one thread at a time can get the lock. If another thread tries to acquire the lock at this time, the thread will become "blocked" state, called "blocked", until the release() method of the thread with the lock calls the lock to release the lock and the lock enters the "unlocked" state.
  • The thread scheduler selects one of the threads in the synchronous blocking state to get the lock and make the thread run.
summary

The Benefits of Locks

  • Ensure that a critical piece of code can only be executed by one thread from beginning to end
    The disadvantages of locks:
  • It prevents the concurrent execution of multi-threads, and a piece of code containing locks can only be executed in single-threaded mode, which greatly reduces the efficiency.
  • Because multiple locks can exist, different threads hold different locks and may cause deadlocks when trying to acquire locks held by the other party.

Thread pool

Import Module Pack

from concurrent.futures import ThreadPoolExecutor

Create a thread pool and add tasks to it

#Create a thread pool 
pool = ThreadPoolExecutor(10) 
#How do I submit tasks to the thread pool?
# Adding tasks to the thread pool
for pagenum in range(50): 
#submit: Represents giving the task we need to perform to this thread pool.
handler = pool.submit(get_page_data,pagenum)
#After setting tasks to the thread pool, you can set a callback function. #The function is: when we finish a task, we will call back the callback function you set up.
handler.add_done_callback(done)
pool.shutdown(wait=True)

case

from concurrent.futures import ThreadPoolExecutor
import requests
from lxml.html import etree
import requests

class CollegateRank(object):

    def get_page_data(self,url):
        response = self.send_request(url=url)
        if response:
            # print(response)
            with open('page.html','w',encoding='gbk') as file:
                file.write(response)
            self.parse_page_data(response)


    def parse_page_data(self,response):
        #Parsing data using xpath
        etree_xpath = etree.HTML(response)
        ranks = etree_xpath.xpath('//div[@class="scores_List"]/dl')
        # print(ranks)
        pool = ThreadPoolExecutor(10)
        for dl in ranks:
            school_info = {}
            school_info['url'] = self.extract_first(dl.xpath('./dt/a[1]/@href'))
            school_info['icon'] = self.extract_first(dl.xpath('./dt/a[1]/img/@src'))
            school_info['name'] = self.extract_first(dl.xpath('./dt/strong/a/text()'))
            school_info['adress'] = self.extract_first(dl.xpath('./dd/ul/li[1]/text()'))
            school_info['tese'] = ','.join(dl.xpath('./dd/ul/li[2]/span/text()'))
            school_info['type'] = self.extract_first(dl.xpath('./dd/ul/li[3]/text()'))
            school_info['belong'] = self.extract_first(dl.xpath('./dd/ul/li[4]/text()'))
            school_info['level'] = self.extract_first(dl.xpath('./dd/ul/li[5]/text()'))
            school_info['weburl'] = self.extract_first(dl.xpath('./dd/ul/li[6]/text()'))

            print(school_info['url'],school_info)
            result = pool.submit(self.send_request,school_info['url'])
            result.add_done_callback(self.parse_school_detail)
        # pool.shutdown()

    # Callback method after thread execution
    def parse_school_detail(self,future):
        text = future.result()
        print('Parsing data',len(text))

    def extract_first(self,data=None,defalut=None):
        if len(data)  > 0:
            return data[0]
        return defalut


    def send_request(self, url, headers=None):
        headers = headers if headers else {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36'}
        response = requests.get(url=url,headers=headers)
        if response.status_code == 200:
            return response.text

if __name__ == '__main__':
    url = 'http://college.gaokao.com/schlist/'
    obj = CollegateRank()
    obj.get_page_data(url)


Tags: Attribute Python less encoding

Posted on Tue, 10 Sep 2019 04:23:22 -0700 by MidOhioIT