说明
背景
使用 Seatunnel 执行数据同步. 部署参考 部署 Apache-Seatunnel 服务
问题
- 需要在任务报错或者其他关键事件发送钉钉消息通知
- SeaTunnel 本身不支持消息通知, 必须依赖 DolphinScheduler 或其他外部工具
方案
- 利用SeaTunnel提供的事件监听器功能
- 编写自定义插件, 捕捉报错事件, 发送消息通知
- 群机器人配置通过命令行提交
部署
- 如果不想写代码打包, 只需要报错通知, 跳过开发插件 步骤, 下载 jar包即可
- 如果需要自定义通知内容和其他事件处理, 自行调整代码
开发插件
可以从 https://github.com/ts7ming/SeatunnelExt 获取(或 https://gitee.com/ts7ming/SeatunnelExt)
项目结构
包名
com.ts7ming 自定义即可
│ pom.xml │ └─src └─main ├─java │ └─com │ └─ts7ming │ DingTalkEventListener.java │ └─resources └─META-INF └─services org.apache.seatunnel.api.event.EventHandler
pom.xml
- 这里用了2.3.13 版本, 根据实际情况调整
4.0.0 com.ts7ming SeatunnelExt 1.0-SNAPSHOT 8 8 UTF-8 2.3.13 org.apache.seatunnel seatunnel-api ${seatunnel.version} provided org.apache.seatunnel seatunnel-engine-common ${seatunnel.version} provided org.projectlombok lombok 1.18.30 provided junit junit 4.13.2 test
DingTalkEventListener.java
- SeaTunnel支持的事件如下

package com.ts7ming;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
@Slf4j
public class DingTalkEventListener implements EventHandler {
private static final String WEBHOOK_URL = System.getProperty("dingtalk.webhook.url", "https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN");
private static final String SECRET = System.getProperty("dingtalk.secret", "YOUR_SECRET");
@Override
public void handle(Event event) {
EventType eventType = event.getEventType();
if (eventType == EventType.JOB_STATUS) {
handleJobStateEvent((JobStateEvent) event);
}
// else if (eventType.name().equals("SCHEMA_CHANGE_ADD_COLUMN")) {
// handleAddColumnEvent((AlterTableAddColumnEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_UPDATE_COLUMNS")) {
// handleUpdateColumnEvent((AlterTableColumnsEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_DROP_COLUMN")) {
// handleDropColumnEvent((AlterTableDropColumnEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_MODIFY_COLUMN")) {
// handleModifyColumnEvent((AlterTableModifyColumnEvent) event);
// }
else {
log.debug("忽略未处理的事件类型: {}", eventType);
}
}
private void handleJobStateEvent(JobStateEvent jobEvent) {
String jobId = jobEvent.getJobId();
String jobName = jobEvent.getJobName();
JobStatus status = jobEvent.getJobStatus();
long eventTime = jobEvent.getCreatedTime();
switch (status) {
case FAILED:
sendAlert("【任务失败】jobId: " + jobId + ", jobName: " + jobName);
break;
case FINISHED:
//sendAlert("任务完成: " + jobId + ", jobName: " + jobName);
break;
//其他需要处理的事件
// case READER_OPEN:
// break;
// case WRITER_CLOSE:
// break;
default:
log.debug("任务状态变更 | jobId: {}, 状态: {}, 时间: {}",jobId, status, eventTime);
// 不发送通知
}
}
private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn().getName() : "未知列";
sendAlert("【表结构变更】表名: " + tableName + ", 新增列: " + columnName);
}
private void handleUpdateColumnEvent(AlterTableColumnsEvent event) {
String tableName = event.getTableIdentifier().getTableName();
sendAlert("【表结构变更】表名: " + tableName + ", 更新内容: " + event);
}
private void handleDropColumnEvent(AlterTableDropColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn() : "未知列";
sendAlert("【表结构变更】表名: " + tableName + ", 删除列: " + columnName);
}
private void handleModifyColumnEvent(AlterTableModifyColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn().getName() : "未知列";
sendAlert("【表结构变更】表名: " + tableName + ", 修改列: " + columnName);
}
private void sendAlert(String content) {
sendDingTalkMessage(content);
}
void sendDingTalkMessage(String message) {
try {
long timestamp = System.currentTimeMillis();
String sign = generateSign(timestamp, SECRET);
String fullUrl = WEBHOOK_URL + "×tamp=" + timestamp + "&sign=" + sign;
String escapedMessage = message.replace("\", "\\")
.replace("\"", "\\"")
.replace("\n", "\n")
.replace("\r", "\r")
.replace("\t", "\t");
String jsonPayload = String.format("{\"msgtype\":\"text\",\"text\":{\"content\":\"%s\"}}", escapedMessage);
URL url = new URL(fullUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
conn.setConnectTimeout(5000);
conn.setReadTimeout(5000);
try (OutputStream os = conn.getOutputStream()) {
os.write(jsonPayload.getBytes(StandardCharsets.UTF_8));
os.flush();
}
int responseCode = conn.getResponseCode();
if (responseCode == 200) {
log.info("钉钉消息发送成功: {}", message);
} else {
log.error("钉钉消息发送失败,响应码: {}, 消息: {}", responseCode, message);
}
} catch (Exception e) {
log.error("发送钉钉消息异常: {}", message, e);
}
}
private String generateSign(long timestamp, String secret) throws Exception {
String stringToSign = timestamp + "\n" + secret;
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
return URLEncoder.encode(new String(Base64.getEncoder().encode(signData)), "UTF-8");
}
}
org.apache.seatunnel.api.event.EventHandler
com.ts7ming.DingTalkEventListener
打包
mvn clean package
部署插件
直接下载可用 jar (如果不想自己打包)
cd /opt/apache-seatunnel/lib wget https://github.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar # 网络差的话用 gitee wget https://gitee.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar # 如果用其他用户下载, 注意权限 chown -R seatunnel:seatunnel /opt/apache-seatunnel
上传
- Jar包上传到Seatunnel 根目录 lib 下 (例如: /opt/apache-seatunnel/lib/)
重启 Seatunnel 服务
systemctl stop seatunnel-master.service systemctl stop seatunnel-worker.service systemctl start seatunnel-master.service systemctl start seatunnel-worker.service
检查插件是否加载
grep "DingTalk" /opt/apache-seatunnel/logs/seatunnel-engine-master.log # 结果应该有如下信息 INFO [o.a.s.e.s.CoordinatorService ] [pool-4-thread-1] - [localhost]:5801 [seatunnel] [5.1] Loaded event handlers: [com.ts7ming.DingTalkEventListener@20eaeaed, org.apache.seatunnel.api.event.LoggingEventHandler@59c99cb9]
运行任务
注意: -D参数一定要在 --config前面 (SeaTunnel启动脚本解析参数的顺序不够灵活)
sh bin/seatunnel.sh --async -Ddingtalk.webhook.url="https://oapi.dingtalk.com/robot/send?access_token=钉钉群Token" -Ddingtalk.secret="钉钉群secret" --config 任务配置.conf -n "任务名称"
Done!
