Spark+Kafka构建Dashboard实训+踩坑笔记

项目简介

项目通过spark和kafka构建实时分析平台,设计消息预处理、消息队列发送、接收消息、数据实时处理、数据实时推送和实时展示等数据处理全流程

涉及技术

linux:操作系统
Spark:专为大规模数据处理而设计的快速通用搜索引擎,由scala语言编写 Kafka:是一种高吞吐量的分布式订阅消息分发系统,由scala和Java编写,处理大量的用户访问流记录
Flask.socketIO:是一个消息双工即使模块
Flask:python编写的轻量级web框架,主要包括两个核心库:werkzeug和jinja2,功能分别是处理业务和负责安全
Highchart.js:是一个由javascript编写的图标库
Pycharm:python编译器

实现思路

部署hadoop–>安装spark–>编写scala独立应用程序–>安装kafka–>安装python–>python操作kafka数据预处理–>编写sparksteaming程序和producerconsumer程序处理数据

项目体系

spark+kafka流程图

<项目实现>

1、部署spark

1.1、部署hadoop

//创建用户并设置权限

~$: sudo useradd -m hadoop -s /bin/bash
~$: sudo passwd hadoop
~$: sudo adduser hadoop sudo     //授权

//更新apt

~$: sudo apt-get updata

//配置SSH免密登录
ubuntu默认有client,这里还需要安装ssh server

~$: sudo apt-get install openssh-server
~$: ssh localhost    //看完后记得exit一下
~$: cd ~/.ssh/
~$: ssh-keygen -t rsa  //-t指秘钥类型 rsa指SSH2,,提示时,直接回车
~$: cat ./id_rsa.pub >> ./authroized_keys

ssh免密登录示例图

1.2、部署java环境

官网链接: https://www.oracle.com/java/technologies/downloads/
jdk
//下载好对应的压缩包后解压到相应目录并设置环境变量

~$: sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm
~$: cd ~            //cd /进入跟目录    cd ~进入用户家目录
~$: cat ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
~$: source ~/.bashrc    //环境变量生效
~$: java -version

Java

1.3、安装hadoop2

hadoop2下载地址: http://mirrors.cnnic.cn/apache/hadoop/common/
//有的同学下载包时会遇到两个都感觉可以下载的情况,一脸懵逼不知下载哪个????这里解释一下,binary是编译好的可以直接使用,source是源码,需要编译之后使用。
chown 修改文件和文件夹的用户和用户组属性
chmod 修改文件和文件夹读写执行属性

~$: sudo tar -zxvf /hadoop-2.6.0.tar.gz -C /usr/local  //解压到local
~$: cd /usr/local
~$: sudo mv ./hadoop-2.6.0/ ./hadoop   //为了方便后期使用,改个短一点的命
~$: sudo chown -R hadoop ./hadoop  //chown 修改文件和文件夹的用户和用户组属性  chmod 修改文件和文件夹读写执行属性
~$: cd /usr/local/hadoop/bin   //bin目录下存放二进制可执行文件
~$: ./hadoop version

1.4、hadoop伪分布式

//修改配置文件,文件位于hadoop包的/etc/hadoop下
(注意:configuration 标签原文已有,不可重复)
core-site.xml

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/usr/local/hadoop/tmp</value>
        <description>Abase for other temporary directories.</description>
    </property>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

hdfs-site.xml

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/usr/local/hadoop/tmp/dfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/usr/local/hadoop/tmp/dfs/data</value>
    </property>
</configuration>

//到hadoop的bin 目录下执行namenode格式化

~$: ./hdfs namenode -format

namenode格式化
格式化过程中出现y/n询问,输y回车即可

// 开启namenode和datanode。
NameNode:是Master节点,有点类似Linux里的根目录。管理数据块映射;处理客户端的读写请求;配置副本策略;管理HDFS的名称空间;
DataNode:负责存储client发来的数据块block;执行数据块的读写操作。是NameNode的小弟。

~$: cd /usr/local/hadoop
~$: ./sbin/start-dfs.sh
~$: ./sbin/stop-dfs.sh

下面图片有很多进程,是我做实验开着后没关的,启动start-dfs.sh只要有
jps + secondarynamenode + namenode + datanode即可
在这里插入图片描述

访问浏览器页面出现下图即可
(拓展:localhost:9000后面跟着一个词active,这括号后面有两种状态,一种是active状态,一种是standby状态,active状态意味着提供服务,standby状态意味着处于休眠状态,只进行数据同步,时刻准备着提供服务,两者可以切换。)
在这里插入图片描述
至此,我们的铺垫,有就是hadoop就部署完成了,接下来就开始部署spark
+++++++++++++++++++++++++++++++++++++++++++++++++++++++

2.部署spark

2.1、安装与修改配置

spark官网: http://spark.apache.org/downloads.html
choose a package type选择如图所示,这个是属于hadoop free版本,下载到的saprk可以用到任何一个hadoop版本
spark四种安装模式:
1.local模式(单机模式)(本次使用的模式)
2.standlone模式(使用spark自带的集群管理器)
3.yarn模式(使用yarn作为集群管理器)
4.mesos模式(使用mesos作为集群管理器)
在这里插入图片描述
// 接着走一遍与安装hadoop2一样的步骤
(解压到指定目录 + cd 到解压目录 + 改名 + 授权)

~$: sudo tar -zxvf spark-2.1.0-bin-without-hadoop.tgz -C /usr/local/
~$: cd /usr/local
~$: sudo mv ./spark-2.1.0-bin-without-hadoop/ ./spark
~$: sudo chown -R hadoop:hadoop ./spark 

// 修改spark配置文件spark-env.sh.template
配置作用:配置后Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据

~$: cd /usr/local/spark
~$: cat ./conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

// 验证spark是否安装成功----/spark目录下

~$: bin/run-example SparkPi 2>&1 | grep "Pi is"

Pi is

2.2、运行spark程序

需要退出时输入quit即可

~$: bin/spark-shell

在这里插入图片描述
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++

3、scala独立应用程序编写

3.1、安装sbt

首先先安装sbt,sbt是scala的编译工具
sbt下载链接: http://www/scala-sbt.org

~$: sudo mkdir /usr/local/sbt    //创建安装目录
~$: sudo tar -zxvf sbt-1.3.8.tgz -C /usr/local
~$: cd /usr/local/sbt
~$: sudo chown -R hadoop /usr/local/sbt
~$: cp ./bin/sbt-launch.jar ./    //将bin下的launch.jar复制到当下目录
~$: vi /usr/local/sbt/sbt     //创建shell脚本用于启动sbt
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"

// 为shell脚本授予权限

~$: chmod u+x /usr/lcoal/sbt/sbt     //u指所有者,+x值赋予可执行权限

3.2、编写并打包scala应用程序

~$: cd ~
~$: mkdir ./sparkapp   //创建应用程序根目录
~$: mkdir -p ./sparkapp/src/main/scala   //-p值递归创建,也就是同时创建层层嵌套文件夹
~$: cd ./sparkapp/src/main/scala
~$: vi SimpleApp.scala
 /* SimpleApp.scala */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
 
    object SimpleApp {
        def main(args: Array[String]) {
            val logFile = "file:///usr/local/spark/README.md" // Should be some file on your system
            val conf = new SparkConf().setAppName("Simple Application")
            val sc = new SparkContext(conf)
            val logData = sc.textFile(logFile, 2).cache()
            val numAs = logData.filter(line => line.contains("a")).count()
            val numBs = logData.filter(line => line.contains("b")).count()
            println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
        }
    }

// 打包scala程序
注意:每次编写脚本时都要注意版本差异

~$: vi ./sparkapp/simple.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

// 首次运行需要下载依赖包

~$: /usr/local/sbt/sbt package

在这里插入图片描述
// 通过submit运行程序

~$: /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar 2>&1 | grep "Lines with a:"

在这里插入图片描述

4、部署kafka

4.1、安装kafka

kafka官网: https://kafka.apache.org/downloads
在这里插入图片描述
核心概念

  1. Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker
  2. Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  3. Partition
    Partition是物理上的概念,每个Topic包含一个或多个Partition.
  4. Producer
    负责发布消息到Kafka broker
  5. Consumer
    消息消费者,向Kafka broker读取消息的客户端。
  6. Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
    // 解压安装包
~$: sudo tar -zxvf kafka_2.11-0.10.1.0.tgz -C /usr/local
~$: cd /usr/local
~$: sudo mv afka_2.11-0.10.1.0/ ./kafka
~$: sudo chown -R hadoop ./kafka
~$: cd /usr/lcoal/kafka
~$: bin/zookeeper-server-start.sh config/zookeeper.properties //运行实例

4.2、测试

在这里插入图片描述
注意:启动完后会保持上图状态,一定不要关闭他,启动新的终端(终端2)

~$: cd /usr/lcoal/kafka
~$: bin/kafka-server-start.sh config/server.properties

在这里插入图片描述
// 上图已经完成kafka服务端的启动了,不要关闭他,启动终端3输入命令如下

~$: cd /usr/local/kafka
~$: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab

用list列出所有创建的topics,来查看刚才创建的主题是否存在。

~$: bin/kafka-topics.sh --list --zookeeper localhost:2181
~$: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

CTRL+C退出,然后使用consumer来接收数据,输入如下命令:

~$: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning 

在这里插入图片描述
至此,kafka就安装成功了!
+++++++++++++++++++++++++++++++++++++++++++++++++++++++

4、安装python

4.1、python下载

案例用到的python第三方库,flask和socketio,pip管理第三方库
注意自己下载到哪个路径下

~$: sudo apt-get install python3-pip
~$: pip3 install flask    //安装需要的库flask和socketio
~$: pip3 install flask-socketio
~$: pip3 install kafka-python

pycharm的安装
pycharm官网: https://www.jetbrains.com/pycharm/download/#section=linux
接下来就是万年不变的解压安装步骤

~$: cd 
~$: sudo tar -zxvf pycharm-community-2021.2.3.tar.gz -C /usr/local  
~$: cd /usr/local
~$: sudo mv pycharm-community-2021.2.3 pycharm 
~$: sudo chown -R hadoop ./pycharm 
~$: cd /usr/local/pycharm
~$: ./bin/pycharm.sh    //启动pycharm

4.2、更换python版本(可选)

ubuntu16.04自带有python2.7和python3.5,直接使用python3.5即可
若需要更换python 版本,例如3.7,则操作如下:

~$: sudo apt-get install zlib1g-dev libbz2-dev libssl-dev libncurses5-dev libsqlite3-dev libreadline-dev tk-dev libgdbm-dev libdb-dev libpcap-dev xz-utils libexpat1-dev liblzma-dev libffi-dev libc6-dev
~$: wget https://www.python.org/ftp/python/3.7.5/Python-3.7.5.tgz
~$: tar -xzvf Python-3.7.5.tgz
~$: cd Python3.7.5
~$: sudo mkdir -p /usr/local/python3
~$: ./configure --prefix=/usr/local/python3  --enable-optimizations
~$: make
~$: sudo make install
~$: ln -s /usr/local/python3/bin/python3.7 /usr/bin/python
~$: ln -s /usr/local/python3/bin/pip3.7 /usr/bin/pip
~$: python -V

4.3、pycharm目录结构:

在这里插入图片描述

目录介绍
data目录存放的是用户日志数据;
scripts目录存放的是Kafka生产者和消费者;
static/js目录存放的是前端所需要的js框架;
templates目录存放的是html页面;
app.py为web服务器,接收Spark Streaming处理后的结果,并推送实时数据给浏览器;
External Libraries是本项目所依赖的Python库,是PyCharm自动生成。

至此,开发环境就部署完成了
++++++++++++++++++++++++++++++++++++++++++++++++++++++++

5、python 操作kafka

数据集: https://pan.baidu.com/s/1cs02Nc
该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv

5.1、创建工程文件

pycharm中按照上面工程项目图定位新建一个文件名为producer.py

# coding: utf-8
import csv
import time
from kafka import KafkaProducer
 
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv","r")
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)
 
for line in reader:
    gender = line[9] # 性别在每行日志代码的第9个元素
    if gender == 'gender':
        continue # 去除第一行表头
    time.sleep(0.1) # 每隔0.1秒发送一行数据
    # 发送数据,topic为'sex'
    producer.send('sex',line[9].encode('utf8'))

pycharm中按照上面工程项目图定位新建一个文件名为consumer.py

from kafka import KafkaConsumer
 
consumer = KafkaConsumer('sex')
for msg in consumer:
    print((msg.value).decode('utf8'))

运行上面两个文件前先开启kafka

~$: cd /usr/local/kafka
~$: bin/zookeeper-server-start.sh config/zookeeper.properties &
~$: bin/kafka-server-start.sh config/server.properties

5.2、测试

kafka开启后直接在pycharm中右键运行producer.py和consumer.py,运行出来是数据流则正确
在这里插入图片描述

6、配置spark开发kafka环境

6.1、下载spark连接kafka的代码库

将下载到的代码库放到/usr/local/spark/jars目录下

~$: sudo mv ~/下载/spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars
~$: cd /usr/local/spark/jars
~$: mkdir kafka
~$: cd kafka
~$: cp /usr/local/kafka/libs/* .

OK,spark开发kafka环境完成

6.2、创建scala工程项目

在/usr/local/spark/mycode下新建kafka项目目录,接着在kafka目录下新建src/main/scala两个文件:StreamingExamples.scala和KafkaTest.scala
代码如下

ackage org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
package org.apache.spark.examples.streaming
 
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.json4s._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.Interval
import org.apache.spark.streaming.kafka._
 
object KafkaWordCount {
  implicit val formats = DefaultFormats//数据格式化时需要
  def main(args: Array[String]): Unit={
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()
    /* 输入的四个参数分别代表着
    * 1. zkQuorum 为zookeeper地址
    * 2. group为消费者所在的组
    * 3. topics该消费者所消费的topics
    * 4. numThreads开启消费topic线程的个数
    */
    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")  //这里表示把检查点文件写入分布式文件系统HDFS,所以要启动Hadoop
    // 将topics转换成topic-->numThreads的哈稀表
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    // 创建连接Kafka的消费者链接
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))//将输入的每行用空格分割成一个个word
    // 对每一秒的输入数据进行reduce,然后将reduce后的数据发送给Kafka
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd => {
          if(rdd.count !=0 ){
               val props = new HashMap[String, Object]()
               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
               "org.apache.kafka.common.serialization.StringSerializer")
               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
               "org.apache.kafka.common.serialization.StringSerializer")
               // 实例化一个Kafka生产者
               val producer = new KafkaProducer[String, String](props)
               // rdd.colect即将rdd中数据转化为数组,然后write函数将rdd内容转化为json格式
               val str = write(rdd.collect)
               // 封装成Kafka消息,topic为"result"
               val message = new ProducerRecord[String, String]("result", null, str)
               // 给Kafka发送消息
               producer.send(message)
          }
      })
    ssc.start()
    ssc.awaitTermination()
  }
}

注意:如果没有启动hdfs就运行的话,会报错“拒绝连接”,所以运行前先启动hdfs,命令:cd /usr/local/hadoop
./sbin/start-dfs.sh(前面有一点)

6.3、打包以及编译

// 在/usr/local/spark/mycode/kafka目录下新建simple.sbt打包,代码如下

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"

// 编译打包程序:/usr/local/sbt/sbt packag
接着在在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,代码如下

 /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar 127.0.0.1:2181 1 sex 1

6.4、测试

运行:sh startup.sh

Consumer中接收的topic改为result
运行producer.py和consumer.py
在这里插入图片描述

7、步骤展示(可视化技术)

利用Flask-SocketIO实时推送数据
socket.io.js实时获取数据
highlights.js展示数据

7.1、创建工程文件

创建app.py文件

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
#因为第一步骤安装好了flask,所以这里可以引用
 
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
# 实例化一个consumer,接收topic为result的消息
consumer = KafkaConsumer('result')
 
# 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器
def background_thread():
    girl = 0
    boy = 0
    for msg in consumer:
        data_json = msg.value.decode('utf8')
        data_list = json.loads(data_json)
        for data in data_list:
            if '0' in data.keys():
                girl = data['0']
            elif '1' in data.keys():
                boy = data['1']
            else:
                continue
        result = str(girl) + ',' + str(boy)
        print(result)
        socketio.emit('test_message',{'data':result})
 
# 客户端发送connect事件时的处理函数
@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        # 单独开启一个线程给客户端发送数据
        thread = socketio.start_background_task(target=background_thread)
    socketio.emit('connected', {'data': 'Connected'})
 
# 通过访问http://127.0.0.1:5000/访问index.html
@app.route("/")
def handle_mes():
    return render_template("index.html")
 
# main函数
if __name__ == '__main__':
    socketio.run(app,debug=True)

// 创建index页面

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>DashBoard</title>
    <script src="static/js/socket.io.js"></script>
    <script src="static/js/jquery-3.1.1.min.js"></script>
    <script src="static/js/highcharts.js"></script>
    <script src="static/js/exporting.js"></script>
    <script type="text/javascript" charset="utf-8">
    var socket = io.connect('http://' + document.domain + ':' + location.port);
    socket.on('connect', function() {
        socket.emit('test_connect', {data: 'I'm connected!'});
    });
 
    socket.on('test_message',function(message){
        console.log(message);
        var obj = eval(message);
        var result = obj["data"].split(",");
        $('#girl').html(result[0]);
        $('#boy').html(result[1]);
    });
 
    socket.on('connected',function(){
        console.log('connected');
    });
 
    socket.on('disconnect', function () {
        console.log('disconnect');
    });
    </script>
</head>
<body>
<div>
    <b>Girl: </b><b id="girl"></b>
    <b>Boy: </b><b id="boy"></b>
</div>
<div id="container" style="width: 600px;height:400px;"></div>
 
<script type="text/javascript">
    $(document).ready(function () {
    Highcharts.setOptions({
        global: {
            useUTC: false
        }
    });
 
    Highcharts.chart('container', {
        chart: {
            type: 'spline',
            animation: Highcharts.svg, // don't animate in old IE
            marginRight: 10,
            events: {
                load: function () {
 
                    // set up the updating of the chart each second
                    var series1 = this.series[0];
                    var series2 = this.series[1];
                    setInterval(function () {
                        var x = (new Date()).getTime(), // current time
                        count1 = $('#girl').text();
                        y = parseInt(count1);
                        series1.addPoint([x, y], true, true);
 
                        count2 = $('#boy').text();
                        z = parseInt(count2);
                        series2.addPoint([x, z], true, true);
                    }, 1000);
                }
            }
        },
        title: {
            text: '男女生购物人数实时分析'
        },
        xAxis: {
            type: 'datetime',
            tickPixelInterval: 50
        },
        yAxis: {
            title: {
                text: '数量'
            },
            plotLines: [{
                value: 0,
                width: 1,
                color: '#808080'
            }]
        },
        tooltip: {
            formatter: function () {
                return '<b>' + this.series.name + '</b><br/>' +
                    Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '<br/>' +
                    Highcharts.numberFormat(this.y, 2);
            }
        },
        legend: {
            enabled: true
        },
        exporting: {
            enabled: true
        },
        series: [{
            name: '女生购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;
 
                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        },
        {
            name: '男生购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;
 
                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }]
    });
});
</script>
</body>
</html>

7.2、按照工程项目图创建下列文件:

socket.io.js链接: https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.0.4/socket.io.js

socket.io.js.map链接: https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.0.4/socket.io.js.map

highchart.js链接: https://www.highcharts.com/download

exporting.js:Highcharts-6.0.7.zip解压缩后,可以看到里面有个code子目录,在code子目录下面就可以找到一个js子目录,在js子目录下可以找到一个modules子目录,在modules子目录中就可以找到库文件exporting.js。

jquery.js链接: http://jquery.com/download/

7.3、测试结果数据

// 运行app.py,打开浏览器127.0.0.1:5000即可展示数据
在这里插入图片描述

8、入坑笔记

部署spark+kafka构建实时分析dashboard入坑笔记

ubuntu系统
1.问题:在使用xshell连接ubuntu时一直连接不上?

解决:连接不上是因为ubuntu系统并没有ssh协议,此时通过以下命令解决
rpm -qa | grep ssh //查看是否安装了ssh
sudo apt-get install ssh //如无安装执行
ps -ef | grep ssh //启动
service sshd start //或者也可以重启

============================================================================

2.问题:在sudo apt-get update时出现无法获得锁/var/lib/dpkg/lock-open(11:资源暂时不可用)报错

解决:ps -A | grep apt:查看哪些进程占用着apt
发现进程后,sudo kill -9 xx(进程号):杀死进程
再删除锁文件:sudo rm /var/lib/pdkg/lock

============================================================================

3.问题:昨晚使用ubuntu虚拟机时,屏幕真的好小,我用着简直难受死,一时搞tools又搞不了!!

解决:点击”虚拟机“–”安装vm tools“–然后在iso文件处会被替换成vm tools–点击他–有个压缩包–复制到桌面上,然后解压他,然后cd 到解压后的文件中,执行sudo ./vmware-install.pl,一路回车或者按照提示完成。

============================================================================

4.问题:安装java环境时报错–格式执行错误

解决:调试了两节课才发现,我jdk的64位的,系统是32位的,不兼容,真是吐血,将jdk更换为32位后一次就成功了。

============================================================================

5.问题:xshell上传文件一直失败

解决:将aisen账户换成hadoop账户,搞定。

============================================================================
6.问题:在安装sbt的时候,出现错误,sbtVersion一直失败

解决:重装了64位系统配置了64位jdk后就能正常安装了。

============================================================================

7.问题:在执行python文件时,在from spark.xx import sprk.yy处报错

解决:在设置处将原本是python2.7换成python3.5编译器就可以正常编译了。

============================================================================

8.问题:在执行最后的pycharm的两个文件时无法正常接收sparksteaming数据,运行不出数据

解决:是需要同时执行sparksteaming和producer和consumer,也就是执行sparksteaming时终端会处于执行中状态,不可中断执行,让它保持执行状态,然后运行producer后再运行consumer即可正常接收sparksteaming数据。

============================================================================

9.问题:在ubuntu16中下载pycharm后,我用的是python3.5的编译器,后来报错:python 3.5 has reached its end-of-life date and it is no longer support in pycharm

解决:报错翻译为:Python3.5已经到了生命周期的尽头,pycharm不再支持它。原因是pycharm已不再兼容python3.5。通过如下命令安装python3.9:
cd /usr/local/share
wget https://www.python.org/ftp/python/3.9.5/Python-3.9.5.tgz
tar zxvf Python-3.9.5.tgz
cd Python-3.9.5
./configure --with-ssl --enable-optimizations --enable-shared --enable-unicode=ucs4
make
make install
mv /usr/bin/python3 /usr/bin/python3.bak
ln -s /usr/local/bin/python3 /usr/bin/python3

============================================================================

10.问题:ubuntu系统安装python3版本

解决:1.准备
在安装之前,请使用以下命令安装Python的先决条件。

     sudo apt-get install build-essential checkinstall
     sudo apt-get install libreadline-gplv2-dev libncursesw5-dev libssl-dev 
     libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev
     2.安装
     使用python官方站点的以下命令下载Python。您也可以下载最新版本代替下面指定的版本。

     cd /usr/src
     sudo wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz

     sudo tar xzf Python-3.7.0.tgz

     3. 编译
     使用下面的命令集来使用altinstall在您的系统上编译python源代码。

     cd Python-3.7.0
     sudo ./configure --enable-optimizations
     sudo make altinstall
     make altinstall用于防止替换默认的python二进制文件/ usr / bin / python。

     4.检查Python版本

     python3.7 -V

============================================================================

11.问题:将终端输入python显示版本的指令有原本的指向2.7更改为指向3.7操作

解决:输入lias python=python3.7 即可

============================================================================

12.问题:pycharm中使用python3.7版本出现{from flask-socketio import socketio}报错,无法导包

解决:在终端执行 pip install eventlet
pip install flask-socketio

============================================================================

13.问题:python3.7运行编写的生产者和消费者时报错(SimpleProducer。。。。。。)

解决:报错是因为python3.7新增了关键字async,await;kafka-python用到关键字async,由此带来了兼容问题,使用python3.6版本即可正常运行

============================================================================

注:项目来源于厦门大学林子雨实验室!
项目原文链接: http://dblab.xmu.edu.cn/post/8274/

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>