欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Spark函数讲解:aggregate

  我们先来看看aggregate函数的官方文档定义:

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

  aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

函数原型

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

实例

/**
 * User: 过往记忆
 * Date: 15-02-12
 * Time: 上午08:30
 * bolg: 
 * 本文地址:/archives/1268
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

scala> def seqOP(a:Int, b:Int) : Int = {
     | println("seqOp: " + a + "\t" + b)
     | math.min(a,b)
     | }
seqOP: (a: Int, b: Int)Int

scala> def combOp(a:Int, b:Int): Int = {
     | println("combOp: " + a + "\t" + b)
     | a + b
     | }
combOp: (a: Int, b: Int)Int

scala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2)
scala> z. aggregate(3)(seqOP, combOp)
seqOp: 3	1
seqOp: 3	4
seqOp: 1	2
seqOp: 3	5
seqOp: 1	3
seqOp: 3	6
combOp: 3	1
combOp: 4	3

res20: Int = 7

scala> def seqOp(a:String, b:String) : String = {
     | println("seqOp: " + a + "\t" + b)
     | math.min(a.length , b.length ).toString
     | }
seqOp: (a: String, b: String)String

scala> def combOp(a:String, b:String) : String = {
     |  println("combOp: " + a + "\t" + b)
     | a + b
     | }
combOp: (a: String, b: String)String

scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp: 	345
seqOp: 	12
seqOp: 0	4567
seqOp: 0	23
combOp: 	1
combOp: 1	1

res25: String = 11

scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp: 	12
seqOp: 	345
seqOp: 0	23
seqOp: 0	
combOp: 	0
combOp: 0	1
res26: String = 01

注意

  1、reduce函数和combine函数必须满足交换律(commutative)和结合律(associative)
  2、从aggregate 函数的定义可知,combine函数的输出类型必须和输入的类型一致

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark函数讲解:aggregate】(https://www.iteblog.com/archives/1268.html)
喜欢 (23)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(5)个小伙伴在吐槽
  1. 你好,请问“这里拿zeroValue和分区内的数比了”分区里的数是什么时候取出来的呢?是怎么取出来并加入到seqOp中和zeroValue进行比较的?

    很好啊V2016-12-29 21:15 回复
  2. 哪位能跟我讲讲第一个aggregate到底是在干啥啊。。。
    seqOp函数定义为两者取小,combOp是把最后结果合并,但是底下打印的结果为什么是那样的呢。。没看懂。。

    天涯2016-08-29 16:26 回复
    • seqOp函数对每个分区取最小值,注意:是和zeroValue比较之后的最小值。比如 分区为(4, 5, 6), zeroValue为3. 那么对该分区seqOp,就得到3。
      解释一下第一个例子,sc.parallelize(List(1, 2, 3, 4, 5, 6), 2) 获得分区(part_0,List(3, 2, 1)) 和 (part_1,List(6, 5, 4))
      对第一个分区seqOp, 则有
      seqOp : 3 1 //这里拿zeroValue和分区内的数比了,取小的1再和分区内剩下的数比较。
      seqOp : 1 2
      seqOp : 1 3 //结束,得到1
      对第二个分区seqOp, 则有
      seqOp : 3 4 //同理,取3
      seqOp : 3 5
      seqOp : 3 6 //结束,得到3
      最后 comOp
      comOp : 3 1 // 得4
      comOp : 4 3 // 得7

      zzsilence2016-11-03 14:15 回复
      • 你好,请问“这里拿zeroValue和分区内的数比了”分区里的数是什么时候取出来的呢?是怎么取出来并加入到seqOp中和zeroValue进行比较的?

        很好啊V2016-12-29 21:16 回复
  3. 很好,谢谢

    xygeng2015-02-28 11:27 回复