分布式重试服务平台 Easy-Retry


1.简介

 在介绍这款开源产品前先给大家介绍一个开源组织:aizuda--爱组搭

1.1爱组搭官网

http://aizuda.com/

可以看到Easy-Retry就是爱组搭的开源项目之一。


1.2介绍

在分布式系统大行其道的当前,系统数据的准确性和正确性是重大的挑战,基于CAP理论,采用柔性事务,保障系统可用性以及数据的最终一致性成为技术共识 为了保障分布式服务的可用性,服务容错性,服务数据一致性 以及服务间调用的网络问题。依据"墨菲定律",增加核心流程重试, 数据核对校验成为提高系统鲁棒性常用的技术方案。

特性

  • 易用性 业务接入成本小。避免依赖研发人员的技术水平,保障重试的稳定性

  • 灵活性 能够动态调整配置,启动/停止任务,以及终止运行中的重试数据

  • 操作简单 分钟上手,支持WEB页面对重试数据CRUD操作。

  • 数据大盘 实时管控系统重试数据

  • 多样化退避策略 Cron、固定间隔、等级触发、随机时间触发

  • 容器化部署 服务端支持docker容器部署

  • 高性能调度平台 支持服务端节点动态扩容和缩容

  • 多样化重试类型 支持ONLY_LOCAL、ONLY_REMOTE、LOCAL_REMOTE多种重试类型

  • 重试数据管理 可以做到重试数据不丢失、重试数据一键回放

  • 支持多样化的告警方式 邮箱、企业微信、钉钉、飞书


1.3 相关地址

easy-retry官方文档地址

https://www.easyretry.com/

项目地址

https://toscode.mulanos.cn/aizuda/easy-retry

gitHub地址

https://github.com/aizuda/easy-retry

字节跳动: 如何优雅地重试

https://juejin.cn/post/6914091859463634951

java优雅重试机制spring-retry

https://mp.weixin.qq.com/s/vqmON5EOT17YDVLo-1JLNQ


2.架构

2.1系统架构图


2.2 客户端与服务端数据交互图


3.业内成熟重试组件对比


4.快速开始

4.1 服务端项目部署

4.1.0 初始化脚本

doc/sql/easy_retry.sql

该sql脚本在项目中的位置如图所示。

准备easy_retry数据,执行上面的初始化脚本:


4.1.1 源码部署

  • 下载源码

     https://gitee.com/aizuda/easy-retry.git

    https://github.com/aizuda/easy-retry.git
  • maven 打包镜像

maven clean install
  • 修改配置

/easy-retry-server/src/main/resources/application.yml

配置文件修改:

spring:
datasource:
  name: easy_retry
  url: jdbc:mysql://localhost:3306/x_retry?useSSL=false&characterEncoding=utf8&useUnicode=true
  username: root
  password: root
  ....其他配置信息....
easy-retry:
lastDays: 30 # 拉取重试数据的天数
retryPullPageSize: 100 # 拉取重试数据的每批次的大小
nettyPort: 1788 # 服务端netty端口
totalPartition: 32 # 重试和死信表的分区总数
  • 启动

java -jar easy-retry-server.jar


4.1.2 Docker部署

  • 下载镜像

docker pull byteblogs/easy-retry:1.5.0
  • 创建容器并运行

/**
* 如需自定义 mysql 等配置,可通过 "-e PARAMS" 指定,参数格式 PARAMS="--key1=value1 --key2=value2" ;
* 配置项参考文件:/easy-retry-server/src/main/resources/application.yml
* 如需自定义 JVM内存参数 等配置,可通过 "-e JAVA_OPTS" 指定,参数格式 JAVA_OPTS="-Xmx512m" ;
*/
docker run \
-e PARAMS="--spring.datasource.username=root --spring.datasource.password=root --spring.datasource.url=jdbc:mysql://IP:3306/easy_retry?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai " \
-p 8080:8080 \
-p 1788:1788 \
--name easy-retry-server-1 \
-d byteblogs/easy-retry:1.5.0

如果你已经正确启动系统了,那么你可以输入以下地址就可以进入管理系统了

http://localhost:8080

后台体验地址

地址: http://preview.easyretry.com/ 
账号: admin 密码: admin


4.2 客户端集成配置

4.2.1 添加依赖

项目中引入依赖


  com.aizuda
  easy-retry-client-starter
  1.5.0

4.2.2 配置

启动类上添加注解开启easy-retry功能

@SpringBootApplication
@EnableEasyRetry(group = "example_group")
public class ExampleApplication {
   public static void main(String[] args) {
       SpringApplication.run(ExampleApplication.class, args);
  }
}

配置服务地址:

easy-retry:
server:
  host: 127.0.0.1 #服务端的地址建议使用域名
  port: 1788 #服务端netty的端口号


4.2.3 基于@Retryable注解实现重试

为需要重试的方法添加重试注解

@Retryable(scene = "errorMethodForLocalAndRemote", localTimes = 3, retryStrategy = RetryType.LOCAL_REMOTE)
public String errorMethodForLocalAndRemote(String name) {
   double i = 1 / 0;
   return "这是一个简单的异常方法";
}


4.2.4 Retryable 详解


4.2.5自定义生成重试任务

注意:生成重试任务是将任务在客户端创建并上报到服务端,由服务端调度并通知客户端进行重试

ExecutorMethodRegister 详解

新建一个自定义任务执行器

// 这个一个自定义任务执行器
@ExecutorMethodRegister(scene = CustomSyncCreateTask.SCENE, async = false, timeout = 10000, unit = TimeUnit.MILLISECONDS, forceReport = true)
@Slf4j
public class CustomSyncCreateTask implements ExecutorMethod {

   public static final String SCENE = "customSyncCreateTask";

   @Override
   public Object doExecute(Object obj) {
       return "测试成功";
  }

}

在代码中执行重试

public void generateAsyncTaskTest() throws InterruptedException {

       Cat cat = new Cat();
       cat.setName("zsd");
       Zoo zoo = new Zoo();
       zoo.setNow(LocalDateTime.now());
       EasyRetryTemplate retryTemplate = RetryTaskTemplateBuilder.newBuilder()
          .withExecutorMethod(CustomAsyncCreateTask.class)
          .withParam(zoo)
          .withScene(CustomAsyncCreateTask.SCENE)
          .build();

       retryTemplate.executeRetry();

       Thread.sleep(90000);
  }

ExecutorMethodRegister 这个也是一个注解,这个我猜测是跟手动重试相关。


5.源码赏析

5.1 客户端自动装配入口

package com.aizuda.easy.retry.client.starter;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.aizuda.easy.retry.client.core")
@ConditionalOnProperty(prefix = "easy-retry", name = "enabled", havingValue = "true")
public class EasyRetryClientAutoConfiguration {

}

该自动装配类会将com.aizuda.easy.retry.client.core核心包下交给springBoot自动注入和管理。


5.2 Netty 客户端


5.3 客户端注册扫描Retryable和ExecutorMethodRegister

这两个注解的解析最终会被放到RetryerInfoCache这个类的的一个table中:

public class RetryerInfoCache {

   private static Table<String, String, RetryerInfo> RETRY_HANDLER_REPOSITORY = HashBasedTable.create();

   public static RetryerInfo put(RetryerInfo retryerInfo) {
      return RETRY_HANDLER_REPOSITORY.put(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), retryerInfo);
  }

   public static RetryerInfo get(String sceneName, String executorClassName) {
       return RETRY_HANDLER_REPOSITORY.get(sceneName, executorClassName);
  }

}

可以看出注册扫描信息始终是在内存中,没有上报给服务端的。


5.4 客户端重试触发入口

       从上图可以看出重试是在加了Retryable注解的方法上采用Aspect的AOP动态代理,当目标方法被调用前会被拦截,AOP的思想就是对目标对象的代理和增强

@Aspect 注解用于标识或者描述AOP中的切面类型,基于切面类型构建的对象用于为目标对象进行功能扩展或控制目标对象的执行。

@Pointcut 注解用于描述切面中的方法,并定义切面中的切入点,后面会对切入点表达式进行详解

@Around注解 用于描述切面中方法,这样的方法会被认为是一个环绕通知,后面会对aop各个通知类型详解

ProceedingJoinPoint 类为一个连接点类型,此类型的对象用于封装要执行的目标方法相关的一些信息。一般用于@Around注解描述的方法参数。

通知类型spring中定义了五种类型的通知,基于AspectJ框架标准,它们分别是:

环绕通知 (@Around) :包围一个连接点的通知,最强大的一种通知类型,环绕通知可以在方法前后完成自定义的行为,它可以自己选择是否继续执行连接点或直接返回方法的返回值或抛异常结束执行

前置通知 (@Before) :在指定连接点(join point)前执行的通知,但它不能阻止连接点前的执行(除非抛异常)

后置通知 (@After):在指定连接点(join point)退出的时候执行(不管是正常返回还是异常退出)

返回通知 (@AfterReturning) :在指定连接点(join point)正常返回后执行,如果抛出异常则不执行(和After通知同时存在则在After通知执行完之后再执行)

异常通知 (@AfterThrowing) :在目标方法抛出异常退出时执行

通知执行顺序

假如这些通知全部写到一个切面对象中,其执行顺序及过程,如图

      进入目标方法前先进入环绕通知(@Aroud)在环绕通知里调用连接点(joinPoint)的proceed方法后进入前置通知(@Before)前置通知执行完后进入目标方法(targetMethod)目标方法逻辑执行完进入环绕通知里调用proceed方法后的逻辑环绕通知全部执行完后进入后置通知(@After)后置通知执行完后若目标方法正常返回后则进入返回通知(@AfterReturning),若目标方法抛出异常则进入异常通知(@AfterThrowing)注:若是存在环绕通知(@Aroud)一定要调用连接点的proceed()方法,否则会在环绕通知后直接返回,跳过目标方法。

around环绕通知源码如下:

package com.aizuda.easy.retry.client.core.intercepter;
import cn.hutool.core.util.IdUtil;import cn.hutool.core.util.StrUtil;import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot.EnumStage;import com.aizuda.easy.retry.client.core.strategy.RetryStrategy;import com.aizuda.easy.retry.client.core.annotation.Retryable;import com.aizuda.easy.retry.client.core.retryer.RetryerResultContext;import com.aizuda.easy.retry.common.core.alarm.Alarm;import com.aizuda.easy.retry.common.core.alarm.AlarmContext;import com.aizuda.easy.retry.common.core.alarm.AltinAlarmFactory;import com.aizuda.easy.retry.common.core.constant.SystemConstants;import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;import com.aizuda.easy.retry.common.core.enums.RetryResultStatusEnum;import com.aizuda.easy.retry.common.core.log.LogUtils;import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;import com.aizuda.easy.retry.common.core.util.JsonUtil;import com.aizuda.easy.retry.server.model.dto.ConfigDTO;import lombok.extern.slf4j.Slf4j;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.annotation.Around;import org.aspectj.lang.annotation.Aspect;import org.aspectj.lang.reflect.MethodSignature;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.core.Ordered;import org.springframework.core.env.StandardEnvironment;import org.springframework.stereotype.Component;
import java.lang.reflect.Method;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.Objects;import java.util.UUID;
/** * @author: www.byteblogs.com * @date : 2022-03-03 11:41 */@Aspect@Component@Slf4jpublic class RetryAspect implements Ordered {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static String retryErrorMoreThresholdTextMessageFormatter = "{}环境 重试组件异常 \r\n" + "> 名称:{} \r\n" + "> 时间:{} \r\n" + "> 异常:{} \n" ;
@Autowired @Qualifier("localRetryStrategies") private RetryStrategy retryStrategy; @Autowired private AltinAlarmFactory altinAlarmFactory; @Autowired private StandardEnvironment standardEnvironment;
@Around("@annotation(com.aizuda.easy.retry.client.core.annotation.Retryable)") public Object around(ProceedingJoinPoint point) throws Throwable { String traceId = UUID.randomUUID().toString();
LogUtils.debug(log,"Start entering the around method traceId:[{}]", traceId); Retryable retryable = getAnnotationParameter(point); String executorClassName = point.getTarget().getClass().getName(); String methodEntrance = getMethodEntrance(retryable, executorClassName); if (StrUtil.isBlank(RetrySiteSnapshot.getMethodEntrance())) { RetrySiteSnapshot.setMethodEntrance(methodEntrance); }
Throwable throwable = null; Object result = null; RetryerResultContext retryerResultContext; try { result = point.proceed(); } catch (Throwable t) { throwable = t; } finally {
LogUtils.debug(log,"Start retrying. traceId:[{}] scene:[{}] executorClassName:[{}]", traceId, retryable.scene(), executorClassName); // 入口则开始处理重试 retryerResultContext = doHandlerRetry(point, traceId, retryable, executorClassName, methodEntrance, throwable); }
LogUtils.debug(log,"Method return value is [{}]. traceId:[{}]", result, traceId, throwable);
// 若是重试完成了, 则判断是否返回重试完成后的数据 if (Objects.nonNull(retryerResultContext)) { // 重试成功直接返回结果 若注解配置了isThrowException=false 则不抛出异常 if (retryerResultContext.getRetryResultStatusEnum().getStatus().equals(RetryResultStatusEnum.SUCCESS.getStatus()) || !retryable.isThrowException()) { return retryerResultContext.getResult(); } }
if (throwable != null) { throw throwable; } else { return result; }
}
private RetryerResultContext doHandlerRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, String methodEntrance, Throwable throwable) {
if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance) || RetrySiteSnapshot.isRunning() || Objects.isNull(throwable) // 重试流量不开启重试 || RetrySiteSnapshot.isRetryFlow() // 下游响应不重试码,不开启重试 || RetrySiteSnapshot.isRetryForStatusCode() ) { if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance)) { LogUtils.debug(log, "Non-method entry does not enable local retries. traceId:[{}] [{}]", traceId, RetrySiteSnapshot.getMethodEntrance()); } else if (RetrySiteSnapshot.isRunning()) { LogUtils.debug(log, "Existing running retry tasks do not enable local retries. traceId:[{}] [{}]", traceId, EnumStage.valueOfStage(RetrySiteSnapshot.getStage())); } else if (Objects.isNull(throwable)) { LogUtils.debug(log, "No exception, no local retries. traceId:[{}]", traceId); } else if (RetrySiteSnapshot.isRetryFlow()) { LogUtils.debug(log, "Retry traffic does not enable local retries. traceId:[{}] [{}]", traceId, RetrySiteSnapshot.getRetryHeader()); } else if (RetrySiteSnapshot.isRetryForStatusCode()) { LogUtils.debug(log, "Existing exception retry codes do not enable local retries. traceId:[{}]", traceId); } else { LogUtils.debug(log, "Unknown situations do not enable local retry scenarios. traceId:[{}]", traceId); } return null; }
return openRetry(point, traceId, retryable, executorClassName, throwable); }
private RetryerResultContext openRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, Throwable throwable) {
try {
// 标识重试流量 initHeaders(retryable);
RetryerResultContext context = retryStrategy.openRetry(retryable.scene(), executorClassName, point.getArgs()); LogUtils.info(log,"local retry result. traceId:[{}] message:[{}]", traceId, context); if (RetryResultStatusEnum.SUCCESS.getStatus().equals(context.getRetryResultStatusEnum().getStatus())) { LogUtils.debug(log, "local retry successful. traceId:[{}] result:[{}]", traceId, context.getResult()); }
return context; } catch (Exception e) { LogUtils.error(log,"retry component handling exception,traceId:[{}]", traceId, e);
// 预警 sendMessage(e);
} finally { RetrySiteSnapshot.removeAll(); }
return null; }
private void initHeaders(final Retryable retryable) {
EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders(); easyRetryHeaders.setEasyRetry(Boolean.TRUE); easyRetryHeaders.setEasyRetryId(IdUtil.getSnowflakeNextIdStr()); easyRetryHeaders.setDdl(GroupVersionCache.getDdl(retryable.scene())); RetrySiteSnapshot.setRetryHeader(easyRetryHeaders); }
private void sendMessage(Exception e) {
try { ConfigDTO.Notify notifyAttribute = GroupVersionCache.getNotifyAttribute(NotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene()); if (Objects.nonNull(notifyAttribute)) { AlarmContext context = AlarmContext.build() .text(retryErrorMoreThresholdTextMessageFormatter, EnvironmentUtils.getActiveProfile(), EasyRetryProperties.getGroup(), LocalDateTime.now().format(formatter), e.getMessage()) .title("retry component handling exception:[{}]", EasyRetryProperties.getGroup()) .notifyAttribute(notifyAttribute.getNotifyAttribute());
Alarm<AlarmContext> alarmType = altinAlarmFactory.getAlarmType(notifyAttribute.getNotifyType()); alarmType.asyncSendMessage(context); } } catch (Exception e1) { LogUtils.error(log, "Client failed to send component exception alert.", e1); }
}
public String getMethodEntrance(Retryable retryable, String executorClassName) {
if (Objects.isNull(retryable)) { return StrUtil.EMPTY; }
return retryable.scene().concat("_").concat(executorClassName); }
private Retryable getAnnotationParameter(ProceedingJoinPoint point) { String methodName = point.getSignature().getName(); Class classTarget = point.getTarget().getClass(); Class[] par = ((MethodSignature) point.getSignature()).getParameterTypes(); Method objMethod = null; try { objMethod = classTarget.getMethod(methodName, par); } catch (NoSuchMethodException e) { throw new EasyRetryClientException("注解配置异常:[{}}", methodName); } return objMethod.getAnnotation(Retryable.class); }
@Override public int getOrder() { String order = standardEnvironment .getProperty("easy-retry.aop.order", String.valueOf(Ordered.HIGHEST_PRECEDENCE)); return Integer.parseInt(order); }}

5.5 客户端重试类型


5.6 客户端重试执行器GuavaRetryExecutor

最终重试会调用到GuavaRetryExecutor的call方法,Easy-Retry本质上就是对GuavaRetry的深度封装,做了一些可视化和告警的能力。


5.7 客户端上报方式

客户端上报方式分为:

异步上报数据:该方式借鉴了sentinel的滑动窗口的RetryTaskDTO做了监听然后进行call重试

同步上报数据:同client将重试任务上报给服务端

        NettyResult result = client.reportRetryInfo(Collections.singletonList(retryTaskDTO));

5.8 netty server


5.8 Handler

Handler可以大体分为两类:处理Get请求的Handler和处理Post请求的Handler,客户端心跳、版本和上报任务都属于Get请求或者是Post请求。

5.9 服务端向客户端发起重试

服务端的亮点就是使用了akka,Akka是一个开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于JVM的应用,使构建高并发的分布式应用更加容易服务端在启动前会做一个scan,把客户端上报给服务端的重试数据全部扫描出来:

AbstractScanGroup类中的doScan:

protected void doScan(final ScanTaskDTO scanTaskDTO) {
LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
String groupName = scanTaskDTO.getGroupName(); LocalDateTime lastAt = Optional.ofNullable(getLastAt(groupName)).orElse(defLastAt);
// 扫描当前Group 待重试的数据 List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), getTaskType());
if (!CollectionUtils.isEmpty(list)) {
// 更新拉取的最大的创建时间 putLastAt(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getCreateDt());
for (RetryTask retryTask : list) {
// 重试次数累加 retryCountIncrement(retryTask);
RetryContext retryContext = builderRetryContext(groupName, retryTask); RetryExecutor executor = builderResultRetryExecutor(retryContext);
if (!executor.filter()) { continue; }
productExecUnitActor(executor); } } else { // 数据为空则休眠5s try { Thread.sleep((DispatchService.PERIOD / 2) * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
putLastAt(groupName, defLastAt); }
}
AbstractScanGroup该抽象类有两个子类:ScanCallbackGroupActor和ScanGroupActor
productExecUnitActor(executor); //抽象父类中定义的方法   private void productExecUnitActor(RetryExecutor retryExecutor) {        String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();        Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();        idempotentStrategy.set(groupIdHash, retryId.intValue());
// 重试成功回调客户端 ActorRef actorRef = getActorRef(); actorRef.tell(retryExecutor, actorRef); } // 两个子类中都有该重试客户端的方法 @Override protected ActorRef getActorRef() { return ActorGenerator.execUnitActor(); } //getActorRef()方法会调用ActorGenerator类里面的方法来生成一个Actor生成器,通过akka的属性然后将这个ExecUnitActor执行器的类注入到spring容器中 getDispatchExecUnitActorSystem().actorOf(getSpringExtension().props(ExecUnitActor.BEAN_NAME)

akka采用消息的发布订阅模型,生产者发布消息,消费者只订阅自己感兴趣的主题,然后接收消息,这样就具有解耦的功能。

ExecUnitActor类里面的createReceive()方法才是具体给客户端发送重试请求的执行者:

package com.aizuda.easy.retry.server.support.dispatch.actor.exec;
import akka.actor.AbstractActor;import cn.hutool.core.lang.Assert;import com.aizuda.easy.retry.client.model.DispatchRetryDTO;import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;import com.aizuda.easy.retry.common.core.constant.SystemConstants;import com.aizuda.easy.retry.common.core.log.LogUtils;import com.aizuda.easy.retry.common.core.model.Result;import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;import com.aizuda.easy.retry.common.core.util.JsonUtil;import com.aizuda.easy.retry.server.exception.EasyRetryServerException;import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;import com.aizuda.easy.retry.server.support.IdempotentStrategy;import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;import com.aizuda.easy.retry.server.support.retry.RetryExecutor;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang.StringUtils;import org.springframework.beans.BeanUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Scope;import org.springframework.http.HttpEntity;import org.springframework.http.HttpHeaders;import org.springframework.stereotype.Component;import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat;import java.time.LocalDateTime;import java.util.Objects;import java.util.concurrent.Callable;
/** * 重试结果执行器 * * @author www.byteblogs.com * @date 2021-10-30 * @since 2.0 */@Component("ExecUnitActor")@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)@Slf4jpublic class ExecUnitActor extends AbstractActor {
public static final String BEAN_NAME = "ExecUnitActor"; public static final String URL = "http://{0}:{1}/{2}/retry/dispatch/v1";
@Autowired @Qualifier("bitSetIdempotentStrategyHandler") private IdempotentStrategy<String, Integer> idempotentStrategy; @Autowired private RetryTaskLogMapper retryTaskLogMapper; @Autowired private RestTemplate restTemplate;
@Override public Receive createReceive() { return receiveBuilder().match(RetryExecutor.class, retryExecutor -> {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext(); RetryTask retryTask = context.getRetryTask(); ServerNode serverNode = context.getServerNode();
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask); retryTaskLog.setErrorMessage(StringUtils.EMPTY);
try {
if (Objects.nonNull(serverNode)) { retryExecutor.call((Callable<Result<DispatchRetryResultDTO>>) () -> callClient(retryTask, retryTaskLog, serverNode)); if (context.hasException()) { retryTaskLog.setErrorMessage(context.getException().getMessage()); } } else { retryTaskLog.setErrorMessage("暂无可用的客户端POD"); }
}catch (Exception e) { LogUtils.error(log, "回调客户端失败 retryTask:[{}]", JsonUtil.toJsonString(retryTask), e); retryTaskLog.setErrorMessage(StringUtils.isBlank(e.getMessage()) ? StringUtils.EMPTY : e.getMessage()); } finally {
// 清除幂等标识位 idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId().intValue()); getContext().stop(getSelf());
// 记录重试日志 retryTaskLog.setCreateDt(LocalDateTime.now()); retryTaskLog.setId(null); Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), () -> new EasyRetryServerException("新增重试日志失败")); }
}).build(); }
/** * 调用客户端 * * @param retryTask {@link RetryTask} 需要重试的数据 * @return 重试结果返回值 */ private Result<DispatchRetryResultDTO> callClient(RetryTask retryTask, RetryTaskLog retryTaskLog, ServerNode serverNode) {
DispatchRetryDTO dispatchRetryDTO = new DispatchRetryDTO(); dispatchRetryDTO.setIdempotentId(retryTask.getIdempotentId()); dispatchRetryDTO.setScene(retryTask.getSceneName()); dispatchRetryDTO.setExecutorName(retryTask.getExecutorName()); dispatchRetryDTO.setArgsStr(retryTask.getArgsStr()); dispatchRetryDTO.setUniqueId(retryTask.getUniqueId()); dispatchRetryDTO.setRetryCount(retryTask.getRetryCount());
// 设置header HttpHeaders requestHeaders = new HttpHeaders(); EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders(); easyRetryHeaders.setEasyRetry(Boolean.TRUE); easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId()); requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders));
HttpEntity<DispatchRetryDTO> requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders);
String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath()); Result<DispatchRetryResultDTO> result = restTemplate.postForObject(format, requestEntity, Result.class);
if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) { retryTaskLog.setErrorMessage(result.getMessage()); } else { DispatchRetryResultDTO data = JsonUtil.parseObject(JsonUtil.toJsonString(result.getData()), DispatchRetryResultDTO.class); result.setData(data); if (Objects.nonNull(data) && StringUtils.isNotBlank(data.getExceptionMsg())) { retryTaskLog.setErrorMessage(data.getExceptionMsg()); }
}
LogUtils.info(log, "请求客户端 response:[{}}] ", JsonUtil.toJsonString(result)); return result;
}
}
可以看出服务端给客户端发送重试是使用的是:restTemplate的方式


5.10 服务端手动下发重试策略

@PostMapping("/generate/idempotent-id")
   public Result<String> idempotentIdGenerate(@RequestBody @Validated GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO){
       return new Result<>(retryTaskService.idempotentIdGenerate(generateRetryIdempotentIdVO));
}

RetryTaskServiceImplle类的idempotentIdGenerate()方法:

@Override
   public String idempotentIdGenerate(final GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) {
       ServerNode serverNode = clientNodeAllocateHandler.getServerNode(generateRetryIdempotentIdVO.getGroupName());
       Assert.notNull(serverNode, () -> new EasyRetryServerException("生成idempotentId失败: 不存在活跃的客户端节点"));

       // 委托客户端生成idempotentId
       String url = MessageFormat
          .format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());

       GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO();
       generateRetryIdempotentIdDTO.setGroup(generateRetryIdempotentIdVO.getGroupName());
       generateRetryIdempotentIdDTO.setScene(generateRetryIdempotentIdVO.getSceneName());
       generateRetryIdempotentIdDTO.setArgsStr(generateRetryIdempotentIdVO.getArgsStr());
       generateRetryIdempotentIdDTO.setExecutorName(generateRetryIdempotentIdVO.getExecutorName());

       HttpEntity<GenerateRetryIdempotentIdDTO> requestEntity = new HttpEntity<>(generateRetryIdempotentIdDTO);
       Result result = restTemplate.postForObject(url, requestEntity, Result.class);

       Assert.notNull(result, () -> new EasyRetryServerException("idempotentId生成失败"));
       Assert.isTrue(1 == result.getStatus(), () -> new EasyRetryServerException("idempotentId生成失败:请确保参数与执行器名称正确"));

       return (String) result.getData();
  }

关键代码如下:

    ServerNode serverNode = clientNodeAllocateHandler.getServerNode(generateRetryIdempotentIdVO.getGroupName());
    /**
    * 获取分配的节点,获取服务端的节点,服务端信息采用数据库(server_node表就是记录服务端的节点信息)做了一个集群,选择一个服务端来执行重试任务
    */
   public ServerNode getServerNode(String groupName) {

       GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
       List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getGroupName, groupName));

       if (CollectionUtils.isEmpty(serverNodes)) {
           return null;
      }

       ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey());

       String hostIp = clientLoadBalanceRandom.route(groupName, new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet())));
       return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get();
  }

ClientLoadBalanceManager类的选择客户端节点的算法有如下几种:

        CONSISTENT_HASH(1, new ClientLoadBalanceConsistentHash(100)), //一致性hash
       RANDOM(2, new ClientLoadBalanceRandom()), //随机
       LRU(3, new ClientLoadBalanceLRU(100)), // LRU


5.11 客户端接收服务端下发重试的端点RetryEndPoint

package com.aizuda.easy.retry.client.core.client;
import cn.hutool.core.lang.Assert;import com.aizuda.easy.retry.client.core.IdempotentIdGenerate;import com.aizuda.easy.retry.client.core.RetryArgSerializer;import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;import com.aizuda.easy.retry.client.core.cache.RetryerInfoCache;import com.aizuda.easy.retry.client.core.callback.RetryCompleteCallback;import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot;import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;import com.aizuda.easy.retry.client.core.retryer.RetryerResultContext;import com.aizuda.easy.retry.client.core.serializer.JacksonSerializer;import com.aizuda.easy.retry.client.core.strategy.RetryStrategy;import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;import com.aizuda.easy.retry.common.core.context.SpringContext;import com.aizuda.easy.retry.common.core.enums.RetryResultStatusEnum;import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;import com.aizuda.easy.retry.common.core.log.LogUtils;import com.aizuda.easy.retry.common.core.model.Result;import com.aizuda.easy.retry.common.core.util.JsonUtil;import com.aizuda.easy.retry.server.model.dto.ConfigDTO;import com.fasterxml.jackson.core.JsonProcessingException;import com.aizuda.easy.retry.client.model.DispatchRetryDTO;import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;import com.aizuda.easy.retry.client.model.RetryCallbackDTO;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.util.ReflectionUtils;import org.springframework.validation.annotation.Validated;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.context.request.RequestContextHolder;import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;import java.lang.reflect.Method;import java.util.Objects;
/** * 服务端调调用客户端进行重试流量下发、配置变更通知等操作 * * @author: www.byteblogs.com * @date : 2022-03-09 16:33 */@RestController@RequestMapping("/retry")@Slf4jpublic class RetryEndPoint {
@Autowired @Qualifier("remoteRetryStrategies") private RetryStrategy retryStrategy;
/** * 服务端调度重试入口 */ @PostMapping("/dispatch/v1") public Result dispatch(@RequestBody DispatchRetryDTO executeReqDto) {
RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName()); if (Objects.isNull(retryerInfo)) { throw new EasyRetryClientException("场景:[{}]配置不存在", executeReqDto.getScene()); }
RetryArgSerializer retryArgSerializer = new JacksonSerializer();
Object[] deSerialize = null; try { deSerialize = (Object[]) retryArgSerializer.deSerialize(executeReqDto.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()); } catch (JsonProcessingException e) { throw new EasyRetryClientException("参数解析异常", e); }
DispatchRetryResultDTO executeRespDto = new DispatchRetryResultDTO();
try { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = Objects.requireNonNull(attributes).getRequest(); request.setAttribute("attemptNumber", executeReqDto.getRetryCount());
RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(), executeReqDto.getExecutorName(), deSerialize);
if (RetrySiteSnapshot.isRetryForStatusCode()) { executeRespDto.setStatusCode(RetryResultStatusEnum.STOP.getStatus());
// TODO 需要标记是哪个系统不需要重试 executeRespDto.setExceptionMsg("下游标记不需要重试"); } else { executeRespDto.setStatusCode(retryerResultContext.getRetryResultStatusEnum().getStatus()); executeRespDto.setExceptionMsg(retryerResultContext.getMessage()); }
executeRespDto.setIdempotentId(executeReqDto.getIdempotentId()); executeRespDto.setUniqueId(executeReqDto.getUniqueId()); if (Objects.nonNull(retryerResultContext.getResult())) { executeRespDto.setResultJson(JsonUtil.toJsonString(retryerResultContext.getResult())); }

} finally { RetrySiteSnapshot.removeAll(); }
return new Result<>(executeRespDto); }
/** * 同步版本 */ @PostMapping("/sync/version/v1") public Result syncVersion(@RequestBody ConfigDTO configDTO) { GroupVersionCache.configDTO = configDTO; return new Result(); }
@PostMapping("/callback/v1") public Result callback(@RequestBody RetryCallbackDTO callbackDTO) { RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName()); if (Objects.isNull(retryerInfo)) { throw new EasyRetryClientException("场景:[{}]配置不存在", callbackDTO.getScene()); }
RetryArgSerializer retryArgSerializer = new JacksonSerializer();
Object[] deSerialize = null; try { deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()); } catch (JsonProcessingException e) { throw new EasyRetryClientException("参数解析异常", e); }
Classextends RetryCompleteCallback> retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback(); RetryCompleteCallback retryCompleteCallback = SpringContext.getBeanByType(retryCompleteCallbackClazz);
if (RetryStatusEnum.FINISH.getStatus().equals(callbackDTO.getRetryStatus())) { retryCompleteCallback.doSuccessCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize); }
if (RetryStatusEnum.MAX_RETRY_COUNT.getStatus().equals(callbackDTO.getRetryStatus())) { retryCompleteCallback.doMaxRetryCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize); }
return new Result(); }
/** * 手动新增重试数据,模拟生成idempotentId * * @param generateRetryIdempotentIdDTO 生成idempotentId模型 * @return idempotentId */ @PostMapping("/generate/idempotent-id/v1") public Result<String> idempotentIdGenerate(@RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {
String scene = generateRetryIdempotentIdDTO.getScene(); String executorName = generateRetryIdempotentIdDTO.getExecutorName(); String argsStr = generateRetryIdempotentIdDTO.getArgsStr();
RetryerInfo retryerInfo = RetryerInfoCache.get(scene, executorName); Assert.notNull(retryerInfo, ()-> new EasyRetryClientException("重试信息不存在 scene:[{}] executorName:[{}]", scene, executorName));
Method executorMethod = retryerInfo.getMethod();
RetryArgSerializer retryArgSerializer = new JacksonSerializer();
Object[] deSerialize = null; try { deSerialize = (Object[]) retryArgSerializer.deSerialize(argsStr, retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()); } catch (JsonProcessingException e) { throw new EasyRetryClientException("参数解析异常", e); }
String idempotentId; try { Classextends IdempotentIdGenerate> idempotentIdGenerate = retryerInfo.getIdempotentIdGenerate(); IdempotentIdGenerate generate = idempotentIdGenerate.newInstance(); Method method = idempotentIdGenerate.getMethod("idGenerate", Object[].class); Object p = new Object[]{scene, executorName, deSerialize, executorMethod.getName()}; idempotentId = (String) ReflectionUtils.invokeMethod(method, generate, p); } catch (Exception exception) { LogUtils.error(log, "幂等id生成异常:{},{}", scene, argsStr, exception); throw new EasyRetryClientException("idempotentId生成异常:{},{}", scene, argsStr); }
return new Result<>(idempotentId); }}

可以看到只有服务端重试会再次上报,手动重试的不会:

RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(), executeReqDto.getExecutorName(), deSerialize);


5.12 服务端的schedule任务

schedule任务里面使用了:@SchedulerLock注解 和数据库加了一张表:shedlock表

这种就可以然服务端是集群部署的时候只有一个节点可以执行定时任务了。

@SchedulerLock详解

https://blog.csdn.net/qq_45498460/article/details/119454759


6.集群架构


7.总结

到此easy-retry分布式开源重试框架已经分享完了,一般这种框架都是这种套路的,使用netty来做客户端和服务端的心跳、采集、监控、上报,只不过每一个侧重解决的业务痛点不一样,就比如xxl-job分布式任务框架,简单的业务中使用spring-retry就足够了,希望我的分享对你有帮助,请一键三连,么么哒!


请使用浏览器的分享功能分享到微信等