欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:961
  2. 浏览总数:11,461,276
  3. 评论:3870
  4. 分类目录:103 个
  5. 注册用户数:5832
  6. 最后更新:2018年10月17日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Spark函数讲解:aggregateByKey

  该函数和aggregate类似,但操作的RDD是Pair类型的。Spark 1.1.0版本才正式引入该函数。官方文档定义:

Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.

  aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。

函数原型

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

  第一个aggregateByKey函数我们可以自定义Partitioner。除了这个参数之外,其函数声明和aggregate很类似;其他的aggregateByKey函数实现最终都是调用这个。
  第二个aggregateByKey函数可以设置分区的个数(numPartitions),最终用的是HashPartitioner。
  最后一个aggregateByKey实现先会判断当前RDD是否定义了分区函数,如果定义了则用当前RDD的分区;如果当前RDD并未定义分区 ,则使用HashPartitioner。

实例

/**
 * User: 过往记忆
 * Date: 15-03-02
 * Time: 上午06:30
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1261
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

scala> var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3)))
scala> def seq(a:Int, b:Int) : Int ={
     | println("seq: " + a + "\t " + b)
     | math.max(a,b)
     | }
seq: (a: Int, b: Int)Int

scala> def comb(a:Int, b:Int) : Int ={
     | println("comb: " + a + "\t " + b)
     | a + b
     | }
comb: (a: Int, b: Int)Int
scala> data.aggregateByKey(1)(seq, comb).collect
seq: 1	 3
seq: 1	 2
seq: 1	 4
seq: 1	 3
comb: 3	 2
comb: 5	 4
res62: Array[(Int, Int)] = Array((1,9), (2,3))

注意

  细心的读者肯定发现了aggregateByKey和aggregate结果有点不一样。如果用aggregate函数对含有3、2、4三个元素的RDD进行计算,初始值为1的时候,计算的结果应该是10,而这里是9,这是因为aggregate函数中的初始值需要和reduce函数以及combine函数结合计算,而aggregateByKey中的初始值只需要和reduce函数计算,不需要和combine函数结合计算,所以导致结果有点不一样。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Spark函数讲解:aggregateByKey】(https://www.iteblog.com/archives/1261.html)
喜欢 (28)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(2)个小伙伴在吐槽
  1. 就改了这点var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3)),2),不是要跨分区合并的么
    Reynold.C2017-06-10 16:44 回复
  2. 为什么我跑出来的是((1,7),(2,3))
    Reynold.C2017-06-10 16:34 回复