1. Kafka数据源

Kafka数据源(KafkaSource)从指定Kafka集群的一个或多个指定主题中消费数据。

注: 请在生产环境下, 务必在任务参数一栏填写"execution.checkpoint.interval": 1000参数, 详情: 消费位点提交


1.1.1. Options

name type required default value
topics array/string yes -
group.id string no -
bootstrap.servers string yes -
schema string yes -
format.type string yes json
format.field_delimiter String no ,
consumer.* string no -
offset.reset string no -
common-options string no -

1.2. 属性

topics (array[string])

topic名称。

group_id (string) (optinal)

订阅组id。如果不指定,每次运行时都会根据组件id自动生成一个。如果组件id没有显示指定而是自动生成的话,则不能保证每次执行时都会分配到相同的订阅组,因此建议指定。

consumer.bootstrap.servers (string)

Kafka集群地址,多个地址用,隔开。

schema (String)

用于生成source表的schema, 如果数据和schema字段不一致, 则会出现字段为空的问题

format.type (string)

输入格式,支持Json,csv和与text格式

format.field_delimiter (optinal)

仅在format.type为csv时生效, 指定分隔符, 且只能是一个char字符

offset.reset

以何种方式消费kafka数据, 分别是:

earliest: 尽可能从最早消费数据

latest: 从最新处消费数据

fromTimestamp: 指定时间戳消费, // TODO

fromGroupOffsets: 从当前的offset消费, 若果不存在offset, 则和latest一致
其他配置 consumer.* (optinal)

除了以上必备的kafka consumer客户端必须指定的参数外,用户还可以指定多个consumer客户端非必须参数,覆盖了kafka官方文档指定的所有consumer参数.


1.3. 配置示例

{
      "plugin_name": "KafkaTableStream",
      "consumer.group.id": "waterdrop15",
      "topics": "batchSend",
      "result_table_name": "KafkaTableStreamTable",
      "format.type": "json",
      "schema": "{\"host\":\"192.168.1.103\",\"source\":\"datasource\",\"MetricsName\":\"cpu\",\"value\":\"49\",\"_time\":1626571020000}",
      "offset.reset": "earliest",
      "consumer.bootstrap.servers": "192.168.100.189:9092",
      "parallelism": 1,
      "name": "mykafka"
    }

results matching ""

    No results matching ""