欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Spark Python API函数学习:pyspark API(1)

  Spark支持Scala、Java以及Python语言,本文将通过图片和简单例子来学习pyspark API。


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

pyspark version

# print Spark version
print("pyspark version:" + str(sc.version))

pyspark version:1.2.2

map

spark map
# map
# sc = spark context, parallelize creates an RDD from the passed object
x = sc.parallelize([1,2,3])
y = x.map(lambda x: (x,x**2))

# collect copies RDD elements to a list on the driver
print(x.collect()) 
print(y.collect())

[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]

flatMap

spark flatMap
# flatMap
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, 100*x, x**2))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 100, 1, 2, 200, 4, 3, 300, 9]

mapPartitions

spark mapPartitions
# mapPartitions
x = sc.parallelize([1,2,3], 2)
def f(iterator): yield sum(iterator)
y = x.mapPartitions(f)
# glom() flattens elements on the same partition
print(x.glom().collect())  
print(y.glom().collect())

[[1], [2, 3]]
[[1], [5]]

mapPartitionsWithIndex

spark mapPartitionsWithIndex
# mapPartitionsWithIndex
x = sc.parallelize([1,2,3], 2)
def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))
y = x.mapPartitionsWithIndex(f)

# glom() flattens elements on the same partition
print(x.glom().collect())  
print(y.glom().collect())

[[1], [2, 3]]
[[(0, 1)], [(1, 5)]]

getNumPartitions

spark getNumPartitions
# getNumPartitions
x = sc.parallelize([1,2,3], 2)
y = x.getNumPartitions()
print(x.glom().collect())
print(y)

[[1], [2, 3]]
2

filter

spark filter
# filter
x = sc.parallelize([1,2,3])
y = x.filter(lambda x: x%2 == 1)  # filters out even elements
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 3]

distinct

spark distinct
# distinct
x = sc.parallelize(['A','A','B'])
y = x.distinct()
print(x.collect())
print(y.collect())

['A', 'A', 'B']
['A', 'B']

sample

spark sample
# sample
x = sc.parallelize(range(7))
# call 'sample' 5 times
ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)] 
print('x = ' + str(x.collect()))
for cnt,y in zip(range(len(ylist)), ylist):
    print('sample:' + str(cnt) + ' y = ' +  str(y.collect()))

x = [0, 1, 2, 3, 4, 5, 6]
sample:0 y = [0, 2, 5, 6]
sample:1 y = [2, 6]
sample:2 y = [0, 4, 5, 6]
sample:3 y = [0, 2, 6]
sample:4 y = [0, 3, 4]

takeSample

spark takeSample
# takeSample
x = sc.parallelize(range(7))
# call 'sample' 5 times
ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)]  
print('x = ' + str(x.collect()))
for cnt,y in zip(range(len(ylist)), ylist):
    print('sample:' + str(cnt) + ' y = ' +  str(y))  # no collect on y

x = [0, 1, 2, 3, 4, 5, 6]
sample:0 y = [0, 2, 6]
sample:1 y = [6, 4, 2]
sample:2 y = [2, 0, 4]
sample:3 y = [5, 4, 1]
sample:4 y = [3, 1, 4]

union

spark union
# union
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['D','C','A'])
z = x.union(y)
print(x.collect())
print(y.collect())
print(z.collect())

['A', 'A', 'B']
['D', 'C', 'A']
['A', 'A', 'B', 'D', 'C', 'A']

intersection

spark intersection
# intersection
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['A','C','D'])
z = x.intersection(y)
print(x.collect())
print(y.collect())
print(z.collect())

['A', 'A', 'B']
['A', 'C', 'D']
['A']

sortByKey

spark sortByKey
# sortByKey
x = sc.parallelize([('B',1),('A',2),('C',3)])
y = x.sortByKey()
print(x.collect())
print(y.collect())

[('B', 1), ('A', 2), ('C', 3)]
[('A', 2), ('B', 1), ('C', 3)]
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark Python API函数学习:pyspark API(1)】(https://www.iteblog.com/archives/1395.html)
喜欢 (78)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!