dolphinscheduler数据集成seatunnel使用

dolphinscheduler 环境配置

  1. 修改/etc/profile 文件,增加如下环境变量

    1
    2
    export SEATUNNEL_HOME=/opt/soft/seatunnel
    export FLINK_HOME=/opt/soft/flink
  2. 将seatunnel、flink的包导入到对应的目录下

    1
    2
    3
    4
    [root@flink3 soft]# ls 
    datax flink python2.7 seatunnel
    [root@flink3 soft]# pwd
    /opt/soft

seatunnel 环境配置

  1. 将需要用到的连接器放置到connector目录下

    1
    2
    3
    4
    [root@flink3 connectors]# ls                            
    connector-console-2.3.6.jar connector-fake-2.3.6.jar connector-kafka-2.3.6.jar plugin-mapping.properties
    [root@flink3 connectors]# pwd
    /opt/soft/seatunnel/connectors
  2. 启动flink

    1
    2
    3
    4
    5
    [root@flink3 flink]# cd /opt/soft/flink/
    [root@flink3 flink]# ./bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host flink3.
    Starting taskexecutor daemon on host flink3.
  3. 安装依赖

    1
    2
    3
    [root@flink3 ~]# cd /opt/soft/seatunnel/                  
    [root@flink3 seatunnel]# ./bin/install-plugin.sh
    Install SeaTunnel connectors plugins, usage version is 2.3.6

使用记录

FakeSource2JdbcSink

从fakesource数据源同步数据到postgresql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

sink {
jdbc {
url = "jdbc:postgresql://192.168.80.130:5432/dolphinscheduler"
driver = "org.postgresql.Driver"
user = root
password = root
query = "insert into test_table(name,age) values(?,?)"
}
}

执行结果:

1
2
3
4
5
6
7
8
Program execution finished
Job with JobID addcfed5d78c2088213b52f7febcc7f8 has finished.
Job Runtime: 4231 ms
Accumulator Results:
- SourceReceivedBytes (java.lang.Long): 144
- SourceReceivedCount (java.lang.Long): 16
- SinkWriteBytes (java.lang.Long): 144
- SinkWriteCount (java.lang.Long): 16

kafka2kafka(batch模式)

源数据格式:

1
2
3
"{\n    \"CONS_ID\": 612559311320.0,\n    \"CONS_NAME\": null,\n    \"MP_NO\": \"812410734530050\",\n    \"METER_ID\": 4.031410087370826e+16,\n    \"T_FACTOR\": 1.0,\n    \"ORG_NO\": nul
l,\n \"MADE_NO\": null,\n \"CONS_NO\": \"98924870943176\",\n \"ACCOUNT_FLAG\": 1,\n \"RMP_ID\": 2.500800021241501e+16,\n \"RUN_CAP\": 8.0,\n \"RATED_CAP\": 26.4,\n \
"IS_HIGH\": \"0\"\n}"

源数据中多了很多换行符,转义符,无法进行有效json化,使用seatunnel将字符替换,写入新topic中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
Kafka {
result_table_name = "kafka1"
schema = {
fields {
name = "string"
}
}
format = text
field_delimiter = "#"
topic = "static"
bootstrap.servers = "192.168.80.130:9092"
kafka.config = {
client.id = client_2
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
}
}

transform {
Replace {
source_table_name = "kafka1"
result_table_name = "kafka2"
replace_field = "name"
pattern = "\\n"
replacement = ""
}
Replace {
source_table_name = "kafka2"
result_table_name = "kafka3"
replace_field = "name"
pattern = " "
replacement = ""
}
Replace {
source_table_name = "kafka3"
result_table_name = "kafka4"
replace_field = "name"
pattern = "\\"
replacement = ""
}
Replace {
source_table_name = "kafka4"
result_table_name = "kafka5"
replace_field = "name"
pattern = "\"{"
replacement = "{"
}
Replace {
source_table_name = "kafka5"
result_table_name = "kafka6"
replace_field = "name"
pattern = "}\""
replacement = "}"
}
}

sink {
kafka {
source_table_name = "kafka6"
topic = "static_test"
bootstrap.servers = "192.168.80.130:9092"
format = text
kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
}
}

运行后,新topic数据为:

1
2
{"CONS_ID":7016732.0,"CONS_NAME":null,"MP_NO":"906405204505770","METER_ID":6.219086100432489e+16,"T_FACTOR":600.0,"ORG_NO":null,"MADE_NO":null,"CONS_NO":"12208475428490","ACCOUNT_FLAG":1
,"RMP_ID":11025246156425.0,"RUN_CAP":400.0,"RATED_CAP":2160.0,"IS_HIGH":"1"}

批处理模式下,当当前批次的数据处理完成后,flink 任务即结束
图片

kafka2pg(stream模式)

基于上一个已处理后的topic,将其解析同步到postgres

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
env {
execution.parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 10000
}

source {
Kafka {
schema = {
fields {
METER_ID = "string"
CONS_NO = "string"
}
}
topic = "static_test"
bootstrap.servers = "192.168.80.130:9092"
result_table_name = "kafka"
consumer.group = "seatunnel_group121"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
}
}

transform {
Sql {
source_table_name = "kafka"
result_table_name = "postgres"
query = "select METER_ID, CAST(CONS_NO AS LONG) as CONS_NO from kafka"
}
}

sink {
jdbc {
source_table_name = "postgres"
url = "jdbc:postgresql://192.168.80.130:5432/dolphinscheduler"
driver = "org.postgresql.Driver"
user = root
password = root
query = "insert into test_table(name,age) values(?,?)"
}
}

运行成功:

1
2
3
4
5
dolphinscheduler=> select count(0) from test_table;  
count
--------
323251
(1 row)

流处理模式下,flink持续监听,即使在dolphinscheduler 停止工作流,也不会对flink 任务产生影响

kafka2kafka + kafka2pg

将kafka2kafka 中的批处理模式改成流处理,然后尝试将kafka2kafka 和kafka2pg放入一个工作流中
图片
点击运行
由于流处理机制,第一个节点任务始终没有结束,导致kafka2pg这个的节点无法启动
图片
图片
去掉前后关系重新执行
图片
图片
两个任务执行成功

自定义transform

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
env {
execution.parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 10000
}

source {
Kafka {
schema = {
fields {
METER_ID = "string"
CONS_NO = "string"
}
}
topic = "static_test"
bootstrap.servers = "192.168.80.130:9092"
result_table_name = "kafka"
consumer.group = "seatunnel_group1121"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
}
}

transform {
Sql {
source_table_name = "kafka"
result_table_name = "t1"
query = "select METER_ID, CAST(CONS_NO AS LONG) as CONS_NO from kafka"
}
DynamicCompile {
source_table_name = "t1"
result_table_name = "postgres"
compile_language="JAVA"
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {

ArrayList<Column> columns = new ArrayList<Column>();
PhysicalColumn destColumn =
PhysicalColumn.of(
"aa",
BasicType.STRING_TYPE,
10,
true,
"",
"");
return new Column[]{
destColumn
};

}
public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
System.out.println(inputRow);
Object[] fieldValues = new Object[1];
fieldValues[0]="AA";
return fieldValues;
}
"""

}
}

sink {
jdbc {
source_table_name = "postgres"
url = "jdbc:postgresql://192.168.80.130:5432/dolphinscheduler"
driver = "org.postgresql.Driver"
user = root
password = root
query = "insert into test_table(name,age,aa) values(?,?,?)"
}
}

运行成功:

1
2
3
4
5
6
7
8
dolphinscheduler=> select * from test_table where aa is not null;                                                                                                                         
name | age | max | aa
-----------------------+----------------+-----+----
6.2190861004324888E16 | 12208475428490 | | AA
3.0110215051031144E16 | 36992910882294 | | AA
7.0172856700350416E16 | 52092370249122 | | AA
4.0013183094105224E16 | 92287332561949 | | AA
8.6010138660437216E16 | 25099441592023 | | AA

多source多sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
env {
execution.parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 10000
}

source {
Kafka {
result_table_name = "kafka1"
schema = {
fields {
name = "string"
}
}
format = text
field_delimiter = "#"
topic = "static"
bootstrap.servers = "192.168.80.130:9092"
consumer.group = "seatunnel_grouptest"
kafka.config = {
client.id = client_2
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
}
Kafka {
result_table_name = "kafka_new"
schema = {
fields {
name = "string"
}
}
format = text
field_delimiter = "#"
consumer.group = "seatunnel_grouptest"
topic = "static_test"
bootstrap.servers = "192.168.80.130:9092"
kafka.config = {
client.id = client_2
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
}
}

transform {
Replace {
source_table_name = "kafka1"
result_table_name = "kafka2"
replace_field = "name"
pattern = "\\n"
replacement = ""
}
Replace {
source_table_name = "kafka1"
result_table_name = "kafka3"
replace_field = "name"
pattern = " "
replacement = ""
}
Replace {
source_table_name = "kafka3"
result_table_name = "kafka4"
replace_field = "name"
pattern = "\\"
replacement = ""
}
Replace {
source_table_name = "kafka4"
result_table_name = "kafka5"
replace_field = "name"
pattern = "\"{"
replacement = "{"
}
Replace {
source_table_name = "kafka_new"
result_table_name = "kafka6"
replace_field = "name"
pattern = "}\""
replacement = "}"
}
}

sink {
kafka {
source_table_name = "kafka5"
topic = "static_test"
bootstrap.servers = "192.168.80.130:9092"
format = text
kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
}
kafka {
source_table_name = "kafka6"
topic = "kafka6"
bootstrap.servers = "192.168.80.130:9092"
format = text
kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
}
}

运行结果:
图片
缺陷:多数据源直接不能做连接操作

问题记录

  1. Caused by: java.lang.IllegalStateException: Initialize flink context failed

    at org.apache.seatunnel.translation.flink.sink.FlinkSinkWriterContext.getStreamingRuntimeContextForV15(FlinkSinkWriterContext.java:80)
    

    高版本flink 不兼容
    图片

  2. Plugin PluginIdentifier{engineType=’seatunnel’, pluginType=’sink’, pluginName=’jdbc’} not found
    connector-jdbc-2.3.6.jar未导入

  3. ClassNotFoundException: org.postgresql.Driver
    1
    2
    3
    4
    [root@flink3 lib]# ls                             
    postgresql-42.3.3.jar
    [root@flink3 lib]# pwd
    /opt/soft/seatunnel/plugins/jdbc/lib