
1.问题
2.解决办法
2.1)设置合理的线程池参数
package xxxxx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class EventListenerExecutor {
public static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
static {
executor.setCorePoolSize(4);
// 配置最大线程数
executor.setMaxPoolSize(8);
// 配置缓存队列大小
executor.setQueueCapacity(10000);
// 空闲线程存活时间
executor.setKeepAliveSeconds(15);
executor.setThreadNamePrefix("event-listener");
// 线程池拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//自定义数据策略
executor.setRejectedExecutionHandler((r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
log.error("线程处理拒绝策略失败:{}",e.getMessage());
e.printStackTrace();
}
});
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
executor.setAwaitTerminationSeconds(60);
executor.initialize();
}
/**
* 直接执行 不给返回值
* @param task
*/
public static void execute(Runnable task) {
executor.execute(task);
}
/**
* 执行一哈 给返回值
* @param task 定时处理
* @return
* @param
*/
public static <T> Future<T> submit(Callable<T> task){
return executor.submit(task);
}
}
2.2)设置url连接参数
mysql的驱动连接的url需要加:rewriteBatchedStatements=true参数
关于rewriteBatchedStatements这个参数介绍:
MySQL的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL另外这个选项对INSERT/UPDATE/DELETE都有效
spring:
datasource:
p6spy: true
dynamic:
datasource:
master:
username: xxx
password: xxxxx
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://xxxx:3306/xxxxxx?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true
old_db:
url: jdbc:mysql://xxxxxx:3306/xxxxx?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: xxxxxxx
password: xxxx
driver-class-name: com.mysql.cj.jdbc.Driver
2.3) 优化msql的系统参数
set global max_allowed_packet=1024 *1024 * 512; # 单个packet可以允许的最大值
set global max_connections = 60000; # 并发连接请求量比较大,建议调高此值,以增加并行连接数量
set global innodb_lock_wait_timeout=16 * 1024; # 事务锁超时时间,默认50s,超过就报错
set global bulk_insert_buffer_size=512 * 1024 * 1024; # 加快insert插入效率
set global wsrep_max_ws_size=1024*1024*1024*4; # 避免事务大小超过限制,最大4G
2.4)使用CountDownLatch减法计数器和数据插入的公共方法新开一个事务
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void dealData(CopyOnWriteArrayList<CreditsRecordVO> creditsRecordVOS, CountDownLatch countDownLatch) {
if (CollectionUtil.isNotEmpty(creditsRecordVOS)) {
EventListenerExecutor.execute(() -> {
doWork(creditsRecordVOS);
countDownLatch.countDown();
});
}
}
2.5)sql批量注入器执行成功后,当前线程sleep(1000)睡1s
Integer cr = creditRecordMapper.insertBatchSomeColumn(result);
if (cr > 0) {
log.info("插入积分记录ok");
Thread.sleep(1000);
}
3.业务代码套路如下
McMembersServiceImpl类如下:
package xxxxx;
import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@DS("old_db")
@Service
public class McMembersServiceImpl extends ServiceImpl<McMembersMapper, McMembers> implements McMembersService {
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Autowired
private CreditRecordService creditRecordService;
@Override
public void dealData() {
int pageSize = 20000;
AtomicInteger index = new AtomicInteger(1);
AtomicInteger totalIndex = new AtomicInteger(0);
Long total2 = this.baseMapper.getTotal2();
log.info("total2:{}",total2);
Long pageCount2 = (total2 + pageSize - 1) / pageSize; //推荐写法
log.info("pageCount2:{}", pageCount2);
CountDownLatch countDownLatch = new CountDownLatch(Math.toIntExact(pageCount2));
try (SqlSession sqlSession = sqlSessionFactory.openSession();
Cursor<CreditsRecordVO> pageCursor = sqlSession.getMapper(McMembersMapper.class).getCursorData()) {
List<CreditsRecordVO> creditsRecordVOS = new ArrayList<>();
for (CreditsRecordVO creditsRecordVO : pageCursor) {
creditsRecordVOS.add(creditsRecordVO);
totalIndex.getAndIncrement();
log.info("total:{}", totalIndex.get());
if (index.getAndIncrement() == pageSize) {
CopyOnWriteArrayList<CreditsRecordVO> creditsRecordVO2 = new CopyOnWriteArrayList<>(creditsRecordVOS);
log.info("creditsRecordVO2.size:{}", creditsRecordVO2.size());
creditRecordService.dealData(creditsRecordVO2, countDownLatch);
creditsRecordVOS.clear();
log.info("清空list:{}", creditsRecordVOS.size());
index.set(0);
}
}
if (CollectionUtil.isNotEmpty(creditsRecordVOS)) {
CopyOnWriteArrayList<CreditsRecordVO> creditsRecordVO3 = new CopyOnWriteArrayList<>(creditsRecordVOS);
log.info("creditsRecordVO3.size:{}", creditsRecordVO3.size());
creditRecordService.dealData(creditsRecordVO3, countDownLatch);
creditsRecordVOS.clear();
log.info("清空list2:{}", creditsRecordVOS.size());
index.set(0);
}
countDownLatch.await();
} catch (Exception e) {
log.error("游标查询异常:{}", e.getMessage());
}
log.info("total:{}", totalIndex.get());
}
@Override
public Long getTotal2() {
return baseMapper.getTotal2();
}
}
CreditRecordServiceImpl类如下:
package xxxxxxx;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeanUtils;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAdjusters;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class CreditRecordServiceImpl extends ServiceImpl<CreditRecordMapper, CreditRecord> implements CreditRecordService {
private final TransactionDefinition transactionDefinition;
private final DataSourceTransactionManager transactionManager;
private final CreditRecordMapper creditRecordMapper;
private final MemberMapper memberMapper;
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void dealData(CopyOnWriteArrayList<CreditsRecordVO> creditsRecordVOS, CountDownLatch countDownLatch) {
if (CollectionUtil.isNotEmpty(creditsRecordVOS)) {
EventListenerExecutor.execute(() -> {
doWork(creditsRecordVOS);
countDownLatch.countDown();
});
}
}
private void doWork(CopyOnWriteArrayList<CreditsRecordVO> creditLogVOS) {
try {
List<CreditRecord> result = new ArrayList<>();
for (CreditsRecordVO creditLogVO : creditLogVOS) {
CreditRecord creditRecord = new CreditRecord();
creditRecord.setId(Long.valueOf(creditLogVO.getId()));
creditRecord.setContent(creditLogVO.getRemark() != null ? creditLogVO.getRemark() : "");
if (Objects.nonNull(creditLogVO.getNum())) {
creditRecord.setNum(BigDecimal.valueOf(creditLogVO.getNum()));
if (creditLogVO.getNum() > 0) {
creditRecord.setFlowType(1);
} else if (creditLogVO.getNum() < 0) {
creditRecord.setFlowType(1);
}
} else {
continue;
}
creditRecord.setCreditType(creditLogVO.getCredittype());
creditRecord.setStatus(1);
if (Objects.nonNull(creditLogVO.getUid())) {
creditRecord.setMemberId(Long.valueOf(creditLogVO.getUid()));
} else {
continue;
}
Integer createtime = creditLogVO.getCreatetime();
if (Objects.nonNull(createtime)) {
LocalDateTime localDateTime = LocalDateTime.ofInstant(
Instant.ofEpochSecond(createtime), ZoneOffset.ofHours(8)
);
creditRecord.setCreateTime(localDateTime);
}
creditRecord.setBalanceSnapshot(BigDecimal.valueOf(-1));
creditRecord.setDeleted(0);
creditRecord.setIsExp(0);
creditRecord.setFromId(0L);
creditRecord.setOpType(0);
creditRecord.setOrderNo("");
creditRecord.setUpdateTime(LocalDateTime.now());
result.add(creditRecord);
}
log.info("插入数据条数:{}", result.size());
Integer cr = creditRecordMapper.insertBatchSomeColumn(result);
if (cr > 0) {
log.info("插入积分记录ok");
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
log.error("插入积分记录异常:{}", ExceptionUtils.getMessage(e));
}
}
}
4.总结
