欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:961
  2. 浏览总数:11,490,358
  3. 评论:3873
  4. 分类目录:103 个
  5. 注册用户数:5846
  6. 最后更新:2018年10月17日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Spark: SchemaRDD隐式转换

  SchemaRDD在Spark SQL中已经被我们使用到,这篇文章简单地介绍一下如果将标准的RDD(org.apache.spark.rdd.RDD)转换成SchemaRDD,并进行SQL相关的操作。

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@6edd421f

scala> case class Person(name: String, age:Int)
defined class Person

scala> var people = sc.textFile("/home/iteblog/person.txt")
        |  .map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people: org.apache.spark.rdd.RDD[Person] = MappedRDD[3] at map at <console>:14

scala> people.registerTempTable("people")
<console>:17: error: value registerTempTable is not a 
   member of org.apache.spark.rdd.RDD[Person]
              people.registerTempTable("people")
                     ^

  这是因为people是普通的RDD,而registerTempTable函数不属于RDD类,只有通过SchemaRDD的实例才可以调用,所以这么调用会出现错误,解决办法有两个:
  (1)registerTempTable函数是SQLContext类中的,所以我们可以将people转换成SchemaRDD,如下:

/**
 * User: 过往记忆
 * Date: 14-12-16
 * Time: 下午10:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1224
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
scala> val peopleSchema = sqlContext.createSchemaRDD(people)
peopleSchema: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[29] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
ExistingRdd [name#4,age#5], MapPartitionsRDD[28] at
 mapPartitions at basicOperators.scala:217

scala> peopleSchema.registerTempTable("people")
warning: there were 1 deprecation warning(s); re-run with -deprecation for details

  这么调用就可以将people转成SchemaRDD。
  (2)、上面的方法是通过显示地调用sqlContext.createSchemaRDD将普通的RDD转成SchemaRDD。其实我们还可以通过Scala的隐式语法来进行转换。我们先来看看createSchemaRDD函数的定义

/**
* Creates a SchemaRDD from an RDD of case classes.
*
* @group userf
*/
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = {
    SparkPlan.currentContext.set(self)
    new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
}

  在定义createSchemaRDD的时候用到了implicit 关键字,所以我们在使用的时候可以通过下面语句使用

scala> import sqlContext.createSchemaRDD
import sqlContext.createSchemaRDD

scala> people.registerAsTable("people")
warning: there were 1 deprecation warning(s); re-run with -deprecation for details

  这样就隐身地将people转换成SchemaRDD了。这是因为Spark可以隐式地将包含case class的RDD转换成SchemaRDD。

  关于什么是SchemaRDD,官方文档将的很详细:
  An RDD of [[Row]] objects that has an associated schema. In addition to standard RDD functions, SchemaRDDs can be used in relational queries。也就是包含了Row对象以及模式的RDD。它继承自标准的RDD类,所以拥有标准RDD类的所有方法;并且可以用于关系性数据库的查询在中。
本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Spark: SchemaRDD隐式转换】(https://www.iteblog.com/archives/1224.html)
喜欢 (17)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!