为VNPY 2版本加入聚宽数据源

之前写过一篇文章,是为VNPY1.9.2 版本加入聚宽的数据源,这个算是后续版本,为VNPY2版本加入聚宽数据源。


VNPY论坛里面有个篇文章也是讲加入聚宽数据源的,不过他是相当于集成到VNPY库代码中去,和VNPY自带的米筐数据源一样。我这篇其实也是借鉴他的,当然他说他借鉴之前我的1.92那篇。反正互相借鉴。


之前VNPY 1版本中,我的个人代码很多是直接在VNPY库代码直接修改或者增加的。每次VNPY升级就是非常头疼,要做代码对比,在一些可能被更新覆盖的地方再次维护测试。而且因为更新的地方很乱,造成后面生产版本一致停留在VNPY1.92。


还是要慢慢迁移到VNPY2版本,毕竟很多吸引的特性。这次准备不在VNPY的库文件代码上修改,而是像引用NUMPY或者Pandas这样,采用引用继承的方式,把自己的代码和VNPY的库代码隔离;这样即使VNPY升级,个人代码不用太担心,只要简单测试,保证继承引用VNPY的类或方法正常工作就可以了。


这里只要安装好了VNPY2.1以后版本,只要在任意地方拷贝下面代码,维护聚宽登录信息运行就可以下载聚宽数据。

其中config.json是保存聚宽登录账户,和需要下载的品种。品种信息是按照VNPY的品种和交易所信息组合字符串,如Symbol.Exchang。

而JQDataload.py是运行下载的,类JQDataService初始化是连接聚宽信息源,方法to_jq_symbol是把VNPY品种名称转换为聚宽名称,方法query_history是按照时间段下载分钟线要指定凭证,方法downloadAllMinuteBar是按照指定最近天数,下载config.json所以的品种数据。

最好去我的Github下载,比较方便

https://github.com/BillyZhangGuoping/VNPY2_BILLY

代码如下:

# encoding: UTF-8
import json
import time
from datetime import datetime, timedelta
from typing import List
import jqdatasdk as jq
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import database_manager
from vnpy.trader.object import (
   BarData
)
class JQDataService:
   """
   Service for download market data from Joinquant
   """
   def __init__(self):
      # 加载配置
      config = open('config.json')
      self.setting = json.load(config)
      USERNAME = self.setting['jqdata.Username']
      PASSWORD = self.setting['jqdata.Password']
      try:
         jq.auth(USERNAME, PASSWORD)
      except Exception as ex:
         print("jq auth fail:" + repr(ex))
   def to_jq_symbol(self, symbol: str, exchange: Exchange):
      """
      CZCE product of RQData has symbol like "TA1905" while
      vt symbol is "TA905.CZCE" so need to add "1" in symbol.
      """
      if exchange in [Exchange.SSE, Exchange.SZSE]:
         if exchange == Exchange.SSE:
            jq_symbol = f"{symbol}.XSHG"  # 上海证券交易所
         else:
            jq_symbol = f"{symbol}.XSHE"  # 深圳证券交易所
      elif exchange == Exchange.SHFE:
         jq_symbol = f"{symbol}.XSGE"  # 上期所
      elif exchange == Exchange.CFFEX:
         jq_symbol = f"{symbol}.CCFX"  # 中金所
      elif exchange == Exchange.DCE:
         jq_symbol = f"{symbol}.XDCE"  # 大商所
      elif exchange == Exchange.INE:
         jq_symbol = f"{symbol}.XINE"  # 上海国际能源期货交易所
      elif exchange == Exchange.CZCE:
         # 郑商所 的合约代码年份只有三位 需要特殊处理
         for count, word in enumerate(symbol):
            if word.isdigit():
               break
         # Check for index symbol
         time_str = symbol[count:]
         if time_str in ["88", "888", "99", "8888"]:
            return f"{symbol}.XZCE"
         # noinspection PyUnboundLocalVariable
         product = symbol[:count]
         year = symbol[count]
         month = symbol[count + 1:]
         if year == "9":
            year = "1" + year
         else:
            year = "2" + year
         jq_symbol = f"{product}{year}{month}.XZCE"
      return jq_symbol.upper()
   def query_history(self, symbol, exchange, start, end, interval='1m'):
      """
      Query history bar data from JQData and update Database.
      """
      jq_symbol = self.to_jq_symbol(symbol, exchange)
      # if jq_symbol not in self.symbols:
      #     return None
      # For querying night trading period data
      # end += timedelta(1)
      now = datetime.now()
      if end >= now:
         end = now
      elif end.year == now.year and end.month == now.month and end.day == now.day:
         end = now
      df = jq.get_price(
         jq_symbol,
         frequency=interval,
         fields=["open", "high", "low", "close", "volume"],
         start_date=start,
         end_date=end,
         skip_paused=True
      )
      data: List[BarData] = []
      if df is not None:
         for ix, row in df.iterrows():
            bar = BarData(
               symbol=symbol,
               exchange=exchange,
               interval=Interval.MINUTE,
               datetime=row.name.to_pydatetime() - timedelta(minutes=1),
               open_price=row["open"],
               high_price=row["high"],
               low_price=row["low"],
               close_price=row["close"],
               volume=row["volume"],
               gateway_name="JQ"
            )
            data.append(bar)
      database_manager.save_bar_data(data)
      return data
   def downloadAllMinuteBar(self, days=1):
      """下载所有配置中的合约的分钟线数据"""
      if days != 0:
         startDt = datetime.today() - days * timedelta(1)
         enddt = datetime.today()
      else:
         startDt = datetime.today() - 10 * timedelta(1)
         enddt = datetime.today()
      print('-' * 50)
      print(u'开始下载合约分钟线数据')
      print('-' * 50)
      if 'Bar.Min' in self.setting:
         l = self.setting["Bar.Min"]
         for VNSymbol in l:
            dt0 = time.process_time()
            symbol = VNSymbol.split(".")[0]
            exchange = Exchange(VNSymbol.split(".")[1])
            self.query_history(symbol, exchange, startDt, enddt, interval='1m')
            cost = (time.process_time() - dt0)
            print(u'合约%s的分钟K线数据下载完成%s - %s,耗时%s秒' % (symbol, startDt, enddt, cost))
            print(jq.get_query_count())
         print('-' * 50)
         print
         u'合约分钟线数据下载完成'
         print('-' * 50)
      return None
if __name__ == '__main__':
   JQdata = JQDataService()
   JQdata.downloadAllMinuteBar(days=30)

config.json

{
   "jqdata.Username": "",
   "jqdata.Password": "",
   "Bar.Min":
   [
      "MA8888.CZCE",
      "MA009.CZCE",
      "SR8888.CZCE",
      "RM8888.CZCE",
    ]
}


请使用浏览器的分享功能分享到微信等