多进程服务器折腾、实现

前言

最近为了写个回测平台真的累成狗,项目地址TradeServer, 目前实现了下单买入卖出、撤单、查看未成交订单、查看用户信息和余额、查看完整操作记录,查看交易记录和收益统计等功能。主要结构为Flask实现的RESTful API和一个包含三个进程的服务器。Flask那个不难,就是设计好json请求结构然后根据情况返回合适的结果即可,但是另外一个含三个进程的服务器就有点麻烦了。

坑1

众所周知,Python的多线程是鸡肋,我之前的爬虫服务器用了rpyc做服务器主体,爬虫函数用了concurrent.futures的ThreadPoolExecutor,本来我是想用多进程实现的,但是发现rpyc做服务主体下你就再也没法用multiprocessing的Pool来执行任务了。现在我的交易服务器需要实现撮合订单或者等待机会撮合,需要多个进程协同操作。

首先是一个进程负责检查数据库某个键的值,如果设置为run,则开始撮合订单,如果设置为stop或其他不是run的值,就会sleep一定的时间然后再检查。那么假设我这个进程能查到这个数据库的键的值为run那么怎样才能让其他进程知道这个消息呢?一般来说最好是使用actor模型,基于消息传递,比如multiprocessing的Queue模块,但是这个交易服务器并不需要那么麻烦,只要一个共享内存就行了。在multiprocessing里可以用Array或者Value来在程序中设置一个进程间都能读写的变量,关于Value或者Array的文档可以看这里,17.2.2.6部分,实现上好像是基于ctypes的共享内存,还有个更高级的Manager,能提供更多数据类型。

这里给一段代码简单地展示下Array的用法pattern:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process
from multiprocessing.sharedctypes import Array
import time
def f():
while True:
time.sleep(2)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if __name__ == '__main__':
num = Array('c', b'hello world')
p = Process(target=f, args=())
p.start()
p.join()
print(bytes.decode(num.value))

上面的例子里,首先要从multiprocessing.sharedctypes导入相应的模块,然后创建一个共享变量num,第一个参数是数值类型。上面的例子是char,可以存一定长度的字符类型,即bytes,在python3里可以通过str的encode来把str转换为bytes,也可以用bytes的decode转换为str类型,然后我们就可以把经过encode的字符串放在Array的第二个参数里,如果其他进程想查看这个共享变量的值,直接用num.value就可以获取bytes的值,再转为字符串就不难了。

下面是Array的输入类型对照表:

1
2
3
4
5
6
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

总之记住普通的全局变量是无法在进程间共享的。

坑2

解决了共享内存的问题之后,然后是循环任务的问题。一个撮合服务器,应该每隔一段时间获取用户提交的交易请求,然后根据股票现价等信息进行判定是否能交易。我需要引入另外两个进程,一个用来检查订单然后看能不能交易,另一个用来进行损益统计。我相信很多人查multiprocessing的代码范例都是这种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'

一看似乎没问题,然而这个程序就是执行一次的,既然要反复检查订单是否能成交就要while循环啊,那么这个while要放在哪?

放在main后面?不行的,如果使用Process来实现多进程,下面的代码pattern是不太好的:

1
2
3
4
5
if __name__ == "__main__":
while True:
p = Process(xxx)
p.start()
p.join()

这种pattern是不行,会提示你创建了多个进程引起冲突:

1
2
3
4
5
6
7
8
if __name__ == "__main__":
p = Process(xxx)
while True:
p.start()
p.join()
WARNING:AssertionError: cannot start a process twice

这个代码意味着你每次循环都创建Process对象然后执行,那么你干嘛不用rpyc写个rpc服务器然后写个普通的while循环的代码来发送请求?

我目前找到的比较ok的多进程服务器代码pattern还是前面的Array例子那个,把while循环放在函数里,main后面就创建Process对象,start()之后这个进程就会一直执行函数里的while循环来打印此刻的时间

下面的pattern在我的撮合服务器代码里也用到:

1
2
3
4
5
6
7
def func():
while True:
exec xxx
if __name__ == "__main__":
p = Process(target=func,args=(xxx,xxx))
p.start()
p.join

不过要注意的是,函数里的while循环最好加上time.sleep(),不然你的的CPU使用率很快就会急剧上升,至少不加sleep我的笔记本CPU风扇没多久就疯转。

有兴趣的可以深入阅读下我的TradeServer的omserver.py代码。

结语

并发、并行是个不简单的话题,不过找到合适的代码pattern就很容易写了。还有,还在用py2的,尽可能还是迁移到3.x吧,至少我现在是用python 3.6了。

参考

官方参考文档

多进程数据共享(好文)

共享内存实现原理

如何共享变量(很老的文章)

别人的学习笔记

多进程编程

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2020 HOCHIKONG's WAPORIZer All Rights Reserved.

访客数 : | 访问量 :