RDD转化为DataFrame的两种方法


 目录

前言

一、DataFrame的创建

从数据源创建

二、RDD->DataFrame

1.利用反射机制推断RDD模式

2. 使用编程方式定义RDD模式


前言

为什么要推出DataFrame?

DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询,DataFrame实际上就是对RDD的封装。


一、DataFrame的创建

从数据源创建

TIPS:代码中的spark指的是sparkSession!!!

import org.apache.spark.sql.SparkSession

val spark=SparkSession.builder().getOrCreate()

//使支持RDDs转换为DataFrames及后续sql操作
import spark.implicits._

//常用的文件有json,csv,txt,orc等(本文案例使用json文件)
val df = spark.read.json("文件路径")

展示结果:

+-----+------+-----+
|users| month|times|
+-----+------+-----+
|  u02|2017/1|   12|
|  u04|2017/2|   20|
|  u04|2017/1|    3|
|  u03|2017/1|    8|
|  u01|2017/2|   12|
|  u03|2017/3|   10|
|  u01|2017/1|   11|
+-----+------+-----+

二、RDD->DataFrame

1.利用反射机制推断RDD模式


在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame

//导包
import spark.implicits._ 
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

//定义一个样例类
case class Person(name: String, age: Long) 

// TODO 读取文本文件 使用RDD内在的.toDF()方法
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()





//TODO 读取json文件,转化为RDD
val df = sp.read.json("file:///usr/local/spark/mycode/sparksql/people.json")
//注册为临时表供下面的查询使用 
peopleDF.createOrReplaceTempView(“people”) 
//使用sql语句查询
val personsRDD = spark.sql("select name,age from people where age > 20")

personsRDD.map(t => “Name:”+t(0)+“,”+“Age:”+t(1)).show() 



展示结果
+------------------+
| value|
+------------------+
|Name:Michael,Age:29|
| Name:Andy,Age:30|
+------------------+

2. 使用编程方式定义RDD模式


当无法提前定义case class时,就需要采用编程方式定义RDD模式

//导包
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row 

//生成 RDD
scala> val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")

//定义一个模式字符串
scala> val schemaString = "name age"

//根据模式字符串生成模式
scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

//对peopleRDD 这个RDD中的每一行元素都进行解析 
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).trim))

val peopleDF = spark.createDataFrame(rowRDD, schema)

//创建临时表进行查询
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")

results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()

查看结果 

+--------------------+
| value|
+--------------------+
|name: Michael,age:29|
| name: Andy,age:30|
| name: Justin,age:19|
+--------------------+

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码

)">
< <上一篇
下一篇>>