前言
最近为了写个回测平台真的累成狗,项目地址TradeServer, 目前实现了下单买入卖出、撤单、查看未成交订单、查看用户信息和余额、查看完整操作记录,查看交易记录和收益统计等功能。主要结构为Flask实现的RESTful API和一个包含三个进程的服务器。Flask那个不难,就是设计好json请求结构然后根据情况返回合适的结果即可,但是另外一个含三个进程的服务器就有点麻烦了。
坑1
众所周知,Python的多线程是鸡肋,我之前的爬虫服务器用了rpyc做服务器主体,爬虫函数用了concurrent.futures的ThreadPoolExecutor,本来我是想用多进程实现的,但是发现rpyc做服务主体下你就再也没法用multiprocessing的Pool来执行任务了。现在我的交易服务器需要实现撮合订单或者等待机会撮合,需要多个进程协同操作。
首先是一个进程负责检查数据库某个键的值,如果设置为run,则开始撮合订单,如果设置为stop或其他不是run的值,就会sleep一定的时间然后再检查。那么假设我这个进程能查到这个数据库的键的值为run那么怎样才能让其他进程知道这个消息呢?一般来说最好是使用actor模型,基于消息传递,比如multiprocessing的Queue模块,但是这个交易服务器并不需要那么麻烦,只要一个共享内存就行了。在multiprocessing里可以用Array或者Value来在程序中设置一个进程间都能读写的变量,关于Value或者Array的文档可以看这里,17.2.2.6部分,实现上好像是基于ctypes的共享内存,还有个更高级的Manager,能提供更多数据类型。
这里给一段代码简单地展示下Array的用法pattern:
|
|
上面的例子里,首先要从multiprocessing.sharedctypes导入相应的模块,然后创建一个共享变量num,第一个参数是数值类型。上面的例子是char,可以存一定长度的字符类型,即bytes,在python3里可以通过str的encode来把str转换为bytes,也可以用bytes的decode转换为str类型,然后我们就可以把经过encode的字符串放在Array的第二个参数里,如果其他进程想查看这个共享变量的值,直接用num.value就可以获取bytes的值,再转为字符串就不难了。
下面是Array的输入类型对照表:
|
|
总之记住普通的全局变量是无法在进程间共享的。
坑2
解决了共享内存的问题之后,然后是循环任务的问题。一个撮合服务器,应该每隔一段时间获取用户提交的交易请求,然后根据股票现价等信息进行判定是否能交易。我需要引入另外两个进程,一个用来检查订单然后看能不能交易,另一个用来进行损益统计。我相信很多人查multiprocessing的代码范例都是这种:
|
|
一看似乎没问题,然而这个程序就是执行一次的,既然要反复检查订单是否能成交就要while循环啊,那么这个while要放在哪?
放在main后面?不行的,如果使用Process来实现多进程,下面的代码pattern是不太好的:
|
|
这种pattern是不行,会提示你创建了多个进程引起冲突:
|
|
这个代码意味着你每次循环都创建Process对象然后执行,那么你干嘛不用rpyc写个rpc服务器然后写个普通的while循环的代码来发送请求?
我目前找到的比较ok的多进程服务器代码pattern还是前面的Array例子那个,把while循环放在函数里,main后面就创建Process对象,start()之后这个进程就会一直执行函数里的while循环来打印此刻的时间
下面的pattern在我的撮合服务器代码里也用到:
|
|
不过要注意的是,函数里的while循环最好加上time.sleep(),不然你的的CPU使用率很快就会急剧上升,至少不加sleep我的笔记本CPU风扇没多久就疯转。
有兴趣的可以深入阅读下我的TradeServer的omserver.py代码。
结语
并发、并行是个不简单的话题,不过找到合适的代码pattern就很容易写了。还有,还在用py2的,尽可能还是迁移到3.x吧,至少我现在是用python 3.6了。