SparkStreaming读取Kafka数据源

SparkStreaming读取Kafka数据源

一、前提工作


  • 安装了zookeeper
  • 安装了Kafka
  • 实验环境:kafka_2.11-2.3.1.tgz + zookeeper-3.4.5.tar.gz + spark2.4.8
  • 实验流程
    在这里插入图片描述

二、实验内容


  • 实验要求:本实验实现的是wordcount程序
  • 启动zookeeper,可参考:zookeeper单机安装与配置
  • 启动Kafka
  • 创建Kafka主题,如test,可参考:Kafka的安装与基本操作
  • 启动Kafka控制台生产者,可参考:Kafka的安装与基本操作
  • 创建maven项目
  • 添加kafka依赖
    <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>2.4.8</version>
        </dependency>
    
  • 编写程序,如下所示:
    package com.spark.streaming.kafka
    
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
    object KafkaDemo {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaTest")
        val streamingContext = new StreamingContext(sparkConf, Seconds(2))
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "niit01:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "use_a_separate_group_id_for_each_stream",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("test", "t100")
        val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
    
        val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))
    
        val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)
    
        // 打印
        resultRDD.print()
    
        // 启动
        streamingContext.start()
    
        // 等待计算结束
        streamingContext.awaitTermination()
    
    
      }
    }
    
    
  • 运行程序
  • 往Kafka控制台生产者中写入数据,观察程序控制台输出
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>