MyBatisPlus多数据源加ES大宽表架构落地实践

1.背景

      在微服务大行其道的今天,我们在业务系统的开发中难免会遇到一些问题,由于微服务的微的特点,将之前单体的应用划分为许多的微服务的模块,数据库也从原来的一整个库划分为许多业务库,从而就让各个微服务模块之前的交互变得不方便,从而引入了一些远程调用服务的rpc框架的出现,比如fegin、dubbo、grpc、http客户端工具等,让操作业务库的数据变得都要走一次网络远程接口调用,这样就产生了网络开销,哪有没有什么好的方案来解决这个问题呢?业务场景如下图所示,互联网公司有各种业务子系统,子系统都有有支付的功能,所以搞了如图所示的聚合支付服务,提供调用各个支付平台的支付相关的接口,各个业务系统有自己的业务订单数据库,聚合支付也有自己的交易订单数据库,互联网公司的财务会需要一个财务对账系统,需求是要去拉取第三方支付平台的支付单跟支付聚合服务库的交易订单对账,然后又要从支付聚合服务库的交易订单跟各个业务系统的业务订单数据对账,那么这两个财务的需求你应该如何设计这个对账系统呢?

2.方案

需求:

支付聚合服务交易订单和第三方支付平台的支付单对账---资金对账单

支付聚合服务交易订单需要和各个业务子系统的业务订单对账---业务对账单

需要建立资金对账表和N个业务对账表

2.1使用传统的mysql数据库+fegin伪rpc调用

      由于mysql数据是关系型数据库(一张二维表),如果业务系统过于复杂,那么建立资金对账表和N个业务对账表里面的字段就会相当的多,每次如果财务需要一些信息,那就得加字段,然后重新跑批更新之前已经生成的数据填充后面新增的字段,这样就会很麻烦,同时每次跑批(使用xxl-job)mysql的压力也会增大,每次去获取各个业务系统都需要使用fegin调用,使用fegin调用如果由于网络抖动(网络不稳定)会导致调用失败,从而让对账失败,所以就会产生巨大的网络开销,不仅会让网络堵塞还让服务器和数据库的压力增大,这种传统的方式会让人很痛苦。

2.2使用mybatisPlus多数据源+ES大宽表架构

       es启动器

       easy-es官网:https://www.easy-es.cn/

       elasticsearch杀手神器,让es操作更简单

       由于ES的特性可以将原先的mysql表主要的字段设计在一个索引字段中,可以设计一个json的字段存储对账两方的订单的原始数据,这种就方便查看全部的详细数据,不至于每次都需要去库里面查数据看业务对账的代码逻辑是怎样的来确定生成的该条对账数据是正确的还是错误的(差错数据对账自动标识出来的,就根据关联的id和两边的金额是否一致判断),同时使用mybatisPlus的多数据源的功能直接在财务对账系统中使用它嫁接在各个业务系统的数据库上,每次需要业务数据就只要通过切换数据源去各自的业务数据库中把数据搂过来,事先设计好索引,比如资金账单索引的设计,它的索引的id可以使用支付聚合服务交易订单的id作为索引的id这样每次重新执行对账,索引中的每一条数据会被更新覆盖,索引的各个字段会被覆盖更新,使用es的批量更新操作需要控制好并发,最好是单行程操作索引数据,还需要写接口去拉取第三方支付系统的账单数据,比如微信的是只能查询当前日期30天内的账单数据还是3个月的数据,所以每次对账数据都只能拉取到几个月内的数据,所以批处理任务每次执行的是前几天的数据,需要将其数据做一个备份,可以上传到OSS等文件存储做一个备份,这个是为了后期如果有数据对账数据是有误的可以去查看这个备份文件做一个排查和修复数据时使用,然后两种对账数据从es对用的索引中查数据万级别以下的数据分页查询也很轻松,如果数据量非常大的情况下,使用es分页查询会有深度分页的问题,解决思路参考如下连接,由于基本都是历史数据使用ES的大宽表将字段平铺在ES的索引中就可以轻松的解决上面两个需求,同时ES的近实时的查询,最多有1s的延迟,也提供了多维度的数据检索聚合的能力,单索引的存储检索能力达到PB级别,实际上索引分片一个在30G-50G最佳,这也是ES亿级文档的强大的检索聚合能。

# ElasticSearch深度分页解决方案:
https://juejin.cn/post/7091619223659053069

mybaitsPlus多数据源教程:

pom依赖:

        
       <dependency>
           <groupId>com.baomidougroupId>
           <artifactId>dynamic-datasource-spring-boot-starterartifactId>
           <version>2.4.2version>
       dependency>
       <dependency>
           <groupId>p6spygroupId>
           <artifactId>p6spyartifactId>
           <version>3.8.0version>
       dependency>

yml配置:

spring:
datasource:
  p6spy: true
  dynamic:
    datasource:
      # 主库
      master:
        url: jdbc:mysql://xxxxx:3306/xxxx?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
        username: xxx
        password: xxxx
        driver-class-name: com.mysql.cj.jdbc.Driver
       # 业务订单001库
      order_001:
        url: jdbc:mysql://xxxx:3306/xxxxx?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
        username: xxxx
        password: xxxx
        driver-class-name: com.mysql.cj.jdbc.Driver  

启动类上排除DataSourceAutoConfiguration:

@EnableFeignClients
@EnableDiscoveryClient
@MapperScan("com.xx.xxx.dao")
@SpringBootApplication(scanBasePackages = {"com.xxx.xxxx.*"}, exclude = {
       DataSourceAutoConfiguration.class
})
public class xxxxxServiceApplication {

   public static void main(String[] args) {
       SpringApplication.run(xxxxxxServiceApplication.class, args);
  }

}

      数据源切换在类或方法上加上@DS("order_001")注解,order_001表示切换使用order_001库,然后调用相应的dao层方法即可取到相应库的数据了,使用mybatisPlus的多数据源切换异步更新、保存或者删除数据操作,需要使用如下姿势,不让会报错:

@Slf4j
@Service
@DS("order_001")
public class OrderServiceImpl extends ServiceImpl<OrderDao,OrderEntity> implements IOrderService {
   
   @Autowired
   private  OrderDao orderDao;
   
   @Override
   public List<OrderEntity> getOrdersByUserId(Long userId) {
       QueryWrapper<OrderEntity> queryWrapper = new QueryWrapper<>();
       queryWrapper.eq("user_id", userId).eq("is_delete", 0);
       List<OrderEntity> orderEntities = orderDao.selectList(queryWrapper);
       String dataSourceLookupKey = DynamicDataSourceContextHolder.getDataSourceLookupKey();
       CompletableFuture<Void> vcf1 = CompletableFuture.runAsync(() -> {
           Optional.ofNullable(orderEntities).ifPresent(oe -> {
             DynamicDataSourceContextHolder.setDataSourceLookupKey(dataSourceLookupKey);
               oe.stream().forEach(o -> {
                   orderDao.deleteById(o.getId());
              });
          });
      });
       try {
           vcf1.get();
      } catch (Exception e) {
           e.printStackTrace();
      }
       return orderEntities;
  }
}

       DynamicDataSourceContextHolder也是我在遇到报错的时候去看了mybatisPlus的源码看到了这个类,先在主线程中的dataSourceLookupKey值取出来,然后在异步线程中重新设置进去,这样子线程就知道主线程使用的是哪个数据源了。

curl调用ES接口和索引设计示例:

首先使用curl获取集群中可用的Elasticsearch索引列表:
curl -XGET 'http://localhost:9200/_cat/indices?v&pretty'
curl -XGET 'http://localhost:9200/fund_rec/_count?pretty'
删除索引
curl -i -XDELETE http://127.0.0.1:9200/bis_bc_rec
kibana清除某个索引的数据
POST bis_wd_rec/_delete_by_query
{
 "query": {"match_all": {}}
}
curl -XPOST 'http://127.0.0.1:9200/fund_rec/_delete_by_query?refresh&slices=5&pretty' -H 'Content-Type: application/json' -d'
{
 "query": {
   "match_all": {}
}
}'
curl -XGET 'http://localhost:9200/fund_rec/_doc/_count?pretty'
curl -i -XDELETE http://127.0.0.1:9200/fund_rec

1.fund_rec
curl -XPUT 'localhost:9200/fund_rec' -H 'content-Type:application/json' -d '
{
  "settings":{
       "number_of_shards":1,
       "number_of_replicas":2
  },
   "mappings":{
       "properties":{
           "id":{
               "type":"long"
          },
           "orderNo":{
               "type":"long"
          },
           "channelNo":{
                "type": "keyword"
          },
           "channelCode":{
                "type": "keyword"
          },
           "tradeState":{
               "type" : "short"
          },
           "orderAmount":{
               "type": "scaled_float",
               "scaling_factor": 100
          },
           "realAmount":{
               "type": "scaled_float",
               "scaling_factor": 100
          },
           "createTime":{
               "type":"date",
               "format":"yyyy-MM-dd HH:mm:ss"
          },
           "updateTime":{
               "type":"date",
               "format":"yyyy-MM-dd HH:mm:ss"
          },
           "tradeTime":{
               "type":"date",
               "format":"yyyy-MM-dd HH:mm:ss"
          },
           "refundTime":{
               "type":"date",
               "format":"yyyy-MM-dd HH:mm:ss"
          },
           "recState":{
               "type" : "short"
          },
           "recType":{
               "type" : "short"
          },
           "billType":{
               "type" : "short"
          },
           "refundAmount":{
               "type": "scaled_float",
               "scaling_factor": 100
          },
           "centOrders":{
               "type" : "object"
          },
           "threeBills":{
               "type" : "object"
          },
           "desc":{
               "type":"text"
          },
           "billTime":{
               "type":"date",
               "format":"yyyy-MM-dd"
          },
           "isDel":{
               "type" : "short"
          },
"historyData":{
                "type":"text"
          }
      }
  }
}'

2.bis_bc_rec
curl -XPUT 'localhost:9200/bis_bc_rec' -H 'content-Type:application/json' -d '
{
 "settings":{
       "number_of_shards":1,
       "number_of_replicas":2
},
 "mappings": {
   "properties": {
     "id": {
       "type": "long"
    },
     "recType": {
       "type": "short"
    },
     "orderNo": {
       "type": "long"
    },
     "yyOrderId": {
       "type": "long"
    },
     "cxOrderId": {
       "type": "keyword"
    },
 "originalOrderId":{
               "type": "keyword"
    },
     "orderType": {
       "type": "long"
    },
     "channelNo": {
       "type": "keyword"
    },
     "channelCode": {
       "type": "keyword"
    },
     "tradeState": {
       "type": "long"
    },
     "orderAmount": {
       "type": "float"
    },
     "realAmount": {
       "type": "float"
    },
     "centRealAmount": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "isDiscount": {
       "type": "short"
    },
     "discountName": {
       "type": "keyword"
    },
     "disAmount": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "bisType": {
       "type": "short"
    },
     "createTime": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss"
    },
     "updateTime": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss"
    },
     "tradeTime": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss"
    },
     "recState": {
       "type": "short"
    },
     "centRefundAmount": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "refundTotalAmount": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "bisRefundAmount": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "isRefund": {
       "type": "short"
    },
     "refundNumb": {
       "type": "short"
    },
     "refundType": {
       "type": "short"
    },
     "refundTime": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss"
    },
     "centOrders": {
   "type" : "object"
    },
     "bisData": {
       "type" : "object"
    },
     "desc": {
       "type": "text"
    },
     "billTime": {
       "type": "date",
       "format": "yyyy-MM-dd"
    },
     "isDel": {
       "type": "short"
    },
     "ownLine": {
       "type": "keyword"
    },
     "isChange": {
       "type": "short"
    },
     "isUpdateOrder": {
       "type": "short"
    },
     "costDetails": {
       "type": "text"
    },
     "businessId": {
       "type": "keyword"
    },
     "useIntegral": {
       "type": "integer"
    },
     "isAddressOrder": {
       "type": "short"
    },
     "isChangeOrder": {
       "type": "keyword"
    },
     "changeAddrDifference": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "changeDifference": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "isPartRefund": {
       "type": "short"
    },
     "refundMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "orderSource": {
       "type": "short"
    },
     "userId": {
       "type": "long"
    },
     "mobile": {
       "type": "keyword"
    },
     "status": {
       "type": "short"
    },
     "tradeNo": {
       "type": "long"
    },
     "refundNo": {
       "type": "long"
    },
     "cxOrderMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "orderMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "discountMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "cashDiscount": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "integralNum": {
       "type": "integer"
    },
     "realPayMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "channel": {
       "type": "keyword"
    },
     "payTime": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss"
    },
     "lineId": {
       "type": "integer"
    },
     "lineName": {
       "type": "keyword"
    },
     "riderMumber": {
       "type": "integer"
    },
     "startRunTime": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss"
    },
     "startCity": {
       "type": "keyword"
    },
     "endCity": {
       "type": "keyword"
    },
     "startAddress": {
       "type": "keyword"
    },
     "endAddress": {
       "type": "keyword"
    },
     "intercityMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "departurePickupMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "payOnArrival": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "lineSurchargeMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "endLineSurchargeMoney": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "commission": {
       "type": "scaled_float",
       "scaling_factor": 100
    },
     "carParkId": {
       "type": "integer"
    },
     "carParkName": {
       "type": "keyword"
    },
     "pickUpTime": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss"
    },
     "pickUpType": {
       "type": "short"
    },
     "driverMobile": {
       "type": "keyword"
    },
     "driverName": {
       "type": "keyword"
    },
     "carLicensePlateNum": {
       "type": "keyword"
    },
     "kmDriverMobile": {
       "type": "keyword"
    },
     "kmCarLicensePlateNum": {
       "type": "keyword"
    },
     "kmDriverName": {
       "type": "keyword"
    },
      "historyData":{
           "type":"text"
      }
  }
}
}'

3.总结

      项目的技术选型至关重要,如果选型选的很LOW,那么后期就会很痛苦和很蛋疼,要结合实际的业务场景和需求还有来选择适合的技术方案,一个mysql打天下已经不能满足我们日益增长、复杂和多变的业务需求了,所以我们的与时俱进,从单一走向混合,从传统走向微服务云原生大数据人工智能,未来对架构的要求和能力也是越来越高的,不在是简单的CRUD和一个mysql打天下,而是技能和业务都很牛逼才不会被淘汰的,所以这个行业也越来越卷。

请使用浏览器的分享功能分享到微信等