This page looks best with JavaScript enabled

多线程、协程与高并发(3)

 ·  ☕ 10 min read · 👀... views

0x20 协程

之前我们讲了,Python中存在GIL(全局解释锁),正因为这个设定(bug),导致多线程一直不是很好用,相形之下,对于高并发的实现,协程的优势就更加突出了。

协程,即单线程下的并发。就是只有一个主线程,并且可利用的cpu只有一个情况下实现并发,要了解如何用协程实现高并发,我们需要先回顾下并发的本质:切换+保存状态

当cpu正在运行一个任务时,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制)。一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长。

其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来大家都被执行的效果,如果多个程序都是纯计算任务,这种切换反而会降低效率(在前面讲过,切换和线程的创建与销毁都是不小的开销)。因此我们是否可以在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下多个任务能遇到io就切换,这样就保证了该线程能够最大限度地处于就绪态(随时都可以被cpu执行的状态)?答案当然是肯定的,这个实现相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,对于操作系统来说:这哥们(该线程)好像是一直处于计算过程的,io比较少。而要实现这个功能,就需要我们解决高并发的切换与状态保存问题。
相信各位如果python基础不是太差的话就一定知道,yield正好同时具备切换与状态保存的功能,因此我们可以利用这一特点设计生成器实现协程解决高并发问题。

因为协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。那么我们需要找寻一种可以同时满足以下条件的解决方案:

  1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来(重新运行时,可以基于暂停的位置继续)
  2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

幸运的是,已经有人实现了这些功能并写好了轮子,这就是我们下面要着重介绍的Gevent模块。

0x21 Gevent

gevent是python的一个并发框架,以微线程greenlet为核心,使用了epoll事件监听机制以及诸多其他优化而变得高效,而且其中有个monkey类(会在后面介绍),可将现有基于Python线程直接转化为greenlet(类似于打patch)。

Gevent的基本思想是:当遇到IO操作时,会自动写换到其他gevent,再在适当的时间切回来继续执行。这样就减少了IO操作时的等待耗时,从而能够提高硬件资源的利用率。其性能是非常非常高的,和线程框架性能比高大概4倍(看下图,是gevent和paste的对比)

0x22 狂暴的gevent monkey_patch (猴子补丁)

猴子补丁monkey_patch,可以将标准库中大部分的阻塞式调用替换成非阻塞的方式,包括socket、ssl、threading、select、httplib等。因为标准库中的很多阻塞gevent是不能识别的,通过monkey.patch_xxx()来将那些函数替换成可被识别的函数或类(打补丁)。按照gevent文档中的建议,应该将猴子补丁的代码尽可能早的被调用(但要在所有module都import了之后),这样可以避免一些不可预料的异常。
gevent实现的是协程的创建、切换和调度,本身是同步的,而猴子补丁可以将gevent调用的阻塞库变成非阻塞的,两者配合实现了高性能的协程。至于为什么说狂暴呢,我通过gevent+requests库对我的博客进行爬取,在加载完monkey_patch后并发量达到1000+Request/s,(然后直接被判定为DOS攻击),这种量级的并发对于客户端而言可能过犹不及,但是在一些服务端程序的设计上是非常有用的。

0x23 gevent基本用法

g1=gevent.spawn(func1,1,2,3,x=4,y=5) 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
g1=gevent.spawn(func1,1,2,3,x=4,y=5)
g2=gevent.spawn(func2)
#可用这种写法批量运行函数
#jobs = [gevent.spawn(func,i)] for i in range(100) 
#gevent.joinall(jobs)


g1.join() #等待g1结束
g2.join() #等待g2结束
#或者上述两步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值
g1.exception#拿到func1的错误信息。在协程内部,所有的错误都不会被抛出(会被打印),以免影响到其他协程的运行
gevent.sleep(5)  #阻塞5秒,会马上触发切换

from gevent.pool import Pool
x = Pool(40)      #构建协程池,并发上限为40

下面看一个例子以理解它的使用(摘自互联网,稍作修改):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import gevent,time
from gevent import monkey;monkey.patch_all()
def eat(name):
    print('%s eat 1' %name)
    time.sleep(2)  #我们用等待的时间模拟IO阻塞
    ''' 导入from gevent import monkey;monkey.patch_all()后
        效果等同于gevent.sleep(2),可直接切换,否则会阻塞线程
    '''
    # gevent.sleep(2)
    print('%s eat 2' %name)
    return 'eat'
def play(name):
    print('%s play 1' %name)
    time.sleep(3)
    # gevent.sleep(3)
    print('%s play 2' %name)
    return 'paly'  #当有返回值的时候,gevent模块也提供了返回结果的操作

start = time.time()
g1 = gevent.spawn(eat,'egon')  #执行任务
g2 = gevent.spawn(play,'egon')  #g1和g2的参数可以不一样
# g1.join()  #等待g1
# g2.join()  #等待g2
#上面等待的两句也可以这样写
gevent.joinall([g1,g2])
print('主',time.time()-start) #3.001171588897705

print(g1.value)
print(g2.value)

0x24 锁操作

虽然是协程,但是在里面添加锁增加对共享资源的互斥访问也是非常重要的,此外锁本身的添加也是很简单的, 用法几乎与线程锁完全相同

1
2
3
from gevent.lock import Semaphore,BoundedSemaphore
se = Semaphore()
se = BoundedSemaphore()

0x25 gevent性能优化

在 gevent 的使用上,很多人一直有一个误区,就是使用并发的 gevent 一定比平时线性的操作速度更快,其实不是这样。timeit函数可以很方便的测试某句代码运行所需时间,并且其number参数可以设置运行次数,我们可以用该函数做一个小测试,看看gevent究竟在什么情况下能对性能有优化,能优化多少。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import gevent,timeit,time

def task():
    #gevent.sleep(0.01)
    pass

def async_():
    jobs = [gevent.spawn(task) for _ in range(100)]


def sync_():
    [task() for _ in range(100)]

print('同步运行:',timeit.timeit(stmt="sync_()",setup="from __main__ import sync_,async_",number=10))
print('异步运行:',timeit.timeit(stmt="async_()",setup="from __main__ import sync_,async_",number=10))

#结果:
#同步运行: 0.0003161709999999207
#异步运行: 0.11802512400000009

可以看到,每运行一次sync/async函数都会以串行/异步的方式运行100次task(),再乘number,也就是1000次task(),异步竟然比线性满了三百多倍…. 由此我们可以得出一个结论,也许在并不是在网络请求或者需要等待切换的情况下,使用 gevent 也许不是一个很好的解决方案。 那么在什么情况下用gevent能使性能有一个较大的提升呢?只需要将task函数中的sleep前的#去掉即可。

1
2
同步运行 12.531099614999999
异步运行 0.5007869070000002

可以看出来,这次异步运行就远远快于线性了。因为在线性的情况下,我们每次都会在 task1 任务运行的时候阻塞 0.01s, 但是 gevent 使得 async 函数几乎不受等待影响。非常快速的解决了这个问题。其实这个环境在我们进行网络 io 的时候非常常见。比如我们向某个地址下载图片,如果我们线性下载图片,我们需要等待第一张图片下载完成之后才能进行第二张图片的下载,但是我们使用 gevent 并发下载图片,我们可以先开始下载图片,然后在等待的时候切换到别的任务继续进行下载。当下载完毕之后我们会切换回来完成下载。不用等待任何一个任务下载完成,大大的提高了效率。

在实际运用中我们常常需要用gevent.pool构建协程池控制并发数量,不然很可能会被判定为恶意攻击。那么,是不是说协程开的越多就越快呢?我们也可以来做一个测试:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import gevent,timeit,time
from gevent.pool import Pool

def task():
    gevent.sleep(0.01)
    pass

def async1():
    jobs = [gevent.spawn(task) for _ in range(100)]
    gevent.joinall(jobs)

x = Pool(10)
def async2():
    [x.spawn(task) for _ in range(100)]
    x.join()

y = Pool(40)
def async3():
    [y.spawn(task) for _ in range(100)]
    y.join()

z = Pool(80)
def async4():
    [z.spawn(task) for _ in range(100)]
    z.join()

s = Pool(200)
def async5():
    [s.spawn(task) for _ in range(100)]
    s.join()

h = Pool(2000)
def async6():
    [h.spawn(task) for _ in range(100)]
    h.join()

p = Pool(10000)
def async7():
    [p.spawn(task) for _ in range(100)]
    p.join()

print('异步运行:',timeit.timeit(stmt="async1()",setup="from __main__ import async1",number=10))
print('协程池(10)运行:',timeit.timeit(stmt="async2()",setup="from __main__ import async2",number=10))
print('协程池(40)运行:',timeit.timeit(stmt="async3()",setup="from __main__ import async3",number=10))
print('协程池(80)运行:',timeit.timeit(stmt="async4()",setup="from __main__ import async4",number=10))
print('协程池(200)运行:',timeit.timeit(stmt="async5()",setup="from __main__ import async5",number=10))
print('协程池(2000)运行:',timeit.timeit(stmt="async6()",setup="from __main__ import async6",number=10))
print('协程池(10000)运行:',timeit.timeit(stmt="async7()",setup="from __main__ import async7",number=10))


#结果:
#异步运行: 0.294998806
#协程池(10)运行: 1.298477964
#协程池(40)运行: 0.4890425079999998
#协程池(80)运行: 0.3522436280000001
#协程池(200)运行: 0.2933392300000004
#协程池(2000)运行: 0.31260324399999995
#协程池(10000)运行: 0.30099836999999985

这个例子可以很清晰的看到,当任务中存在阻塞的情况下,刚开始提高greenlet能极大的提高效率,但当我们超过阀值之后,开更多的实例已经没有任何意义了,而且有可能还造成一些性能上的浪费,所以这个案例告诉我们,需要选择一个合适的实例数量。

0x26 gevent再提速

在gevent中,还有两个速度更快的函数,风别是imap()和 ima_unordered(),但是imap_unordered()出的结果是乱序的,若是对结果顺序有要求,那这个函数不适用。下面同样做个测试,看看用这两个函数比普通的spawn能快多少。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import gevent,timeit,time
from gevent.pool import Pool

def task():
    gevent.sleep(0.01)
    pass

def async1():
    jobs = [gevent.spawn(task) for _ in range(1000)]
    gevent.joinall(jobs)

x = Pool(200)
def async2():
    [x.spawn(task) for _ in range(1000)]
    x.join()

y = Pool(200)
def async3():
    [y.imap(task) for _ in range(1000)]
    y.join()

z = Pool(200)
def async4():
    [z.imap_unordered(task) for _ in range(1000)]
    z.join()

print('异步运行:',timeit.timeit(stmt="async1()",setup="from __main__ import async1",number=10))
print('协程池(spawn)运行:',timeit.timeit(stmt="async2()",setup="from __main__ import async2",number=10))
print('协程池(imap)运行:',timeit.timeit(stmt="async3()",setup="from __main__ import async3",number=10))
print('协程池(imap_unordered)运行:',timeit.timeit(stmt="async4()",setup="from __main__ import async4",number=10))


#结果:
#异步运行: 5.280276387
#协程池(spawn)运行: 2.6753745889999996
#协程池(imap)运行: 0.6472693149999991
#协程池(imap_unordered)运行: 0.5721896490000002

可以看到,imap比spawn快了数倍,imap_unordered比imap也略快一点,相反,不开线程池的协程并发反而是最慢的,因此,当我们在做并发的时候,用何种方式并发一定要慎重选择。

Share on

Qfrost
WRITTEN BY
Qfrost
CTFer, Anti-Cheater, LLVM Committer