Flink迷你集群及默认参数简单介绍:
Flink迷你集群是一个轻量级的本地集群,可用于在本地环境中快速开发和测试Flink应用程序。迷你集群不需要任何复杂的配置和管理,只需要提供一个简单的配置即可使用。在启动迷你集群时,必须指定Flink应用程序所需的资源,如TaskManager的数量、内存大小、并行度等。
迷你集群的默认参数通常由Flink框架的配置文件控制。以下是一些常见的默认值:
- singleRpcService: SHARED(默认情况下,所有的RPC服务都共享同一个线程池)
- numTaskManagers: 1(默认情况下使用一个TaskManager节点)
- commonBindAddress: null(默认情况下使用本地地址)
- taskmanager.memory.network.min: 64 mb(TaskManager网络内存的最小值)
- taskmanager.memory.network.max: 64 mb(TaskManager网络内存的最大值)
- taskmanager.memory.managed.size: 128 mb(TaskManager管理内存的默认大小)
- taskmanager.numberOfTaskSlots: 12(每个TaskManager节点的默认任务插槽数量)
- parallelism.default: 1(操作符默认并行度)
- execution.target: local(默认情况下在本地运行)
- execution.runtime-mode: AUTOMATIC(默认情况下自适应选择执行模式)
- taskmanager.cpu.cores: 1.7976931348623157E308(TaskManager可以使用的CPU核心数最大值)
- taskmanager.memory.task.heap.size: 9223372036854775807 bytes(TaskManager堆内存的最大大小)
- taskmanager.memory.task.off-heap.size: 9223372036854775807 bytes(TaskManager堆外内存的最大大小)
- rest.bind-port: 0(默认情况下,REST API将绑定到系统可用端口)
- rest.address: localhost(REST API的默认地址)。
这些参数可以在Flink框架的配置文件中进行修改。在使用迷你集群时,可以使用命令行选项或Java代码覆盖这些默认设置。其中,最常用的命令行选项是-D
和-yT
。-D
选项可以在JVM启动参数中设置要覆盖的配置参数,-yT
选项可以从外部文件中加载YAML格式的配置文件。
简单demo代码如下:
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class FlinkDemo { public static void main(String[] args) throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 2.加载数据源 DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++", "java,scala,php", "java,scala", "java,c,c++,python,go"); // 3.数据转换 DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String element, Collector<String> out) throws Exception { String[] wordArr = element.split(","); for (String word : wordArr) { out.collect(word); } } }); //DataStream 下边为DataStream子类 SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } }).returns(String.class); // 4.数据输出 source.print(); // 5.执行程序 env.execute("FlinkDemo"); } }