支持中低频量化交易的单机数据平台

  1. 云栖社区>
  2. 博客>
  3. 正文

支持中低频量化交易的单机数据平台

灵圣 2019-11-15 09:54:24 浏览1284
展开阅读全文

量化交易中,数据系统主要需要支撑两个场景。第一个是策略回测,面对的是过去10年的历史数据,数据量在10TB级别。目标是当我们脑袋里面有一个自认为绝好的策略思路时,能够快速的进行验证其是不是有效。从技术实现的角度来说,回测就是一遍一遍的轮询大量的历史数据进行计算。这里面的历史数据是不会更改的,要求读取的速度特别快。

另一个是实时交易,面对的是每秒10k+的交易数据流入,能够即时通过原始数据计算出量化因子,做出交易决策。还有风险控制,在出现事先没有预期到的风险时,要能够迅速把持仓退出来。这都要求延迟尽可能地小,控制滑点成本。

金融时间序列数据的特点


  1. 数据量比较大。以目前A股的level1 tick数据为例,每支股票每3s就会生成一条数据,3k+支股票每天交易4个小时,总计生成接近1500万条原始记录,加入基于原始记录生成的各类因子,数据量要翻N倍。使用level2逐笔成交数据的话,数据量要更大。
  2. 数据是分块的。依赖于交易所的交易日界定,每个交易日都是独立的,所以可以将每支股票每天的数据作为一块互不相关的数据块。每个数据块大约1M大小。
  3. 全部是数值型,没有文本。对数据的压缩很有效。
  4. 数据稳定增长,不会出现访问峰值。这对于系统的承压能力要求相对较低。
  5. 一次写入多次读取,不会修改已经写入的数据,数据写入压力小。
  6. 不需要支持事务。
  7. 对时效性和准确性要求很高。如果出现比较大的延迟或者数据错误,那策略的表现变得不可控,无法执行。

数据库的选择


  1. MySQL:以上文所提到的A股level1 tick数据量,MySQL是无法支撑的。对于历史数据来说数据量太大了,MySQL的数据压缩效率不高,存储和效率都无法满足需求;对于实时交易来说延迟会比较大。

如果数据频率时分钟K线,那用MySQL是可以解决的。使用MyISAM存储引擎,因为MyISAM可以对数据压缩,节约存储空间,读性能也要比InnoDB要好。

  1. MongoDB:一个不错的选择,目前有很多量化团队在使用MongoDB作数据存储。对于中低频策略应该完全没问题。Mongoing中文社区 也有一系列相关的文章:

  2. InfluxDB:无论是面对历史回测或者实时交易的场景,InfluxDB都是很好的选择。具体在下文讨论。
  3. HDF5:非常高效的二进制文件,用来存储静态数据,特别是面对科学计算问题。具体在下文讨论。
  4. Kdb+:商用软件,性能很强大,但是q查询语言学习曲线很陡峭,而且license很贵。
  5. DolphinDB:比较新的时序数据库,也是商用软件, 官方宣称其性能可以替代kdb+。

InfluxDB


为什么选择InfluxDB

InfluxDB是目前最受欢迎的时序数据库,而且社区活跃度增长非常快。一图胜千言,我们看下面两个图就可以了解时序数据库的现状。

Ranking of Time Series DBMS (from DB-Engines)

Ranking_of_Time_Series_DBMS_from_DB_Engines_

Trend of InfluxDB Popularity (from DB-Engines)

Trend_of_InfluxDB_Popularity_from_DB_Engines_

与其它数据库对比
MongoDB vs InfluxDB | InfluxData Time Series Workloads
  • InfluxDB outperformed MongoDB by 2.4x when it came to data ingestion
  • InfluxDB outperformed MongoDB by delivering 20x better compression
  • InfluxDB outperformed MongoDB by delivering 5.7x better query performance
InfluxDB vs OpenTSDB | Time Series Database Comparison

InfluxDB和OpenTSDB是目前最受欢迎两个时序数据库。
易用性:

  • 在单机上,InfluxDB就是一个独立安装包,安装配置都很简单。
  • 在集群系统中,OpenTSDB使用HBase存储数据,比较成熟。InfluxDB的集群解决方案是商业化的。
    性能:
  • InfluxDB outperformed OpenTSDB by 9x when it came to data ingestion
  • InfluxDB outperformed OpenTSDB by delivering 8x better compression
  • InfluxDB outperformed OpenTSDB by delivering a minimum of 7x better query throughput
InfluxDB硬件配置建议
Load Field writes per second Moderate queries per second Unique series
Low < 5 thousand < 5 < 100 thousand
Moderate < 250 thousand < 25 < 1 million
High > 250 thousand > 25 > 1 million
Probably infeasible > 750 thousand > 100 > 10 million
  • Low - CPU: 2-4 cores, RAM: 2-4 GB, IOPS: 500
  • Moderate - CPU: 4-6 cores, RAM: 8-32 GB, IOPS: 500-1000
  • High - CPU: 8+ cores, RAM: 32+ GB, IOPS: 1000+
  • Probably infeasible load - cluster solution

根据上文的推算结果,这里的load介于Moderate与High之间,使用单机InfluxDB就够了。

目前很多量化团队用的都是单机架构,主要在提高单机性能。那为什么不用分布式系统,比如Hive/HBase?因为学习和维护成本高,对于中小团队不现实。另一个原因就是这里数据并不是高并发的场景,性能较好的单机就可以解决。

InfluxDB存储交易数据

InfluxDB使用细节不在这里展开。学习资料:

在我们的系统中,每支股票用一个独立的 measurement 存储,类似于MySQL里面的table。如上文所说,每支股票每天的交易tick被当作一个独立数据块,在InfluxDB里面存储为一个series,通过添加tag记录交易日来区分。还加入另一个tag来记录数据源,因为我们可能会有多个数据源,这个tag可用来做数据源可靠性分析。

数据(line protocol)示例,其中datesource就是数据的tag集:

000001,source=XYZ,date=20190103 Price=123.45,Volume=6789,Amount=10111213 1546480800000

检索示例,查询出某支股票一整天的交易数据,InfluxQL跟SQL使用基本一样:

SELECT * FROM "000001" WHERE date='20190103'
使用技巧
  • InfluxDB是不支持事务的,所以在读/写操作同时进行的场景中,有可能一条记录只有一半被写入,就被读出来了,这就是脏数据。为了判断读出来的是不是脏数据,需要对取出来的数据进行检查,如果某个不可能为空的字段是空值,那么求需要重新取一次。
  • 复制measurement:

    SELECT * INTO measurement_new FROM measurement_old GROUP BY *

HDF5


对于实时交易的场景,用InfluxDB提供数据管理系统,使用方便,也可以解耦合数据模块、计算模块和交易模块。

但是在面对历史数据回测的场景中,我们会预先通过原始数据计算出因子数据,在整个回测过程中只会对数据进行读取,不会做任何更新。如果这里依旧使用InfluxDB,就会在数据库连接和网络传输上产生额外的时间开销,这是没有必要的。这种情况下,本地文件存储就是一个很好的选择。高效而且简单易用的HDF5就是首选,可参考Python和HDF5大数据应用

HDF5中有一个dataset的概念,就是一个相关数据组成的一个数据集,在我们的问题里面,前文所说的数据块就很好的符合这个概念,每个股票每天的数据作为一个数据集存储。

API接口
使用技巧
  • 不建议用pandas中的Dataframe.to_hdf5直接存储,而是使用h5py存储Dataframe内部的numpy.ndarray,读取时再将其组装为Dataframe。因为pandas会存入很多冗余信息,存储大小是后者的5倍以上。
  • 使用压缩功能对数据进行压缩,节约存储空间。

    # save: ticks is an instance of pandas.Dataframe
    with h5py.File('data.h5', mode='w') as f:
        f.create_dataset('/20190101/000001', data=ticks, compression='gzip', compression_opts=6, chunks=ticks.shape)
        
    # load: read dataset and pack it to a pandas.Dataframe
    columns = ['Price', 'Volume', 'Amount']
    with h5py.File('data.h5') as f:
        dset = f.get('/20190101/000001')
        values = dset.value
    ticks = pandas.Dataframe(data=numpy.array(values), columns=columns)

附录


我们开源的量化交易工具

网友评论

登录后评论
0/500
评论