跳到主要内容
版本:2.1.2

Flink SQL Kafka Connector

Description

With kafka connector, we can read data from kafka and write data to kafka using Flink SQL. Refer to the Kafka connector for more details.

Usage

Let us have a brief example to show how to use the connector from end to end.

1. kafka prepare

Please refer to the Kafka QuickStart to prepare kafka environment and produce data like following:

$ bin/kafka-console-producer.sh --topic <topic-name> --bootstrap-server localhost:9092

After executing the command, we will come to the interactive mode. Print the following message to send data to kafka.

{"id":1,"name":"abc"}
>{"id":2,"name":"def"}
>{"id":3,"name":"dfs"}
>{"id":4,"name":"eret"}
>{"id":5,"name":"yui"}

2. prepare seatunnel configuration

Here is a simple example of seatunnel configuration.

SET table.dml-sync = true;

CREATE TABLE events (
id INT,
name STRING
) WITH (
'connector' = 'kafka',
'topic'='<topic-name>',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

CREATE TABLE print_table (
id INT,
name STRING
) WITH (
'connector' = 'print',
'sink.parallelism' = '1'
);

INSERT INTO print_table SELECT * FROM events;
$ ${FLINK_HOME}/bin/start-cluster.sh

Execute the following command in seatunnel home path to start the Flink SQL job.

$ bin/start-seatunnel-sql.sh -c config/kafka.sql.conf

5. verify result

After the job submitted, we can see the data printing by connector 'print' in taskmanager's log .

+I[1, abc]
+I[2, def]
+I[3, dfs]
+I[4, eret]
+I[5, yui]