
1.前言
https://mp.weixin.qq.com/s/s_cjrXHvBjIQBF8RqQTvcQ
https://blog.csdn.net/qq_34905631/article/details/135068301?spm=1001.2014.3001.5501
CentOs7.x安装部署SeaTunnelWeb遇到的坑
https://mp.weixin.qq.com/s/1FcCB1TjfEs22iGiCoKL5g
https://blog.csdn.net/qq_34905631/article/details/135074860?spm=1001.2014.3001.5501
2.编译
2.1版本说明
<version>2.3.5-SNAPSHOTversion>

2.2 seatunnel2.3.4-release分支配置


编译打包:
mvn clean package -pl seatunnel-dist -am '-Dmaven.test.skip=true' -T 8C
2.3maven调优配置
https://blog.csdn.net/qq_34905631/article/details/135074860?spm=1001.2014.3001.5501
-Dfile.encoding=GBK -DarchetypeCatalog=local -Xmx1024m -XX:MetaspaceSize=1024m -XX:MaxMetaspaceSize=1024m -Xss2m -Dmaven.test.skip=true -Dmaven.compile.fork=true
-Xmx1024m
Thread Count 8 -T option
3.web1.0.0适配
3.1配置文件修改和新增文件

3.2手动拷贝jar修改依赖

<dependency>
<groupId>org.apache.seatunnelgroupId>
<artifactId>seatunnel-commonartifactId>
<version>${seatunnel-framework.version}version>
<scope>systemscope>
<systemPath>D:/other-workspace/seatunnel-web/lib/seatunnel-common-2.3.5-SNAPSHOT-2.12.15.jarsystemPath>
dependency>
这里由于不可以在顶级的父pom里面修改,因为顶级父pom里面的依赖是在

3.3修改web不兼容的代码
public static class SeaTunnelDataTypeConvertor
implements DataTypeConvertor<SeaTunnelDataType>> {
@Override
public SeaTunnelDataType> toSeaTunnelType(String s, String s1) {
return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();
}
@Override
public SeaTunnelDataType> toSeaTunnelType(
String s, SeaTunnelDataType> seaTunnelDataType, Map<String, Object> map) {
return seaTunnelDataType;
}
@Override
public SeaTunnelDataType> toConnectorType(
String s, SeaTunnelDataType> seaTunnelDataType, Map<String, Object> map) {
return seaTunnelDataType;
}
@Override
public String getIdentity() {
return "EngineDataTypeConvertor";
}
}
executeJobBySeaTunnel方法:
原来:
JobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig);
现在:
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
public void complete(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId)方法:
原来:
if (statusList.size() == 1 && statusList.contains("FINISHED")) {
jobStatus = JobStatus.FINISHED.name();
} else if (statusList.contains("FAILED")) {
jobStatus = JobStatus.FAILED.name();
} else if (statusList.contains("CANCELED")) {
jobStatus = JobStatus.CANCELED.name();
} else if (statusList.contains("CANCELLING")) {
jobStatus = JobStatus.CANCELLING.name();
} else {
jobStatus = JobStatus.RUNNING.name();
}
现在:
if (statusList.size() == 1 && statusList.contains("FINISHED")) {
jobStatus = JobStatus.FINISHED.name();
} else if (statusList.contains("FAILED")) {
jobStatus = JobStatus.FAILED.name();
} else if (statusList.contains("CANCELED")) {
jobStatus = JobStatus.CANCELED.name();
} else if (statusList.contains("CANCELING")) {
jobStatus = JobStatus.CANCELING.name();
} else {
jobStatus = JobStatus.RUNNING.name();
}
原来:
public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
PluginType pluginType) throws IOException {
Common.setStarter(true);
if (!pluginType.equals(PluginType.SOURCE)) {
throw new UnsupportedOperationException("ONLY support plugin type source");
}
Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
List<Factory> factories;
if (path.toFile().exists()) {
List<URL> files = FileUtils.searchJarFiles(path);
factories =
FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
} else {
factories =
FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
}
Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
factories.forEach(
plugin -> {
if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
PluginIdentifier info =
PluginIdentifier.of(
"seatunnel",
PluginType.SOURCE.getType(),
plugin.factoryIdentifier());
featureMap.put(
info,
new ConnectorFeature(
SupportColumnProjection.class.isAssignableFrom(
tableSourceFactory.getSourceClass())));
}
});
return featureMap;
}
现在:
public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
PluginType pluginType) throws IOException {
Common.setStarter(true);
if (!pluginType.equals(PluginType.SOURCE)) {
throw new UnsupportedOperationException("ONLY support plugin type source");
}
List<Factory> factories = null;
SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery();
Map<PluginIdentifier, String> allSupportedPlugins =
seaTunnelSinkPluginDiscovery.getAllSupportedPlugins(pluginType);
for (Entry<PluginIdentifier, String> entry : allSupportedPlugins.entrySet()) {
PluginIdentifier pluginIdentifier = entry.getKey();
List<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
pluginIdentifiers.add(pluginIdentifier);
List<URL> files = seaTunnelSinkPluginDiscovery.getPluginJarPaths(pluginIdentifiers);
if (CollectionUtils.isNotEmpty(files)) {
factories =
FactoryUtil.discoverFactories(
new URLClassLoader(files.toArray(new URL[0])));
} else {
factories =
FactoryUtil.discoverFactories(
Thread.currentThread().getContextClassLoader());
}
}
Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
if (CollectionUtils.isNotEmpty(factories)) {
factories.forEach(
plugin -> {
if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
PluginIdentifier info =
PluginIdentifier.of(
"seatunnel",
PluginType.SOURCE.getType(),
plugin.factoryIdentifier());
featureMap.put(
info,
new ConnectorFeature(
SupportColumnProjection.class.isAssignableFrom(
tableSourceFactory.getSourceClass())));
}
});
}
return featureMap;
}
原来:
TableFactoryContext context =
new TableFactoryContext(
Collections.singletonList(table),
ReadonlyConfig.fromMap(config),
Thread.currentThread().getContextClassLoader());
现在:
TableTransformFactoryContext context =
new TableTransformFactoryContext(
Collections.singletonList(table),
ReadonlyConfig.fromMap(config),
Thread.currentThread().getContextClassLoader());
原来:
seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
现在:
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelClient
.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
.execute();
原来:
public TableSchemaServiceImpl() throws IOException {
Common.setStarter(true);
Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
if (path.toFile().exists()) {
List<URL> files = FileUtils.searchJarFiles(path);
files.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
factory = new DataTypeConvertorFactory(new URLClassLoader(files.toArray(new URL[0])));
} else {
factory = new DataTypeConvertorFactory();
}
}
现在:
public TableSchemaServiceImpl() throws IOException {
Common.setStarter(true);
SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery();
Map<PluginIdentifier, String> allSupportedPlugins =
seaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK);
for (Map.Entry<PluginIdentifier, String> entry : allSupportedPlugins.entrySet()) {
PluginIdentifier pluginIdentifier = entry.getKey();
List<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
pluginIdentifiers.add(pluginIdentifier);
List<URL> files = seaTunnelSinkPluginDiscovery.getPluginJarPaths(pluginIdentifiers);
if (CollectionUtils.isNotEmpty(files)) {
factory =
new DataTypeConvertorFactory(new URLClassLoader(files.toArray(new URL[0])));
} else {
factory = new DataTypeConvertorFactory();
}
}
}
原来:
SeaTunnelDataType> dataType = convertor.toSeaTunnelType(field.getType());
现在:
SeaTunnelDataType> dataType = convertor.toSeaTunnelType(field.getType(), null);
3.4 web编译打包
mvn clean package -pl seatunnel-web-dist -am '-Dmaven.test.skip=true' -T 8C
4.运行mysql-cdc示例
4.1配置运行seatunnel
-DSEATUNNEL_HOME=D:\other-workspace\seatunnel\seatunnel-dist\target\apache-seatunnel-2.3.5-SNAPSHOT

4.2配置运行web
-DSEATUNNEL_HOME=D:\other-workspace\seatunnel\seatunnel-dist\target\apache-seatunnel-2.3.5-SNAPSHOT
ST_WEB_BASEDIR_PATH=D:\other-workspace\seatunnel-web\seatunnel-web-dist\target\apache-seatunnel-web-1.0.0-SNAPSHOT
4.3 拷贝jar到seatunnel家目录的lib和web家目录的libs下


mysql-connector-java-8.0.33.jar
datasource-jdbc-mysql-1.0.0-SNAPSHOT.jar
connector-jdbc-2.3.5-SNAPSHOT-2.12.15.jar
connector-cdc-mysql-2.3.5-SNAPSHOT-2.12.15.jar
4.4 ui编译运行注意事项
[ERROR] Failed to execute goal com.github.eirslett:frontend-maven-plugin:1.11.3:npm (build) on project seatunnel-web-dist: Failed to
run task: 'npm run build:prod' failed. org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) ->
4.5mysql-cdc的示例
添加数据源

创建cdc任务

任务执行现象和结论:
env {
"job.mode"=STREAMING
"job.name"="SeaTunnel_Job"
}
source {
MySQL-CDC {
format=DEFAULT
"snapshot.split.size"=8096
"snapshot.fetch.size"=1024
"incremental.parallelism"=1
"connect.timeout.ms"=30000
"connect.max-retries"=3
"connection.pool.size"=20
"chunk-key.even-distribution.factor.lower-bound"=0.05
"chunk-key.even-distribution.factor.upper-bound"=100
"sample-sharding.threshold"=1000
"inverse-sampling.rate"=1000
"startup.mode"=INITIAL
"exactly_once"="true"
"stop.mode"=NEVER
parallelism=1
"result_table_name"=Table13434473575488
"dag-parsing.mode"=MULTIPLEX
catalog {
factory=Mysql
}
database-names=[
"xxxxx"
]
table-names=[
"xxxx.xxxx_order"
]
password="xxxx"
username=root
base-url="jdbc:mysql://xxxx:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
server-time-zone=UTC
}
}
transform {
}
sink {
Jdbc {
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode"="APPEND_DATA"
"connection_check_timeout_sec"=30
"batch_size"=1000
"is_exactly_once"="true"
"xa_data_source_class_name"=test-cdc1
"max_commit_attempts"=3
"transaction_timeout_sec"=-1
"auto_commit"="true"
"support_upsert_by_query_primary_key_exist"="true"
"multi_table_sink_replica"=1
"source_table_name"=Table13434473575488
"generate_sink_sql"=true
catalog {
factory=MySQL
username=root
password="xxx"
base-url="jdbc:mysql://xxxx:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
}
database="xxxxx"
url="jdbc:mysql://xxxxx:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true"
driver="com.mysql.cj.jdbc.Driver"
password="xxxx"
user=root
}
}
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.Long to field org.apache.seatunnel.api.table.catalog.Column.columnLength of type java.lang.Integer in instance of org.apache.seatunnel.api.table.catalog.PhysicalColumn
同步任务实例:

5.总结
source + t + sink