Kafka
Descriptionâ
To consume data from Kafka
, supported Kafka version >= 0.10.0
.
Engine Supported and plugin name
- Spark: KafkaStream
- Flink: Kafka
Optionsâ
- Spark
- Flink
name | type | required | default value |
---|---|---|---|
topics | string | yes | - |
consumer.group.id | string | yes | - |
consumer.bootstrap.servers | string | yes | - |
consumer.* | string | no | - |
common-options | string | yes | - |
name | type | required | default value |
---|---|---|---|
topics | string | yes | - |
consumer.group.id | string | yes | - |
consumer.bootstrap.servers | string | yes | - |
schema | string | yes | - |
format.type | string | yes | - |
format.* | string | no | - |
consumer.* | string | no | - |
rowtime.field | string | no | - |
watermark | long | no | - |
offset.reset | string | no | - |
common-options | string | no | - |
topics [string]â
Kafka topic
name. If there are multiple topics
, use ,
to split, for example: "tpc1,tpc2"
consumer.group.id [string]â
Kafka consumer group id
, used to distinguish different consumer groups
consumer.bootstrap.servers [string]â
Kafka
cluster address, separated by ,
- Spark
- Flink
format.type [string]â
Currently supports three formats
- json
- csv
- avro
format.* [string]â
The csv
format uses this parameter to set the separator and so on. For example, set the column delimiter to \t
, format.field-delimiter=\\t
schema [string]â
csv
- The
schema
ofcsv
is a string ofjsonArray
, such as"[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\ ",\"type\":\"int\"}]"
.
- The
json
- The
schema
parameter ofjson
is to provide ajson string
of the original data, and theschema
can be automatically generated, but the original data with the most complete content needs to be provided, otherwise the fields will be lost.
- The
avro
- The
schema
parameter ofavro
is to provide a standardavro schema JSON string
, such as{\"name\":\"test\",\"type\":\"record\",\"fields\":[{ \"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"long\"} ,{\"name\":\"addrs\",\"type\":{\"name\":\"addrs\",\"type\":\"record\",\"fields\" :[{\"name\":\"province\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string \"}]}}]}
- The
To learn more about how the
Avro Schema JSON string
should be defined, please refer to: https://avro.apache.org/docs/current/spec.html
rowtime.field [string]â
Extract timestamp using current configuration field for flink event time watermark
watermark [long]â
Sets a built-in watermark strategy for rowtime.field attributes which are out-of-order by a bounded time interval. Emits watermarks which are the maximum observed timestamp minus the specified delay.
offset.reset [string]â
The consumer's initial offset
is only valid for new consumers. There are three modes
- latest
- Start consumption from the latest offset
- earliest
- Start consumption from the earliest offset
- specific
- Start consumption from the specified
offset
, and specify thestart offset
of each partition at this time. The setting method is throughoffset.reset.specific="{0:111,1:123}"
- Start consumption from the specified
consumer.* [string]â
In addition to the above necessary parameters that must be specified by the Kafka consumer
client, users can also specify multiple consumer
client non-mandatory parameters, covering all consumer parameters specified in the official Kafka document.
The way to specify parameters is to add the prefix consumer.
to the original parameter name. For example, the way to specify auto.offset.reset
is: consumer.auto.offset.reset = latest
. If these non-essential parameters are not specified, they will use the default values given in the official Kafka documentation.
common options [string]â
Source plugin common parameters, please refer to Source Plugin for details
Examplesâ
- Spark
- Flink
kafkaStream {
topics = "seatunnel"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_group"
}
KafkaTableStream {
consumer.bootstrap.servers = "127.0.0.1:9092"
consumer.group.id = "seatunnel5"
topics = test
result_table_name = test
format.type = csv
schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
format.field-delimiter = ";"
format.allow-comments = "true"
format.ignore-parse-errors = "true"
}