vnpy量化平台学习过程中的经验分享

头像
玉米棉花糖
帖子: 92
注册时间: 2018年 1月 11日 17:13

vnpy量化平台学习过程中的经验分享

#1

帖子 玉米棉花糖 » 2018年 1月 11日 17:31

17年通过学习vnpy量化平台,开发出了自己的程序化交易软件,实现了无人值守,在这里给家友们做个分享,有兴趣的可以一起交流学习,实现自己的程序化。

写在前面的话:
疯兄写给老论坛的原则中提到“怀疑一切,后果自负”,本贴的一切分享也适用这一原则。我半路出家学编程,没有经过系统的训练,只会一点点python,认真看过的开源项目也只有vnpy。能做出自己的交易软件,靠的是简单粗暴的实现方法,得益于开源软件的大环境和社区的热心帮助。如果各位朋友能从我的分享中有所收获自然是极好的,思路被我带跑偏,也只能后果自负了。

几个问题:

vnpy是什么?
官方介绍见http://www.vnpy.org/
简单说,我们可以把vnpy当作一个更高级的交易开拓者、文华、MC来开发策略。项目中的vn trader就是一个完成度很高的程序化软件,上手难度比交易开拓者等软件略高,但策略灵活度也大幅提升。交易开拓者、文华、MC等软件中,策略只能使用软件提供的函数和数据源,你绝不可能写出一个监控微博热点来选股的策略,或者将期货tick数据经过数量统计的工具计算后得出交易信号。而用vnpy,你的工具是整个python语言,只要你想得到、会写,没有什么做不到的。
也可以从vnpy中选取部分代码自行组装一个个性化的交易软件。vnpy对CTP等主流的交易接口做了很好的封装,提供了事件引擎、CTA引擎、回测引擎等模块。具备一定编程基础的人完全可以自由组合,并加上个性化的内容,形成自己的交易系统,并且省去很多基础的工作。

程序化交易好像很高端,我能学会吗?
以我个人来说,从刚开始学python到自己摸索开发出定制的程序化交易软件,花了一年半左右业余时间。如果你有编程基础,上手并不难。
基于vnpy开发的话,你需要
1、能看懂廖雪峰的这个python教程前半部分,即从开头到“图形界面”部分
https://www.liaoxuefeng.com/wiki/001431 ... a2e542c000
2、对ctp系统有基础的了解。能理解当你接收行情、查询、发出委托的时候,服务器和客户端之间大概发生了什么事,有过实盘交易经验的一般都没什么问题。
3、对于开发自己的策略、系统的渴望和一点好奇心。只要有动力,加上搜索引擎和社区的帮助,没有什么解决不了的问题。迈出第一步,解决一个个小问题,你的策略、系统就会慢慢浮现出来。

我的交易软件做到什么程度了?
实现以下功能:
行情:tick行情的订阅、推送
交易:基础的查询功能(资金、持仓、委托等)、报撤单
策略:实现了一个类似海龟的交易策略,就是我的土鳖交易法
回测:照抄了vnpy的回测模块
无人值守:写了一个守护进程,根据交易日列表定时自动开关软件
界面:
TIM图片20180112101210.jpg
TIM图片20180112101210.jpg (194.01 KiB) 查看 24110 次
有哪些参考资源?
1、vnpy的开源项目,所有源代码。你需要的这里都有
https://github.com/vnpy/vnpy/wiki
2、我改写的软件的部分功能,一个简单的示例,基于python3和ctp接口,适合要自行定制的朋友减少弯路。
https://github.com/vvipi/py3_demo_on_vnpy_ctp
上次由 玉米棉花糖 在 2018年 1月 12日 10:18,总共编辑 1 次。

头像
博弈
帖子: 241
注册时间: 2018年 1月 11日 15:35

Re: vnpy量化平台学习过程中的经验分享

#2

帖子 博弈 » 2018年 1月 11日 19:41

前排学习,期待分享!

形而上学
帖子: 1
注册时间: 2018年 1月 11日 19:56

Re: vnpy量化平台学习过程中的经验分享

#3

帖子 形而上学 » 2018年 1月 11日 19:59

必须学习,谢谢博弈!

Amoresvoce
帖子: 238
注册时间: 2018年 1月 11日 16:31

Re: vnpy量化平台学习过程中的经验分享

#4

帖子 Amoresvoce » 2018年 1月 11日 21:37

前排吃瓜。

头像
玉米棉花糖
帖子: 92
注册时间: 2018年 1月 11日 17:13

Re: vnpy量化平台学习过程中的经验分享

#5

帖子 玉米棉花糖 » 2018年 1月 12日 10:25

我的系统构建思路基本上是在何先生这个帖子的基础上模仿起来的,然后从vnpy中不断搬运有用的零件。
http://www.vnpie.com/forum.php?mod=view ... rom=portal
因此,我打算模仿这篇文章,从ctp接口开始写一篇分享。

先随便聊聊。陌生的东西总让人觉得神秘,程序化,那是啥?实际上可以从最简单的做起,不管是什么策略,你都得用期货公司的账号密码登录他们的服务器。然后
你得看行情,ctp直接推送的只有tick数据,一秒两次,这就是一切行情的基础。把tick数据不断累积起来,过一分钟,你就能画出一根一分钟k线了,依次类推就有了各周期的k线,计算出需要的指标。

你想知道账户的状态,于是就向服务器发出请求(req),服务器返回给你一个字典,你从里面读出账户资金之类的信息。

你想下单,这也是个请求,在请求中要写明你下单的合约、价格、方向、数量等信息。服务器又返回个字典,有时候是出错信息,告诉你有没有成交或者错误信息是什么。

好消息是,这些基础的功能,底层API基本上都封装了,我们不需要深究服务器和客户端之间是怎么实现这些细节的。要发单,就调用sendOrder函数,要订阅RB1805的行情,就subscribe('rb1805')。好像也没那么复杂。

有了行情和交易的接口,就可以在策略中调用接口的函数实现你那牛逼闪闪的策略了。

那么,我们开始吧,从安装环境讲起。

环境安装
1、下载anaconda,这是集成了常用模块的python版本,能省去很多麻烦。国内去清华镜像下载比较快。https://mirrors.tuna.tsinghua.edu.cn/an ... x86_64.exe
2、下载个编辑器/IDE,这里推荐vs code,免费、配置简单、调试方便。
3、去我的github下载ctp接口文件。至少需要用到thostmduserapi.dll、thosttraderapi.dll、vnctpmd.pyd、vnctptd.pyd四个文件。eventEngine.py和eventType.py两个文件我也强烈建议你下载下来,节约时间。
https://github.com/vvipi/py3_demo_on_vnpy_ctp
上次由 玉米棉花糖 在 2018年 1月 12日 20:55,总共编辑 5 次。

头像
玉米棉花糖
帖子: 92
注册时间: 2018年 1月 11日 17:13

Re: vnpy量化平台学习过程中的经验分享

#6

帖子 玉米棉花糖 » 2018年 1月 12日 10:25

导入ctp接口
正式开始写代码,先来试试ctp接口是否能够成功导入。我提供的ctp接口适用于win7-win10的64位操作系统,python3.6
在ctp接口的四个文件同一目录下,新建一个demo.py如下:

代码: 全选

from vnctpmd import MdApi
from vnctptd import TdApi
如果没有报错。恭喜你已经导入ctp的api!
如果出错,比如“1%不是有效的win32程序”或者找不到模块,先确保操作系统和python版本没错,然后尝试安装Visual C++ Redistributable Packages for VS2013,x86版本和x64版本都装上。
https://www.microsoft.com/zh-cn/downloa ... x?id=40784

连接服务器
首先去http://www.simnow.com.cn/申请一个simnow模拟账户,并通过找回密码重置一次密码,因为首次登录会要求修改密码。当然,如果你非要用实盘账号我也拦不住你。

代码: 全选

# encoding:utf-8
import os
from PyQt5.QtWidgets import QApplication

from vnctpmd import MdApi
from vnctptd import TdApi

from eventEngine import Event, EventEngine
from eventType import  *

class CtpMdApi(MdApi):
    """
    Demo中的行情API封装
    """
    def __init__(self, eventEngine):

        super(CtpMdApi, self).__init__()

        self.__eventEngine = eventEngine
        
        self.reqID = 0              # 操作请求编号
        
        self.connectionStatus = False       # 连接状态
        self.loginStatus = False            # 登录状态
        
        self.userID = ''          # 账号
        self.password = ''        # 密码
        self.brokerID = ''        # 经纪商代码
        self.address = ''         # 服务器地址

    def connect(self, userID, password, brokerID, address):
        """连接服务器"""
        self.userID = userID                # 账号
        self.password = password            # 密码
        self.brokerID = brokerID            # 经纪商代码
        self.address = address              # 服务器地址
        
        # 如果尚未建立服务器连接,则进行连接
        if not self.connectionStatus:
            # 创建C++环境中的API对象,这里传入的参数是需要用来保存.con文件的文件夹路径
            path = os.getcwd() + '/temp/'
            if not os.path.exists(path):
                os.makedirs(path)
            self.createFtdcMdApi(path)
            
            # 注册服务器地址
            self.registerFront(self.address)
            
            # 初始化连接,成功会调用onFrontConnected
            self.init()
            
            
        # 若已经连接但尚未登录,则进行登录
        else:
            if not self.loginStatus:
                self.login()

    #----------------------------------------------------------------------  
    def login(self):
        """登录"""
        # 如果填入了用户名密码等,则登录
        if self.userID and self.password and self.brokerID:
            req = {}
            req['UserID'] = self.userID
            req['Password'] = self.password
            req['BrokerID'] = self.brokerID
            self.reqID += 1
            self.reqUserLogin(req, self.reqID)    
    
    #----------------------------------------------------------------------
    def onFrontConnected(self):
        """服务器连接"""
        self.connectionStatus = True
        
        log = u'行情服务器连接成功'
        self.put_log_event(log)

        self.login()
    #----------------------------------------------------------------------  
    def onFrontDisconnected(self, n):
        """服务器断开"""
        self.connectionStatus = False
        self.loginStatus = False

        log = u'行情服务器连接断开'
        self.put_log_event(log)
        
    def put_log_event(self, log):  # log事件分发
        event = Event(type_=EVENT_LOG)
        event.dict_['log'] = log
        self.__eventEngine.put(event)

########################################################################
class CtpTdApi(TdApi):
    """CTP交易API实现"""
    
    #----------------------------------------------------------------------
    def __init__(self, eventEngine):
        """API对象的初始化函数"""
        super(CtpTdApi, self).__init__()
        
        self.__eventEngine = eventEngine

        self.reqID = 0              # 操作请求编号
        
        self.connectionStatus = False       # 连接状态
        self.loginStatus = False            # 登录状态
        
        self.userID = ''          # 账号
        self.password = ''        # 密码
        self.brokerID = ''        # 经纪商代码
        self.address = ''         # 服务器地址
        
        self.frontID = 0            # 前置机编号
        self.sessionID = 0          # 会话编号
        
    #----------------------------------------------------------------------
    def put_log_event(self, log):  # 投放log事件
        event = Event(type_=EVENT_LOG)
        event.dict_['log'] = log
        self.__eventEngine.put(event)
    #----------------------------------------------------------------------
    def onFrontConnected(self):
        """服务器连接"""
        self.connectionStatus = True
        log = u'交易服务器连接成功'
        self.put_log_event(log)
    
        self.login()

    #----------------------------------------------------------------------
    def onFrontDisconnected(self, n):
        """服务器断开"""
        self.connectionStatus = False
        self.loginStatus = False
    
        log = u'交易服务器连接断开'
        self.put_log_event(log)
        
    #----------------------------------------------------------------------
    def onRspUserLogin(self, data, error, n, last):
        """登陆回报"""
        # 如果登录成功,推送日志信息
        if error['ErrorID'] == 0:
            self.frontID = str(data['FrontID'])
            self.sessionID = str(data['SessionID'])
            self.loginStatus = True
            
            log = data['UserID'] + u'交易服务器登录完成'
            self.put_log_event(log)
            
            # 确认结算信息
            req = {}
            req['BrokerID'] = self.brokerID
            req['InvestorID'] = self.userID
            self.reqID += 1
            # self.reqSettlementInfoConfirm(req, self.reqID)
                
        # 否则,推送错误信息
        else:
            log = error['ErrorMsg']
            self.put_log_event(log)

    #----------------------------------------------------------------------
    def onRspUserLogout(self, data, error, n, last):
        """登出回报"""
        # 如果登出成功,推送日志信息
        if error['ErrorID'] == 0:
            self.loginStatus = False
            
            log = u'交易服务器登出完成'
            self.put_log_event(log)
                
        # 否则,推送错误信息
        else:
            log = error['ErrorMsg']
            self.put_log_event(log)
    #----------------------------------------------------------------------
    def connect(self, userID, password, brokerID, address):
        """初始化连接"""
        self.userID = userID                # 账号
        self.password = password            # 密码
        self.brokerID = brokerID            # 经纪商代码
        self.address = address              # 服务器地址
        
        # 如果尚未建立服务器连接,则进行连接
        if not self.connectionStatus:
            # 创建C++环境中的API对象,这里传入的参数是需要用来保存.con文件的文件夹路径
            path = os.getcwd() + '/temp/'
            if not os.path.exists(path):
                os.makedirs(path)
            self.createFtdcTraderApi(path)
            
            # 注册服务器地址
            self.registerFront(self.address)
            
            # 初始化连接,成功会调用onFrontConnected
            self.init()
            
        # 若已经连接但尚未登录,则进行登录
        else:
            if not self.loginStatus:
                self.login()    
    
    #----------------------------------------------------------------------
    def login(self):
        """连接服务器"""
        # 如果填入了用户名密码等,则登录
        if self.userID and self.password and self.brokerID:
            req = {}
            req['UserID'] = self.userID
            req['Password'] = self.password
            req['BrokerID'] = self.brokerID
            self.reqID += 1
            self.reqUserLogin(req, self.reqID)   
        
########################################################################
class MainEngine:
    """主引擎,负责对API的调度"""
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""
        self.ee = EventEngine()         # 创建事件驱动引擎
        self.md = CtpMdApi(self.ee)    # 创建行情API接口
        self.td = CtpTdApi(self.ee)      # 创建交易API接口
        self.ee.start()                 # 启动事件驱动引擎
        self.ee.register(EVENT_LOG, self.print_log) # 注册日志打印事件

        self.userID = ''          # 账号
        self.password = ''        # 密码
        self.brokerID = '9999'        # 经纪商代码
        self.MdIp = 'tcp://180.168.146.187:10011'         # 行情服务器地址
        self.TdIp = 'tcp://180.168.146.187:10001'         # 交易服务器地址

    #----------------------------------------------------------------------
    def login(self):
        """登陆"""
        self.md.connect(self.userID, self.password, self.brokerID, self.MdIp)
        self.td.connect(self.userID, self.password, self.brokerID, self.TdIp)

    def print_log(self, event):
        log = event.dict_['log']
        print(log)

# 直接运行脚本可以进行测试
if __name__ == '__main__':

    import sys
    app = QApplication(sys.argv)
    main = MainEngine()
    main.login()
    app.exec_()   
运行结果如下:

代码: 全选

交易服务器连接成功
行情服务器连接成功
100897交易服务器登录完成
TypeError: 'NoneType' object is not callable
TypeError: 'NoneType' object is not callable
因为api没写完整,似乎调用了某些没写的方法,所以有后面的错误,先忽略。

在上面的代码中,我们从封装的ctp接口继承了行情api和交易api,然后只改写了其中几个用于连接服务器的方法。然后创建一个主引擎,用它来创建一个行情api和交易api的实例,并且创建事件引擎的实例传给它们。这样他们就可以通过事件引擎来实现线程间的通信,同时事件引擎也会负责调度任务。

确认结算单、查询合约、查询行情截面数据
登录成功之后,还有几个要做的动作:
1、确认结算单,就相当于我们登录快期时弹出结算单我们点确认的过程,如果不确认,就无法进行后续操作。
2、查询一次合约信息,期货合约不断会到期更新,所有每次登录一般会查询一次合约信息,缓存到本地。其中的很多信息,也是后续要用到的。
3、查询一次详细合约信息,或者叫行情截面数据,里面包含各合约的价格方面的信息。
4、设置一个循环查询,一般每隔几秒会进行一下持仓和资金的查询,更新账户信息。
完成这几步,基本上就是个正常的连接状态了。后面就是根据你的输入或者策略来进行交互了。

ctp的接口中有两类常见的函数,一类是以req开头,是主动向服务器发出的请求,如reqUserLogin(登录请求)、reqQryTradingAccount(查询账户)、reqOrderInsert(发出委托)。
一类是以on开头,在特定情况下被动触发的回调函数,如onRspSettlementInfoConfirm(结算单确认)、onRspQryInstrument(查询合约回报)、onRtnTrade(成交回报)。
一个请求往往有一个回调函数与之对应,用来处理服务器响应请求发来的数据。所以我们在一个请求的回调函数中发出下一个请求,以实现顺序执行几个任务。
reqUserLogin==>onRspUserLogin(这里加reqSettlementInfoConfirm代码,确认结算单)==>onRspSettlementInfoConfirm(这里加reqQryInstrument代码,查询合约)==>onRspQryInstrument(这里加reqQryDepthMarketData代码,查询行情截面数据)==>onRspQryDepthMarketData(这里通过事件引擎触发下一个任务)
在tdapi中增加这些代码

代码: 全选

    def onRspSettlementInfoConfirm(self, data, error, n, last):
        """确认结算信息回报"""
        log = u'结算信息确认完成'
        self.put_log_event(log)
    
        # 查询合约代码
        self.reqID += 1
        self.reqQryInstrument({}, self.reqID)
    #----------------------------------------------------------------------
    def onRspQryInstrument(self, data, error, n, last):
        """
        合约查询回报
        由于该回报的推送速度极快,因此不适合全部存入队列中处理,
        选择先储存在一个本地字典中,全部收集完毕后再推送到队列中
        (由于耗时过长目前使用其他进程读取)
        """
        if error['ErrorID'] == 0:
            event = Event(type_=EVENT_INSTRUMENT)
            event.dict_['data'] = data
            event.dict_['last'] = last
            self.__eventEngine.put(event)
            
            if last:
                sleep(1)
                self.reqID += 1
                self.reqQryDepthMarketData({}, self.reqID)  # 查询合约截面数据
                
        else:
            log = '合约投资者回报,错误代码:' + str(error['ErrorID']) + ',   错误信息:' + str(error['ErrorMsg'])
            self.put_log_event(log)

    #----------------------------------------------------------------------
    def onRspQryDepthMarketData(self, data, error, n, last): 
        # 常规行情事件,查询合约截面数据的回报
        event = Event(type_=EVENT_MARKETDATA)
        event.dict_['data'] = data
        event.dict_['last'] = last
        self.__eventEngine.put(event)
        
    def qryAccount(self):
        """查询账户"""
        self.reqID += 1
        self.reqQryTradingAccount({}, self.reqID)
        
    #----------------------------------------------------------------------
    def qryPosition(self):
        """查询持仓"""
        self.reqID += 1
        req = {}
        req['BrokerID'] = self.brokerID
        req['InvestorID'] = self.userID
        self.reqQryInvestorPosition(req, self.reqID)
   
在mainEngine中增加保存合约数据和行情截面数据的函数,并在事件引擎中把他们分别注册到EVENT_INSTRUMENT和EVENT_MARKETDATA事件上。

代码: 全选

    def insertInstrument(self, event):
        """插入合约对象"""
        data = event.dict_['data']
        last = event.dict_['last']
        self.list_instrument.append(data)
        if last:#最后一条数据
            # 将查询完成的合约信息保存到本地文件,今日登录可直接使用不再查询
            event = Event(type_=EVENT_LOG)
            log = '合约信息查询完成'
            event.dict_['log'] = log
            self.ee.put(event)
            with open('instrument.json', 'w', encoding="utf-8") as f:
                jsonD = json.dumps(self.list_instrument,indent=4)
                f.write(jsonD)
            self.list_instrument = []
            event = Event(type_=EVENT_LOG)
            log = '合约信息已经保存'
            event.dict_['log'] = log
            self.ee.put(event)
    # ----------------------------------------------------------------------
    def insertMarketData(self, event):
        """插入合约截面数据"""
        data = event.dict_['data']
        last = event.dict_['last']
        self.list_marketdata.append(data)
        if last:
            #更新交易日
            self.md.TradingDay = data['TradingDay']
            # 将查询完成的合约信息保存到本地文件,今日登录可直接使用不再查询
            event = Event(type_=EVENT_LOG)
            log = '合约截面数据查询完成'
            event.dict_['log'] = log
            self.ee.put(event)
            with open('marketdata.json', 'w', encoding="utf-8") as f:
                jsonD = json.dumps(self.list_marketdata, indent=4)
                f.write(jsonD)
            self.list_marketdata = []
            event = Event(type_=EVENT_LOG)
            log = '合约截面数据已经保存'
            event.dict_['log'] = log
            self.ee.put(event)
执行结果如下,并在当前文件夹下生成instrument.json和marketdata.json两个文件.用文本编辑器打开看看,里面是合约信息和行情截面数据

代码: 全选

行情服务器连接成功
交易服务器连接成功
100897交易服务器登录完成
TypeError: 'NoneType' object is not callable
TypeError: 'NoneType' object is not callable
结算信息确认完成
合约信息查询完成
合约信息已经保存
合约截面数据查询完成
合约截面数据已经保存
查资金和查持仓

在tdapi中加入查持仓和查资金的回调函数

代码: 全选

    #----------------------------------------------------------------------
    def onRspQryInvestorPosition(self, data, error, n, last):
        """持仓查询回报"""
        if not data['InstrumentID']:
            return
        if error['ErrorID'] == 0:
            event = Event(type_=EVENT_POSITION)
            event.dict_['data'] = data
            event.dict_['last'] = last
            self.__eventEngine.put(event)
        else:
            log = ('持仓查询回报,错误代码:'  +str(error['ErrorID']) + ',   错误信息:' +str(error['ErrorMsg']))
            self.put_log_event(log)
    #----------------------------------------------------------------------
    def onRspQryTradingAccount(self, data, error, n, last):
        """资金账户查询回报"""
        if error['ErrorID'] == 0:
            event = Event(type_=EVENT_ACCOUNT)
            event.dict_['data'] = data
            self.__eventEngine.put(event)
        else:
            log = ('账户查询回报,错误代码:' +str(error['ErrorID']) + ',   错误信息:' +str(error['ErrorMsg']))
            self.put_log_event(log)
在mainEngine中增加定时循环查询资金和持仓的方法,并把持仓和资金的数据直接打印出来
__init__里面加上相关的变量,在事件引擎注册打印持仓和资金的函数

代码: 全选

        # 循环查询持仓和账户相关
        self.countGet = 0  # 查询延时计数
        self.lastGet = 'Position'  # 上次查询的性质,先查询账户
        # 注册持仓和账户、委托事件
        self.ee.register (EVENT_ACCOUNT, self.account)
        self.ee.register (EVENT_POSITION, self.position)
insertMarketData里面加上下面这行,用每秒一次的计时器来触发循环查询

代码: 全选

            self.ee.register(EVENT_TIMER, self.getAccountPosition)#定时器事件,循环查询
mainEngine加上

代码: 全选

    def getAccountPosition(self, event):
        """循环查询账户和持仓"""
        self.countGet += 1
        # 每n秒发一次查询
        if self.countGet > 5:
            self.countGet = 0  # 清空计数

            if self.lastGet == 'Account':
                self.getPosition()
                self.lastGet = 'Position'
            else:
                self.getAccount()
                self.lastGet = 'Account'

    def account(self,event):#处理账户事件数据
        var = event.dict_['data']
        print(var)

    def position(self, event):#处理持仓事件数据
        var = event.dict_['data']
        print(var)
        
    def getAccount(self):
        """查询账户"""
        self.td.qryAccount()
    # ----------------------------------------------------------------------
    def getPosition(self):
        """查询持仓"""
        self.td.qryPosition()    
运行结果除了之前的内容,增加了每个几秒一次的打印资金信息或打印持仓信息

代码: 全选

{'ReserveBalance': 0.0, 'Reserve': 0.0, 'SpecProductCommission': 0.0, 'FrozenMargin': 0.0, 'BrokerID': '9999', 'CashIn': 0.0, 'FundMortgageOut': 0.0, 'FrozenCommission': 0.0, 'SpecProductPositionProfitByAlg': 0.0, 'Commission': 0.0, 'SpecProductPositionProfit': 0.0, 'Deposit': 0.0, 'DeliveryMargin': 0.0, 'TradingDay': '20180115', 'CurrencyID': 'CNY', 'Interest': 0.0, 'PreDeposit': 932642.0599999999, 'Available': 928742.0599999999, 'SpecProductFrozenMargin': 0.0, 'AccountID': '100897', 'SpecProductMargin': 0.0, 'PreFundMortgageOut': 0.0, 'InterestBase': 0.0, 'SpecProductExchangeMargin': 0.0, 'PreBalance': 953078.5599999999, 'Balance': 949178.5599999999, 'MortgageableFund': 742993.648, 'Withdraw': 0.0, 'SpecProductFrozenCommission': 0.0, 'PreMortgage': 0.0, 'SpecProductCloseProfit': 0.0, 'WithdrawQuota': 742993.648, 'FundMortgageAvailable': 0.0, 'BizType': '\x00', 'PreCredit': 0.0, 'FrozenCash': 0.0, 'SettlementID': 1, 'CloseProfit': 0.0, 'ExchangeDeliveryMargin': 0.0, 'Mortgage': 0.0, 'Credit': 0.0, 'CurrMargin': 20436.5, 'FundMortgageIn': 0.0, 'ExchangeMargin': 20436.5, 'PreFundMortgageIn': 0.0, 'PositionProfit': -3900.0, 'PreMargin': 20436.5}
{'ShortFrozen': 0, 'FrozenMargin': 0.0, 'BrokerID': '9999', 'CashIn': 0.0, 'FrozenCommission': 0.0, 'UseMargin': 10945.0, 'MarginRateByVolume': 0.0, 'CloseProfitByDate': 0.0, 'InstrumentID': 'j1801', 'StrikeFrozen': 0, 'CombLongFrozen': 0, 'CloseProfitByTrade': 0.0, 'TodayPosition': 0, 'TradingDay': '20180115', 'CombShortFrozen': 0, 'YdStrikeFrozen': 0, 'PreSettlementPrice': 2189.0, 'OpenVolume': 0, 'CloseVolume': 0, 'SettlementPrice': 2150.0, 'OpenCost': 190650.0, 'HedgeFlag': '1', 'OpenAmount': 0.0, 'StrikeFrozenAmount': 0.0, 'InvestorID': '100897', 'PositionCost': 218900.0, 'LongFrozenAmount': 0.0, 'ExchangeID': '', 'PreMargin': 0.0, 'CloseProfit': 0.0, 'CloseAmount': 0.0, 'LongFrozen': 0, 'PosiDirection': '2', 'CombPosition': 0, 'YdPosition': 1, 'PositionDate': '1', 'AbandonFrozen': 0, 'ShortFrozenAmount': 0.0, 'FrozenCash': 0.0, 'SettlementID': 1, 'Position': 1, 'ExchangeMargin': 10945.0, 'MarginRateByMoney': 0.0, 'PositionProfit': -3900.0, 'Commission': 0.0}
{'ShortFrozen': 0, 'FrozenMargin': 0.0, 'BrokerID': '9999', 'CashIn': 0.0, 'FrozenCommission': 0.0, 'UseMargin': 4766.5, 'MarginRateByVolume': 0.0, 'CloseProfitByDate': 0.0, 'InstrumentID': 'pp1809', 'StrikeFrozen': 0, 'CombLongFrozen': 0, 'CloseProfitByTrade': 0.0, 'TodayPosition': 0, 'TradingDay': '20180115', 'CombShortFrozen': 0, 'YdStrikeFrozen': 0, 'PreSettlementPrice': 9533.0, 'OpenVolume': 0, 'CloseVolume': 0, 'SettlementPrice': 9533.0, 'OpenCost': 92470.0, 'HedgeFlag': '1', 'OpenAmount': 0.0, 'StrikeFrozenAmount': 0.0, 'InvestorID': '100897', 'PositionCost': 95330.0, 'LongFrozenAmount': 0.0, 'ExchangeID': '', 'PreMargin': 0.0, 'CloseProfit': 0.0, 'CloseAmount': 0.0, 'LongFrozen': 0, 'PosiDirection': '2', 'CombPosition': 0, 'YdPosition': 2, 'PositionDate': '1', 'AbandonFrozen': 0, 'ShortFrozenAmount': 0.0, 'FrozenCash': 0.0, 'SettlementID': 1, 'Position': 2, 'ExchangeMargin': 4766.5, 'MarginRateByMoney': 0.0, 'PositionProfit': 0.0, 'Commission': 0.0}
{'ShortFrozen': 0, 'FrozenMargin': 0.0, 'BrokerID': '9999', 'CashIn': 0.0, 'FrozenCommission': 0.0, 'UseMargin': 4725.0, 'MarginRateByVolume': 0.0, 'CloseProfitByDate': 0.0, 'InstrumentID': 'pp1801', 'StrikeFrozen': 0, 'CombLongFrozen': 0, 'CloseProfitByTrade': 0.0, 'TodayPosition': 0, 'TradingDay': '20180115', 'CombShortFrozen': 0, 'YdStrikeFrozen': 0, 'PreSettlementPrice': 9450.0, 'OpenVolume': 0, 'CloseVolume': 0, 'SettlementPrice': 9450.0, 'OpenCost': 86600.0, 'HedgeFlag': '1', 'OpenAmount': 0.0, 'StrikeFrozenAmount': 0.0, 'InvestorID': '100897', 'PositionCost': 94500.0, 'LongFrozenAmount': 0.0, 'ExchangeID': '', 'PreMargin': 0.0, 'CloseProfit': 0.0, 'CloseAmount': 0.0, 'LongFrozen': 0, 'PosiDirection': '3', 'CombPosition': 0, 'YdPosition': 2, 'PositionDate': '1', 'AbandonFrozen': 0, 'ShortFrozenAmount': 0.0, 'FrozenCash': 0.0, 'SettlementID': 1, 'Position': 2, 'ExchangeMargin': 4725.0, 'MarginRateByMoney': 0.0, 'PositionProfit': 0.0, 'Commission': 0.0}
目前为止的完整代码在附件里可以下载。如果你按照上面的代码自己加的话,可能已经发现出错了。因为依赖的模块增加了,对照附件和自己的出错提示改一下吧。
附件
demo.rar
(3.23 KiB) 下载 459 次
上次由 玉米棉花糖 在 2018年 1月 15日 13:56,总共编辑 8 次。

头像
玉米棉花糖
帖子: 92
注册时间: 2018年 1月 11日 17:13

Re: vnpy量化平台学习过程中的经验分享

#7

帖子 玉米棉花糖 » 2018年 1月 12日 10:25

事件引擎EventEngine

事件引擎是整个系统的核心,有必要单独说说。

有时候我们会让电脑简单的按照顺序执行一系列任务,这种程序很简单:

代码: 全选

print('我在执行第一个任务')
print('我在执行第二个任务')
print('我在执行第三个任务')
电脑会按照顺序一直执行直到完成所有任务。这种方式很有用,但是不够。

回想一下我们平时使用软件的经历,大部分软件不是这么简单的,我们总是在和软件进行交互。有时输入信息,有时按下按钮,软件在保持界面正常运行的前提下,会响应我们的输入,执行后给出结果。这就涉及到多线程,以及线程之间的通信。

在交易软件中,有行情引擎、交易引擎、策略引擎、图形界面,有时还有用户输入,他们之间如何协调沟通?
VNPY给出了一个很好的解决方案,用事件驱动的方式运行程序。事件引擎的作用就像机场取行李的传送带,各种函数可能是向上面放行李的人,也可能是站在旁边等行李的人。放行李的人要在行李上贴一个标签(eventType),取行李的人会先告诉事件引擎我在等贴着哪种标签的行李(register(eventType, handler)),事件引擎负责把每个行李分配给正确的人(handler处理event)
同一个事件引擎的实例可以作为参数传给函数或类,就实现了引擎间、线程间的沟通协调。

事件引擎的原理可以看这篇文档:
https://github.com/vnpy/vnpy/wiki/%E4%B ... 5%E6%93%8E
先说这么多,在使用过程中你会渐渐体会到事件引擎的妙处。

事件引擎使用很简单,分三步:
1、创建事件引擎的实例
ee = EventEngine()
之后就可以把ee这个实例作为参数传给需要沟通的对象

2、注册handler
ee.register('log', print_log)
代表把print_log这个函数注册到事件类型为'log'的事件上,每当有这样的事件,事件引擎就会调用print_log来处理一次这个事件

3、投放事件
event = Event()
event.type_ = 'log'
event.dict_['data'] = 'data'
ee.put(event)
创建一个事件的实例,注明它的类型,在字典里放你想放的数据,然后投放到事件引擎中。事件引擎会把它交给注册过的函数来处理

事件引擎的代码

代码: 全选

# encoding: UTF-8
'''
VNPY的事件引擎原有两版,一个用QTimer计时,一个用子线程计时
这里选择了子线程的版本
'''
# 系统模块
from queue import Queue, Empty
from threading import Thread
from collections import defaultdict
from time import sleep

EVENT_TIMER = 'eTimer'   

########################################################################
class EventEngine(object):
    """
    计时器使用python线程的事件驱动引擎        
    """

    #----------------------------------------------------------------------
    def __init__(self):
        """初始化事件引擎"""
        # 事件队列
        self.__queue = Queue()
        
        # 事件引擎开关
        self.__active = False
        
        # 事件处理线程
        self.__thread = Thread(target = self.__run)
        
        # 计时器,用于触发计时器事件
        self.__timer = Thread(target = self.__runTimer)
        self.__timerActive = False                      # 计时器工作状态
        self.__timerSleep = 1                           # 计时器触发间隔(默认1秒)        
        
        # 这里的__handlers是一个字典,用来保存对应的事件调用关系
        # 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
        self.__handlers = defaultdict(list)
        
        # __generalHandlers是一个列表,用来保存通用回调函数(所有事件均调用)
        self.__generalHandlers = []        
        
    #----------------------------------------------------------------------
    def __run(self):
        """引擎运行"""
        while self.__active == True:
            try:
                event = self.__queue.get(block = True, timeout = 1)  # 获取事件的阻塞时间设为1秒
                self.__process(event)
            except Empty:
                pass
            
    #----------------------------------------------------------------------
    def __process(self, event):
        """处理事件"""
        # 检查是否存在对该事件进行监听的处理函数
        if event.type_ in self.__handlers:
            # 若存在,则按顺序将事件传递给处理函数执行
            [handler(event) for handler in self.__handlers[event.type_]]
            
            # 以上语句为Python列表解析方式的写法,对应的常规循环写法为:
            #for handler in self.__handlers[event.type_]:
                #handler(event) 
                
        # 调用通用处理函数进行处理
        if self.__generalHandlers:
            [handler(event) for handler in self.__generalHandlers]        
               
    #----------------------------------------------------------------------
    def __runTimer(self):
        """运行在计时器线程中的循环函数"""
        while self.__timerActive:
            # 创建计时器事件
            event = Event(type_=EVENT_TIMER)
        
            # 向队列中存入计时器事件
            self.put(event)    
            
            # 等待
            sleep(self.__timerSleep)

    #----------------------------------------------------------------------
    def start(self, timer=True):
        """
        引擎启动
        timer:是否要启动计时器
        """
        # 将引擎设为启动
        self.__active = True
        
        # 启动事件处理线程
        self.__thread.start()
        
        # 启动计时器,计时器事件间隔默认设定为1秒
        if timer:
            self.__timerActive = True
            self.__timer.start()
    
    #----------------------------------------------------------------------
    def stop(self):
        """停止引擎"""
        # 将引擎设为停止
        self.__active = False
        
        # 停止计时器
        self.__timerActive = False
        self.__timer.join()
        
        # 等待事件处理线程退出
        self.__thread.join()
            
    #----------------------------------------------------------------------
    def register(self, type_, handler):
        """注册事件处理函数监听"""
        # 尝试获取该事件类型对应的处理函数列表,若无defaultDict会自动创建新的list
        handlerList = self.__handlers[type_]
        
        # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
        if handler not in handlerList:
            handlerList.append(handler)
            
    #----------------------------------------------------------------------
    def unregister(self, type_, handler):
        """注销事件处理函数监听"""
        # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求   
        handlerList = self.__handlers[type_]
            
        # 如果该函数存在于列表中,则移除
        if handler in handlerList:
            handlerList.remove(handler)

        # 如果函数列表为空,则从引擎中移除该事件类型
        if not handlerList:
            del self.__handlers[type_]  
        
    #----------------------------------------------------------------------
    def put(self, event):
        """向事件队列中存入事件"""
        self.__queue.put(event)

    #----------------------------------------------------------------------
    def registerGeneralHandler(self, handler):
        """注册通用事件处理函数监听"""
        if handler not in self.__generalHandlers:
            self.__generalHandlers.append(handler)
            
    #----------------------------------------------------------------------
    def unregisterGeneralHandler(self, handler):
        """注销通用事件处理函数监听"""
        if handler in self.__generalHandlers:
            self.__generalHandlers.remove(handler)
########################################################################
class Event:
    """事件对象"""

    #----------------------------------------------------------------------
    def __init__(self, type_=None):
        """Constructor"""
        self.type_ = type_      # 事件类型
        self.dict_ = {}         # 字典用于保存具体的事件数据


#----------------------------------------------------------------------

    
# 直接运行脚本可以进行测试
if __name__ == '__main__':
    def test():
        """测试函数"""
        from datetime import datetime
        
        def simpletest(event):
            print(u'处理每秒触发的计时器事件:%s' % str(datetime.now()))
        
        
        ee = EventEngine()
        ee.register(EVENT_TIMER, simpletest)
        ee.start()
    test()
上次由 玉米棉花糖 在 2018年 1月 15日 13:55,总共编辑 5 次。

头像
博弈
帖子: 241
注册时间: 2018年 1月 11日 15:35

Re: vnpy量化平台学习过程中的经验分享

#8

帖子 博弈 » 2018年 1月 12日 17:46

玉米棉花糖 写了:
2018年 1月 11日 17:31
基于vnpy开发的话,你需要
1、能看懂廖雪峰的这个python教程前半部分,即从开头到“图形界面”部分
https://www.liaoxuefeng.com/wiki/001431 ... a2e542c000
https://github.com/vvipi/py3_demo_on_vnpy_ctp
我得从这开始...

头像
dapanji
帖子: 3032
注册时间: 2018年 1月 12日 16:41

Re: vnpy量化平台学习过程中的经验分享

#9

帖子 dapanji » 2018年 1月 12日 18:40

感谢分享 :D
看图出奇迹,看基本面穷三代

头像
玉米棉花糖
帖子: 92
注册时间: 2018年 1月 11日 17:13

Re: vnpy量化平台学习过程中的经验分享

#10

帖子 玉米棉花糖 » 2018年 1月 12日 22:13

行情订阅、tick数据合成

先从vnpy把tick数据的类直接抄过来

代码: 全选

class CtaTickData(object):
    """Tick数据"""
    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""       
        self.vtSymbol = ''            # vt系统代码
        self.symbol = ''              # 合约代码
        self.exchange = ''            # 交易所代码

        # 成交数据
        self.lastPrice = 0.0            # 最新成交价
        self.volume = 0                 # 最新成交量
        self.openInterest = 0           # 持仓量
        
        self.upperLimit = 0.0           # 涨停价
        self.lowerLimit = 0.0           # 跌停价
        
        # tick的时间
        self.date = ''            # 日期
        self.time = ''            # 时间
        self.datetime = None                # python的datetime时间对象
        
        # 五档行情
        self.bidPrice1 = 0.0
        self.bidPrice2 = 0.0
        self.bidPrice3 = 0.0
        self.bidPrice4 = 0.0
        self.bidPrice5 = 0.0
        
        self.askPrice1 = 0.0
        self.askPrice2 = 0.0
        self.askPrice3 = 0.0
        self.askPrice4 = 0.0
        self.askPrice5 = 0.0        
        
        self.bidVolume1 = 0
        self.bidVolume2 = 0
        self.bidVolume3 = 0
        self.bidVolume4 = 0
        self.bidVolume5 = 0
        
        self.askVolume1 = 0
        self.askVolume2 = 0
        self.askVolume3 = 0
        self.askVolume4 = 0
        self.askVolume5 = 0    

在mdapi中增加tick数据合成、订阅退订方法

代码: 全选

    def onRtnDepthMarketData(self, data):
        """行情推送"""
        if not data['Volume']:
            return
        
        # 创建对象
        tick = CtaTickData()
        
        tick.symbol = data['InstrumentID']
        tick.exchange = data['ExchangeID']   #exchangeMapReverse.get(data['ExchangeID'], u'未知')
        tick.vtSymbol = tick.symbol #'.'.join([tick.symbol, EXCHANGE_UNKNOWN])
        
        tick.lastPrice = data['LastPrice']
        tick.volume = data['Volume']
        tick.openInterest = data['OpenInterest']
        

        tick.time = '.'.join([data['UpdateTime'], str(data['UpdateMillisec']/100)])
        
        # 这里由于交易所夜盘时段的交易日数据有误,所以选择本地获取
        tick.date = datetime.now().strftime('%Y%m%d')   
        
        tick.openPrice = data['OpenPrice']
        tick.highPrice = data['HighestPrice']
        tick.lowPrice = data['LowestPrice']
        tick.preClosePrice = data['PreClosePrice']
        
        tick.upperLimit = data['UpperLimitPrice']
        tick.lowerLimit = data['LowerLimitPrice']
        
        # CTP只有一档行情
        # 无报价时用涨跌停板价替换
        if data['BidPrice1'] > tick.upperLimit:
            tick.bidPrice1 = tick.lowerLimit
        else:
            tick.bidPrice1 = data['BidPrice1']
        if data['AskPrice1'] > tick.upperLimit:
            tick.askPrice1 = tick.upperLimit
        else:
            tick.askPrice1 = data['AskPrice1']
            
        tick.bidVolume1 = data['BidVolume1']
        tick.askVolume1 = data['AskVolume1']
        
        event1 = Event(type_=(EVENT_TICK + data['InstrumentID']))
        event1.dict_['data'] = tick
        self.__eventEngine.put(event1)
    #----------------------------------------------------------------------  
    def subscribe(self, symbol):
        """订阅合约"""
        # 这里的设计是,如果尚未登录就调用了订阅方法
        # 则先保存订阅请求,登录完成后会自动订阅
        if self.loginStatus:
            self.subscribeMarketData(str(symbol))
        self.subscribedSymbols.add(symbol)   
        
    #----------------------------------------------------------------------
    def unsubscribe(self, symbol):
        """退订合约"""
        self.unSubscribeMarketData(str(symbol))

在mainEngine中临时增加一个打印最新价的函数,在开始循环查询的位置增加订阅rb1805合约行情的代码

代码: 全选

    def print_last_price(self, event):
        tick = event.dict_['data']
        lastPrice = tick.lastPrice
        print(lastPrice)

代码: 全选

            self.ee.register(EVENT_TICK + 'rb1805', self.print_last_price) # 打印最新价的函数注册到特定合约的tick事件
            self.md.subscribe('rb1805')
然后把api里面用不到和还没用到的函数先都补上,全部pass,这样就不会再报TypeError的错了
持仓和资金查询,数据全打出来会刷屏,先改掉
运行结果:

代码: 全选

行情服务器连接成功
交易服务器连接成功
行情服务器登录完成
100897交易服务器登录完成
结算信息确认完成
合约信息查询完成
合约信息已经保存
合约截面数据查询完成
合约截面数据已经保存
最新价: 3787.0
收到查询资金的回报
收到查询持仓的回报
收到查询持仓的回报
收到查询持仓的回报
老规矩,完整代码在附件
附件
demo_tick.rar
(5.41 KiB) 下载 447 次
上次由 玉米棉花糖 在 2018年 1月 13日 20:28,总共编辑 5 次。

回复