Flink使用local模式执行Flink程序,并且开启Flink的webUI

Flink使用local模式执行Flink程序,并且开启Flink的webUI

使用local模式执行Flink程序,并且开启Flink的webUI
1.在pom.xml中引入依赖

org.apache.flink
flink-runtime-web_

s

c

a

l

a

.

b

i

n

a

r

y

.

v

e

r

s

i

o

n

<

/

a

r

t

i

f

a

c

t

I

d

>

<

v

e

r

s

i

o

n

>

{scala.binary.version}</artifactId> <version>

scala.binary.version</artifactId><version>{flink.version}

2.调用StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
3.如果使用的是createLocalEnvironmentWithWebUI(configuration),那么提交到集群中执行,必须改成.getExecutionEnvironment();

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* 使用local模式执行Flink程序,并且开启Flink的webUI
*  1.在pom.xml中引入依赖
*      <dependency>
*             <groupId>org.apache.flink</groupId>
*             <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
*             <version>${flink.version}</version>
*         </dependency>
*  2.调用StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
*  3.如果使用的是createLocalEnvironmentWithWebUI(configuration),那么提交到集群中执行,必须改成.getExecutionEnvironment();
*
*
*/
public class LocalWebUI {


    public static void main(String[] args) throws Exception{


        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        Configuration configuration = new Configuration();
        //创建一个带webUI的本地执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);


        int parallelism = env.getParallelism();


        System.out.println("执行环境的并行度:" + parallelism);


        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);


        int parallelism1 = lines.getParallelism();


        System.out.println("socketTextStream创建的DataStreamSource的并行度:" + parallelism1);


        SingleOutputStreamOperator<String> uppered = lines.map(line -> line.toUpperCase());


        int parallelism2 = uppered.getParallelism();


        System.out.println("调用完map方法得到的DataStream的并行度:" + parallelism2);


        DataStreamSink<String> print = uppered.print();


        int parallelism3 = print.getTransformation().getParallelism();


        System.out.println("调用完print方法得到的DataStreamSink的并行度:" + parallelism3);


        env.execute();


    }
}

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
)">
< <上一篇
下一篇>>