Skip to main content

2 posts tagged with "ClickHouse"

View All Tags

ยท 6 min read

ClickHouse is a distributed columnar DBMS for OLAP. Our department has now stored all log data related to data analysis in ClickHouse, an excellent data warehouse, and the current daily data volume has reached 30 billion.

The experience of data processing and storage introduced earlier is based on real-time data streams. The data is stored in Kafka. We use Java or Golang to read, parse, and clean the data from Kafka and write it into ClickHouse, so that the data can be stored in ClickHouse. Quick access. However, in the usage scenarios of many students, the data is not real-time, and it may be necessary to import the data in HDFS or Hive into ClickHouse. Some students implement data import by writing Spark programs, so is there a simpler and more efficient way?

At present, there is a tool Seatunnel in the open source community, the project address https://github.com/apache/incubator-seatunnel, can quickly Data in HDFS is imported into ClickHouse.

HDFS To ClickHouseโ€‹

Assuming that our logs are stored in HDFS, we need to parse the logs and filter out the fields we care about, and write the corresponding fields into the ClickHouse table.

Log Sampleโ€‹

The log format we store in HDFS is as follows, which is a very common Nginx log

10.41.1.28 github.com 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:03:09:32 +0800] "GET /Apache/Seatunnel HTTP/1.1" 200 0 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)" "196" "-" "mainpage" "443" "-" "172.16.181.129"

ClickHouse Schemaโ€‹

Our ClickHouse table creation statement is as follows, our table is partitioned by day

CREATE TABLE cms.cms_msg
(
date Date,
datetime DateTime,
url String,
request_time Float32,
status String,
hostname String,
domain String,
remote_addr String,
data_size Int32,
pool String
) ENGINE = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 16384

Seatunnel with ClickHouseโ€‹

Next, I will introduce to you in detail how we can meet the above requirements through Seatunnel and write the data in HDFS into ClickHouse.

Seatunnelโ€‹

Seatunnel is a very easy-to-use, high-performance, real-time data processing product that can deal with massive data. It is built on Spark. Seatunnel has a very rich set of plugins that support reading data from Kafka, HDFS, Kudu, performing various data processing, and writing the results to ClickHouse, Elasticsearch or Kafka.

Prerequisitesโ€‹

First we need to install Seatunnel, the installation is very simple, no need to configure system environment variables

  1. Prepare the Spark environment
  2. Install Seatunnel
  3. Configure Seatunnel

The following are simple steps, the specific installation can refer to Quick Start

cd /usr/local

wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

wget https://github.com/InterestingLab/seatunnel/releases/download/v1.1.1/seatunnel-1.1.1.zip

unzip seatunnel-1.1.1.zip

cd seatunnel-1.1.1
vim config/seatunnel-env.sh

# Specify the Spark installation path
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

seatunnel Pipelineโ€‹

We only need to write a configuration file of seatunnel Pipeline to complete the data import.

The configuration file consists of four parts, Spark, Input, filter and Output.

Sparkโ€‹

This part is the related configuration of Spark, which mainly configures the size of the resources required for Spark to execute.

spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

Inputโ€‹

This part defines the data source. The following is a configuration example for reading data in text format from HDFS files.

input {
hdfs {
path = "hdfs://nomanode:8020/rowlog/accesslog"
table_name = "access_log"
format = "text"
}
}

Filterโ€‹

In the Filter section, here we configure a series of transformations, including regular parsing to split the log, time transformation to convert HTTPDATE to the date format supported by ClickHouse, type conversion to Number type fields, and field filtering through SQL, etc.

filter {
# Parse raw logs using regular expressions
grok {
source_field = "raw_message"
pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
}

# Convert data in "dd/MMM/yyyy:HH:mm:ss Z" format to
# Data in "yyyy/MM/dd HH:mm:ss" format
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy/MM/dd HH:mm:ss"
}

# Use SQL to filter the fields of interest and process the fields
# You can even filter out data you don't care about by filter conditions
sql {
table_name = "access"
sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
}
}

Outputโ€‹

Finally, we write the processed structured data to ClickHouse

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

Running seatunnelโ€‹

We combine the above four-part configuration into our configuration file config/batch.conf.

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

input {
hdfs {
path = "hdfs://nomanode:8020/rowlog/accesslog"
table_name = "access_log"
format = "text"
}
}

filter {
# Parse raw logs using regular expressions
grok {
source_field = "raw_message"
pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
}

# Convert data in "dd/MMM/yyyy:HH:mm:ss Z" format to
# Data in "yyyy/MM/dd HH:mm:ss" format
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy/MM/dd HH:mm:ss"
}

# Use SQL to filter the fields of interest and process the fields
# You can even filter out data you don't care about by filter conditions
sql {
table_name = "access"
sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
}
}

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

Execute the command, specify the configuration file, and run Seatunnel to write data to ClickHouse. Here we take the local mode as an example.

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

Conclusionโ€‹

In this post, we covered how to import Nginx log files from HDFS into ClickHouse using Seatunnel. Data can be imported quickly with only one configuration file without writing any code. In addition to supporting HDFS data sources, Seatunnel also supports real-time reading and processing of data from Kafka to ClickHouse. Our next article will describe how to quickly import data from Hive into ClickHouse.

Of course, Seatunnel is not only a tool for ClickHouse data writing, but also plays a very important role in the writing of data sources such as Elasticsearch and Kafka.

If you want to know more functions and cases of Seatunnel combined with ClickHouse, Elasticsearch and Kafka, you can go directly to the official website https://seatunnel.apache.org/

-- Power by InterestingLab

ยท 5 min read

ClickHouse is a distributed columnar DBMS for OLAP. Our department has stored all log data related to data analysis in ClickHouse, an excellent data warehouse, and the current daily data volume has reached 30 billion.

In the previous article [How to quickly import data from HDFS into ClickHouse] (2021-12-30-hdfs-to-clickhouse.md), we mentioned the use of Seatunnel https://github.com/apache/incubator -seatunnel After a very simple operation on the data in HDFS, the data can be written to ClickHouse. The data in HDFS is generally unstructured data, so what should we do with the structured data stored in Hive?

Hive to ClickHouseโ€‹

Assuming that our data has been stored in Hive, we need to read the data in the Hive table and filter out the fields we care about, or convert the fields, and finally write the corresponding fields into the ClickHouse table.

Hive Schemaโ€‹

The structure of the data table we store in Hive is as follows, which stores common Nginx logs.

CREATE TABLE `nginx_msg_detail`(
`hostname` string,
`domain` string,
`remote_addr` string,
`request_time` float,
`datetime` string,
`url` string,
`status` int,
`data_size` int,
`referer` string,
`cookie_info` string,
`user_agent` string,
`minute` string)
PARTITIONED BY (
`date` string,
`hour` string)

ClickHouse Schemaโ€‹

Our ClickHouse table creation statement is as follows, our table is partitioned by day

CREATE TABLE cms.cms_msg
(
date Date,
datetime DateTime,
url String,
request_time Float32,
status String,
hostname String,
domain String,
remote_addr String,
data_size Int32
) ENGINE = MergeTree PARTITION BY date ORDER BY (date, hostname) SETTINGS index_granularity = 16384

Seatunnel with ClickHouseโ€‹

Next, I will introduce to you how we write data from Hive to ClickHouse through Seatunnel.

Seatunnelโ€‹

Seatunnel is a very easy-to-use, high-performance, real-time data processing product that can deal with massive data. It is built on Spark. Seatunnel has a very rich set of plug-ins that support reading data from Kafka, HDFS, and Kudu, performing various data processing, and writing the results to ClickHouse, Elasticsearch or Kafka.

The environment preparation and installation steps of Seatunnel will not be repeated here. For specific installation steps, please refer to the previous article or visit Seatunnel Docs

Seatunnel Pipelineโ€‹

We only need to write a configuration file of Seatunnel Pipeline to complete the data import.

The configuration file includes four parts, namely Spark, Input, filter and Output.

Sparkโ€‹

This part is the related configuration of Spark, which mainly configures the resource size required for Spark execution.

spark {
// This configuration is required
spark.sql.catalogImplementation = "hive"
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

Inputโ€‹

This part defines the data source. The following is a configuration example of reading data in text format from a Hive file.

input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}

See, a very simple configuration can read data from Hive. pre_sql is the SQL to read data from Hive, and table_name is the name of the table that will register the read data as a temporary table in Spark, which can be any field.

It should be noted that it must be ensured that the metastore of hive is in the service state.

When running in Cluster, Client, Local mode, the hive-site.xml file must be placed in the $HADOOP_CONF directory of the submit task node

Filterโ€‹

In the Filter section, here we configure a series of transformations, and here we discard the unnecessary minute and hour fields. Of course, we can also not read these fields through pre_sql when reading Hive

filter {
remove {
source_field = ["minute", "hour"]
}
}

Outputโ€‹

Finally, we write the processed structured data to ClickHouse

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "nginx_log"
fields = ["date", "datetime", "hostname", "url", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

Running Seatunnelโ€‹

We combine the above four-part configuration into our configuration file config/batch.conf.

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
// This configuration is required
spark.sql.catalogImplementation = "hive"
}
input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}
filter {
remove {
source_field = ["minute", "hour"]
}
}
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

Execute the command, specify the configuration file, and run Seatunnel to write data to ClickHouse. Here we take the local mode as an example.

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

Conclusionโ€‹

In this post, we covered how to import data from Hive into ClickHouse using Seatunnel. The data import can be completed quickly through only one configuration file without writing any code, which is very simple.

If you want to know more functions and cases of Seatunnel combined with ClickHouse, Elasticsearch, Kafka, Hadoop, you can go directly to the official website https://seatunnel.apache.org/

-- Power by InterestingLab