sparkstreaming+flume+kafka实时流式处理完整流程

目录

sparkstreaming+flume+kafka实时流式处理完整流程

一、前期准备

二、实现步骤

1.引入依赖

2.日志收集服务器

3.日志接收服务器

4、spark集群处理接收数据并写入数据库

5、测试结果


sparkstreaming+flume+kafka实时流式处理完整流程

一、前期准备

1、环境准备,四台测试服务器

spark集群三台,hadoop02,hadoop03,hadoop04

kafka集群三台,hadoop02,hadoop03,hadoop04

zookeeper集群三台,hadoop02,hadoop03,hadoop04

日志接收服务器, hadoop02

日志收集流程:

日志收集服务器->日志接收服务器->kafka集群->spark集群处理

说明: 日志收集服务器,在实际生产中很有可能是应用系统服务器,日志接收服务器为大数据服务器中一台,日志通过网络传输到日志接收服务器,再入集群处理。

因为,生产环境中,往往网络只是单向开放给某台服务器的某个端口访问的。

Flume版本:apache-flume-1.9.0-bin.tar.gz,该版本已经较好地集成了对kafka的支持

2.1 实时计算
跟实时系统类似(能在严格的时间限制内响应请求的系统),例如在股票交易中,市场数据瞬息万变,决策通常需要秒级甚至毫秒级。通俗来说,就是一个任务需要在非常短的单位时间内计算出来,这个计算通常是多次的。

2.2 流式计算
通常指源源不断的数据流过系统,系统能够不停地连续计算。这里对时间上可能没什么特别限制,数据流入系统到产生结果,可能经过很长时间。比如系统中的日志数据、电商中的每日用户访问浏览数据等。

2.3 实时流式计算
将实时计算和流式数据结合起来,就是实时流式计算,也就是大数据中通常说的实时流处理。数据源源不断的产生的同时,计算时间上也有了严格的限制。比如,目前电商中的商品推荐,往往在你点了某个商品之后,推荐的商品都是变化的,也就是实时的计算出来推荐给你的。再比如你的手机号,在你话费或者流量快用完时,实时的给你推荐流量包套餐等。
 

二、实现步骤

1.引入依赖

!注意:需要与自己安装对应版本一致

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>ScalaProject</artifactId>
    <version>1.0-SNAPSHOT</version>

    <packaging>jar</packaging>

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.8</spark.version>
        <spark.artifact.version>2.11</spark.artifact.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${spark.artifact.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
<!--         使用scala2.11.8进行编译和打包 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
<!--         引入MySQL驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>

    </dependencies>

    <build>
        <!-- 指定scala源代码所在的目录 -->
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/main/scala</testSourceDirectory>

        <plugins>
            <!--对src/main/java下的后缀名为.java的文件进行编译 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <!-- scala的打包插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.5.4</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>


2.日志收集服务器

配置flume动态收集特定的日志,collect.conf  配置如下:

# Name the components on this agent
a1.sources = tailsource-1
a1.sinks = remotesink
a1.channels = memoryChnanel-1
 
# Describe/configure the source
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.command = tail -F /training/data/logs/1.log
 
a1.sources.tailsource-1.channels = memoryChnanel-1
 
# Describe the sink
a1.sinks.k1.type = logger
 
# Use a channel which buffers events in memory
a1.channels.memoryChnanel-1.type = memory
a1.channels.memoryChnanel-1.keep-alive = 10
a1.channels.memoryChnanel-1.capacity = 100000
a1.channels.memoryChnanel-1.transactionCapacity = 100000
 
# Bind the source and sink to the channel
a1.sinks.remotesink.type = avro
a1.sinks.remotesink.hostname = hadoo02
a1.sinks.remotesink.port = 6666
a1.sinks.remotesink.channel = memoryChnanel-1

注意:需创建/training/data/logs/1.log

启动日志收集端脚本:

bin/flume-ng agent --conf conf --conf-file conf/collect.conf --name a1 -Dflume.root.logger=INFO,console

3.日志接收服务器

配置flume实时接收日志,kafka-flume.conf  配置如下:

#agent section  
producer.sources = s  
producer.channels = c  
producer.sinks = r  
  
#source section  
producer.sources.s.type = avro
producer.sources.s.bind = hadoop02
producer.sources.s.port = 6666
 
producer.sources.s.channels = c  
  
# Each sink's type must be defined  
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = flumetopic
producer.sinks.r.brokerList = hadoop02:9092,hadoop03:9092,hadoop04:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
producer.sinks.r.channel = c1
 
#Specify the channel the sink should use  
producer.sinks.r.channel = c  
  
# Each channel's type is defined.  
producer.channels.c.type   = org.apache.flume.channel.kafka.KafkaChannel
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 1000
producer.channels.c.brokerList=hadoop02:9092,hadoop03:9092,hadoop04:9092
producer.channels.c.topic=channel1
producer.channels.c.zookeeperConnect=hadoop02:2181,hadoop03:2181,hadoop04:2181

启动接收端脚本:

bin/flume-ng agent --conf conf --conf-file conf/kafka-flume.conf --name producer -Dflume.root.logger=INFO,console

4、spark集群处理接收数据并写入数据库

package sparkclass.sparkstreaming

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import java.sql.{Connection, DriverManager, PreparedStatement}

/**
 * @author Administrator
 */
object KafkaDataTest1 {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

    val conf = new SparkConf().setAppName("stocker").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(3))

    val sc1 = ssc.sparkContext
    ssc.checkpoint("hdfs://hadoop02:9000/spark/checkpoint")

    // Kafka configurations

    val topics = Set("flumetopic")

    val brokers = "hadoop02:9092,hadoop03:9092,hadoop04:9092"

    val kafkaParams = Map[String,Object](
          "bootstrap.servers" -> "hadoop02:9092", //从那些broker消费数据
          "key.deserializer" -> classOf[StringDeserializer], //发序列化的参数,因为写入kafka的数据经过序列化
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "test-consumer-group", //指定group.id
          "auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始  ;② latest从消费者启动之后开始
          "enable.auto.commit" -> (false: java.lang.Boolean)
          //是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】,
          // true 消费者定期会更新偏移量 groupid,topic,parition -> offset ;
          // "false" 不让kafka自动维护偏移量     手动维护偏移
        )
              
    val kafkaStream = KafkaUtils.createDirectStream[String, String] 
    (ssc,PreferConsistent,Subscribe[String,String](topics, kafkaParams))

    val mapDStream: DStream[(String, String)] = kafkaStream.map(record =>             
    (record.key,record.value))
    val urlClickLogPairsDStream: DStream[(String, Int)] = 
    mapDStream.flatMap(_._2.split(" ")).map((_, 1))

    val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Seconds(30),
      Seconds(3)
    );

    //写入数据库
    //定义函数用于累计每个单词出现的次数
    val addWordFunction = (currentValues:Seq[Int],previousValueState:Option[Int])=>{
      //通过spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
      val currentCount = currentValues.sum
      //已经进行累加的值
      val previousCount = previousValueState.getOrElse(0)
      //返回累加后的结果,是一个Option[Int]类型
      Some(currentCount+previousCount)
    }

    val result = urlClickLogPairsDStream.updateStateByKey(addWordFunction)


    //将DStream中的数据存储到mysql数据库中
    result.foreachRDD(
      rdd=>{
        val url = "jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=UTF-8"
        val user = "root"
        val password = "LIUJIANG1313"
        Class.forName("com.mysql.jdbc.Driver").newInstance()
        //截断数据表,将数据表原有的数据进行删除
        var conn1: Connection = DriverManager.getConnection(url,user,password)
        //此句是清空数据表
        val sql1 = "truncate table word"
        var stmt1 : PreparedStatement = conn1.prepareStatement(sql1)
        stmt1.executeUpdate()
        conn1.close()
        rdd.foreach(
          data=>{
            //将数据库数据更新为最新的RDD中的数据集
            var conn2: Connection = DriverManager.getConnection(url,user,password)
            val sql2 = "insert into word(wordName,count) values(?,?)"
            var stmt2 : PreparedStatement = conn2.prepareStatement(sql2)
            stmt2.setString(1,data._1.toString)
            stmt2.setString(2,data._2.toString)
            stmt2.executeUpdate()
            conn2.close()
          })
      })

    urlClickCountDaysDStream.print();

    ssc.start()
    ssc.awaitTermination()
  }
}

5、测试结果

往日志中依次追加三次日志 

idea运行结果

查看数据库

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>