mysql单表一千多万条数据同步6分钟处理完

1. 需求

      由于最近在搞会员重构,涉及到的积分变动日志表等数据需要同步清洗处理,把线上的积分变动日志表的数据导出,然后导入测试库,需要将这个测试库中的这个积分变动日志表的数据同步清洗到另外一个数据库的表中,这个积分变动日志表的数据有10107934条数据,这么大的数据量下怎么查得动?

mybatisPlus批量插入优化性能快的飞起

https://mp.weixin.qq.com/s/zgVXP_iN5C54ZsRgo8cPPA


2. 方案

2.1  使用flink-cdc

省略,由于时间有限,生产环境单独使用flink组件,flink的高可用环境部署有些麻烦,所以就没有用这种方式,这种方式是可以解决这个需求的,这里就使用写业务代码的方式来同步处理全量的数据,我往期的王章有关于flink-cdc将mysql表中的数据同步到es中的文章,使用flink-cdc的方式是可以全量、增量和实时都可以实现,下面写业务代码的方式实现的是全量的同步处理,还得写根据时间同步上线发布后每天的增量数据,这个也是比较麻烦的。

ApacheStreamPark2.1.0部署及执行flink-cdc任务同步mysql表的数据到es的实践

https://mp.weixin.qq.com/s/i9oLHCK6pOgVnnawdeyDdg


2.2  使用传统的Limit分页、数据分片、mybatisPlus的saveBanch和线程池异步

使用这种方式有两个点会很慢:

1).Limit的深度分页问题

后面的页会很慢,这个需要优化

优化参看如下文章

https://blog.csdn.net/xxxxg_xg/article/details/126143246?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168983939916800188566879%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=168983939916800188566879&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-8-126143246-null-null.142
这种优化方式赶脚没有多大的性能提升,还是慢的,还有一个思路是可以把id分段使用id>=xxx1 and id < xxx2这种计算好id的段来查,类似于分页,这个思路没有试过,有兴趣的可以试下。

2).mybatisPlus的saveBanch这个方法会很慢很慢

可以将这个方式换成mybatisPlus框架的InsertBatchSomeColumn这个sql注入器方式批量插入,我往期的王章里面有关于这个的sql注入器的讲解。
所以这种方式就不推荐了,很慢很慢,,,,,慢到你怀疑人生,所以不要使用这种方式。


2.3  使用多数据源、mybatis的游标查询、mybatisPlus框架的InsertBatchSomeColumn这个sql注入器、数据分片和线程池异步插入的方式

2.3.1  准备

1).在源数据表的uniacid字段加了一个索引,不然查询的时候全表扫描,就会很慢

2).数据插入的目标表id为主键,主键不设置自增

@TableId(value = "id", type = IdType.INPUT)
private Integer id;
目标表中的id设置位源数据表中的id,下面代码中有相关的代码,这里设置以后,等全量数据同步完成后设置表中的id自增,然后修改实体中的idz字段为:
@TableId(value = "id", type = IdType.AUTO)
private Long id;
然后将表的id自增起始值设置为数据同步后记录的max(id) +1开始自增,这种后面进入的数据的id生成策略就是自增的策略了。

3).插入目的表字段除id外设置可以为null,不勾选不是null的限制

如果目的表中有设置了不为null的字段,源数据表中对应的数据是null的会导致InsertBatchSomeColumn的sql注入器插入的那一批次的数据失败,导致最后同步到目的表中的数据条数跟源数据表中的数据有所减少,最终的数据条数不一致,还有一个原因是由于源数据表中的字段是null的数据,在业务代码中取出源数据做一些转换操作的时候,没有判断null出现空指针异常,从而导致异步线程插入一批数据由于业务代码空指针导致事务回滚,从而导致最终同步的数据条数出现缺少。


2.4  多数据源配置

引入多数据源依赖:

       <dependency>
           <groupId>com.baomidougroupId>
           <artifactId>dynamic-datasource-spring-boot-starterartifactId>
           <version>2.4.2version>
       dependency>
       <dependency>
           <groupId>p6spygroupId>
           <artifactId>p6spyartifactId>
           <version>3.8.0version>
       dependency>

nacos上的yaml配置:

spring:
application:
  name: xxxx
datasource:
  p6spy: true
  dynamic:
    datasource:
     # 主库只能配置一个
      master:
        username: root
        password: xxxx
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://ip:3306/xxx数据库名?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
      # 从库可以配置多个xxx_xxx是从库的切换名字,这个xxx_xxx名字可以随便取,只要不重复即可
      xxx_xxx:
        username: root
        password: xxxx
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://ip:3306/xxx数据库名?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
      # xxx_xxxn ,,,,,,,,从库n

MyBatisPlus多数据源加ES大宽表架构落地实践

https://mp.weixin.qq.com/s/Dtse2oMcChcE0xjVxXQk_Q

2.5 游标查询

public interface McMembersMapper extends BaseMapper<McMembers> {
   /**
    * 分页游标查询
    * 1. fetchSize = Integer.MIN_VALUE fetchSize只能设置为这个值,性能才是最佳的,如果设置成其它的值没有设置为这个值快,设置成其它值会比设置成这个值查询速度慢
    * 2.uniacid字段建一个btree的索引可以避免全表扫描而查询慢
    * 3.这里不在游标中使用Limit xxx1,xxx2这种Limit的查询方式本身就用深度分页的问题,所以这里不使用Limit分页,还去传start(从第几条开始取)和pageSize(每页多少条)这两个参数来分页,游标查询根本不需要分页,如果写了分页会跟传统的Limit一样的慢。
    * @return
    */
   @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
   @Select("SELECT id,op_type,op_type_name,flow_type,op_type_name,is_del,uid,order_no ,from_id ,credit_num,is_exp FROM ims_testcar_credit_log WHERE uniacid = 8 AND credit_num is not null AND uid is not null ORDER BY id ASC")
   Cursor<TestcarCreditLogVO> getCursorData();

}


2.6 sql注入器配置

MySqlInjector类:

package xxxx;

import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.core.injector.AbstractMethod;
import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector;
import com.baomidou.mybatisplus.core.metadata.TableInfo;
import com.baomidou.mybatisplus.extension.injector.methods.AlwaysUpdateSomeColumnById;
import com.baomidou.mybatisplus.extension.injector.methods.InsertBatchSomeColumn;
import com.baomidou.mybatisplus.extension.injector.methods.Upsert;

import java.util.List;

public class MySqlInjector extends DefaultSqlInjector {

   @Override
   public List<AbstractMethod> getMethodList(Class mapperClass, TableInfo tableInfo) {
       List<AbstractMethod> methodList = super.getMethodList(mapperClass, tableInfo);
       methodList.add(new InsertBatchSomeColumn(i -> i.getFieldFill() != FieldFill.UPDATE)); //批量更新
       methodList.add(new AlwaysUpdateSomeColumnById(i -> i.getFieldFill() != FieldFill.INSERT)); // 根据id更新(全量更新不忽略null)
       methodList.add(new Upsert());//添加或更新一条数据
       return methodList;
  }

}

MybatisPlusConfig类,配置注入MySqlInjector

package xxxxx;

import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import com.dy.member.sqlinjector.MySqlInjector;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
@MapperScan("xxx.xxxx.dao")
public class MybatisPlusConfig {
   @Bean
   public MybatisPlusInterceptor mybatisPlusInterceptor() {
       MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
       interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
       return interceptor;
  }

   @Bean
   @Primary//批量插入配置
   public MySqlInjector mySqlInjector() {
       return new MySqlInjector();
  }

}

Mapper中加入insertBatchSomeColumn方法:

public interface CreditRecordMapper extends BaseMapper<CreditRecord> {

   /**
    * 批量插入 仅适用于mysql
    *
    * @param entityList 实体列表
    * @return 影响行数
    */
   Integer insertBatchSomeColumn(Collection<CreditRecord> entityList);

}


2.7 异步线程池配置

package xxxx;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

public class EventListenerExecutor {
   public static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

   static {
       executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()*2);
       // 配置最大线程数
       executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()*4);
       // 配置缓存队列大小
       executor.setQueueCapacity(10000);
       // 空闲线程存活时间
       executor.setKeepAliveSeconds(15);
       executor.setThreadNamePrefix("event-listener");
       // 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       // 等待所有任务结束后再关闭线程池
       executor.setWaitForTasksToCompleteOnShutdown(true);
       // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
       executor.setAwaitTerminationSeconds(60);
       executor.initialize();
  }
   public static void execute(Runnable task) {
       executor.execute(task);
  }

}
以上参数根据自己的实际修改,这里也可以使用开源的动态数据源框架动态的来配置以上参数和拒绝策略,可以参考:

Hippo4j和DynamicTp动态线程池介绍和使用中遇到的坑

https://mp.weixin.qq.com/s/yHfo8SSsi5k95Y9YLqPgng


2.8 同步业务代码

CreditRecordAsyncExecutor类:

@Component
@RequiredArgsConstructor
@Slf4j
public class CreditRecordAsyncExecutor {

   private final McMembersService mcMembersService;

   public void execute() {
       long start = System.currentTimeMillis();
       mcMembersService.dealData();
       long end = System.currentTimeMillis();
       log.info("数据处理耗时[{}]分钟", (end - start) / (1000 * 60));
  }

}

McMembersService类的dealData方法:

package xxxx.xxxx.impl;

import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;


@Slf4j
@DS("xxx_xxx") // 切换到yaml配置的从库,这里的名字跟yaml配置的名字一致
@Service
public class McMembersServiceImpl extends ServiceImpl<McMembersMapper, McMembers> implements McMembersService {

   @Autowired
   private SqlSessionFactory sqlSessionFactory;

   @Autowired
   private CreditRecordService creditRecordService;

   @Override
   public void dealData() {
       //每一批数据2万条
       int pageSize = 20000;
       AtomicInteger index = new AtomicInteger(0);
       AtomicInteger totalIndex = new AtomicInteger(0);
       try (SqlSession sqlSession = sqlSessionFactory.openSession();
            Cursor<TestcarCreditLogVO> pageCursor = sqlSession.getMapper(McMembersMapper.class).getCursorData()) {
           List<TestcarCreditLogVO> creditLogVOS = new ArrayList<>();
           for (TestcarCreditLogVO testcarCreditLogVO : pageCursor) {
               creditLogVOS.add(testcarCreditLogVO);
               totalIndex.getAndIncrement();
               log.info("total:{}", totalIndex.get());
               if (index.getAndIncrement() == pageSize) {
                   //使用CopyOnWriteArrayList拷贝ArrayList,不然下面的 creditLogVOS.clear()会报不可修改的错误,因为如果直接将creditLogVOS对象传入到 creditRecordService.dealData方法中,日志打印调用了creditLogVOS.size()会报这个不可修改的错,为了安全使用CopyOnWriteArrayList来拷贝一份数据后面传入到线程中使用
                   CopyOnWriteArrayList<TestcarCreditLogVO> testcarCreditLogVOS = new CopyOnWriteArrayList<>(creditLogVOS);
                   log.info("testcarCreditLogVOS1.size:{}", testcarCreditLogVOS.size());
                   creditRecordService.dealData(testcarCreditLogVOS);
                   creditLogVOS.clear();
                   log.info("清空list:{}", creditLogVOS.size());
                   index.set(0);
              }
          }
           if (CollectionUtil.isNotEmpty(creditLogVOS)) {
               CopyOnWriteArrayList<TestcarCreditLogVO> testcarCreditLogVOS = new CopyOnWriteArrayList<>(creditLogVOS);
               log.info("testcarCreditLogVOS2.size:{}", testcarCreditLogVOS.size());
               creditRecordService.dealData(testcarCreditLogVOS);
          }
      } catch (IOException e) {
           log.error("游标查询异常:{}", e.getMessage());
      }
       log.info("total:{}", totalIndex.get());

  }
}

creditRecordService.dealData方法如下:

package xxxx.xxxxx.impl;

import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;

@Slf4j
@Service
@RequiredArgsConstructor
public class CreditRecordServiceImpl extends ServiceImpl<CreditRecordMapper, CreditRecord> implements CreditRecordService {

   private final CreditRecordMapper creditRecordMapper;

   @Override
   public void dealData(CopyOnWriteArrayList<TestcarCreditLogVO> creditLogVOS) {
       if (CollectionUtil.isNotEmpty(creditLogVOS)) {
           log.info("dealData插入数据条数:{}",creditLogVOS.size());
           EventListenerExecutor.execute(() -> {
               doWork(creditLogVOS);
          });
      }
  }

   private void doWork(CopyOnWriteArrayList<TestcarCreditLogVO> creditLogVOS) {
       List<CreditRecord> result = new ArrayList<>();
       for (TestcarCreditLogVO creditLogVO : creditLogVOS) {
           CreditRecord creditRecord = new CreditRecord();
           //设置id为源数据表中的id值
           creditRecord.setId(creditLogVO.getId());
           creditRecord.setContent(creditLogVO.getOpTypeName());
           //源表字段的数据有null的需要判空处理,否则导致空指针异常会让该批次数据执行失败(异常事务回滚,最终导致同步后数据缺少)
           if(Objects.nonNull(creditLogVO.getCreditNum())){
               creditRecord.setNum(BigDecimal.valueOf(creditLogVO.getCreditNum()));
          }else {
               continue;
          }
           creditRecord.setOrderNo(creditLogVO.getOrderNo());
           creditRecord.setStatus(1);
           // 注释同上面的第一个注释
           if (Objects.nonNull(creditLogVO.getUid())) {
               creditRecord.setMemberId(Long.valueOf(creditLogVO.getUid()));
          }else {
               continue;
          }
           creditRecord.setDeleted(creditLogVO.getIsDel());
           Integer createtime = creditLogVO.getUpdateTime();
           // 注释同上面的第一个注释
           if (Objects.nonNull(createtime)) {
               LocalDateTime localDateTime = LocalDateTime.ofInstant(
                       Instant.ofEpochMilli(createtime), ZoneOffset.ofHours(8)
              );
               creditRecord.setCreateTime(localDateTime);
          }
           creditRecord.setFromId(creditLogVO.getFromId());
           creditRecord.setFlowType(creditLogVO.getFlowType());
           creditRecord.setIsExp(creditLogVO.getIsExp());
           creditRecord.setOpType(creditLogVO.getOpType());
           result.add(creditRecord);
      }
       log.info("插入数据条数:{}", result.size());
       creditRecordMapper.insertBatchSomeColumn(result);
  }

}


3. 总结

到此,我的分享就结束了,是一个总结、记录和分享,以后遇到直接信手拈来分分钟搞定,上面的一些坑已经说明,最终同步的数据条数是源数据表中的数据目的表中的数据条数是一致的,如果是多表同步不要关联查询,因为单表的数据量就很大,单表查询都很慢,关联多个表查就会更慢,需要把关联查询拆分为单表分步骤一个表一个表的去找到数据然后逐一同步即可,如果有目的表中的数据条数有缺少跟源表不一致,可以检查上面那个几个会导致数据少的地方,导致数据缺少的原因有四个:1.目的表的主键id是自增的(这个原因可以设置id主键为自增后验证下看看,然后把doWork方法中的 这个行creditRecord.setId(creditLogVO.getId())代码注释后同步验证下看看是否会导致同步后数据缺失),2.目的表出主键外的字段设置了非空校验策略(去除即可),否则会导致数据插入失败从而少数据,3.业务代码数据清晰转化代码中出现了空指针异常(或者其它异常)导致那事务回滚后那一批次数据没有保存成功而导致少数据,创作不易,请有一点版权意识,抄袭请放上原文连接,否则看到直接举报,希望我的分享对你有所帮助,请一键三连,么么哒!



请使用浏览器的分享功能分享到微信等