跳到主要内容
版本:2.3.2

Set Up with Kubernetes

This section provides a quick guide to using SeaTunnel with Kubernetes.

Prerequisites

We assume that you have a local installations of the following:

So that the kubectl and helm commands are available on your local system.

For kubernetes minikube is our choice, at the time of writing this we are using version v1.23.3. You can start a cluster with the following command:

minikube start --kubernetes-version=v1.23.3

Installation

SeaTunnel docker image

To run the image with SeaTunnel, first create a Dockerfile:

FROM flink:1.13

ENV SEATUNNEL_VERSION="2.3.2"
ENV SEATUNNEL_HOME="/opt/seatunnel"

RUN mkdir -p $SEATUNNEL_HOME

RUN wget https://archive.apache.org/dist/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
RUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz

RUN cp -r apache-seatunnel-${SEATUNNEL_VERSION}/* $SEATUNNEL_HOME/
RUN rm -rf apache-seatunnel-${SEATUNNEL_VERSION}*

Then run the following commands to build the image:

docker build -t seatunnel:2.3.2-flink-1.13 -f Dockerfile .

Image seatunnel:2.3.2-flink-1.13 need to be present in the host (minikube) so that the deployment can take place.

Load image to minikube via:

minikube image load seatunnel:2.3.2-flink-1.13

Deploying the operator

The steps below provide a quick walk-through on setting up the Flink Kubernetes Operator.
You can refer to Flink Kubernetes Operator - Quick Start for more details.

Notice: All the Kubernetes resources bellow are created in default namespace.

Install the certificate manager on your Kubernetes cluster to enable adding the webhook component (only needed once per Kubernetes cluster):

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml

Now you can deploy the latest stable Flink Kubernetes Operator version using the included Helm chart:

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.3.1/

helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--set image.repository=apache/flink-kubernetes-operator

You may verify your installation via kubectl:

kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-5f466b8549-mgchb 1/1 Running 3 (23h ago) 16d

Run SeaTunnel Application

Run Application:: SeaTunnel already providers out-of-the-box configurations.

In this guide we are going to use seatunnel.streaming.conf:

env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 2000
}

source {
FakeSource {
result_table_name = "fake"
row.num = 160000
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

transform {
FieldMapper {
source_table_name = "fake"
result_table_name = "fake1"
field_mapper = {
age = age
name = new_name
}
}
}

sink {
Console {
source_table_name = "fake1"
}
}

Generate a configmap named seatunnel-config in Kubernetes for the seatunnel.streaming.conf so that we can mount the config content in pod.

kubectl create cm seatunnel-config \
--from-file=seatunnel.streaming.conf=seatunnel.streaming.conf

Once the Flink Kubernetes Operator is running as seen in the previous steps you are ready to submit a Flink (SeaTunnel) job:

  • Create seatunnel-flink.yaml FlinkDeployment manifest:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: seatunnel-flink-streaming-example
spec:
image: seatunnel:2.3.2-flink-1.13
flinkVersion: v1_13
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: seatunnel-config
mountPath: /data/seatunnel.streaming.conf
subPath: seatunnel.streaming.conf
volumes:
- name: seatunnel-config
configMap:
name: seatunnel-config
items:
- key: seatunnel.streaming.conf
path: seatunnel.streaming.conf
job:
jarURI: local:///opt/seatunnel/starter/seatunnel-flink-starter.jar
entryClass: org.apache.seatunnel.core.starter.flink.SeatunnelFlink
args: ["--config", "/data/seatunnel.streaming.conf"]
parallelism: 2
upgradeMode: stateless
  • Run the example application:
kubectl apply -f seatunnel-flink.yaml

See The Output

You may follow the logs of your job, after a successful startup (which can take on the order of a minute in a fresh environment, seconds afterwards) you can:

kubectl logs -f deploy/seatunnel-flink-streaming-example

looks like the below:

...
2023-01-31 12:13:54,349 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from SCHEDULED to DEPLOYING.
2023-01-31 12:13:56,684 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (attempt #0) with attempt id 1665d2d011b2f6cf6525c0e5e75ec251 to seatunnel-flink-streaming-example-taskmanager-1-1 @ 100.103.244.106 (dataPort=39137) with allocation id fbe162650c4126649afcdaff00e46875
2023-01-31 12:13:57,794 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from DEPLOYING to INITIALIZING.
2023-01-31 12:13:58,203 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from INITIALIZING to RUNNING.

If OOM error accur in the log, you can decrease the row.num value in seatunnel.streaming.conf

To expose the Flink Dashboard you may add a port-forward rule:

kubectl port-forward svc/seatunnel-flink-streaming-example-rest 8081

Now the Flink Dashboard is accessible at localhost:8081.

Or launch minikube dashboard for a web-based Kubernetes user interface.

The content printed in the TaskManager Stdout log:

kubectl logs \
-l 'app in (seatunnel-flink-streaming-example), component in (taskmanager)' \
--tail=-1 \
-f

looks like the below (your content may be different since we use FakeSource to automatically generate random stream data):

...
subtaskIndex=0: row=159991 : VVgpp, 978840000
subtaskIndex=0: row=159992 : JxrOC, 1493825495
subtaskIndex=0: row=159993 : YmCZR, 654146216
subtaskIndex=0: row=159994 : LdmUn, 643140261
subtaskIndex=0: row=159995 : tURkE, 837012821
subtaskIndex=0: row=159996 : uPDfd, 2021489045
subtaskIndex=0: row=159997 : mjrdG, 2074957853
subtaskIndex=0: row=159998 : xbeUi, 864518418
subtaskIndex=0: row=159999 : sSWLb, 1924451911
subtaskIndex=0: row=160000 : AuPlM, 1255017876

To stop your job and delete your FlinkDeployment you can simply:

kubectl delete -f seatunnel-flink.yaml

Happy SeaTunneling!

What's More

For now, you are already taking a quick look at SeaTunnel, you could see connector to find all source and sink SeaTunnel supported. Or see deployment if you want to submit your application in another kind of your engine cluster.