环境配置
- 修改/etc/profile 文件,增加如下环境变量,如果是docker容器,需要进容器内修改,或者映射进去
1
2
3
4export JAVA_HOME=/opt/java/openjdk
export PYTHON_LAUNCHER=/opt/soft/python2.7/bin/python
export DATAX_LAUNCHER=/opt/soft/datax/bin/datax.py
export PATH=$JAVA_HOME/bin:$PATH
因为dolphinscheduler执行datax脚本的方式为sudo运行,sudo运行时,会默认重置环境变量为安全的环境变量,也即,但前设置的变量都会失效,只有少数配置文件中指定的环境变量能保存下来。
不这么设置会报line 4: --jvm=-Xms1G -Xmx1G: command not found
的错误
- 将datax和python的包导入到对应的目录下,docker容器的话,需要导入到其映射目录下
使用记录
相同表数据同步
从mysql 表employees 同步到 postgres表employees 中 两个表字段相同
点击运行后,成功同步
字段映射
创建一个新表,字段名全部修改后做同步
使用as语法来做字段的映射,运行后成功
默认值填充
给postgresql 的employees表新增一个字段,同步时,使用默认值填充
同样使用as语法来做字段的映射,运行后成功
Transformer
在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。
目前datax 支持的transformer有6个:
- dx_substr 从字符串的指定位置(包含)截取指定长度的字符串
- dx_pad 字符串填充
- dx_replace 字符串替换
- dx_filter 记录过滤
- dx_digest 字符串hash
- dx_groovy 使用groovy code的形式自定义转换函数
Transformer 需要借助自定义模板实现,在模板中配置对应的json文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"emp_no",
"birth_date",
"first_name",
"last_name",
"gender",
"hire_date",
"0"
],
"connection": [
{
"table": [
"employees"
],
"jdbcUrl": [
"jdbc:mysql://192.168.80.130:3306/employees?useSSL=false"
]
}
]
}
},
"transformer": [
{
"name": "dx_substr",
"parameter": {
"columnIndex": 3,
"paras": [
"1",
"8"
]
}
},
{
"name": "dx_groovy",
"parameter": {
"code": "Column dataTimeColumn = record.getColumn(1);
Date birthData = dataTimeColumn.asDate();
if (birthData.getTime() < -315648000000) {
record.setColumn(6,new LongColumn(1));
} else {
record.setColumn(6,new LongColumn(0));
}
return record;"
},
"extraPackage": [
"import java.util.Date;",
"import java.text.SimpleDateFormat"
]
}
],
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "root",
"password": "root",
"column": [
"emp_no1",
"birth_date1",
"first_name1",
"last_name1",
"gender1",
"hire_date1",
"old"
],
"preSql": [
"delete from employees1"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://192.168.80.130:5432/dolphinscheduler",
"table": [
"employees1"
]
}
]
}
}
}
]
}
}
在配置文件中配置了两个transformer:
dx_substr
:将source表中第三列的字符串截取第1位到第8位dx_groovy
:自定义转换函数代码,判断出生日期与 -315648000000(1960-01-01 00:00:00)的大小,来赋值目的表中的第6列
执行成功:
Upsert
在自定义模板中使用writeMode来配置写入方式 "writeMode": "update (emp_no)"
表示根据emp_no列进行插入或更新。
注意:datax的官方版本不支持 postgres的update模式,需要修改一下源码重新打包datax实现 https://www.cnblogs.com/xmzpc/p/15429291.html
同样自定义模板,加入writeMode进行配置:
1 | { |
修改一下源端的一个值,看一下update是否实现
运行任务:!
查看目的端,可以看到对应的值也被更新且被我们的transformer函数所处理
增量更新
在配置文件中新增"where": "from_date >= '$[yyyy-MM-dd -1]' and from_date < '$[yyyy-MM-dd]'"
$[...]
格式为dolphinscheduler的基准变量 $[yyyyMMddHHmmss]
代表当前任务的调度时间,通过这种方式结合定时任务执行,可以每次获取到增量更新的数据进行同步;
$[yyyyMMddHHmmss]
是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等
1 | { |
运行成功:
查看结果:
问题记录
–jvm=-Xms1G -Xmx1G: command not found
修改环境变量org.postgresql.util.PSQLException: The authentication type 10 is not supported. Check that you have configured the pg_hba.conf file to include the client’s IP address or subnet, and that it is using an authentication scheme supported by the driver.
datax默认的postgres jdbc驱动版本太低,自行下载一个高版本的驱动替换即可