Skip to main content

3 posts tagged with "Spark"

View All Tags

ยท 6 min read

When it comes to writing data to Elasticsearch, the first thing that comes to mind must be Logstash. Logstash is accepted by the majority of users because of its simplicity, scalability, and scalability. However, the ruler is shorter and the inch is longer, and Logstash must have application scenarios that it cannot apply to, such as:

  • Massive data ETL
  • Massive data aggregation
  • Multi-source data processing

In order to meet these scenarios, many students will choose Spark, use Spark operators to process data, and finally write the processing results to Elasticsearch.

Our department used Spark to analyze Nginx logs, counted our web service access, aggregated Nginx logs every minute and finally wrote the results to Elasticsearch, and then used Kibana to configure real-time monitoring of the Dashboard. Both Elasticsearch and Kibana are convenient and practical, but with more and more similar requirements, how to quickly write data to Elasticsearch through Spark has become a big problem for us.

Today, I would like to recommend a black technology Seatunnel https://github.com/apache/incubator-seatunnel that can realize fast data writing. It is very easy to use , a high-performance, real-time data processing product that can deal with massive data. It is built on Spark and is easy to use, flexibly configured, and requires no development.

Kafka to Elasticsearchโ€‹

Like Logstash, Seatunnel also supports multiple types of data input. Here we take the most common Kakfa as the input source as an example to explain how to use Seatunnel to quickly write data to Elasticsearch

Log Sampleโ€‹

The original log format is as follows:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

Elasticsearch Documentโ€‹

We want to count the visits of each domain name in one minute. The aggregated data has the following fields:

domain String
hostname String
status int
datetime String
count int

Seatunnel with Elasticsearchโ€‹

Next, I will introduce you in detail, how we read the data in Kafka through Seatunnel, parse and aggregate the data, and finally write the processing results into Elasticsearch.

Seatunnelโ€‹

Seatunnel also has a very rich plug-in that supports reading data from Kafka, HDFS, Hive, performing various data processing, and converting the results Write to Elasticsearch, Kudu or Kafka.

Prerequisitesโ€‹

First of all, we need to install seatunnel, the installation is very simple, no need to configure system environment variables

  1. Prepare the Spark environment
  2. Install Seatunnel
  3. Configure Seatunnel

The following are simple steps, the specific installation can refer to Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/seatunnel/releases/download/v1.1.1/seatunnel-1.1.1.zip
unzip seatunnel-1.1.1.zip
cd seatunnel-1.1.1

vim config/seatunnel-env.sh
# Specify the Spark installation path
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Seatunnel Pipelineโ€‹

Like Logstash, we only need to write a configuration file of Seatunnel Pipeline to complete the data import. I believe that friends who know Logstash can start Seatunnel configuration soon.

The configuration file includes four parts, namely Spark, Input, filter and Output.

Sparkโ€‹

This part is the related configuration of Spark, which mainly configures the resource size required for Spark execution.

spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.streaming.batchDuration = 5
}

Inputโ€‹

This part defines the data source. The following is a configuration example of reading data from Kafka,

kafkaStream {
topics = "seatunnel-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_es_group"
consumer.rebalance.max.retries = 100
}

Filterโ€‹

In the Filter section, here we configure a series of conversions, including regular parsing to split logs, time conversion to convert HTTPDATE to a date format supported by Elasticsearch, type conversion for fields of type Number, and data aggregation through SQL

filter {
# Parse the original log using regex
# The initial data is in the raw_message field
grok {
source_field = "raw_message"
pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# Convert data in "dd/MMM/yyyy:HH:mm:ss Z" format to
# format supported in Elasticsearch
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
}
## Aggregate data with SQL
sql {
table_name = "access_log"
sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}

Outputโ€‹

Finally, we write the processed structured data to Elasticsearch.

output {
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}

Running Seatunnelโ€‹

We combine the above four-part configuration into our configuration file config/batch.conf.

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.streaming.batchDuration = 5
}
input {
kafkaStream {
topics = "seatunnel-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_es_group"
consumer.rebalance.max.retries = 100
}
}
filter {
# Parse the original log using regex
# The initial data is in the raw_message field
grok {
source_field = "raw_message"
pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# Convert data in "dd/MMM/yyyy:HH:mm:ss Z" format to
# format supported in Elasticsearch
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
}
## Aggregate data with SQL
sql {
table_name = "access_log"
sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}
output {
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}

Execute the command, specify the configuration file, and run Seatunnel to write data to Elasticsearch. Here we take the local mode as an example.

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

Finally, the data written into Elasticsearch is as follows, and with Kibana, real-time monitoring of web services can be realized ^_^.

"_source": {
"domain": "elasticsearch.cn",
"hostname": "localhost",
"status": "200",
"datetime": "2018-11-26T21:54:00.000+08:00",
"count": 26
}

Conclusionโ€‹

In this post, we introduced how to write data from Kafka to Elasticsearch via Seatunnel. You can quickly run a Spark Application with only one configuration file, complete data processing and writing, and do not need to write any code, which is very simple.

When there are scenarios that Logstash cannot support or the performance of Logstah cannot meet expectations during data processing, you can try to use Seatunnel to solve the problem.

If you want to know more functions and cases of using Seatunnel in combination with Elasticsearch, Kafka and Hadoop, you can go directly to the official website https://seatunnel.apache.org/

We will publish another article "How to Use Spark and Elasticsearch for Interactive Data Analysis" in the near future, so stay tuned.

Contract usโ€‹

  • Mailing list : dev@seatunnel.apache.org. Send anything to dev-subscribe@seatunnel.apache.org and subscribe to the mailing list according to the replies.
  • Slack: Send a Request to join SeaTunnel slack email to the mailing list (dev@seatunnel.apache.org), and we will invite you to join (please make sure you are registered with Slack before doing so).
  • bilibili B station video

ยท 7 min read

TiDB is a fusion database product targeting online transaction processing/online analytical processing. Distributed transactions, real-time OLAP and other important features.

TiSpark is a product launched by PingCAP to solve the complex OLAP needs of users. It uses the Spark platform and integrates the advantages of TiKV distributed clusters.

Completing OLAP operations with TiSpark directly requires knowledge of Spark and some development work. So, are there some out-of-the-box tools that can help us use TiSpark to complete OLAP analysis on TiDB more quickly?

At present, there is a tool Seatunnel in the open source community, the project address https://github.com/apache/incubator-seatunnel, which can be based on Spark, Quickly implement TiDB data reading and OLAP analysis based on TiSpark.

Operating TiDB with Seatunnelโ€‹

We have such a requirement online. Read the website access data of a certain day from TiDB, count the number of visits of each domain name and the status code returned by the service, and finally write the statistical results to another table in TiDB. Let's see how Seatunnel implements such a function.

Seatunnelโ€‹

Seatunnel is a very easy-to-use, high-performance, real-time data processing product that can deal with massive data. It is built on Spark. Seatunnel has a very rich set of plugins that support reading data from TiDB, Kafka, HDFS, Kudu, perform various data processing, and then write the results to TiDB, ClickHouse, Elasticsearch or Kafka.

Ready to workโ€‹

1. Introduction to TiDB table structureโ€‹

Input (table where access logs are stored)

CREATE TABLE access_log (
domain VARCHAR(255),
datetime VARCHAR(63),
remote_addr VARCHAR(63),
http_ver VARCHAR(15),
body_bytes_send INT,
status INT,
request_time FLOAT,
url TEXT
)
+-----------------+--------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------+--------------+------+------+---------+-------+
| domain | varchar(255) | YES | | NULL | |
| datetime | varchar(63) | YES | | NULL | |
| remote_addr | varchar(63) | YES | | NULL | |
| http_ver | varchar(15) | YES | | NULL | |
| body_bytes_send | int(11) | YES | | NULL | |
| status | int(11) | YES | | NULL | |
| request_time | float | YES | | NULL | |
| url | text | YES | | NULL | |
+-----------------+--------------+------+------+---------+-------+

Output (table where result data is stored)

CREATE TABLE access_collect (
date VARCHAR(23),
domain VARCHAR(63),
status INT,
hit INT
)
+--------+-------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+-------------+------+------+---------+-------+
| date | varchar(23) | YES | | NULL | |
| domain | varchar(63) | YES | | NULL | |
| status | int(11) | YES | | NULL | |
| hit | int(11) | YES | | NULL | |
+--------+-------------+------+------+---------+-------+
2. Install Seatunnelโ€‹

After we have the input and output tables of TiDB, we need to install Seatunnel. The installation is very simple, and there is no need to configure system environment variables

  1. Prepare the Spark environment
  2. Install Seatunnel
  3. Configure Seatunnel

The following are simple steps, the specific installation can refer to Quick Start

# Download and install Spark
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
wget
# Download and install seatunnel
https://github.com/InterestingLab/seatunnel/releases/download/v1.2.0/seatunnel-1.2.0.zip
unzip seatunnel-1.2.0.zip
cd seatunnel-1.2.0

vim config/seatunnel-env.sh
# Specify the Spark installation path
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.1.0-bin-hadoop2.7}

Implement the Seatunnel processing flowโ€‹

We only need to write a Seatunnel configuration file to read, process, and write data.

The Seatunnel configuration file consists of four parts, Spark, Input, Filter and Output. The Input part is used to specify the input source of the data, the Filter part is used to define various data processing and aggregation, and the Output part is responsible for writing the processed data to the specified database or message queue.

The whole processing flow is Input -> Filter -> Output, which constitutes the processing flow (Pipeline) of Seatunnel.

The following is a specific configuration, which is derived from an online practical application, but simplified for demonstration.

Input (TiDB)โ€‹

This part of the configuration defines the input source. The following is to read data from a table in TiDB.

input {
tidb {
database = "nginx"
pre_sql = "select * from nginx.access_log"
table_name = "spark_nginx_input"
}
}
Filterโ€‹

In the Filter section, here we configure a series of transformations, most of the data analysis requirements are completed in the Filter. Seatunnel provides a wealth of plug-ins enough to meet various data analysis needs. Here we complete the data aggregation operation through the SQL plugin.

filter {
sql {
table_name = "spark_nginx_log"
sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
}
}
Output (TiDB)โ€‹

Finally, we write the processed results to another table in TiDB. TiDB Output is implemented through JDBC

output {
tidb {
url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
table = "access_collect"
user = "username"
password = "password"
save_mode = "append"
}
}
Sparkโ€‹

This part is related to Spark configuration. It mainly configures the resource size required for Spark execution and other Spark configurations.

Our TiDB Input plugin is implemented based on TiSpark, which relies on TiKV cluster and Placement Driver (PD). So we need to specify PD node information and TiSpark related configuration spark.tispark.pd.addresses and spark.sql.extensions.

spark {
spark.app.name = "seatunnel-tidb"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
# Set for TiSpark
spark.tispark.pd.addresses = "localhost:2379"
spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}

Run Seatunnelโ€‹

We combine the above four parts into our final configuration file conf/tidb.conf

spark {
spark.app.name = "seatunnel-tidb"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
# Set for TiSpark
spark.tispark.pd.addresses = "localhost:2379"
spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}
input {
tidb {
database = "nginx"
pre_sql = "select * from nginx.access_log"
table_name = "spark_table"
}
}
filter {
sql {
table_name = "spark_nginx_log"
sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
}
}
output {
tidb {
url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
table = "access_collect"
user = "username"
password = "password"
save_mode = "append"
}
}

Execute the command, specify the configuration file, and run Seatunnel to implement our data processing logic.

  • Local

./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode client --master 'local[2]'

  • yarn-client

./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode client --master yarn

  • yarn-cluster

./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode cluster -master yarn

If it is a local test and verification logic, you can use the local mode (Local). Generally, in the production environment, the yarn-client or yarn-cluster mode is used.

test resultโ€‹

mysql> select * from access_collect;
+------------+--------+--------+------+
| date | domain | status | hit |
+------------+--------+--------+------+
| 2019-01-20 | b.com | 200 | 63 |
| 2019-01-20 | a.com | 200 | 85 |
+------------+--------+--------+------+
2 rows in set (0.21 sec)

Conclusionโ€‹

In this article, we introduced how to use Seatunnel to read data from TiDB, do simple data processing and write it to another table in TiDB. Data can be imported quickly with only one configuration file without writing any code.

In addition to supporting TiDB data sources, Seatunnel also supports Elasticsearch, Kafka, Kudu, ClickHouse and other data sources.

At the same time, we are developing an important function, which is to use the transaction features of TiDB in Seatunnel to realize streaming data processing from Kafka to TiDB, and support Exactly-Once data from end (Kafka) to end (TiDB). consistency.

If you want to know more functions and cases of Seatunnel combined with TiDB, ClickHouse, Elasticsearch and Kafka, you can go directly to the official website https://seatunnel.apache.org/

Contract usโ€‹

  • Mailing list : dev@seatunnel.apache.org. Send anything to dev-subscribe@seatunnel.apache.org and subscribe to the mailing list according to the replies.
  • Slack: Send a Request to join SeaTunnel slack email to the mailing list (dev@seatunnel.apache.org), and we will invite you to join (please make sure you are registered with Slack before doing so).
  • bilibili B station video

-- Power by InterestingLab

ยท 8 min read

Forewordโ€‹

StructuredStreaming is a newly opened module after Spark 2.0. Compared with SparkStreaming, it has some prominent advantages:
โ€ƒโ€ƒFirst, it can achieve lower latency;
โ€ƒโ€ƒSecond, real-time aggregation can be done, such as real-time calculation of the total sales of each commodity every day;
โ€ƒโ€ƒThird, you can do the association between streams, for example, to calculate the click rate of an advertisement, you need to associate the exposure record of the advertisement with the click record.
The above points may be cumbersome or difficult to implement if using SparkStreaming, but it will be easier to implement using StructuredStreaming.

How to use StructuredStreamingโ€‹

Maybe you have not studied StructuredStreaming in detail, but found that StructuredStreaming can solve your needs very well. How to quickly use StructuredStreaming to solve your needs? Currently there is a tool Seatunnel in the community, the project address: https://github.com/apache/incubator-seatunnel , It can help you use StructuredStreaming to complete your needs efficiently and at low cost.

Seatunnelโ€‹

Seatunnel is a very easy-to-use, high-performance, real-time data processing product that can deal with massive data. It is built on Spark. Seatunnel has a very rich set of plug-ins, supports reading data from Kafka, HDFS, Kudu, performs various data processing, and writes the results to ClickHouse, Elasticsearch or Kafka

Ready to workโ€‹

First we need to install Seatunnel, the installation is very simple, no need to configure system environment variables

  1. Prepare the Spark environment
  2. Install Seatunnel
  3. Configure Seatunnel

The following are simple steps, the specific installation can refer to Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/seatunnel/releases/download/v1.3.0/seatunnel-1.3.0.zip
unzip seatunnel-1.3.0.zip
cd seatunnel-1.3.0

vim config/seatunnel-env.sh
# Specify the Spark installation path
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Seatunnel Pipelineโ€‹

We only need to write a configuration file of Seatunnel Pipeline to complete the data import.

The configuration file includes four parts, namely Spark, Input, filter and Output.

Sparkโ€‹

This part is the related configuration of Spark, which mainly configures the resource size required for Spark execution.

spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

Inputโ€‹

Below is an example of reading data from kafka

kafkaStream {
topics = "seatunnel"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}"
}

Through the above configuration, the data in kafka can be read. Topics is the topic of kafka to be subscribed to. Subscribing to multiple topics at the same time can be separated by commas. Consumer.bootstrap.servers is the list of Kafka servers, and schema is optional. Because the value read by StructuredStreaming from kafka (official fixed field value) is of binary type, see http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html But if you are sure that the data in your kafka is a json string, you can specify the schema, and the input plugin will parse it according to the schema you specify

Filterโ€‹

Here is a simple filter example

filter{
sql{
table_name = "student"
sql = "select name,age from student"
}
}

table_name is the registered temporary table name for easy use in the following sql

Outputโ€‹

The processed data is output, assuming that our output is also kafka

output{
kafka {
topic = "seatunnel"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "update"
checkpointLocation = "/your/path"
}
}

topic is the topic you want to output, producer.bootstrap.servers is a list of kafka clusters, streaming_output_mode is an output mode parameter of StructuredStreaming, there are three types of append|update|complete, for details, see the documentation http: //spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

checkpointLocation is the checkpoint path of StructuredStreaming. If configured, this directory will store the running information of the program. For example, if the program exits and restarts, it will continue to consume the last offset.

Scenario Analysisโ€‹

The above is a simple example. Next, we will introduce a slightly more complex business scenario.

Scenario 1: Real-time aggregation scenarioโ€‹

Suppose there is now a mall with 10 kinds of products on it, and now it is necessary to find the daily sales of each product in real time, and even to find the number of buyers of each product (not very precise). The huge advantage of this is that massive data can be aggregated during real-time processing, and there is no need to write data into the data warehouse first, and then run offline scheduled tasks for aggregation. It is still very convenient to operate.

The data of kafka is as follows

{"good_id":"abc","price":300,"user_id":123456,"time":1553216320}

So how do we use Seatunnel to fulfill this requirement, of course, we only need to configure it.

#The configuration in spark is configured according to business requirements
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

#configure input
input {
kafkaStream {
topics = "good_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"good_id\":\"string\",\"price\":\"integer\",\"user_id\":\"Long\",\"time\":\"Long\"}"
}
}

#configure filter
filter {

#When the program is doing aggregation, it will internally store the aggregation state of the program since startup, which will lead to OOM over time. If the watermark is set, the program will automatically clean up the state other than the watermark.
#Here means use the ts field to set the watermark, the limit is 1 day

Watermark {
time_field = "time"
time_type = "UNIX" #UNIX represents a timestamp with a time field of 10, and other types can be found in the plugin documentation for details.
time_pattern = "yyyy-MM-dd" #The reason why the ts is assigned to the day is because the daily sales are sought, if the hourly sales are sought, the hour can be assigned `yyyy-MM-dd HH`
delay_threshold = "1 day"
watermark_field = "ts" #After setting the watermark, a new field will be added, `ts` is the name of this field
}

#The reason for group by ts is to make the watermark take effect, approx_count_distinct is an estimate, not an exact count_distinct
sql {
table_name = "good_table_2"
sql = "select good_id,sum(price) total, approx_count_distinct(user_id) person from good_table_2 group by ts,good_id"
}
}

#Next we choose to output the results to Kafka in real time
output{
kafka {
topic = "seatunnel"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "update"
checkpointLocation = "/your/path"
}
}

The above configuration is complete, start Seatunnel, and you can get the results you want.

Scenario 2: Multiple stream association scenariosโ€‹

Suppose you have placed an advertisement on a certain platform, and now you need to calculate the CTR (click-through rate) of each advertisement in real time. The data comes from two topics, one is the advertisement exposure log, and the other is the advertisement click log. At this point, we need to associate the two stream data together for calculation, and Seatunnel also supports this function recently, let's take a look at how to do it:

Click on topic data format

{"ad_id":"abc","click_time":1553216320,"user_id":12345}

Exposure topic data format

{"ad_id":"abc","show_time":1553216220,"user_id":12345}

#The configuration in spark is configured according to business requirements
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

#configure input
input {

kafkaStream {
topics = "click_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"ad_id\":\"string\",\"user_id\":\"Long\",\"click_time\":\"Long\"}"
table_name = "click_table"
}

kafkaStream {
topics = "show_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"ad_id\":\"string\",\"user_id\":\"Long\",\"show_time\":\"Long\"}"
table_name = "show_table"
}
}

filter {

#Left association right table must set watermark
#Right off left and right tables must set watermark
#http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking
Watermark {
source_table_name = "click_table" #Here you can specify to add a watermark to a temporary table. If you don't specify it, it will be the first one in the input.
time_field = "time"
time_type = "UNIX"
delay_threshold = "3 hours"
watermark_field = "ts"
result_table_name = "click_table_watermark" #After adding the watermark, it can be registered as a temporary table, which is convenient for subsequent use in sql
}

Watermark {
source_table_name = "show_table"
time_field = "time"
time_type = "UNIX"
delay_threshold = "2 hours"
watermark_field = "ts"
result_table_name = "show_table_watermark"
}


sql {
table_name = "show_table_watermark"
sql = "select a.ad_id,count(b.user_id)/count(a.user_id) ctr from show_table_watermark as a left join click_table_watermark as b on a.ad_id = b.ad_id and a.user_id = b.user_id "
}

}

#Next we choose to output the results to Kafka in real time
output {
kafka {
topic = "seatunnel"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "append" #Stream association only supports append mode
checkpointLocation = "/your/path"
}
}

Through configuration, the case of stream association is also completed here.

Conclusionโ€‹

Through configuration, you can quickly use StructuredStreaming for real-time data processing, but you still need to understand some concepts of StructuredStreaming, such as the watermark mechanism, and the output mode of the program.

Finally, Seatunnel also supports spark streaming and spark batching of course. If you are also interested in these two, you can read our previous article "How to quickly import data from Hive into ClickHouse", "[Excellent data engineer, how to use Spark to do OLAP analysis on TiDB] (2021-12-30-spark-execute-tidb.md)", "[How to use Spark to quickly write data to Elasticsearch] (2021-12-30-spark-execute-elasticsearch.md)"

If you want to know more functions and cases of Seatunnel combined with HBase, ClickHouse, Elasticsearch, Kafka, MySQL and other data sources, you can go directly to the official website [https://seatunnel.apache.org/](https://seatunnel.apache. org/)

่”็ณปๆˆ‘ไปฌโ€‹

  • Mailing list : dev@seatunnel.apache.org. Send anything to dev-subscribe@seatunnel.apache.org and subscribe to the mailing list according to the replies.
  • Slack: Send a Request to join SeaTunnel slack email to the mailing list (dev@seatunnel.apache.org), and we will invite you to join (please make sure you are registered with Slack before doing so).
  • bilibili B station video