Apache SeaTunnel 设置钉钉通知详细教程,亲测可用!

说明

背景

使用 Seatunnel 执行数据同步. 部署参考 部署 Apache-Seatunnel 服务

问题

  • 需要在任务报错或者其他关键事件发送钉钉消息通知
  • SeaTunnel 本身不支持消息通知, 必须依赖 DolphinScheduler 或其他外部工具

方案

  • 利用SeaTunnel提供的事件监听器功能
  • 编写自定义插件, 捕捉报错事件, 发送消息通知
  • 群机器人配置通过命令行提交

部署

  • 如果不想写代码打包, 只需要报错通知, 跳过开发插件 步骤, 下载 jar包即可
  • 如果需要自定义通知内容和其他事件处理, 自行调整代码

开发插件

可以从 https://github.com/ts7ming/SeatunnelExt 获取(或 https://gitee.com/ts7ming/SeatunnelExt)

项目结构

包名 com.ts7ming 自定义即可

│  pom.xml
│
└─src
    └─main
        ├─java
        │  └─com
        │      └─ts7ming
        │              DingTalkEventListener.java
        │
        └─resources
            └─META-INF
                └─services
                        org.apache.seatunnel.api.event.EventHandler

pom.xml

  • 这里用了2.3.13 版本, 根据实际情况调整


    4.0.0
    com.ts7ming
    SeatunnelExt
    1.0-SNAPSHOT
    
        8
        8
        UTF-8
        2.3.13
    
    
        
            org.apache.seatunnel
            seatunnel-api
            ${seatunnel.version}
            provided
        
        
            org.apache.seatunnel
            seatunnel-engine-common
            ${seatunnel.version}
            provided
        
        
            org.projectlombok
            lombok
            1.18.30
            provided
        
        
            junit
            junit
            4.13.2
            test
        
    

DingTalkEventListener.java

  • SeaTunnel支持的事件如下

package com.ts7ming;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
@Slf4j
public class DingTalkEventListener implements EventHandler {
    private static final String WEBHOOK_URL = System.getProperty("dingtalk.webhook.url", "https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN");
    private static final String SECRET = System.getProperty("dingtalk.secret", "YOUR_SECRET");
    @Override
    public void handle(Event event) {
        EventType eventType = event.getEventType();
        if (eventType == EventType.JOB_STATUS) {
            handleJobStateEvent((JobStateEvent) event);
        } 
//        else if (eventType.name().equals("SCHEMA_CHANGE_ADD_COLUMN")) {
//            handleAddColumnEvent((AlterTableAddColumnEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_UPDATE_COLUMNS")) {
//            handleUpdateColumnEvent((AlterTableColumnsEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_DROP_COLUMN")) {
//            handleDropColumnEvent((AlterTableDropColumnEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_MODIFY_COLUMN")) {
//            handleModifyColumnEvent((AlterTableModifyColumnEvent) event);
//        }
        else {
            log.debug("忽略未处理的事件类型: {}", eventType);
        }
    }
    private void handleJobStateEvent(JobStateEvent jobEvent) {
        String jobId = jobEvent.getJobId();
        String jobName = jobEvent.getJobName();
        JobStatus status = jobEvent.getJobStatus();
        long eventTime = jobEvent.getCreatedTime();
        switch (status) {
            case FAILED:
                sendAlert("【任务失败】jobId: " + jobId + ", jobName: " + jobName);
                break;
            case FINISHED:
                //sendAlert("任务完成: " + jobId + ", jobName: " + jobName);
                break;
            //其他需要处理的事件
            // case READER_OPEN:
            //     break;
            // case WRITER_CLOSE:
            //     break;
            default:
                log.debug("任务状态变更 | jobId: {}, 状态: {}, 时间: {}",jobId, status, eventTime);
                // 不发送通知
        }
    }
    private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn().getName() : "未知列";
        sendAlert("【表结构变更】表名: " + tableName + ", 新增列: " + columnName);
    }
    private void handleUpdateColumnEvent(AlterTableColumnsEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        sendAlert("【表结构变更】表名: " + tableName + ", 更新内容: " + event);
    }
    private void handleDropColumnEvent(AlterTableDropColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn() : "未知列";
        sendAlert("【表结构变更】表名: " + tableName + ", 删除列: " + columnName);
    }
    private void handleModifyColumnEvent(AlterTableModifyColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn().getName() : "未知列";
        sendAlert("【表结构变更】表名: " + tableName + ", 修改列: " + columnName);
    }
    private void sendAlert(String content) {
          sendDingTalkMessage(content);
    }
    void sendDingTalkMessage(String message) {
        try {
            long timestamp = System.currentTimeMillis();
            String sign = generateSign(timestamp, SECRET);
            String fullUrl = WEBHOOK_URL + "×tamp=" + timestamp + "&sign=" + sign;
            String escapedMessage = message.replace("\", "\\")
                                          .replace("\"", "\\"")
                                          .replace("\n", "\n")
                                          .replace("\r", "\r")
                                          .replace("\t", "\t");
            String jsonPayload = String.format("{\"msgtype\":\"text\",\"text\":{\"content\":\"%s\"}}", escapedMessage);
            URL url = new URL(fullUrl);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("POST");
            conn.setRequestProperty("Content-Type", "application/json");
            conn.setDoOutput(true);
            conn.setConnectTimeout(5000);
            conn.setReadTimeout(5000);
            try (OutputStream os = conn.getOutputStream()) {
                os.write(jsonPayload.getBytes(StandardCharsets.UTF_8));
                os.flush();
            }
            
            int responseCode = conn.getResponseCode();
            if (responseCode == 200) {
                log.info("钉钉消息发送成功: {}", message);
            } else {
                log.error("钉钉消息发送失败,响应码: {}, 消息: {}", responseCode, message);
            }
        } catch (Exception e) {
            log.error("发送钉钉消息异常: {}", message, e);
        }
    }
    private String generateSign(long timestamp, String secret) throws Exception {
        String stringToSign = timestamp + "\n" + secret;
        Mac mac = Mac.getInstance("HmacSHA256");
        mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
        byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
        return URLEncoder.encode(new String(Base64.getEncoder().encode(signData)), "UTF-8");
    }
}

org.apache.seatunnel.api.event.EventHandler

com.ts7ming.DingTalkEventListener

打包

mvn clean package

部署插件

直接下载可用 jar (如果不想自己打包)

cd /opt/apache-seatunnel/lib
wget https://github.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar
# 网络差的话用 gitee
wget https://gitee.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar
# 如果用其他用户下载, 注意权限
chown -R seatunnel:seatunnel /opt/apache-seatunnel

上传

  • Jar包上传到Seatunnel 根目录 lib 下 (例如: /opt/apache-seatunnel/lib/)

重启 Seatunnel 服务

systemctl stop seatunnel-master.service
systemctl stop seatunnel-worker.service
systemctl start seatunnel-master.service
systemctl start seatunnel-worker.service

检查插件是否加载

grep "DingTalk" /opt/apache-seatunnel/logs/seatunnel-engine-master.log
# 结果应该有如下信息
INFO  [o.a.s.e.s.CoordinatorService  ] [pool-4-thread-1] - [localhost]:5801 [seatunnel] [5.1] Loaded event handlers: [com.ts7ming.DingTalkEventListener@20eaeaed, org.apache.seatunnel.api.event.LoggingEventHandler@59c99cb9]

运行任务

注意: -D参数一定要在 --config前面 (SeaTunnel启动脚本解析参数的顺序不够灵活)

sh bin/seatunnel.sh --async -Ddingtalk.webhook.url="https://oapi.dingtalk.com/robot/send?access_token=钉钉群Token" -Ddingtalk.secret="钉钉群secret" --config 任务配置.conf  -n "任务名称"

Done!

请使用浏览器的分享功能分享到微信等