SparkStreaming读取Kafka数据源
一、前提工作
- 安装了zookeeper
- 安装了Kafka
- 实验环境:
kafka_2.11-2.3.1.tgz + zookeeper-3.4.5.tar.gz + spark2.4.8
- 实验流程
二、实验内容
- 实验要求:本实验实现的是wordcount程序
- 启动zookeeper,可参考:zookeeper单机安装与配置
- 启动Kafka
- 创建Kafka主题,如test,可参考:Kafka的安装与基本操作
- 启动Kafka控制台生产者,可参考:Kafka的安装与基本操作
- 创建maven项目
- 添加kafka依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>2.4.8</version> </dependency>
- 编写程序,如下所示:
package com.spark.streaming.kafka import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object KafkaDemo { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaTest") val streamingContext = new StreamingContext(sparkConf, Seconds(2)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "niit01:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("test", "t100") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _) // 打印 resultRDD.print() // 启动 streamingContext.start() // 等待计算结束 streamingContext.awaitTermination() } }
- 运行程序
- 往Kafka控制台生产者中写入数据,观察程序控制台输出
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
二维码