This page looks best with JavaScript enabled

多线程、协程与高并发(1)

 ·  ☕ 13 min read · 👀... views

0x01 多线程

多线程是程序在同样的上下文中同时运行多条线程的能力。这些线程共享同一个进程的资源,可以在并发模式(单核处理器)或并行模式(多核处理器)下执行多个任务。

通常,多线程技术完全可以在多处理器上实现并行计算。但是Python的官方版本(CPython)有一个GIL(全局解释锁)限制,GIL会阻止多个线程同时运行Python的字节码,这就不是真正的并行了。假如你的系统有6个处理器,多线程可以把CPU跑到600%,然而,你能看到的只有100%,甚至更慢一点,这都是GIL造成的。正是因为GIL的存在,Python的多线程对于计算密集型(CPU-bound)任务性能提升不升反降(因为可利用的cpu资源不变,增加了线程创建、切换与销毁的开销),但是对于I/O密集型(I/O-bound)任务性能仍有不小的提升,因为一个线程在I/O等待时可以切换到别的线程进行运算。

CPython的GIL是有必要的,因为CPython的内存管理不是线程安全的。因此,为了让每个任务都按顺序进行,它需要确保运行过程中内存不被干扰。它可以更快的运行单线程程序,简化C语言扩展库的使用方法,因为它不需要考虑多线程问题。对于GIL问题的解决,通常有两个解决方案:1.多进程。2.用C写python的拓展。对于这两个解决方案均不在本节博客的讨论范围之内,本节博客主要讨论如何使用多线程、协程提升I/O密集型任务的执行效率。

多线程有以下几个优点:

  1. 持续响应:在单线程的程序中,执行一个长期运行的任务可能会导致程序的冻结。多线程可以把这个长期运行的任务放在一个线程中,在程序并发的运行任务时可以持续响应客户的需求
  2. 更快的执行速度:在多核处理器的操作系统上,多线程可以通过真正的并行提高程序的运行速度
  3. 较低的资源消耗:利用线程模式,程序可以利用一个进程内的资源响应多个请求
  4. 更简单的状态共享与进程间的通信机制:由于线程都共享同一资源和内存空间,因此线程之间的通比进程间通信简单
  5. 并行化:多处理器系统可以实现多线程的每个线程独立运行

但是多线程也有以下几个缺点:

  1. 线程同步:由于多个线程是在同一数据上运行的,所以需要引入一些机制预防竞态条件
  2. 问题线程导致集体崩溃:虽然多个线程可以独立运行,但一旦某个线程出现问题,也可能造成整个进程崩溃
  3. 死锁:这是线程操作的常见问题。通常,线程执行任务时会锁住正在使用的资源,当一个线程开始等待另一个线程资源释放,而另一个线程同时也要等待第一个线程释放资源时,就发生了死锁

0x02 用threading模块创建线程

threading模块是目前Python中处理线程普遍推荐的模块。相比_thead模块,threading这个模块提供了更完善和高级的接口,这里通过threading模块演示一个最简单的多线程累加数字的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading
import time
 
global_values = 0

def run(thread_name, lock):
    global global_values
    local_copy = global_values
    print("%s with value %s" % (thread_name, local_copy))
    global_values = local_copy + 1
 
for i in range(10):
    t = threading.Thread(target=run, args=("thread-(%s)" % str(i),))
    t.start()
 
time.sleep(3)
print("global_values:%s" % global_values)

运行结果:

thread-(0) with value 0
thread-(1) with value 0
thread-(2) with value 0
thread-(4) with value 0
thread-(6) with value 0
thread-(8) with value 0
thread-(7) with value 0
thread-(5) with value 0
thread-(3) with value 0
thread-(9) with value 1
global_values:2

看到这里,一定会有朋友纳闷了,这是什么鬼?为什么不是预期结果?造成这样的原因,都是因为多个线程操作同一变量或同一代码块导致有的线程不能读到最新的值,甚至是把旧值的运算结果赋给全部局变量。为了解决这个问题,将代码修改如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time
 
global_values = 0

def run(thread_name, lock):
    global global_values
    lock.acquire()
    local_copy = global_values
    print("%s with value %s" % (thread_name, local_copy))
    global_values = local_copy + 1
    lock.release()
 
lock = threading.Lock()
 
for i in range(10):
    t = threading.Thread(target=run, args=("thread-(%s)" % str(i), lock))
    t.start()
 
time.sleep(3)
print("global_values:%s" % global_values)

结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
thread-(0) with value 0
thread-(2) with value 1
thread-(4) with value 2
thread-(5) with value 3
thread-(3) with value 4
thread-(6) with value 5
thread-(1) with value 6
thread-(7) with value 7
thread-(8) with value 8
thread-(9) with value 9

可以看到,线程的执行顺序依旧是乱序的,但全局变量的值是逐个递增的。然后我们就需要来说说这个lock对象是什么,有什么用。这里的lock对象,是一个线程锁,从分类上来说,它是互斥锁。互斥锁是用来解决io密集型场景产生的计算错误,即目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。当一个线程执行lock.acquire()时,lock对象就会变成locked状态,当其他线程再运行到这一步是,会全部暂停,直到获取锁的那个线程执行lock.release()方法释放锁,lock对象重新变回unlocked状态,其他线程才会争抢锁,争到锁的线程继续执行代码,其余线程接着等待。通过这样一个锁机制,我们可以保证一些必要部分的代码在同一时间点内只能被一个线程所执行。比如写文件操作,如果多个线程同时对同一个文件进行写入,便会使文件内容错乱,这就需要使用到线程锁。

0x03 类方法实现多线程

对于更复杂的情况,如果要更好地封装线程的行为,我们可能需要创建自己的线程类,这里需要注意几点:

  1. 需要继承threading.Thread类
  2. 需要改写run方法,也可以使用__init__方法
  3. 如果改写初始化方法__init__,需要在一开始调用父类的初始化方法threading.Thread.init
  4. 当线程的run方法停止或抛出未处理的异常时,线程将停止,因此要提前设计好方法
  5. 可以用初始化方法的name参数名称命名你的线程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time
 
 
class MyThread(threading.Thread):
 
    def __init__(self, count):
        threading.Thread.__init__(self)     #可加参数name设置线程名称
        self.total = count
 
    def run(self):
        for i in range(self.total):
            time.sleep(1)
            print("Thread:%s - %s" % (self.name, i))
 
 
t = MyThread(2)
t2 = MyThread(3)
t.start()
t2.start()
 
print("finish")

0x04 join()

通常的,当我们使用多线程执行任务时,主线程结束后,若其余线程仍在执行任务,主线程的结束不会影响其余线程继续进行任务,但是,在一些特定情况下主线程的结束会干扰其他线程的运行,这就需要我们用join()方法阻塞住线程的运行。比如这段代码:

1
2
3
4
5
6
f = open("content.txt", "w+")
t = MyThread(2, f)
t2 = MyThread(3, f)
t.start()
t2.start()
f.close()

我们假设在MyThread中会将打印的语句写入content.txt,但这段代码是会出问题的,因为在开启其他线程前,主线程可能会先关闭文件处理器,如果想避免这种情况,应该使用join方法,join方法会使得被调用的线程执行完毕后,在能返回原先的线程继续执行下去:

1
2
3
4
5
6
7
8
9
f = open("content.txt", "w+")
t = MyThread(2, f)
t2 = MyThread(3, f)
t.start()
t2.start()
t.join()
t2.join()
f.close()
print("finish")

join方法还支持一个可选参数:时限(浮点数或None),以秒为单位。但是join返回值是None。因此,要检查操作是否已超时,需要在join方法返回后查看线程的激活状态,如果线程的状态是激活的,操作就超时了。

0x05 run方法与start方法的区别

其实不仅仅是start,使用run也可以让子线程执行,但是start() 方法是启动一个子线程,线程名就是我们定义的name,run() 方法并不启动一个新线程,就是在主线程中调用了一个普通函数而已。因此,如果你想启动多线程,就必须使用start()方法

0x06 通过Event对象实现线程间通信

虽然线程通常是作为独立运行或并行的任务,但是有时也会出现线程间通信的需求,threading模块提供了事件(event)对象实现线程间通信,它包含一个内部标记,以及可以使用set()和clear()方法的调用线程。

Event类的接口很简单,它支持的方法如下:

  • is_set:如果事件设置了内部标记,就返回True
  • set:把内部标记设置为True。它可以唤醒等待被设置标记的所有线程,调用wait()方法的线程将不再被阻塞
  • clear:重置内部标记。调用wait方法的线程,在调用set()方法之前都将被阻塞
  • wait:在事件的内部标记被设置好之前,使用这个方法会一直阻塞线程调用,这个方法支持一个可选参数,作为等待时限(timeout)。如果等待时限非0,则线程会在时限内被一直阻塞

我们可以用线程事件对象来演示一个简单的线程通信示例,它们可以轮流打印字符串。两个线程共享同一个事件对象。在while循环中,每次循环时,一个线程设置标记,另一个线程重置标记。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import threading,time

class ThreadA(threading.Thread):
    def __init__(self,event):
        super(ThreadA,self).__init__()
        #threading.Thread.__init__(self)
        self.event = event

    def run(self):
        count = 0
        while count < 6:
            time.sleep(1)
            if self.event.is_set() == True:
                print('A')
                self.event.clear()
            count += 1

class ThreadB(threading.Thread):
    def __init__(self,event):
        super().__init__()
        #threading.Thread.__init__(self)
        self.event = event

    def run(self):
        count = 0
        while count < 6:
            time.sleep(1)
            if not self.event.is_set() == True:
                print('B')
                self.event.set()
            count += 1

event = threading.Event()
ta = ThreadA(event)
tb = ThreadB(event)
tb.start()
ta.start()

可以看到,A和B并没有按照预期各输出6次,这是因为线程执行顺序的不确定性造成的,而且event的标记和Lock也不一样,它可以重复执行set()和clear()方法,因此就可能在执行时出现不可预期的错误。所以,个人不建议使用这种方法实现线程通信。

0x06Queue实现线程间通信

Queue是加了锁的安全消息队列,主要用于多生产者和消费者模式下的队列实现,特别适合多线程时的消息交换。通过使用队列,把生产者和消费者分解开来,作为其中的中间件,比如生产者产生一个数据,然后放到queue队列中,queue队列在把这个数据放到消费者线程中。使用单线程不必用队列,但是队列对于多线程来说是不可或缺的,它实现了常见的锁语法,临时阻塞线程,防止竞争,这有赖于Python对线程的支持。对于queue结构的基本用法在这里不再阐述,这里将着重介绍两个在多线程中非常常见的方法。

下面展示一个例子来简单介绍queue的基本使用方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import threading,time,queue

def get():
    while q.empty() == False:
        q.get()
        #q.task_done()

q = queue.Queue()
list(map(q.put,range(10)))   #向队列中put10个元素

threading.Thread(target=get).start()
q.join()    #阻塞主线程
print("success!")

这个程序的运行结果是结束不了,因为join()会阻塞主线程,直到所有任务全部完成,因此只要将get()函数中的q.task_done()前的注释去掉,程序就能正常结束。这里或许比较难理解,接下来详细介绍这个两个函数。

join()
一直阻塞直到队列中的所有项目都已获取并处理完毕。
每当任务(示例:未爬取过的url)添加到队列时,未完成任务的计数就会增加。 每当消费者线程(示例:爬取网页内容的函数)调用task_done()以指示检索到该项目并且其上的所有工作都已完成时,计数就会下降。 当未完成任务的数量降至零时,join()取消阻塞。join()会一直阻塞直到队列中的所有项目都已获取并处理完毕。

task_done()
表明以前排队的任务(示例:使用一个url爬取网页内容完成)已完成。
由队列使用者线程使用。每次调用get()方法从队列中获取任务,如果任务处理完毕,则条用task_done()方法,告知等待的队列(queue.join()这里在等待)任务的处理已完成。
如果join()当前正在阻塞,则它将在所有项目都已处理后恢复(这意味着已为每个已放入队列的项目收到task_done()调用)。
如果调用的次数超过队列中放置的项目,则引发ValueError。

接下来看这段代码,可以帮助你对元素和任务的概念有更深的理解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import threading,time,queue

def get():
    while q.empty() == False:
        q.get()
       
    for _ in range(10):
        q.task_done()
    pass

q = queue.Queue()
list(map(q.put,range(10)))

threading.Thread(target=get).start()
q.join()
print("success!")

可以看到,在这段代码中,我将元素提取部分和任务完成信息发送部分分开写了,结果是,程序也成功结束了。由此观之,元素和任务甚至可以说是完全分开的两个概念,只是我们常常将其中的每个元素作为任务,因此将其写在一起,从而混淆了概念。

最后做个总结,在queue中,元素和任务是单独计数的,put()方法会给queue添加一个元素和一个任务,get()方法会从queue中提取一个元素,而task_done()方法会将queue的任务数减一,当任务数归零后join()方法的阻塞才会释放。

0x07 条件变量实现精确控制

条件变量,用于复杂的线程间同步。在一些对线程间通信要求比较精准的需求下,使用简单的lock加锁解锁已经没法实现需求,这个时候condition条件控制就派上用场了。

condition源代码中本质上还是调用lock/Rlock实现条件变量的控制, 他提供如下一些方法:

  1. acquire():上锁
  2. release():解锁
  3. wait(timeout=None):堵塞线程,知道接受到一个notify或者超时才能继续运行,需记住wait()必须在已经获得lock的前提下才能调用如果线程没有获取到锁就调用了此方法,那么将引发 RuntimeError 异常。本方法会释放隐性锁,然后阻塞直到被其他线程的调用此条件变量的 notify() 或 notify_all() 唤醒,或超时。一旦被唤醒或超时,该线程将立即重新获取锁并返回。如果隐性锁是一个 RLock 对象,因为调用它的 release() 方法未必能够释放该锁,所以本方法会使用 RLock 对象的一个内部接口,该接口可以立即释放多重迭代的 RLock 锁。并且在需要重新获取锁的时候,也会使用一个类似的内部接口来恢复多重的迭代级别 。本方法所阻塞的线程如果是被唤醒的,那么本方法会返回一个 True,如果是超时了,则返回 False。
  4. wait_for(predicate,timeout=None):等待直到某个条件的值为 True。predicate 是一个返回可布尔化值的可调用对象。本方法可能会多次调用 wait() 直到 predicate 为真,或超时。本方法的返回值为最后一次调用 predicate 的返回值,除非超时,超时的时候返回 False。
  5. notify(n=1):打通线程,堵塞的线程接收到notify后开始运行,需记住notify()必须在已经获得lock的前提下才能调用。本方法至多可唤醒所有正在等待本条件变量的线程中的 n 个。如果调用时没有线程处于等待操作,那么本方法的调用是一个空操作
  6. notifyAll():如果调用wait()堵塞的线程比较多,就打通所有的堵塞线程。

下面用一个典型的生产者与消费者例子来演示一下条件变量的用法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def produce():
    global q
    while 1:
        con.acquire()   # 必须在有锁的前提下才能使用条件变量
        q = str(random.randint(1,100))
        print('我生成了一个随机数字:'+q)
        con.notify()
        # 发起一个信号,释放一个被堵塞的线程
        con.wait()
        # 发起一个信号,堵塞当前线程,等待另一个notify出现的时候就执行下面的代码
        con.release()
        # 必须要解锁


def consume():
    global q
    while 1:
        con.acquire()
        # 必须在有锁的前提下才能使用条件变量
        print('我获取到你生成的随机数字:'+q)
        con.notify(n=1)
        # 发起一个信号,释放一个被堵塞的线程
        con.wait()
        # 堵塞当前线程
        con.release()

t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
con = threading.Condition()
t1.start()
t2.start()

0x08信号量实现定量的线程同步

semaphore适用于控制进入数量的锁,好比文件的读写操作,写入的时候一般只用一个线程写,如果多个线程同时执行写入操作的时候,就会造成写入数据混乱。 但是读取的时候可以用多个线程来读取,可以看到写与写是互斥的,读与写不是互斥的,读与读不是互斥的。

文件读写只是个例子,在一些日常业务中比如爬虫读取网址的线程数量控制等。

BoundedSemaphore。这种锁允许一定数量的线程同时更改数据,它不是互斥锁。比如地铁安检,排队人很多,工作人员只允许一定数量的人进入安检区,其它的人继续排队。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time
import threading

def run(n, se):
    se.acquire()
    print("run the thread: %s" % n)
    time.sleep(1)
    se.release()    #可唤醒处于等待的一条线程,唤醒哪一条是随机的

# 设置允许5个线程同时运行
semaphore = threading.BoundedSemaphore(5)
for i in range(20):
    t = threading.Thread(target=run, args=(i,semaphore))
    t.start()

运行后,可以看到5个一批的线程被放行。他用来控制进入某段代码的线程数量。Semaphore和BoundedSemaphore在用法上还是有一定的区别的。Semaphore对象使用release()方法会使其value加1,它的value在某种意义上是没有上限的,而BoundedSemaphore执行release()方法也会使其value加1,但是当其value超过初始值(参数值)时就会抛出ValueError。

综上所述Semephore对象的release()执行次数可以超过其acquire()次数,可以通过这种方法临时增加并发量,而BoundedSemaphore的acquire()和release()必须一一对应。在实际运用中,BoundedSemaphore会用的更多一些。

下一节:多线程、协程与高并发(2)

Share on

Qfrost
WRITTEN BY
Qfrost
CTFer, Anti-Cheater, LLVM Committer