structured streaming 入门级初使用(一)

1、准备

导入依赖

		<dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
	     <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
        
     <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>

2、从kafka读取数据

注意:

1、structured streaming 读取kafka数据是不需要设置group id的。
2、df.selectExpr()中还可以选择kafka的key等

代码如下

    val conf = new SparkConf().setMaster("local[*]")
    val spark =  SparkSession
      .builder()
      .config(conf)
      .appName(getClass.getName)
      .getOrCreate()
 import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", PropertiesUtils.loadProperties("kafka.broker.list"))
      .option("subscribe", "cache_tmp")//
      .option("startingOffsets", "earliest")
      .load()
    val re = df.selectExpr("CAST(value AS STRING) ")//
      .as[(String)]

3、写入kafka

注意:

1、写入kafka需要做checkpoint(),checkpoint 中保存着偏移量
2、目的 kafka 即使不存在 也会自己创建。

代码如下

   val query = re

      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers",PropertiesUtils.loadProperties("kafka.broker.list"))
      .option("topic","tmp_t")
      .option("checkpointLocation","E:/t_check_2")//生产环境中要放在hdfs哟。
      .start()
    query.awaitTermination()

这里的checkpoint的目录保存这消费的偏移量等等信息。
在这里插入图片描述

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>