文章目录
《Spark Python API函数学习:pyspark API(1)》
《Spark Python API函数学习:pyspark API(2)》
《Spark Python API函数学习:pyspark API(3)》
《Spark Python API函数学习:pyspark API(4)》
《Spark Python API函数学习:pyspark API(2)》
《Spark Python API函数学习:pyspark API(3)》
《Spark Python API函数学习:pyspark API(4)》
Spark支持Scala、Java以及Python语言,本文将通过图片和简单例子来学习pyspark API。


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop
sortBy
# sortBy x = sc.parallelize(['Cat','Apple','Bat']) def keyGen(val): return val[0] y = x.sortBy(keyGen) print(y.collect()) ['Apple', 'Bat', 'Cat']
glom
# glom x = sc.parallelize(['C','B','A'], 2) y = x.glom() print(x.collect()) print(y.collect()) ['C', 'B', 'A'] [['C'], ['B', 'A']]
cartesian
# cartesian
x = sc.parallelize(['A','B'])
y = sc.parallelize(['C','D'])
z = x.cartesian(y)
print(x.collect())
print(y.collect())
print(z.collect())
['A', 'B']
['C', 'D']
[('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]
groupBy
# groupBy
x = sc.parallelize([1,2,3])
y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )
print(x.collect())
# y is nested, this iterates through it
print([(j[0],[i for i in j[1]]) for j in y.collect()])
[1, 2, 3]
[('A', [1, 3]), ('B', [2])]
pipe
# pipe
x = sc.parallelize(['A', 'Ba', 'C', 'AD'])
y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows
print(x.collect())
print(y.collect())
['A', 'Ba', 'C', 'AD']
['A', 'Ba', 'AD']
foreach
# foreach
from __future__ import print_function
x = sc.parallelize([1,2,3])
def f(el):
'''side effect: append the current RDD elements to a file'''
f1=open("./foreachExample.txt", 'a+')
print(el,file=f1)
# first clear the file contents
open('./foreachExample.txt', 'w').close()
y = x.foreach(f) # writes into foreachExample.txt
print(x.collect())
print(y) # foreach returns 'None'
# print the contents of foreachExample.txt
with open("./foreachExample.txt", "r") as foreachExample:
print (foreachExample.read())
[1, 2, 3]
None
3
1
2
foreachPartition
# foreachPartition
from __future__ import print_function
x = sc.parallelize([1,2,3],5)
def f(parition):
'''side effect: append the current RDD partition contents to a file'''
f1=open("./foreachPartitionExample.txt", 'a+')
print([el for el in parition],file=f1)
# first clear the file contents
open('./foreachPartitionExample.txt', 'w').close()
y = x.foreachPartition(f) # writes into foreachExample.txt
print(x.glom().collect())
print(y) # foreach returns 'None'
# print the contents of foreachExample.txt
with open("./foreachPartitionExample.txt", "r") as foreachExample:
print (foreachExample.read())
[[], [1], [], [2], [3]]
None
[]
[]
[1]
[2]
[3]
collect
# collect x = sc.parallelize([1,2,3]) y = x.collect() print(x) # distributed print(y) # not distributed ParallelCollectionRDD[87] at parallelize at PythonRDD.scala:382 [1, 2, 3]
reduce
# reduce x = sc.parallelize([1,2,3]) y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum print(x.collect()) print(y) [1, 2, 3] 6
fold
# fold x = sc.parallelize([1,2,3]) neutral_zero_value = 0 # 0 for sum, 1 for multiplication y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum print(x.collect()) print(y) [1, 2, 3] 6
aggregate
# aggregate x = sc.parallelize([2,3,4]) neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1])) y = x.aggregate(neutral_zero_value,seqOp,combOp) # computes (cumulative sum, cumulative product) print(x.collect()) print(y) [2, 3, 4] (9, 24)
max
# max x = sc.parallelize([1,3,2]) y = x.max() print(x.collect()) print(y) [1, 3, 2] 3
min
# min x = sc.parallelize([1,3,2]) y = x.min() print(x.collect()) print(y) [1, 3, 2] 1
sum
# sum x = sc.parallelize([1,3,2]) y = x.sum() print(x.collect()) print(y) [1, 3, 2] 6
count
# count x = sc.parallelize([1,3,2]) y = x.count() print(x.collect()) print(y) [1, 3, 2] 3本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Python API函数学习:pyspark API(2)】(https://www.iteblog.com/archives/1396.html)


讲解的很细致