
1. 需求
由于最近在搞会员重构,涉及到的积分变动日志表等数据需要同步清洗处理,把线上的积分变动日志表的数据导出,然后导入测试库,需要将这个测试库中的这个积分变动日志表的数据同步清洗到另外一个数据库的表中,这个积分变动日志表的数据有10107934条数据,这么大的数据量下怎么查得动?
https://mp.weixin.qq.com/s/zgVXP_iN5C54ZsRgo8cPPA
2. 方案
2.1 使用flink-cdc
ApacheStreamPark2.1.0部署及执行flink-cdc任务同步mysql表的数据到es的实践
https://mp.weixin.qq.com/s/i9oLHCK6pOgVnnawdeyDdg
2.2 使用传统的Limit分页、数据分片、mybatisPlus的saveBanch和线程池异步
使用这种方式有两个点会很慢:
1).Limit的深度分页问题
后面的页会很慢,这个需要优化
优化参看如下文章:
https://blog.csdn.net/xxxxg_xg/article/details/126143246?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168983939916800188566879%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=168983939916800188566879&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-8-126143246-null-null.142
2).mybatisPlus的saveBanch这个方法会很慢很慢
2.3 使用多数据源、mybatis的游标查询、mybatisPlus框架的InsertBatchSomeColumn这个sql注入器、数据分片和线程池异步插入的方式
2.3.1 准备
1).在源数据表的uniacid字段加了一个索引,不然查询的时候全表扫描,就会很慢
2).数据插入的目标表id为主键,主键不设置自增
@TableId(value = "id", type = IdType.INPUT)
private Integer id;
@TableId(value = "id", type = IdType.AUTO)
private Long id;
3).插入目的表字段除id外设置可以为null,不勾选不是null的限制
2.4 多数据源配置
引入多数据源依赖:
<dependency>
<groupId>com.baomidougroupId>
<artifactId>dynamic-datasource-spring-boot-starterartifactId>
<version>2.4.2version>
dependency>
<dependency>
<groupId>p6spygroupId>
<artifactId>p6spyartifactId>
<version>3.8.0version>
dependency>
nacos上的yaml配置:
spring:
application:
name: xxxx
datasource:
p6spy: true
dynamic:
datasource:
# 主库只能配置一个
master:
username: root
password: xxxx
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://ip:3306/xxx数据库名?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
# 从库可以配置多个xxx_xxx是从库的切换名字,这个xxx_xxx名字可以随便取,只要不重复即可
xxx_xxx:
username: root
password: xxxx
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://ip:3306/xxx数据库名?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
# xxx_xxxn ,,,,,,,,从库n
https://mp.weixin.qq.com/s/Dtse2oMcChcE0xjVxXQk_Q
2.5 游标查询
public interface McMembersMapper extends BaseMapper<McMembers> {
/**
* 分页游标查询
* 1. fetchSize = Integer.MIN_VALUE fetchSize只能设置为这个值,性能才是最佳的,如果设置成其它的值没有设置为这个值快,设置成其它值会比设置成这个值查询速度慢
* 2.uniacid字段建一个btree的索引可以避免全表扫描而查询慢
* 3.这里不在游标中使用Limit xxx1,xxx2这种Limit的查询方式本身就用深度分页的问题,所以这里不使用Limit分页,还去传start(从第几条开始取)和pageSize(每页多少条)这两个参数来分页,游标查询根本不需要分页,如果写了分页会跟传统的Limit一样的慢。
* @return
*/
@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
@Select("SELECT id,op_type,op_type_name,flow_type,op_type_name,is_del,uid,order_no ,from_id ,credit_num,is_exp FROM ims_testcar_credit_log WHERE uniacid = 8 AND credit_num is not null AND uid is not null ORDER BY id ASC")
Cursor<TestcarCreditLogVO> getCursorData();
}
2.6 sql注入器配置
MySqlInjector类:
package xxxx;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.core.injector.AbstractMethod;
import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector;
import com.baomidou.mybatisplus.core.metadata.TableInfo;
import com.baomidou.mybatisplus.extension.injector.methods.AlwaysUpdateSomeColumnById;
import com.baomidou.mybatisplus.extension.injector.methods.InsertBatchSomeColumn;
import com.baomidou.mybatisplus.extension.injector.methods.Upsert;
import java.util.List;
public class MySqlInjector extends DefaultSqlInjector {
@Override
public List<AbstractMethod> getMethodList(Class> mapperClass, TableInfo tableInfo) {
List<AbstractMethod> methodList = super.getMethodList(mapperClass, tableInfo);
methodList.add(new InsertBatchSomeColumn(i -> i.getFieldFill() != FieldFill.UPDATE)); //批量更新
methodList.add(new AlwaysUpdateSomeColumnById(i -> i.getFieldFill() != FieldFill.INSERT)); // 根据id更新(全量更新不忽略null)
methodList.add(new Upsert());//添加或更新一条数据
return methodList;
}
}
MybatisPlusConfig类,配置注入MySqlInjector
package xxxxx;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import com.dy.member.sqlinjector.MySqlInjector;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
@MapperScan("xxx.xxxx.dao")
public class MybatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
@Bean
@Primary//批量插入配置
public MySqlInjector mySqlInjector() {
return new MySqlInjector();
}
}
Mapper中加入insertBatchSomeColumn方法:
public interface CreditRecordMapper extends BaseMapper<CreditRecord> {
/**
* 批量插入 仅适用于mysql
*
* @param entityList 实体列表
* @return 影响行数
*/
Integer insertBatchSomeColumn(Collection<CreditRecord> entityList);
}
2.7 异步线程池配置
package xxxx;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
public class EventListenerExecutor {
public static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
static {
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()*2);
// 配置最大线程数
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()*4);
// 配置缓存队列大小
executor.setQueueCapacity(10000);
// 空闲线程存活时间
executor.setKeepAliveSeconds(15);
executor.setThreadNamePrefix("event-listener");
// 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
executor.setAwaitTerminationSeconds(60);
executor.initialize();
}
public static void execute(Runnable task) {
executor.execute(task);
}
}
Hippo4j和DynamicTp动态线程池介绍和使用中遇到的坑
https://mp.weixin.qq.com/s/yHfo8SSsi5k95Y9YLqPgng
2.8 同步业务代码
CreditRecordAsyncExecutor类:
@Component
@RequiredArgsConstructor
@Slf4j
public class CreditRecordAsyncExecutor {
private final McMembersService mcMembersService;
public void execute() {
long start = System.currentTimeMillis();
mcMembersService.dealData();
long end = System.currentTimeMillis();
log.info("数据处理耗时[{}]分钟", (end - start) / (1000 * 60));
}
}
McMembersService类的dealData方法:
package xxxx.xxxx.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@DS("xxx_xxx") // 切换到yaml配置的从库,这里的名字跟yaml配置的名字一致
@Service
public class McMembersServiceImpl extends ServiceImpl<McMembersMapper, McMembers> implements McMembersService {
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Autowired
private CreditRecordService creditRecordService;
@Override
public void dealData() {
//每一批数据2万条
int pageSize = 20000;
AtomicInteger index = new AtomicInteger(0);
AtomicInteger totalIndex = new AtomicInteger(0);
try (SqlSession sqlSession = sqlSessionFactory.openSession();
Cursor<TestcarCreditLogVO> pageCursor = sqlSession.getMapper(McMembersMapper.class).getCursorData()) {
List<TestcarCreditLogVO> creditLogVOS = new ArrayList<>();
for (TestcarCreditLogVO testcarCreditLogVO : pageCursor) {
creditLogVOS.add(testcarCreditLogVO);
totalIndex.getAndIncrement();
log.info("total:{}", totalIndex.get());
if (index.getAndIncrement() == pageSize) {
//使用CopyOnWriteArrayList拷贝ArrayList,不然下面的 creditLogVOS.clear()会报不可修改的错误,因为如果直接将creditLogVOS对象传入到 creditRecordService.dealData方法中,日志打印调用了creditLogVOS.size()会报这个不可修改的错,为了安全使用CopyOnWriteArrayList来拷贝一份数据后面传入到线程中使用
CopyOnWriteArrayList<TestcarCreditLogVO> testcarCreditLogVOS = new CopyOnWriteArrayList<>(creditLogVOS);
log.info("testcarCreditLogVOS1.size:{}", testcarCreditLogVOS.size());
creditRecordService.dealData(testcarCreditLogVOS);
creditLogVOS.clear();
log.info("清空list:{}", creditLogVOS.size());
index.set(0);
}
}
if (CollectionUtil.isNotEmpty(creditLogVOS)) {
CopyOnWriteArrayList<TestcarCreditLogVO> testcarCreditLogVOS = new CopyOnWriteArrayList<>(creditLogVOS);
log.info("testcarCreditLogVOS2.size:{}", testcarCreditLogVOS.size());
creditRecordService.dealData(testcarCreditLogVOS);
}
} catch (IOException e) {
log.error("游标查询异常:{}", e.getMessage());
}
log.info("total:{}", totalIndex.get());
}
}
creditRecordService.dealData方法如下:
package xxxx.xxxxx.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
@Slf4j
@Service
@RequiredArgsConstructor
public class CreditRecordServiceImpl extends ServiceImpl<CreditRecordMapper, CreditRecord> implements CreditRecordService {
private final CreditRecordMapper creditRecordMapper;
@Override
public void dealData(CopyOnWriteArrayList<TestcarCreditLogVO> creditLogVOS) {
if (CollectionUtil.isNotEmpty(creditLogVOS)) {
log.info("dealData插入数据条数:{}",creditLogVOS.size());
EventListenerExecutor.execute(() -> {
doWork(creditLogVOS);
});
}
}
private void doWork(CopyOnWriteArrayList<TestcarCreditLogVO> creditLogVOS) {
List<CreditRecord> result = new ArrayList<>();
for (TestcarCreditLogVO creditLogVO : creditLogVOS) {
CreditRecord creditRecord = new CreditRecord();
//设置id为源数据表中的id值
creditRecord.setId(creditLogVO.getId());
creditRecord.setContent(creditLogVO.getOpTypeName());
//源表字段的数据有null的需要判空处理,否则导致空指针异常会让该批次数据执行失败(异常事务回滚,最终导致同步后数据缺少)
if(Objects.nonNull(creditLogVO.getCreditNum())){
creditRecord.setNum(BigDecimal.valueOf(creditLogVO.getCreditNum()));
}else {
continue;
}
creditRecord.setOrderNo(creditLogVO.getOrderNo());
creditRecord.setStatus(1);
// 注释同上面的第一个注释
if (Objects.nonNull(creditLogVO.getUid())) {
creditRecord.setMemberId(Long.valueOf(creditLogVO.getUid()));
}else {
continue;
}
creditRecord.setDeleted(creditLogVO.getIsDel());
Integer createtime = creditLogVO.getUpdateTime();
// 注释同上面的第一个注释
if (Objects.nonNull(createtime)) {
LocalDateTime localDateTime = LocalDateTime.ofInstant(
Instant.ofEpochMilli(createtime), ZoneOffset.ofHours(8)
);
creditRecord.setCreateTime(localDateTime);
}
creditRecord.setFromId(creditLogVO.getFromId());
creditRecord.setFlowType(creditLogVO.getFlowType());
creditRecord.setIsExp(creditLogVO.getIsExp());
creditRecord.setOpType(creditLogVO.getOpType());
result.add(creditRecord);
}
log.info("插入数据条数:{}", result.size());
creditRecordMapper.insertBatchSomeColumn(result);
}
}