欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:976
  2. 浏览总数:11,916,912
  3. 评论:3931
  4. 分类目录:106 个
  5. 注册用户数:6090
  6. 最后更新:2018年12月11日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

影响到Spark输出RDD分区的操作函数

  下面的操作会影响到Spark输出RDD分区(partitioner)的:
  cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, partitionBy, sort, mapValues (如果父RDD存在partitioner), flatMapValues(如果父RDD存在partitioner), 和 filter (如果父RDD存在partitioner)。其他的transform操作不会影响到输出RDD的partitioner,一般来说是None,也就是没有partitioner。下面举个例子进行说明:

scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = 
ParallelCollectionRDD[4] at parallelize at <console>:12

scala> val a = sc.parallelize(List(2,51,2,7,3))
a: org.apache.spark.rdd.RDD[Int] = 
ParallelCollectionRDD[5] at parallelize at <console>:12

scala> val a = sc.parallelize(List(2,51,2))
a: org.apache.spark.rdd.RDD[Int] = 
ParallelCollectionRDD[6] at parallelize at <console>:12

scala> val b = sc.parallelize(List(3,1,4))
b: org.apache.spark.rdd.RDD[Int] =
 ParallelCollectionRDD[7] at parallelize at <console>:12

scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(Int, Int)] = 
ZippedPartitionsRDD2[8] at zip at <console>:16

scala> val result = pairs.join(c)
result: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = 
FlatMappedValuesRDD[11] at join at <console>:20

scala> result.partitioner
res6: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

  大家可以看到输出来的RDD result分区变成了HashPartitioner,因为join中的两个分区都没有设置分区,所以默认用到了HashPartitioner,可以看join的实现:

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
    join(other, defaultPartitioner(self, other))
}

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
}

  defaultPartitioner函数就确定了结果RDD的分区。从上面的实现可以看到,
  1、join的两个RDD如果都没有partitioner,那么join结果RDD将使用HashPartitioner;
  2、如果两个RDD中其中有一个有partitioner,那么join结果RDD将使用那个父RDD的partitioner;
  3、如果两个RDD都有partitioner,那么join结果RDD就使用调用join的那个RDD的partitioner。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【影响到Spark输出RDD分区的操作函数】(https://www.iteblog.com/archives/1242.html)
喜欢 (4)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!