1. 任务
任务是Filling数据摄取任务的主要抽象,允许用户配置处理数据的输入、流程、输出及其它运行时设定,追踪任务执行的状态和日志,并提供样例输入测试等附加功能。
1.1. 任务属性
1.1.1. 基本属性
- result_table_name:source或transfrom生成的目标表
- source_table_name: transfrom或sink的数据来源
- name:任务名称命名,任务标识, 也用于flink生成的jobgraph的名称
- parallelism: 定义任务正式运行时对资源的需求量,默认为1
- describe:任务描述信息
并行度需要根据集群剩余的资源数(task slot数)和任务复杂度、处理数据量进行设置。
1.1.2. 任务DAG
任务DAG是由组件、组件的属性配置及组件之间的数据流动关系定义构成的,是一个有向无环图。它定义了任务的数据处理逻辑:数据由一个或者多个数据源流入,经过多个转换器的处理后,流入一个或者多个输出从而落盘到文件、Kafka或者其他输出端。
任务DAG其内容的脚本化示例(以json格式为例)如下:
{
"env": {
"execution.parallelism": 1,
"job.name": "filling-01"
},
"source": [
{
"plugin_name": "KafkaTableStream",
"consumer.group.id": "filling15",
"topics": "batchSend",
"result_table_name": "KafkaTableStreamTable",
"format.type": "json",
"schema": "{\"host\":\"192.168.1.103\",\"source\":\"datasource\",\"MetricsName\":\"cpu\",\"value\":\"49\",\"_time\":1626571020000,\"trade_type\":\"未知\"}",
"format.allow-comments": "true",
"format.ignore-parse-errors": "true",
"offset.reset": "earliest",
"consumer.bootstrap.servers": "192.168.100.189:9092",
"parallelism": 10,
"name": "mykafka"
}
{
"plugin_name": "JdbcSource",
"driver": "com.mysql.jdbc.Driver",
"result_table_name": "JdbcSourceTable",
"url": "jdbc:mysql://10.10.14.17:3306/tmp",
"username": "aiops",
"password": "aiops",
"query": "select * from t_group"
}
],
"transform": [
{
"source_table_name": "KafkaTableStreamTable",
"result_table_name": "KafkaTableStreamTable_default",
"plugin_name": "DataJoin",
"join.source_table_name": [
"JdbcSourceTable"
],
"join.JdbcSourceTable.where": "workgroup = host",
"join.JdbcSourceTable.type": "left"
},
{
"source_table_name": "KafkaTableStreamTable_default",
"result_table_name": "DataSelector_default",
"plugin_name": "DataSelector",
"select.result_table_name": [
"DataSelector_unknown",
"DataSelector_other"
],
"select.DataSelector_unknown.where": " serial_no ='未知'",
"select.DataSelector_other.where": " serial_no !='未知'"
}
],
"sink": [
{
"source_table_name": "DataSelector_unknown",
"plugin_name": "Elasticsearch",
"hosts": [
"10.10.14.51:9200"
],
"index": "filling_unknown",
"es.bulk.flush.max.actions": 1000,
"es.bulk.flush.max.size.mb": 2,
"es.bulk.flush.interval.ms": 1000,
"es.bulk.flush.backoff.enable": true,
"es.bulk.flush.backoff.delay": 50,
"es.bulk.flush.backoff.retries": 8
},
{
"source_table_name": "DataSelector_other",
"plugin_name": "Elasticsearch",
"hosts": [
"10.10.14.51:9200"
],
"index": "filling_other",
"es.bulk.flush.max.actions": 1000,
"es.bulk.flush.max.size.mb": 2,
"es.bulk.flush.interval.ms": 1000,
"es.bulk.flush.backoff.enable": true,
"es.bulk.flush.backoff.delay": 50,
"es.bulk.flush.backoff.retries": 8
}
]
}
上面的DAG定义了从一个kafka数据源订阅数据, 和一个mysql数据源的数据,进行join后对数据进行分流,输出到Elasticsearch端的处理流程。其中,serial_no 字段等于'未知'的数据会写入到elasticsearch 的filling_unknown 索引, 相反, 其他不等于未知的数据会写到filling_other索引, (source)进提供输出数据表KafkaTableStreamTable和JdbcSource.(transfrom)算子 DataJoin 的输入表(source_table_name)为KafkaTableStreamTable, join.source_table_name参数为 JdbcSourceTable, 查询类型为left, 意味这表KafkaTableStreamTable 和表JdbcSourceTable 做join查询, 且join的条件为 workgroup = host 输出(sink)仅需求输入数据表,而(transform)既提供输出数据表,也需求输入数据表,而且可能不止一个。每个输出数据表必须要存在相同名称的输入数据流来满足,这也构成了组件之间的数据流动关系。
流程图:
注意,任务DAG,其组件与数据流构成的有向图必须是无环的。与可能存在迭代运算的批式处理场景不同,数据摄取是典型的流式处理场景,数据流不能以循环的方式进行流动,从而多次进入同一个处理逻辑。在DAG的配置脚本中,亦要求多个组件(主要是多个transfrom组件)的编写顺序是符合DAG的拓扑顺序的。
编辑任务DAG,从而定义任务处理逻辑,是配置任务的主体工作。目前只支持一种方式进行:
- 直接编辑任务文件:直接编写DAG配置脚本,支持以json的方式编写。编写好的文件可以直接通过Filling-core提交执行。参见配置文件部分。
1.2. 配置文件
Filling推荐使用图形化界面完成任务的配置和运行。然而,你可以从界面导出或自行编写你的任务配置文件。配置文件可以用于通过界面导入以建立对应的任务,也可以直接通过提交到flink cluster运行的Filling-core程序执行。
配置文件支持以json, 格式为例说明配置文件的内容:
{
"env": {
},
"source": [
],
"transform": [
],
"sink": [
]
}
配置文件中dag以配置为object(包含了source、transform和sink三个字段,为组件分类列表)还有额外的env全局参数,。其中:
env: flink相关的参数, 详见,
source: 数据来源, 详见,
transfer: 数据处理算子, 详见,
Sink: 数据存储位置, 详见,
,