Spark(一):基础

Spark 概述

  1. 什么是Spark
    • Spark是一种基于内存的快速,通用,可扩展的大数据分析计算引擎
    • Spark 是一种由 Scala 语言开发的快速、通用、可扩展的大数据分析引擎
    • Spark Core 中提供了 Spark 最基础与最核心的功能
    • Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。
    • Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的
      处理数据流的 API。
  2. Spark 与 Hadoop的对比:Hadoop 的 MR 框架和 Spark 框架都是数据处理框架,那么我们在使用时如何选择呢?
    • Hadoop MapReduce 由于其设计初衷并不是为了满足循环迭代式数据流处理,因此在多并行运行的数据可复用场景(如:机器学习、图挖掘算法、交互式数据挖掘算法)中存在诸多计算效率等问题。所以 Spark 应运而生,Spark 就是在传统的 MapReduce 计算框架的基础上,利用其计算过程的优化,从而大大加快了数据分析、挖掘的运行和读写速度,并将计算单元缩小到更适合并行计算和重复使用的 RDD 计算模型。
    • 机器学习中 ALS、凸优化梯度下降等。这些都需要基于数据集或者数据集的衍生数据
      反复查询反复操作。MR 这种模式不太合适,即使多 MR 串行处理,性能和时间也是一
      个问题。数据的共享依赖于磁盘。另外一种是交互式数据挖掘,MR 显然不擅长。而
      Spark 所基于的 scala 语言恰恰擅长函数的处理。
    • Spark 是一个分布式数据快速分析项目。它的核心技术是弹性分布式数据集(Resilient Distributed Datasets),提供了比 MapReduce 丰富的模型,可以快速在内存中对数据集进行多次迭代,来支持复杂的数据挖掘算法和图形计算算法。
    • Spark 和Hadoop 的根本差异是多个作业之间的数据通信问题 : Spark 多个作业之间数据
      通信是基于内存,而 Hadoop 是基于磁盘。
      在这里插入图片描述
    • Spark Task 的启动时间快。Spark 采用 fork 线程的方式,而 Hadoop 采用创建新的进程的方式。
    • Spark 只有在 shuffle 的时候将数据写入磁盘,而 Hadoop 中多个 MR 作业之间的数据交互都要依赖于磁盘交互
    • Spark 的缓存机制比 HDFS 的缓存机制高效。
    • 经过上面的比较,我们可以看出在绝大多数的数据计算场景中,Spark 确实会比 MapReduce
      更有优势。但是 Spark 是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会
      由于内存资源不够导致 Job 执行失败,此时,MapReduce 其实是一个更好的选择,所以 Spark
      并不能完全替代 MR。
  3. Spark 核心模块
    在这里插入图片描述
    • Spark Core : Spark Core 中提供了 Spark 最基础与最核心的功能,Spark 其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib 都是在 Spark Core 的基础上进行扩展的
    • Spark SQL:Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用 SQL或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。
    • Spark Streaming:Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API。
    • Spark MLlib:MLlib 是 Spark 提供的一个机器学习算法库。MLlib 不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。
    • Spark GraphX:GraphX 是 Spark 面向图计算提供的框架与算法库。

Spark快速上手

创建Maven项目

  1. 第一步创建工程
    在这里插入图片描述
    在这里插入图片描述
  2. 创建好了之后,因为要学习多个模块,所以我们这个创建好的项目当作父类,所以将src删除
    在这里插入图片描述
  3. 然后点击父目录,新建模块,下一步后,因为首先先学习的是sparkcore,所以先创建sparkcore模块
    在这里插入图片描述
  4. 配置好scala,网上有这里就不多赘述,但是要在模块上添加scala框架支持,所以就要提娜姬添加在这里插入图片描述在这里插入图片描述
  5. 点开src目录,来到Java目录下创建目录,并创建scala文件,测试一下环境是否正确。
    在这里插入图片描述
    在这里插入图片描述
  6. 在项目的pom.xml下引入依赖
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>
    <build>
    <plugins>
    <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
    <plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>3.2.2</version>
    <executions>
    <execution>
        <!-- 声明绑定到 maven 的 compile 阶段 -->
        <goals>
            <goal>testCompile</goal>
        </goals>
    </execution>
    </executions>
    </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.1.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
    </build>

WordCount案例

  • 通过获取文件夹下的文件,将文件中的数据进行整合,得到每个单词的数量
  • 流程图:
    在这里插入图片描述
  • 代码:
  • 第一种方式
object WordCount {
  def main(args: Array[String]): Unit = {

    //建立和spark框架的连接
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)

    //执行业务操作
    //1. 读取文件,获取一行一行的数据
    val lines = sc.textFile("datas")

    //2. 将一行数据进行才分,形成一个一个的单词
    val words = lines.flatMap(_.split(" "))

    //3. 将数据根据单词进行分组
    val wordLists = words.groupBy(word => word)

    //4. 对分组后的数据进行准换
    val wordToCount = wordLists.map {
      case (word, list) => {
        (word, list.size)
      }
    }

    //5. 将转换结构打印
    val result = wordToCount.collect()
    result.foreach(println)

    //关闭连接
    sc.stop()
  }

}
  • 第二种方式
object WordCount {
  def main(args: Array[String]): Unit = {

    //建立和spark框架的连接
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)

    //执行业务操作
    //1. 读取文件,获取一行一行的数据
    val lines = sc.textFile("datas")

    //2. 将一行数据进行才分,形成一个一个的单词
    val words = lines.flatMap(_.split(" "))

    //3. 将数据根据单词进行分组
    val wordToOne = words.map(
      word => (word, 1)
    )
    val wordLists = wordToOne.groupBy{
      t => t._1
    }

    //4. 对分组后的数据进行准换
    val wordToCount = wordLists.map {
      case (word, list) => {
        list.reduce(
          (t1,t2) => {
            (t1._1, t1._2 + t2._2)
          }
        )
      }
    }

    //5. 将转换结构打印
    val result = wordToCount.collect()
    result.foreach(println)

    //关闭连接
    sc.stop()
  }

}
  • 第三种方式:利用spark自带的方法
object WordCount {
  def main(args: Array[String]): Unit = {

    //建立和spark框架的连接
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)

    //执行业务操作
    //1. 读取文件,获取一行一行的数据
    val lines = sc.textFile("datas")

    //2. 将一行数据进行才分,形成一个一个的单词
    val words = lines.flatMap(_.split(" "))

    //3. 将数据根据单词进行分组
    val wordToOne = words.map(
      word => (word, 1)
    )

    //4. spark方式对分组后的数据进行准换
    //reduceByKey:相同的KEY的数据,可以对value进行reduce聚合
    val wordToCount = wordToOne.reduceByKey(_ + _)

    //5. 将转换结构打印
    val result = wordToCount.collect()
    result.foreach(println)

    //关闭连接
    sc.stop()
  }

}
  • 教学版本:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)
// 读取文件数据
val fileRDD: RDD[String] = sc.textFile("input/word.txt")
// 将文件中的数据进行分词
val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )
// 转换数据结构 word => (word, 1)
val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1))
// 将转换结构后的数据按照相同的单词进行分组聚合
val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_)
// 将数据聚合结果采集到内存中
val word2Count: Array[(String, Int)] = word2CountRDD.collect()
// 打印结果
word2Count.foreach(println)
//关闭 Spark 连接
sc.stop()
  • 管理日志信息,再在项目的 resources 目录中创建 log4j.properties 文件,并添加日志配置信息:
    在这里插入图片描述
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd 
HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, 
the
# log level for this class is used to overwrite the root logger's log level, so 
that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent 
UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>