flink on k8s native 再次实践

基于flink 1.13.2版本做的实践
本次主要实践flink on k8s native 的两种方式, 分别是sesion 和 application方式

第一步: k8s环境准备

  1, 创建一个namespace
      kubectl create namespace flink-session-cluster-test-1213
  2, 新建一个serviceaccount, 用来提交flink的任务
     kubectl create serviceaccount flink -n flink-session-cluster-test-1213
  3, 做好绑定
     kubectl create clusterrolebinding flink-role-binding-flink-session-cluster-test-1213_flink 
     --clusterrole=edit   --serviceaccount=flink-session-cluster-test-1213:flink     

第二步: 镜像准备

  使用hdfs作为flink的checkpoint存储,所以需要在flink的lib目录中放入hadoop的jar包
  创建Dockerfile文件,并添加如下内容:
vi   Dockerfile
FROM flink:1.13.2-scala_2.11-java8
COPY ./flink-shaded-hadoop-2-uber-2.7.5-10.0.jar $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

构建image
docker build -t native_realtime:1.0.3 .

后续的session与application均使用该镜像镜像实践

为了解决hosts映射以及用户自定义jar包等问题, 需要使用yaml模板

vi flink-template.yaml

apiVersion: v1
kind: Pod
metadata:
  name: flink-pod-template
spec:
  initContainers:
    - name: artifacts-fetcher
      image: native_realtime:1.0.3
      # 添加自定义运行的jar包以及各种配置文件
      command: ["/bin/sh","-c"]
      args: ["wget http://xxxxxx:8082/flinkhistory/1.13.2/tt.sql -O /opt/flink/usrHome/taa.sql ; wget http://xxxx:8082/flinkhistory/1.13.2/realtime-dw-service-1.0.1-SNAPSHOT.jar -O /opt/flink/usrHome/realtime-dw-service-1.0.1.jar"]
      volumeMounts:
        - mountPath: /opt/flink/usrHome
          name: flink-usr-home
  hostAliases:
  - ip: 10.1.1.103
    hostnames:
    - "cdh103"
  - ip: 10.1.1.104
    hostnames:
    - "cdh104"
  - ip: 10.1.1.105
    hostnames:
    - "cdh105"
  - ip: 10.1.1.106
    hostnames:
    - "cdh106"
  containers:
    # Do not change the main container name
    - name: flink-main-container
      resources:
        requests:
          ephemeral-storage: 2048Mi
        limits:
          ephemeral-storage: 2048Mi
      volumeMounts:
        - mountPath: /opt/flink/usrHome
          name: flink-usr-home
  volumes:
    - name: flink-usr-home
      hostPath:
        path: /tmp
        type: Directory

使用run application模式提交任务

/data/flink-1.13.0/bin/flink run-application 
    --target kubernetes-application 
	-Dresourcemanager.taskmanager-timeout=345600 
	-Dkubernetes.namespace=flink-session-cluster-test-1213 
	-Dkubernetes.service-account=flink 
    -Dkubernetes.cluster-id=flink-stream-reatime-dw11 
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
	-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery 
    -Dkubernetes.container.image=native_realtime:1.0.3 
	-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 
    -Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 
	-Dexecution.checkpointing.interval=2s 
	-Dexecution.checkpointing.mode=EXACTLY_ONCE 
	-Dstate.backend=filesystem 
	-Dkubernetes.rest-service.exposed.type=NodePort  
	-Drestart-strategy=failure-rate  
	-Drestart-strategy.failure-rate.delay=1s  
	-Drestart-strategy.failure-rate.failure-rate-interval=5s 
	-Drestart-strategy.failure-rate.max-failures-per-interval=1  
	-Dtaskmanager.memory.process.size=1096m 
    -Dkubernetes.taskmanager.cpu=1 
    -Dtaskmanager.numberOfTaskSlots=1 
	-Dkubernetes.pod-template-file=./flink-template.yaml 
	-c com.xxx.bigdata.rt.dw.service.runtime.RealtimeWarehouseMain 
    local:///opt/flink/usrHome/realtime-dw-service-1.0.1.jar 
	-cfc state.checkpoint.interval=60000  -cfp 1 -cfm no -cfn kafka_es -cfs /opt/flink/usrHome/taa.sql

使用session模式提交任务

-- 创建session
/data/flink-1.13.0/bin/kubernetes-session.sh 
  -Dkubernetes.cluster-id=stream-wordcount-application-cluster 
  -Dtaskmanager.memory.process.size=1096m 
  -Dkubernetes.taskmanager.cpu=1 
  -Dtaskmanager.numberOfTaskSlots=4 
  -Dkubernetes.container.image=native_realtime:1.0.3 
  -Dkubernetes.service.exposed.type=NodePort 
  -Dkubernetes.jobmanager.service-account=flink 
  -Dkubernetes.service-account=flink 
  -Dkubernetes.namespace=flink-session-cluster-test-1213
  -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
	-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery 
    -Dkubernetes.container.image=native_realtime:1.0.3 
	-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 
    -Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 
	-Dexecution.checkpointing.interval=2s 
	-Dexecution.checkpointing.mode=EXACTLY_ONCE 
	-Dstate.backend=filesystem 
	-Dkubernetes.rest-service.exposed.type=NodePort  
	-Drestart-strategy=failure-rate  
	-Drestart-strategy.failure-rate.delay=1s  
	-Drestart-strategy.failure-rate.failure-rate-interval=5s 
	-Drestart-strategy.failure-rate.max-failures-per-interval=1  
	-Dkubernetes.pod-template-file=./flink-template.yaml 

提交任务
/data/flink-1.13.0/bin/flink run -d -e kubernetes-session 
 -Dkubernetes.cluster-id=stream-wordcount-application-cluster 
 -Dkubernetes.namespace=flink-session-cluster-test-1213 
 -Dkubernetes.taskmanager.service-account=flink 
 -Dexecution.attached=true 
 -c com.xxx.bigdata.rt.dw.service.runtime.RealtimeWarehouseMain 
    ./realtime-dw-service-1.0.1-SNAPSHOT.jar 
	 -cfc state.checkpoint.interval=60000  -cfp 1 -cfm no -cfn kafka_es -cfs ./tt.sql

问题及解决:
1, flink任务的hosts问题?
可以通过flink 提供的yaml模板, 将hosts配置放在yaml中, 然后在命令使用-Dkubernetes.pod-template-file指定
2, 关于使用session 模式出现启动taskManager时, 获取configmap权限不够的问题?
可以使用 -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.service-account=flink -Dkubernetes.taskmanager.service-account=flink 来解决
3, 如果使用application模式, 解决自定义jar包不想打入镜像的问题?
可以在yaml模板中, initContainers使用wget方式引入

session模式与application模式的相互比较
session模式, 先启动jobmanager, 再之后根据提交的任务, 来启动taskManager, 导致多个任务日志耦合在一起, 但是自定义的jar包不需要再构建镜像, 相对提交比较简单
application模式, 需要将自定义的jar包构建在镜像中, 或者使用yaml模板的initContainers的方式. 日志可以分开.
这两种模式均可以实现高可用, 不管是application还是session模式, 均可以自动以checkpoint自动重启

本次未实践日志的收集等相关

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>