欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Spark Python API函数学习:pyspark API(2)

  Spark支持Scala、Java以及Python语言,本文将通过图片和简单例子来学习pyspark API。


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

sortBy

spark sortBy
# sortBy
x = sc.parallelize(['Cat','Apple','Bat'])
def keyGen(val): return val[0]
y = x.sortBy(keyGen)
print(y.collect())

['Apple', 'Bat', 'Cat']

glom

spark glom
# glom
x = sc.parallelize(['C','B','A'], 2)
y = x.glom()
print(x.collect()) 
print(y.collect())

['C', 'B', 'A']
[['C'], ['B', 'A']]

cartesian

spark cartesian
# cartesian
x = sc.parallelize(['A','B'])
y = sc.parallelize(['C','D'])
z = x.cartesian(y)
print(x.collect())
print(y.collect())
print(z.collect())

['A', 'B']
['C', 'D']
[('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]

groupBy

spark groupBy
# groupBy
x = sc.parallelize([1,2,3])
y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )
print(x.collect())
# y is nested, this iterates through it
print([(j[0],[i for i in j[1]]) for j in y.collect()]) 

[1, 2, 3]
[('A', [1, 3]), ('B', [2])]

pipe

spark pipe
# pipe
x = sc.parallelize(['A', 'Ba', 'C', 'AD'])
y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows 
print(x.collect())
print(y.collect())

['A', 'Ba', 'C', 'AD']
['A', 'Ba', 'AD']

foreach

spark foreach
# foreach
from __future__ import print_function
x = sc.parallelize([1,2,3])
def f(el):
    '''side effect: append the current RDD elements to a file'''
    f1=open("./foreachExample.txt", 'a+') 
    print(el,file=f1)

# first clear the file contents
open('./foreachExample.txt', 'w').close()  

y = x.foreach(f) # writes into foreachExample.txt

print(x.collect())
print(y) # foreach returns 'None'
# print the contents of foreachExample.txt
with open("./foreachExample.txt", "r") as foreachExample:
    print (foreachExample.read())
    
[1, 2, 3]
None
3
1
2

foreachPartition

spark foreachPartition
# foreachPartition
from __future__ import print_function
x = sc.parallelize([1,2,3],5)
def f(parition):
    '''side effect: append the current RDD partition contents to a file'''
    f1=open("./foreachPartitionExample.txt", 'a+') 
    print([el for el in parition],file=f1)

# first clear the file contents
open('./foreachPartitionExample.txt', 'w').close()  

y = x.foreachPartition(f) # writes into foreachExample.txt

print(x.glom().collect())
print(y)  # foreach returns 'None'
# print the contents of foreachExample.txt
with open("./foreachPartitionExample.txt", "r") as foreachExample:
    print (foreachExample.read())

[[], [1], [], [2], [3]]
None
[]
[]
[1]
[2]
[3]

collect

spark collect
# collect
x = sc.parallelize([1,2,3])
y = x.collect()
print(x)  # distributed
print(y)  # not distributed

ParallelCollectionRDD[87] at parallelize at PythonRDD.scala:382
[1, 2, 3]

reduce

spark reduce
# reduce
x = sc.parallelize([1,2,3])
y = x.reduce(lambda obj, accumulated: obj + accumulated)  # computes a cumulative sum
print(x.collect())
print(y)

[1, 2, 3]
6

fold

spark fold
# fold
x = sc.parallelize([1,2,3])
neutral_zero_value = 0  # 0 for sum, 1 for multiplication
y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum
print(x.collect())
print(y)

[1, 2, 3]
6

aggregate

spark aggregate
# aggregate
x = sc.parallelize([2,3,4])
neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x
seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) 
combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1]))
y = x.aggregate(neutral_zero_value,seqOp,combOp)  # computes (cumulative sum, cumulative product)
print(x.collect())
print(y)

[2, 3, 4]
(9, 24)

max

spark max
# max
x = sc.parallelize([1,3,2])
y = x.max()
print(x.collect())
print(y)

[1, 3, 2]
3

min

spark min
# min
x = sc.parallelize([1,3,2])
y = x.min()
print(x.collect())
print(y)

[1, 3, 2]
1

sum

spark sum
# sum
x = sc.parallelize([1,3,2])
y = x.sum()
print(x.collect())
print(y)

[1, 3, 2]
6

count

spark count
# count
x = sc.parallelize([1,3,2])
y = x.count()
print(x.collect())
print(y)

[1, 3, 2]
3
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Python API函数学习:pyspark API(2)】(https://www.iteblog.com/archives/1396.html)
喜欢 (16)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(1)个小伙伴在吐槽
  1. 讲解的很细致

    100801152016-10-23 16:34 回复