RDD编程基础

一、创建RDD

两种方式:

1.从文件系统中加载数据创建RDD

Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是:

  • 本地文件系统的地址
  • 或者是分布式文件系统HDFS的地址
  • 或者是Amazon S3的地址等等

2. 通过并行集合(列表)创建RDD

可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合 (列表)上创建。

举个栗子:

第一种:
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") or sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
>>> lines.foreach(print)
Hadoop is good
Spark is fast
Spark is bette

第二种:
>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array)
>>> rdd.foreach(print)

二、转换函数

1.filter()

.filter(func):筛选出满足函数func的元素,并返回一个新的数据集
>>>lines = sc.textFile("file:/l/usr/local/spark/mycode/rdd/word.txt")
>>>linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
Spark is fast
Spark is better

2.map()

.map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集,一个纯粹的转换操作

第一个栗子:
>>>data=[1,2,3,4,5]
>>> rdd1=sc.parallelize(data)
>>> rdd2=rdd1.map(lambda x:x+1)
>>> rdd2.foreach(print)

第二个栗子:

>>>lines =sc.textFile("file:lllusr/local/spark/mycode/rdd/word.txt"')
>>>words = lines.map(lambda line:line.split(" "))
>>>words.foreach(print)
['Hadoop', ' is' , 'good']['Spark', 'is', ' fast]['Spark', 'is', ' better']

3.flatMap()

.flatMap(func):与map相似,但每个输入元素都可以映射到0或多个输出结果;先执行Map在执行flat拍扁其中的每个元素

第一个栗子:
>>>lines = sc.textFile("file:/llusr/localspark/mycode/rdd/word.txt")
>>>words = lines.flatMap(lambda line:line.split(" "))

第二个栗子
>>>words = sc.parallelize([("Hadoop",1),("is",1).("good",1),.... ("Spark",1).("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.groupByKey
>>> words1.foreach(print)
('Hadoop',<pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('better',<pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
('fast',<pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('good',<pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('Spark',<pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
('is',<pyspark.resultiterable.ResultIterable object at Ox7fb210552e10>)

4.reduceByKey()

.reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果
>>>words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), !.... ("is",1),("fast"",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.reduceByKey(lambda a,b:a+b)
>>> words1.foreach(print)
('good', 1)
('Hadoop', 1)('better', 1)('Spark', 2)('fast', 1)('is',3)

tips:

  • groupByKey也是对每个key进行操作,但只生成一个sequence, groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作.
  • reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义.

5.keys()

.keys():返回键值
>>> list=[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>>pairRDD= sc.parallelize(list)
>>> pairRDD.keys().foreach(print)
Hadoop
Spark
Hive
Spark

6.values()

.values():返回值
>>> list =[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>>pairRDD.values(.foreach(print)
1111

7.sortByKey()

.sortByKey():返回一个根据键排序的RDD,默认升序排序,降序sortByKey(False)

第一个栗子:
>>>list=[("Hadoop",1).("Spark",1),("Hive",1),(""Spark",1)]
>>>pairRDD= sc.parallelize(list)
>>> pairRDD.foreach(print)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
('Spark', 1)

第二个栗子:

>>>pairRDD.sortByKey.foreach(print)
('Hadoop', 1)
('Hive', 1)('Spark', 1)('Spark', 1)
>>>d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()
[('g',21), ('f,29), ('e',17), ('d', 9), ('c',27),('b',38), ('a',42)]

8.sortBy()

.sortBy(func):按func自定义排序
>>>d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[0],False).collect(('g',21), ('f,29), ('e',17), ('d', 9), ('c',27), ('b',38), ('a', 42))
>>>d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[1],False).collect()
[('a',42), ('b',38), ('f,29), ('c',27),('g', 21), ('e',17), ('d', 9)]

9.mapValues()

.mapValues(func):对键值对RDD中的每个value都应用—个函数,key不会发生变化
>>> list=[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>>pairRDD= sc.parallelize(list)
>>>pairRDD1 = pairRDD.mapValues(lambda x;x+1)
>>>pairRDD1.foreach(print)
('Hadoop',2)
('Spark',2)('Hive',2)('Spark', 2)

10.join()

.join(): join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个:(K,(V1,V2))类型的数据集。
>>> pairRDD1 = sc. parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)])
>>>pairRDD2= sc.parallelize([("spark","fast")))
>>>pairRDD3 = pairRDD1.join(pairRDD2)
>>>pairRDD3.foreach(print)
('spark', (1, 'fast'))
('spark', (2, 'fast'))

11.distinct()

.distinct():去除重复值,一般用于在数据读入时执行该操作
>>> RDD = sc. parallelize([("spark",1),("spark",1),("spark",2),("hadoop",3),("hadoop",5)]).map(lambda x:s.strip()).distinct()
>>>RDD.foreach(print)
("spark",1)
("spark",2)
("hadoop",3)
("hadoop",5)


二、常见的行动操作:

  • count(返回数据集中的元素个数
  • collect0以数组的形式返回数据集中的所有元素
  • first()返回数据集中的第一个元素
  • take(n)以数组的形式返回数据集中的前n个元素
  • reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
  •  foreach(func)将数据集中的每个元素传递到函数func中运行

>>>rdd = sc.parallelize([1,2,3,4,5)
>>> rdd.countO
5
>>> rdd.first()
1
>>>rdd.take(3)
[1,2,3]
>>>rdd.reduce(lambda a,b:a+b)
15
>>> rdd.collect()
[1,2,3,4,5]
>>>rdd.foreach(lambda elem:print(elem))
12345

三、题外话:

A.持久化

在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执 行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据,通过持久化(缓存)机制避免这种重复计算的开销。

.persist():标记为持久化,在第一次行动操作时执行----->.unpersist():手动地把持久化的RDD从缓存中移除

>>> list =["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>>rdd.cache) #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
>>> print(rdd.count() #第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
3
>>>print(','.join(rdd.collectO)) #第二次行动,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive

B.分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多 个分区,分别保存在不同的节点上。分区的作用主要是:增加并行度;减少通信开销。

分区原则:RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心 (core)数目

分区个数:

(1)创建RDD时手动指定分区个数

        sc.textFile(path, partitionNum) 其中,path参数用于指定要加载的文件的地址,partitionNum参数用于 指定分区个数。例如:

(2)使用reparititon方法重新设置分区个数

        通过转换操作得到新 RDD 时,直接调用 repartition 方法即可。例如:

四、如何开始编写一个RDD

启动Hadoop,打开pycharm新建文件就不说了,部分同学不知道sc是啥,我理解的是sparkContext相当于一个指挥官负责统筹调度计算RDD之间的依赖关系构建DAG(有向无环图),再有DAGScheduler负责将DAG图分解成多个阶段,每个阶段包含多个任务,每个任务又会被TaskScheduler分发给各个WorkerNode上的Executor去执行,再逐层返回最后得到结果,基本的思想还是MapReduce只是基于内存速度更快,不像Hadoop频繁的IO读写会有很大延迟,举个栗子:

from pyspark import SparkContext,SparkConf

def fun1(x):
    arr = x.split()
    id = arr[0]
    name = arr[1:]
    return (id,name)

def fun2(x):
    if x[0] == '2019110401':
        return False
    else:
        return True

def fun3(x):
    key = x[0]
    value = int(int(x[1][1])/10)
    return (value,key)

conf = SparkConf().setAppName('class 1').setMaster('local')
sc = SparkContext(conf=conf)

# alist=[1,2,3,4,5]
# rdd0 = sc.parallelize(alist)#创建第一个RDD
# print(rdd0)

#path = 'hdfs://master:9000/test.txt'
path = 'file:///home/mls/abc/test.txt'
rdd0 = sc.textFile(path)
print(rdd0)
#
rdd1 = rdd0.map(lambda x:x.strip()).distinct()#去空格后去重
print(rdd1.collect())#此时得到的是字符串

rdd2 = rdd1.map(lambda x:fun1(x))#map转换
print(rdd2.count())
print(rdd2.collect())
print(rdd2.take(4))

rddttest = rdd2.map(lambda x: fun3(x))
print(rddttest.collect())

rdd3 = rdd2.filter(fun2)#过滤 rdd2.filter(lambda x: fun2(x))
print(rdd3.count())
print(rdd3.collect())
print(rdd3.take(4))
print(rdd3.collect()[0][1][1])#打印第一个元素的第二维里的第二个元素

# rdd4 = rdd3.map(lambda x: fun3(x))
# print(rdd4.collect())
rdd5 = rdd3.groupByKey()#按照学号分组
print(rdd5.collect())
rddttest5 = rddttest.groupByKey()#按照成绩分组
print(rddttest5.collect())

rdd6 = rdd5.mapValues(lambda x:list(x))#len(list(x))只针对值
print(rdd6.collect())
rddttest6 = rddttest5.mapValues(lambda x:list(x))
print(rddttest6.collect())

print(rdd4.collect())

sc.stop()

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>