SparkStreaming动态读取kafka生产者生产的数据并将它存入MySQL数据库

SparkStreaming动态读取kafka生产者生产的数据并将它存入MySQL数据库

关于使用sparkstreaming读取kafka生产者生产的数据,并且将每一次输入的数据进行词频统计,然后将结果存储到MySQL数据库中。学习记录~

一、环境准备

  • zookeeper
  • kafka

二、编写程序

在idea中编写SparkStreaming代码:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

case class Word(
               wordName: String,
               count:Int
               )
object KafkaDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDemo")
    val streamingContext = new StreamingContext(conf,Seconds(5))
    val sc = streamingContext.sparkContext
    
    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> "ethan002:9092", //从那些broker消费数据
      "key.deserializer" -> classOf[StringDeserializer], //发序列化的参数,因为写入kafka的数据经过序列化
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream", //指定group.id
      "auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始  ;② latest从消费者启动之后开始
      "enable.auto.commit" -> (false: java.lang.Boolean)
      //是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】,
      // true 消费者定期会更新偏移量 groupid,topic,parition -> offset ;
     // "false" 不让kafka自动维护偏移量     手动维护偏移
    )
    //    数组中存放的是在kafka中创建的topic
    val topics = Array("first", "t100")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams) //订阅主题
    )
    val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) //转换格式

    //对从kafka生产的一次消息进行词频统计
    val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)

    //将DStream中的数据存储到mysql数据库中
    resultRDD.foreachRDD(
      rdd=>{
      rdd.foreach(
        data=>{
          val url = "jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=UTF-8"
          val user = "root"
          val password = "123456"
          Class.forName("com.mysql.jdbc.Driver").newInstance()
          var conn: Connection = DriverManager.getConnection(url,user,password)
          val sql = "insert into word(wordName,count) values(?,?)"
          var stmt : PreparedStatement = conn.prepareStatement(sql)
          stmt.setString(1,data._1.toString)
          stmt.setString(2,data._2.toString)
          stmt.executeUpdate()
          conn.close()
      })
    })
    // 打印
    resultRDD.print()

    // 启动
    streamingContext.start()

    // 等待计算采集器的执行
    streamingContext.awaitTermination()
  }
}


三、进行测试

  • 启动zookeeper
    bin/zkServer.sh start

  • 启动kafka
    bin/kafka-server-start.sh ./config/server.properties
    在这里插入图片描述

  • 启动kafka的producer进程
    kafka-console-producer.sh --broker-list ethan002:9092 --topic first
    在这里插入图片描述

  • 运行SparkStreaming程序

  • 在kafka的producer进程输入数据

    [root@ethan002 ~]# kafka-console-producer.sh --broker-list ethan002:9092 --topic first
    >hello world
    >hello world
    >world hello
    >hello world
    
  • 查看结果

    1. idea控制台中查看结果
      在这里插入图片描述

    2. 在mysql中查看
      在这里插入图片描述


目前仅仅是对每一次读取kafka的数据分别进行一个词频统计,我将继续努力实现对词频的连续累计统计的程序。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇

)">
下一篇>>