dolphinscheduler数据集成datax使用

环境配置

  1. 修改/etc/profile 文件,增加如下环境变量,如果是docker容器,需要进容器内修改,或者映射进去
    1
    2
    3
    4
    export JAVA_HOME=/opt/java/openjdk                                                       
    export PYTHON_LAUNCHER=/opt/soft/python2.7/bin/python
    export DATAX_LAUNCHER=/opt/soft/datax/bin/datax.py
    export PATH=$JAVA_HOME/bin:$PATH

因为dolphinscheduler执行datax脚本的方式为sudo运行,sudo运行时,会默认重置环境变量为安全的环境变量,也即,但前设置的变量都会失效,只有少数配置文件中指定的环境变量能保存下来。
图片
不这么设置会报line 4: --jvm=-Xms1G -Xmx1G: command not found的错误

  1. 将datax和python的包导入到对应的目录下,docker容器的话,需要导入到其映射目录下
    图片

使用记录

相同表数据同步

从mysql 表employees 同步到 postgres表employees 中 两个表字段相同
图片
点击运行后,成功同步
图片

字段映射

创建一个新表,字段名全部修改后做同步
图片
使用as语法来做字段的映射,运行后成功
图片

默认值填充

给postgresql 的employees表新增一个字段,同步时,使用默认值填充
图片
同样使用as语法来做字段的映射,运行后成功
图片

Transformer

在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。
目前datax 支持的transformer有6个:

  1. dx_substr 从字符串的指定位置(包含)截取指定长度的字符串
  2. dx_pad 字符串填充
  3. dx_replace 字符串替换
  4. dx_filter 记录过滤
  5. dx_digest 字符串hash
  6. dx_groovy 使用groovy code的形式自定义转换函数
    Transformer 需要借助自定义模板实现,在模板中配置对应的json文件
    图片
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    {
    "job": {
    "setting": {
    "speed": {
    "channel": 1
    }
    },
    "content": [
    {
    "reader": {
    "name": "mysqlreader",
    "parameter": {
    "username": "root",
    "password": "root",
    "column": [
    "emp_no",
    "birth_date",
    "first_name",
    "last_name",
    "gender",
    "hire_date",
    "0"
    ],
    "connection": [
    {
    "table": [
    "employees"
    ],
    "jdbcUrl": [
    "jdbc:mysql://192.168.80.130:3306/employees?useSSL=false"
    ]
    }
    ]
    }
    },
    "transformer": [
    {
    "name": "dx_substr",
    "parameter": {
    "columnIndex": 3,
    "paras": [
    "1",
    "8"
    ]
    }
    },
    {
    "name": "dx_groovy",
    "parameter": {
    "code": "Column dataTimeColumn = record.getColumn(1);
    Date birthData = dataTimeColumn.asDate();
    if (birthData.getTime() < -315648000000) {
    record.setColumn(6,new LongColumn(1));
    } else {
    record.setColumn(6,new LongColumn(0));
    }
    return record;"
    },
    "extraPackage": [
    "import java.util.Date;",
    "import java.text.SimpleDateFormat"
    ]
    }
    ],
    "writer": {
    "name": "postgresqlwriter",
    "parameter": {
    "username": "root",
    "password": "root",
    "column": [
    "emp_no1",
    "birth_date1",
    "first_name1",
    "last_name1",
    "gender1",
    "hire_date1",
    "old"
    ],
    "preSql": [
    "delete from employees1"
    ],
    "connection": [
    {
    "jdbcUrl": "jdbc:postgresql://192.168.80.130:5432/dolphinscheduler",
    "table": [
    "employees1"
    ]
    }
    ]
    }
    }
    }
    ]
    }
    }

在配置文件中配置了两个transformer:

  • dx_substr :将source表中第三列的字符串截取第1位到第8位
  • dx_groovy:自定义转换函数代码,判断出生日期与 -315648000000(1960-01-01 00:00:00)的大小,来赋值目的表中的第6列
    执行成功:
    图片

Upsert

在自定义模板中使用writeMode来配置写入方式 "writeMode": "update (emp_no)" 表示根据emp_no列进行插入或更新。

注意:datax的官方版本不支持 postgres的update模式,需要修改一下源码重新打包datax实现 https://www.cnblogs.com/xmzpc/p/15429291.html
同样自定义模板,加入writeMode进行配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"emp_no",
"birth_date",
"first_name",
"last_name",
"gender",
"hire_date",
"0"
],
"connection": [
{
"table": [
"employees"
],
"jdbcUrl": [
"jdbc:mysql://192.168.80.130:3306/employees?useSSL=false"
]
}
]
}
},
"transformer": [
{
"name": "dx_substr",
"parameter": {
"columnIndex": 3,
"paras": [
"1",
"8"
]
}
},
{
"name": "dx_groovy",
"parameter": {
"code": "Column dataTimeColumn = record.getColumn(1);
Date birthData = dataTimeColumn.asDate();
if (birthData.getTime() < -315648000000) {
record.setColumn(6,new LongColumn(1));
} else {
record.setColumn(6,new LongColumn(0));
}
return record;"
},
"extraPackage": [
"import java.util.Date;",
"import java.text.SimpleDateFormat"
]
}
],
"writer": {
"name": "postgresqlwriter",
"parameter": {
"writeMode": "update (emp_no1)",
"username": "root",
"password": "root",
"column": [
"emp_no1",
"birth_date1",
"first_name1",
"last_name1",
"gender1",
"hire_date1",
"old"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://192.168.80.130:5432/dolphinscheduler",
"table": [
"employees1"
]
}
]
}
}
}
]
}
}

修改一下源端的一个值,看一下update是否实现
图片
运行任务:!
图片
查看目的端,可以看到对应的值也被更新且被我们的transformer函数所处理
图片

增量更新

在配置文件中新增"where": "from_date >= '$[yyyy-MM-dd -1]' and from_date < '$[yyyy-MM-dd]'"

$[...] 格式为dolphinscheduler的基准变量 $[yyyyMMddHHmmss]代表当前任务的调度时间,通过这种方式结合定时任务执行,可以每次获取到增量更新的数据进行同步;

$[yyyyMMddHHmmss] 是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"*"
],
"where": "from_date >= '$[yyyy-MM-dd -1]' and from_date < '$[yyyy-MM-dd]'"
"connection": [
{
"table": [
"salaries"
],
"jdbcUrl": [
"jdbc:mysql://192.168.80.130:3306/employees?useSSL=false"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "root",
"password": "root",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://192.168.80.130:5432/dolphinscheduler",
"table": [
"salaries"
]
}
]
}
}
}
]
}
}

运行成功:
图片
查看结果:
图片

问题记录

  1. –jvm=-Xms1G -Xmx1G: command not found
    修改环境变量

  2. org.postgresql.util.PSQLException: The authentication type 10 is not supported. Check that you have configured the pg_hba.conf file to include the client’s IP address or subnet, and that it is using an authentication scheme supported by the driver.
    datax默认的postgres jdbc驱动版本太低,自行下载一个高版本的驱动替换即可