DataxWeb安装部署及使用--真香警告

1.Datax简介

DataX

https://github.com/alibaba/DataX


1.1 Datax是什么?

DataX 是阿里巴巴使用 Java 和 Python 开发的一个异构数据源离线同步工具
异构数据源:不同存储结构的数据源致力于实现包括关系型数据库 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS等各种异结构数据源之间稳定高效的数据同步功能
- Sqoop 是用于在 HDFS 与 RDBMS 之间数据迁移工具
- DataX 是阿里开源的一个异构数据源离线同步工具(任意两种数据源之间)


1.2 Datax的架构


1.3 设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。


1.4 DataX3.0框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
  • Reader:Reader�为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer:Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。


1.5 DataX3.0插件体系

经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:
DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。详情请看:DataX数据源指南


1.6 DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

1.6.1 核心模块介绍

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0


1.6.2 DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是:
  1. DataXJob根据分库分表切分成了100个Task。

  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。

  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。


2.DataxWeb简介

DataxWeb

https://github.com/WeiYe-Jing/datax-web
https://gitee.com/WeiYe-Jing/datax-web


2.1 DataxWeb是什么?

      DataX Web是在DataX之上开发的分布式数据同步工具,提供简单易用的 操作界面,降低用户使用DataX的学习成本,缩短任务配置时间,避免配置过程中出错。用户可通过页面选择数据源即可创建数据同步任务,支持RDBMS、Hive、HBase、ClickHouse、MongoDB等数据源,RDBMS数据源可批量创建数据同步任务,支持实时查看数据同步进度及日志并提供终止同步功能,集成并二次开发xxl-job可根据时间、自增主键增量同步数据。

任务"执行器"支持集群部署,支持执行器多节点路由策略选择,支持超时控制、失败重试、失败告警、任务依赖,执行器CPU.内存.负载的监控等等。后续还将提供更多的数据源支持、数据转换UDF、表结构同步、数据同步血缘等更为复杂的业务场景。


2.2 DataxWeb架构


3.DataxWeb安装部署

3.1 创建数据库表用户及授权

create database datax_web_db default character set utf8mb4 collate utf8mb4_general_ci;create user 'datax_web'@'%' identified with mysql_native_password by 'zlf123456';grant all privileges on datax_web_db.* to 'datax_web'@'%';flush privileges;


3.2 sql脚本执行

/* Navicat Premium Data Transfer  Source Server         : localhost Source Server Type    : MySQL Source Server Version : 50725 Source Host           : localhost:3306 Source Schema         : datax_web  Target Server Type    : MySQL Target Server Version : 50725 File Encoding         : 65001  Date: 15/12/2019 22:27:10*/ SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0; -- ------------------------------ Table structure for job_group-- ----------------------------DROP TABLE IF EXISTS `job_group`;CREATE TABLE `job_group`  (  `id` int(11) NOT NULL AUTO_INCREMENT,  `app_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '执行器AppName',  `title` varchar(12) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '执行器名称',  `order` int(11) NOT NULL DEFAULT 0 COMMENT '排序',  `address_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '执行器地址类型:0=自动注册、1=手动录入',  `address_list` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器地址列表,多地址逗号分隔',  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -- ------------------------------ Records of job_group-- ----------------------------INSERT INTO `job_group` VALUES (1, 'datax-executor', 'datax执行器', 1, 0, NULL); -- ------------------------------ Table structure for job_info-- ----------------------------DROP TABLE IF EXISTS `job_info`;CREATE TABLE `job_info`  (  `id` int(11) NOT NULL AUTO_INCREMENT,  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',  `job_cron` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '任务执行CRON',  `job_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,  `add_time` datetime(0) NULL DEFAULT NULL,  `update_time` datetime(0) NULL DEFAULT NULL,  `author` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '作者',  `alarm_email` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '报警邮件',  `executor_route_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器路由策略',  `executor_handler` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器任务handler',  `executor_param` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器任务参数',  `executor_block_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '阻塞处理策略',  `executor_timeout` int(11) NOT NULL DEFAULT 0 COMMENT '任务执行超时时间,单位秒',  `executor_fail_retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '失败重试次数',  `glue_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'GLUE类型',  `glue_source` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT 'GLUE源代码',  `glue_remark` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'GLUE备注',  `glue_updatetime` datetime(0) NULL DEFAULT NULL COMMENT 'GLUE更新时间',  `child_jobid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',  `trigger_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '调度状态:0-停止,1-运行',  `trigger_last_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '上次调度时间',  `trigger_next_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '下次调度时间',  `job_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT 'datax运行脚本',  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 7 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;  -- ------------------------------ Table structure for job_jdbc_datasource-- ----------------------------DROP TABLE IF EXISTS `job_jdbc_datasource`;CREATE TABLE `job_jdbc_datasource`  (  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',  `datasource_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '数据源名称',  `datasource_group` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT 'Default' COMMENT '数据源分组',  `jdbc_username` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户名',  `jdbc_password` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '密码',  `jdbc_url` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'jdbc url',  `jdbc_driver_class` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'jdbc驱动类',  `status` tinyint(1) NOT NULL DEFAULT 1 COMMENT '状态:0删除 1启用 2禁用',  `create_by` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '创建人',  `create_date` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',  `update_by` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '更新人',  `update_date` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',  `comments` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '备注',  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'jdbc数据源配置' ROW_FORMAT = Dynamic;  -- ------------------------------ Table structure for job_lock-- ----------------------------DROP TABLE IF EXISTS `job_lock`;CREATE TABLE `job_lock`  (  `lock_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '锁名称',  PRIMARY KEY (`lock_name`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -- ------------------------------ Records of job_lock-- ----------------------------INSERT INTO `job_lock` VALUES ('schedule_lock'); -- ------------------------------ Table structure for job_log-- ----------------------------DROP TABLE IF EXISTS `job_log`;CREATE TABLE `job_log`  (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',  `job_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,  `executor_address` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器地址,本次执行的地址',  `executor_handler` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器任务handler',  `executor_param` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器任务参数',  `executor_sharding_param` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',  `executor_fail_retry_count` int(11) NULL DEFAULT 0 COMMENT '失败重试次数',  `trigger_time` datetime(0) NULL DEFAULT NULL COMMENT '调度-时间',  `trigger_code` int(11) NOT NULL COMMENT '调度-结果',  `trigger_msg` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '调度-日志',  `handle_time` datetime(0) NULL DEFAULT NULL COMMENT '执行-时间',  `handle_code` int(11) NOT NULL COMMENT '执行-状态',  `handle_msg` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '执行-日志',  `alarm_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',  `process_id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'datax进程Id',  `max_id` bigint(20) NULL DEFAULT NULL COMMENT '增量表max id',  PRIMARY KEY (`id`) USING BTREE,  INDEX `I_trigger_time`(`trigger_time`) USING BTREE,  INDEX `I_handle_code`(`handle_code`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 0 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -- ------------------------------ Table structure for job_log_report-- ----------------------------DROP TABLE IF EXISTS `job_log_report`;CREATE TABLE `job_log_report`  (  `id` int(11) NOT NULL AUTO_INCREMENT,  `trigger_day` datetime(0) NULL DEFAULT NULL COMMENT '调度-时间',  `running_count` int(11) NOT NULL DEFAULT 0 COMMENT '运行中-日志数量',  `suc_count` int(11) NOT NULL DEFAULT 0 COMMENT '执行成功-日志数量',  `fail_count` int(11) NOT NULL DEFAULT 0 COMMENT '执行失败-日志数量',  PRIMARY KEY (`id`) USING BTREE,  UNIQUE INDEX `i_trigger_day`(`trigger_day`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 28 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -- ------------------------------ Records of job_log_report-- ----------------------------INSERT INTO `job_log_report` VALUES (20, '2019-12-07 00:00:00', 0, 0, 0);INSERT INTO `job_log_report` VALUES (21, '2019-12-10 00:00:00', 77, 52, 23);INSERT INTO `job_log_report` VALUES (22, '2019-12-11 00:00:00', 9, 2, 11);INSERT INTO `job_log_report` VALUES (23, '2019-12-13 00:00:00', 9, 48, 74);INSERT INTO `job_log_report` VALUES (24, '2019-12-12 00:00:00', 10, 8, 30);INSERT INTO `job_log_report` VALUES (25, '2019-12-14 00:00:00', 78, 45, 66);INSERT INTO `job_log_report` VALUES (26, '2019-12-15 00:00:00', 24, 76, 9);INSERT INTO `job_log_report` VALUES (27, '2019-12-16 00:00:00', 23, 85, 10); -- ------------------------------ Table structure for job_logglue-- ----------------------------DROP TABLE IF EXISTS `job_logglue`;CREATE TABLE `job_logglue`  (  `id` int(11) NOT NULL AUTO_INCREMENT,  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',  `glue_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'GLUE类型',  `glue_source` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT 'GLUE源代码',  `glue_remark` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'GLUE备注',  `add_time` datetime(0) NULL DEFAULT NULL,  `update_time` datetime(0) NULL DEFAULT NULL,  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -- ------------------------------ Table structure for job_registry-- ----------------------------DROP TABLE IF EXISTS `job_registry`;CREATE TABLE `job_registry`  (  `id` int(11) NOT NULL AUTO_INCREMENT,  `registry_group` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,  `registry_key` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,  `registry_value` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,  `update_time` datetime(0) NULL DEFAULT NULL,  PRIMARY KEY (`id`) USING BTREE,  INDEX `i_g_k_v`(`registry_group`, `registry_key`, `registry_value`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 26 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;   -- ------------------------------ Table structure for job_user-- ----------------------------DROP TABLE IF EXISTS `job_user`;CREATE TABLE `job_user`  (  `id` int(11) NOT NULL AUTO_INCREMENT,  `username` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '账号',  `password` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '密码',  `role` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '角色:0-普通用户、1-管理员',  `permission` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',  PRIMARY KEY (`id`) USING BTREE,  UNIQUE INDEX `i_username`(`username`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 10 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -- ------------------------------ Records of job_user-- ----------------------------INSERT INTO `job_user` VALUES (1, 'admin', '$2a$10$2KCqRbra0Yn2TwvkZxtfLuWuUP5KyCWsljO/ci5pLD27pqR3TV1vy', 'ROLE_ADMIN', NULL);   /**v2.1.1脚本更新*/ALTER TABLE `job_info`ADD COLUMN `replace_param` VARCHAR(100) NULL DEFAULT NULL COMMENT '动态参数' AFTER `job_json`,ADD COLUMN `jvm_param` VARCHAR(200) NULL DEFAULT NULL COMMENT 'jvm参数' AFTER `replace_param`,ADD COLUMN `time_offset` INT(11) NULL DEFAULT '0'COMMENT '时间偏移量'  AFTER `jvm_param`;/**增量改版脚本更新 */ALTER TABLE `job_info` DROP COLUMN `time_offset`;ALTER TABLE `job_info`ADD COLUMN `inc_start_time` DATETIME NULL DEFAULT NULL COMMENT '增量初始时间' AFTER `jvm_param`; -- ------------------------------ Table structure for job_template-- ----------------------------DROP TABLE IF EXISTS `job_template`;CREATE TABLE `job_template`  (  `id` int(11) NOT NULL AUTO_INCREMENT,  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',  `job_cron` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '任务执行CRON',  `job_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,  `add_time` datetime(0) NULL DEFAULT NULL,  `update_time` datetime(0) NULL DEFAULT NULL,  `user_id` int(11) NOT NULL COMMENT '修改用户',  `alarm_email` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '报警邮件',  `executor_route_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器路由策略',  `executor_handler` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器任务handler',  `executor_param` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行器参数',  `executor_block_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '阻塞处理策略',  `executor_timeout` int(11) NOT NULL DEFAULT 0 COMMENT '任务执行超时时间,单位秒',  `executor_fail_retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '失败重试次数',  `glue_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'GLUE类型',  `glue_source` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT 'GLUE源代码',  `glue_remark` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'GLUE备注',  `glue_updatetime` datetime(0) NULL DEFAULT NULL COMMENT 'GLUE更新时间',  `child_jobid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',  `trigger_last_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '上次调度时间',  `trigger_next_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '下次调度时间',  `job_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT 'datax运行脚本',  `jvm_param` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'jvm参数',  `project_id` int(11) NULL DEFAULT NULL COMMENT '所属项目Id',  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 22 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; /**添加数据源字段 */ALTER TABLE `job_jdbc_datasource`ADD COLUMN `datasource` VARCHAR(45) NOT NULL COMMENT '数据源' AFTER `datasource_name`; /**添加分区字段 */ALTER TABLE `job_info`ADD COLUMN `partition_info` VARCHAR(100) NULL DEFAULT NULL COMMENT '分区信息' AFTER `inc_start_time`; /**2.1.1版本新增---------------------------------------------------------------------------------------------- *//**最近一次执行状态 */ALTER TABLE `job_info`ADD COLUMN `last_handle_code` INT(11) NULL DEFAULT '0' COMMENT '最近一次执行状态' AFTER `partition_info`; /**zookeeper地址 */ALTER TABLE `job_jdbc_datasource`ADD COLUMN `zk_adress` VARCHAR(200) NULL DEFAULT NULL AFTER `jdbc_driver_class`; ALTER TABLE `job_info`CHANGE COLUMN `executor_timeout` `executor_timeout` INT(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位分钟' ; /**用户名密码改为非必填 */ALTER TABLE `job_jdbc_datasource`CHANGE COLUMN `jdbc_username` `jdbc_username` VARCHAR(100) CHARACTER SET 'utf8mb4' NULL DEFAULT NULL COMMENT '用户名' ,CHANGE COLUMN `jdbc_password` `jdbc_password` VARCHAR(200) CHARACTER SET 'utf8mb4' NULL DEFAULT NULL COMMENT '密码' ;/**添加mongodb数据库名字段 */ALTER TABLE `job_jdbc_datasource`ADD COLUMN `database_name` VARCHAR(45) NULL DEFAULT NULL COMMENT '数据库名' AFTER `datasource_group`;/**添加执行器资源字段 */ALTER TABLE `job_registry`ADD COLUMN `cpu_usage` DOUBLE NULL AFTER `registry_value`,ADD COLUMN `memory_usage` DOUBLE NULL AFTER `cpu_usage`,ADD COLUMN `load_average` DOUBLE NULL AFTER `memory_usage`; -- ------------------------------ Table structure for job_permission-- ----------------------------DROP TABLE IF EXISTS `job_permission`;CREATE TABLE `job_permission`  (  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',  `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '权限名',  `description` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '权限描述',  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,  `pid` int(11) NULL DEFAULT NULL,  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;   ALTER TABLE `job_info`ADD COLUMN `replace_param_type` varchar(255) NULL COMMENT '增量时间格式' AFTER `last_handle_code`;  ALTER TABLE `job_info`ADD COLUMN `project_id` int(11) NULL COMMENT '所属项目id' AFTER `job_desc`; ALTER TABLE `job_info`ADD COLUMN `reader_table` VARCHAR(255) NULL COMMENT 'reader表名称' AFTER `replace_param_type`,ADD COLUMN `primary_key` VARCHAR(50) NULL COMMENT '增量表主键' AFTER `reader_table`,ADD COLUMN `inc_start_id` VARCHAR(20) NULL COMMENT '增量初始id' AFTER `primary_key`,ADD COLUMN `increment_type` TINYINT(4) NULL COMMENT '增量类型' AFTER `inc_start_id`,ADD COLUMN `datasource_id` BIGINT(11) NULL COMMENT '数据源id' AFTER `increment_type`; CREATE TABLE `job_project`  (  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',  `name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'project name',  `description` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,  `user_id` int(11) NULL DEFAULT NULL COMMENT 'creator id',  `flag` tinyint(4) NULL DEFAULT 1 COMMENT '0 not available, 1 available',  `create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT 'create time',  `update_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT 'update time',  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;  ALTER TABLE `job_info`CHANGE COLUMN `author` `user_id` INT(11) NOT NULL COMMENT '修改用户' ; ALTER TABLE `job_info`CHANGE COLUMN `increment_type` `increment_type` TINYINT(4) NULL DEFAULT 0 COMMENT '增量类型' ;


3.3 bootstrap配置文件挂载位置

如果是在Linux上在宿主机创建/home/datax/datax-admin/conf目录,并将bootstrap.properties拷贝到/home/datax/datax-admin/conf目录下
如果是在Windows10专业版上在D:\datax\datax-admin\conf\下新建bootstrap.properties文件
#Database 
DB_HOST=xx.xx.xx.xx
DB_PORT=3306
DB_USERNAME=datax_web
DB_PASSWORD=xxxxxx
DB_DATABASE=datax_web_db


3.4 docker命令启动

#linux上的执行命令如下:
docker run -d --name datax_web -p 9527:9527 -v /home/datax/datax-admin/conf/bootstrap.properties:/home/datax/datax-web-2.1.2/modules/datax-admin/conf/bootstrap.properties linshellfeng/datax_web:3.0.1

# windows10专业版上执行命令如下:
docker run -d --name datax_web -p 9527:9527 -v "D:\datax\datax-admin\conf\bootstrap.properties":/home/datax/datax-web-2.1.2/modules/datax-admin/conf/bootstrap.properties linshellfeng/datax_web:3.0.1


3.5 修改core.json 配置修改

docker exec -it b9b /bin/bash
vim /home/datax/datax/conf/core.json
byte字段 由原来的的-1改为:2000000


3.6 访问首页

账号密码:admin/123456http://ip:9527/index.html


4.全量同步

这个比较简单,省略该步骤
这里做一个简单的说明:本篇文章中实践的是mysql8.0数据库的一个数据库test1中的test1表(源数据),然后需要同步到test1数据库的test2(目标数据),都是同一个数据库,test1表和test2表的结构是一样的,这个是简单的数据库(同库或异库)表对表的数据同步,还可以写表与表的关联查询,然后将关联数据同步到目标库的目标表中,这个本文没有搞,有兴趣的可以去探索尝试下它的一些新玩法和新姿势。


5.增量同步

时间段增量同步和id段增量同步
下面的是错误的例子:这种写的博客有好多坑的博客都是这种写的,这种写不报错,就是执行跑不出你想要的增量数据:
错误demo:
{  "job": {    "setting": {      "speed": {        "channel": 3,        "byte": 1048576      },      "errorLimit": {        "record": 0,        "percentage": 0.02      }    },    "content": [      {        "reader": {          "name": "mysqlreader",          "parameter": {            "username": "==",            "password": "==",            "splitPk": "Id",            "connection": [              {                //这种方式是错误的,没有效果的                "querySql": [                  "select Id,Name,Addrress,CreateTime,UpdateTime from products where UpdateTime >= ${lastTime} and UpdateTime < ${currentTime} "                ],                "jdbcUrl": [                  "jdbc:mysql://192.168.31.132:3306/demo"                ]              }            ]          }        },        "writer": {          "name": "mysqlwriter",          "parameter": {            "username": "==",            "password": "==",            "writeMode": "update",            "column": [              "`Id`",              "`Name`",              "`Addrress`",              "`CreateTime`",              "`UpdateTime`"            ],            "connection": [              {                "table": [                  "products2"                ],                "jdbcUrl": "jdbc:mysql://192.168.31.132:3306/demo"              }            ]          }        }      }    ]  }}

正确的是在where条件当中:
 "where": " create_time >= ${lastTime} and create_time < ${currentTime}",
id段也是这种搞的,id的这个我没有试过,但是我相信是可以的,时间段增量的都可以的,id的也是没有啥问题的,放在where条件当中
正确demo:
{  "job": {    "setting": {      "speed": {        "channel": 3,        "byte": 1048576      },      "errorLimit": {        "record": 0,        "percentage": 0.02      }    },    "content": [      {        "reader": {          "name": "mysqlreader",          "parameter": {            "username": "xxxxxx",            "password": "xxxxxx",            "column": [              "`id`",              "`order_id`",              "`create_time`",              "`update_time`",              "`remark`"              ,,,,,,,,,,,,            ],            "where": " create_time >= ${lastTime} and create_time < ${currentTime}",            "splitPk": "",            "connection": [              {                "table": [                  "bc_order"                ],                "jdbcUrl": [                  "jdbc:mysql://ip:3306/test"                ]              }            ]          }        },        "writer": {          "name": "mysqlwriter",          "parameter": {            "username": "xxxxxx",            "password": "xxxx",            "column": [              "`id`",              "`order_id`",              "`create_time`",              "`update_time`",              "`remark`"              ,,,,,,,,,,,,            ],            "connection": [              {                "table": [                  "bc_order_copy1"                ],                "jdbcUrl": "jdbc:mysql://ip:3306/test"              }            ]          }        }      }    ]  }}
这个json不需要手写的,由dataxWeb给我们自动生成的,也很方便。


5.1时间段增量同步

页面任务配置:

打开菜单任务管理页面,选择添加任务

按下图中步骤进行配置

说明:

  • 1.任务类型选DataX任务

  • 2.辅助参数选择时间自增

  • 3.增量开始时间选择,即sql中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第一次同步完成后,该时间被更新为上一次的任务触发时间,任务失败不更新。

  • 4.增量时间字段,-DlastTime='%s' -DcurrentTime='%s' 先来解析下这段字符串

1.-D是DataX参数的标识符,必配
2.-D后面的lastTime和currentTime是DataX json中where条件的时间字段标识符,必须和json中的变量名称保持一致

3.='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致

4.注意-DlastTime='%s'和-DcurrentTime='%s'中间有一个空格,空格必须保留并且是一个空格

5.时间格式,可以选择自己数据库中时间的格式,也可以通过json中配置sql时间转换函数来处理

demo如下:

{  "job": {    "setting": {      "speed": {        "channel": 3,        "byte": -1      },      "errorLimit": {        "record": 0,        "percentage": 0.02      }    },    "content": [      {        "reader": {          "name": "mysqlreader",          "parameter": {            "username": "7aAw6fAFXgqP2weyjjwIAw==",            "password": "1Sh8F0VGrkzgnRXsNXowUAxSS1xnCyE8TrEgzQ7ZE40=",            "column": [              "ID",              "CREATE_TIME",              "USER_ID",              "UPDATE_TIME",              "LAST_MODIFY_USER_ID"            ],            //如果选择的是时间戳需要用FROM_UNIXTIME这个函数进行转换下的,下面有说明            "where": " CREATE_TIME >= FROM_UNIXTIME(${lastTime}) and CREATE_TIME < FROM_UNIXTIME(${currentTime})",            "splitPk": "ID",            "connection": [              {                "table": [                  "t_test"                ],                "jdbcUrl": [                  "jdbc:mysql://127.0.0.1:3306/test"                ]              }            ]          }        },        "writer": {          "name": "clickhousewriter",          "parameter": {            "username": "OhlJ4g2KfCRznayQNh0eng==",            "password": "ONwWYPUDMPXDIREymhWAMQ==",            "column": [              "ID",              "CREATE_TIME",              "USER_ID",              "UPDATE_TIME",              "LAST_MODIFY_USER_ID"            ],            "connection": [              {                "table": [                  "tb"                ],                "jdbcUrl": "jdbc:clickhouse://localhost:8123/local"              }            ]          }        }      }    ]  }}

说明:1.此处的关键点在${lastTime}${currentTime}${}是DataX动态参数的固定格式,lastTime,currentTime就是我们页面配置中 -DlastTime='%s' -DcurrentTime='%s'中的lastTime,currentTime,注意字段一定要一致。2.如果任务配置页面,时间类型选择为时间戳但是数据库时间格式不是时间戳,例如是:2019-11-26 11:40:57 此时可以用FROM_UNIXTIME(${lastTime})进行转换。select * from test_list where operationDate >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})

5.2id段增量同步

页面任务配置

打开菜单任务管理页面,选择添加任务

按下图中步骤进行配置

说明:

  • 1.任务类型选DataX任务

  • 2.辅助参数选择主键自增

  • 3.增量主键开始ID选择,即sql中查询ID的开始ID,用户使用此选项方便第一次的全量同步。第一次同步完成后,该ID被更新为上一次的任务触发时最大的ID,任务失败不更新。

  • 4.增量时间字段,-DstartId='%s' -DendId='%s' 先来解析下这段字符串

1.-D是DataX参数的标识符,必配

2.-D后面的startId和endId是DataX json中where条件的id字段标识符,必须和json中的变量名称保持一致

3.='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致

4.注意-DstartId='%s'和-DendId='%s' 中间有一个空格,空格必须保留并且是一个空格

5.reader数据源,选择任务同步的读数据源

6.配置reader数据源中需要同步数据的表名及该表的主键
此处的关键点在${startId},${endId},${}是DataX动态参数的固定格式,startId,endId就是我们页面配置中 -DstartId='%s' -DendId='%s'中的startId,endId,注意字段一定要一致。

demo如下:

{  "job": {    "setting": {      "speed": {        "channel": 3,        "byte": -1      },      "errorLimit": {        "record": 0,        "percentage": 0.02      }    },    "content": [      {        "reader": {          "name": "mysqlreader",          "parameter": {            "username": "7aAw6fAFXgqP2weyjjwIAw==",            "password": "1Sh8F0VGrkzgnRXsNXowUAxSS1xnCyE8TrEgzQ7ZE40=",            "column": [              "ID",              "CREATE_TIME",              "USER_ID",              "UPDATE_TIME",              "LAST_MODIFY_USER_ID"            ],            "where": " ID >= ${startId} and ID < ${endId} ",            "splitPk": "ID",            "connection": [              {                "table": [                  "t_test"                ],                "jdbcUrl": [                  "jdbc:mysql://127.0.0.1:3306/test"                ]              }            ]          }        },        "writer": {          "name": "clickhousewriter",          "parameter": {            "username": "OhlJ4g2KfCRznayQNh0eng==",            "password": "ONwWYPUDMPXDIREymhWAMQ==",            "column": [              "ID",              "CREATE_TIME",              "USER_ID",              "UPDATE_TIME",              "LAST_MODIFY_USER_ID"            ],            "connection": [              {                "table": [                  "tb"                ],                "jdbcUrl": "jdbc:clickhouse://localhost:8123/local"              }            ]          }        }      }    ]  }}

6.遇到的问题及决绝办法

在做这个实践的时候,我用的是之前本地flink-cdc的实践所安装的mysql5.7.1的数据库,使用的是windows10操作系统的docker环境,本文也是使用windows10操作系统的docker环境,然后在执行的任务的时候就会报一个加载mysql-connector-java-8.0.30.jar异常的错误,后面我在docker容器文件界面找到了dataxWeb的lib所在的路径下把这个8.0的驱动包删除,换了一个5.7x的版本的jar包,你后面启动,然后执行报了一个如下错误:
readlag fail, logFile not exists
我还以为是我之前改动了上面那个jar包导致镜像文件改变了,后面我把之前的启动的容器和下载的镜像全部删除,重新执行docker命令,重新下载镜像,拉起容器,然后继续尝试,结果还是报这个错误,后面我进入到datax下面的bin目录下,执行了datax的自检,一直是自检有问题,看日志是jar的依赖冲突导致,关于datax的启动自检(datax、dataxWeb使用可执行包安装就有这个步骤了,这个源码暗转比较复杂,可以参考网上的教程,本文使用docker镜像的方式简单方便快捷的就可以使用体验上datax和dataxWeb,这个镜像都包含这个两个,都是开箱即可使用,只需要安装上面的步骤配置下即可快速使用),网上也有教程,这里就不过多的讲解,否则对大家带来误解,后面左搞右搞还是这个错误,我就怀疑是不是mysql5.7x的数据库这个dataxWeb的镜像不支持,后面使用了一个mysql8.x的数据库进行了再一次尝试,结果发现没有这个问题了,这个也是很坑的一个问题。要解决这个问题有两个方法:
1.上网找dataxWeb镜像支持mysql数据库5.7x的镜像
2.把dataxWeb的源码拉到本地修改pom的mysql依赖改成5.7.1重新打包构建,然后重新构建一个dataxWeb的镜像
这个两个只是一个思路提供给大家,有兴趣的可以去尝试下。
数据库为mysql8.0时添加数据源需要注意:

7.总结

本次分享到此结束,会使用datax同步数据,在异构数据源的情况下,如果不会这个工具,那只能写crud的方式写一大堆业务代码来完成数据的同步,很容易出问题,一个装B的写法一个不小心就会写出bug导致翻车,造成一些问题和事故,所以能不写代码实现就不写代码,不一定要写代码才可以实现,只会写代码实现就是一种定式思维和惯性思维,条条大路通罗马,没有必要一上来就写代码,保持好奇心,每天学习研究点新东西,不至于天天月月年年在crud,希望我的分享对你有所帮助,请一键三连,么么么哒!



请使用浏览器的分享功能分享到微信等