欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Spark 2.0介绍:Catalog API介绍和使用

《Spark 2.0技术预览:更容易、更快速、更智能》文章中简单地介绍了Spark 2.0带来的新技术等。Spark 2.0是Apache Spark的下一个主要版本。此版本在架构抽象、API以及平台的类库方面带来了很大的变化,为该框架明年的发展奠定了方向,所以了解Spark 2.0的一些特性对我们能够使用它有着非常重要的作用。本博客将对Spark 2.0进行一序列的介绍(参见Spark 2.0分类),欢迎关注。

Catalog API

  Spark中的DataSet和Dataframe API支持结构化分析。结构化分析的一个重要的方面是管理元数据。这些元数据可能是一些临时元数据(比如临时表)、SQLContext上注册的UDF以及持久化的元数据(比如Hivemeta store或者HCatalog)。

  Spark的早期版本是没有标准的API来访问这些元数据的。用户通常使用查询语句(比如show tables)来查询这些元数据。这些查询通常需要操作原始的字符串,而且不同元数据类型的操作也是不一样的。

  这种情况在Spark 2.0中得到改变。Spark 2.0中添加了标准的API(称为catalog)来访问Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。

  这篇文章中我将介绍如何使用catalog API。

访问Catalog

  Catalog可以通过SparkSession获取,下面代码展示如何获取Catalog:

/**
 * User: 过往记忆
 * Date: 2016年07月05日
 * Time: 下午23:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1701.html
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val sparkSession = SparkSession.builder.appName("spark session example").enableHiveSupport().getOrCreate()
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5d50ea49

scala> val catalog = sparkSession.catalog
catalog: org.apache.spark.sql.catalog.Catalog = org.apache.spark.sql.internal.CatalogImpl@17308af1

Querying the databases

  我们一旦创建好catalog对象之后,我们可以使用它来查询元数据中的数据库,catalog上的API返回的结果全部都是dataset。

scala> catalog.listDatabases().select("name").show(false)
+-----------------------+
|name                   |
+-----------------------+
|iteblog                |
|default                |
+-----------------------+
only showing top 20 rows

listDatabases返回元数据中所有的数据库。默认情况下,元数据仅仅只有名为default的数据库。如果是Hive元数据,那么它会从Hive元数据中获取所有的数据库。listDatabases返回的类型是dataset,所以我们可以使用Dataset上的所有操作来查询元数据。

使用createTempView注册Dataframe

  在Spark的早期版本,我们使用registerTempTable来注册Dataframe。然而在Spark 2.0中,这个API已经被遗弃了。registerTempTable名字很让人误解,因为用户会认为这个函数会将Dataframe持久化并且保证这个临时表,但是实际上并不是这样的,所以社区才有意将它替换成createTempViewcreateTempView的使用方法如下:

df.createTempView("iteblog")

我们注册完一个view之后,然后就可以使用listTables函数来查询它。

查询表

  正如我们可以展示出元数据中的所有数据库一样,我们也可以展示出元数据中某个数据库中的表。它会展示出Spark SQL中所有注册的临时表。同时可以展示出Hive中默认数据库(也就是default)中的表。如下:

/**
 * User: 过往记忆
 * Date: 2016年07月05日
 * Time: 下午23:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1701.html
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
scala> catalog.listTables().select("name").show(false)
+----------------------------------------+
|name                                    |
+----------------------------------------+
|city_to_level                           |
|table2                                  |
|test                                    |
|ticket_order                            |
|tmp1_result                             |
|iteblog                                 |
+----------------------------------------+

上面的iteblog表就是使用df.createTempView("iteblog")注册的临时表。

判断某个表是否缓存

  我们可以使用Catalog提供的API来检查某个表是否缓存。如下:

scala> println(catalog.isCached("iteblog"))
false

上面判断iteblog表是否缓存,结果输出false。默认情况下表是不会被缓存的,我们可以手动缓存某个表,如下:

scala>  df.cache()
res4: df.type = [_c0: string, _c1: string ... 2 more fields]

scala> println(catalog.isCached("iteblog"))
true

现在iteblog表已经被缓存了,所有现在的输出结构是true。

删除view

  我们可以使用catalog提供的API来删除view。如果是Spark SQL情况,那么它会删除事先注册好的view;如果是hive情况,那么它会从元数据中删除表

/**
 * User: 过往记忆
 * Date: 2016年07月05日
 * Time: 下午23:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1701.html
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

scala> catalog.dropTempView("iteblog")

查询已经注册的函数

  我们不仅可以使用Catalog API操作表,还可以用它操作UDF。下面代码片段展示SparkSession上所有已经注册号的函数,当然也包括了Spark内置的函数。

/**
 * User: 过往记忆
 * Date: 2016年07月05日
 * Time: 下午23:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1701.html
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

scala> catalog.listFunctions().select("name","className","isTemporary").show(100, false)
+---------------------+-----------------------------------------------------------------------+-----------+
|name                 |className                                                              |isTemporary|
+---------------------+-----------------------------------------------------------------------+-----------+
|!                    |org.apache.spark.sql.catalyst.expressions.Not                          |true       |
|%                    |org.apache.spark.sql.catalyst.expressions.Remainder                    |true       |
|&                    |org.apache.spark.sql.catalyst.expressions.BitwiseAnd                   |true       |
|*                    |org.apache.spark.sql.catalyst.expressions.Multiply                     |true       |
|+                    |org.apache.spark.sql.catalyst.expressions.Add                          |true       |
|-                    |org.apache.spark.sql.catalyst.expressions.Subtract                     |true       |
|/                    |org.apache.spark.sql.catalyst.expressions.Divide                       |true       |
|<                    |org.apache.spark.sql.catalyst.expressions.LessThan                     |true       |
|<=                   |org.apache.spark.sql.catalyst.expressions.LessThanOrEqual              |true       |
|<=>                  |org.apache.spark.sql.catalyst.expressions.EqualNullSafe                |true       |
|=                    |org.apache.spark.sql.catalyst.expressions.EqualTo                      |true       |
|==                   |org.apache.spark.sql.catalyst.expressions.EqualTo                      |true       |
|>                    |org.apache.spark.sql.catalyst.expressions.GreaterThan                  |true       |
|>=                   |org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual           |true       |
|^                    |org.apache.spark.sql.catalyst.expressions.BitwiseXor                   |true       |
|abs                  |org.apache.spark.sql.catalyst.expressions.Abs                          |true       |
|acos                 |org.apache.spark.sql.catalyst.expressions.Acos                         |true       |
|add_months           |org.apache.spark.sql.catalyst.expressions.AddMonths                    |true       |
|and                  |org.apache.spark.sql.catalyst.expressions.And                          |true       |
|approx_count_distinct|org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus|true       |
|array                |org.apache.spark.sql.catalyst.expressions.CreateArray                  |true       |
|array_contains       |org.apache.spark.sql.catalyst.expressions.ArrayContains                |true       |
|ascii                |org.apache.spark.sql.catalyst.expressions.Ascii                        |true       |
|asin                 |org.apache.spark.sql.catalyst.expressions.Asin                         |true       |
|assert_true          |org.apache.spark.sql.catalyst.expressions.AssertTrue                   |true       |
|atan                 |org.apache.spark.sql.catalyst.expressions.Atan                         |true       |
|atan2                |org.apache.spark.sql.catalyst.expressions.Atan2                        |true       |
|avg                  |org.apache.spark.sql.catalyst.expressions.aggregate.Average            |true       |
|base64               |org.apache.spark.sql.catalyst.expressions.Base64                       |true       |
|bin                  |org.apache.spark.sql.catalyst.expressions.Bin                          |true       |
|bround               |org.apache.spark.sql.catalyst.expressions.BRound                       |true       |
|cbrt                 |org.apache.spark.sql.catalyst.expressions.Cbrt                         |true       |
|ceil                 |org.apache.spark.sql.catalyst.expressions.Ceil                         |true       |
|ceiling              |org.apache.spark.sql.catalyst.expressions.Ceil                         |true       |
|coalesce             |org.apache.spark.sql.catalyst.expressions.Coalesce                     |true       |
|collect_list         |org.apache.spark.sql.catalyst.expressions.aggregate.CollectList        |true       |
|collect_set          |org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet         |true       |
|concat               |org.apache.spark.sql.catalyst.expressions.Concat                       |true       |
|concat_ws            |org.apache.spark.sql.catalyst.expressions.ConcatWs                     |true       |
|conv                 |org.apache.spark.sql.catalyst.expressions.Conv                         |true       |
|corr                 |org.apache.spark.sql.catalyst.expressions.aggregate.Corr               |true       |
|cos                  |org.apache.spark.sql.catalyst.expressions.Cos                          |true       |
|cosh                 |org.apache.spark.sql.catalyst.expressions.Cosh                         |true       |
|count                |org.apache.spark.sql.catalyst.expressions.aggregate.Count              |true       |
|covar_pop            |org.apache.spark.sql.catalyst.expressions.aggregate.CovPopulation      |true       |
|covar_samp           |org.apache.spark.sql.catalyst.expressions.aggregate.CovSample          |true       |
|crc32                |org.apache.spark.sql.catalyst.expressions.Crc32                        |true       |
|cube                 |org.apache.spark.sql.catalyst.expressions.Cube                         |true       |
|cume_dist            |org.apache.spark.sql.catalyst.expressions.CumeDist                     |true       |
|current_database     |org.apache.spark.sql.catalyst.expressions.CurrentDatabase              |true       |
|current_date         |org.apache.spark.sql.catalyst.expressions.CurrentDate                  |true       |
|current_timestamp    |org.apache.spark.sql.catalyst.expressions.CurrentTimestamp             |true       |
|date_add             |org.apache.spark.sql.catalyst.expressions.DateAdd                      |true       |
|date_format          |org.apache.spark.sql.catalyst.expressions.DateFormatClass              |true       |
|date_sub             |org.apache.spark.sql.catalyst.expressions.DateSub                      |true       |
|datediff             |org.apache.spark.sql.catalyst.expressions.DateDiff                     |true       |
|day                  |org.apache.spark.sql.catalyst.expressions.DayOfMonth                   |true       |
|dayofmonth           |org.apache.spark.sql.catalyst.expressions.DayOfMonth                   |true       |
|dayofyear            |org.apache.spark.sql.catalyst.expressions.DayOfYear                    |true       |
|decode               |org.apache.spark.sql.catalyst.expressions.Decode                       |true       |
|degrees              |org.apache.spark.sql.catalyst.expressions.ToDegrees                    |true       |
|dense_rank           |org.apache.spark.sql.catalyst.expressions.DenseRank                    |true       |
|e                    |org.apache.spark.sql.catalyst.expressions.EulerNumber                  |true       |
|encode               |org.apache.spark.sql.catalyst.expressions.Encode                       |true       |
|exp                  |org.apache.spark.sql.catalyst.expressions.Exp                          |true       |
|explode              |org.apache.spark.sql.catalyst.expressions.Explode                      |true       |
|expm1                |org.apache.spark.sql.catalyst.expressions.Expm1                        |true       |
|factorial            |org.apache.spark.sql.catalyst.expressions.Factorial                    |true       |
|find_in_set          |org.apache.spark.sql.catalyst.expressions.FindInSet                    |true       |
|first                |org.apache.spark.sql.catalyst.expressions.aggregate.First              |true       |
|first_value          |org.apache.spark.sql.catalyst.expressions.aggregate.First              |true       |
|floor                |org.apache.spark.sql.catalyst.expressions.Floor                        |true       |
|format_number        |org.apache.spark.sql.catalyst.expressions.FormatNumber                 |true       |
|format_string        |org.apache.spark.sql.catalyst.expressions.FormatString                 |true       |
|from_unixtime        |org.apache.spark.sql.catalyst.expressions.FromUnixTime                 |true       |
|from_utc_timestamp   |org.apache.spark.sql.catalyst.expressions.FromUTCTimestamp             |true       |
|get_json_object      |org.apache.spark.sql.catalyst.expressions.GetJsonObject                |true       |
|greatest             |org.apache.spark.sql.catalyst.expressions.Greatest                     |true       |
|grouping             |org.apache.spark.sql.catalyst.expressions.Grouping                     |true       |
|grouping_id          |org.apache.spark.sql.catalyst.expressions.GroupingID                   |true       |
|hash                 |org.apache.spark.sql.catalyst.expressions.Murmur3Hash                  |true       |
|hex                  |org.apache.spark.sql.catalyst.expressions.Hex                          |true       |
|hour                 |org.apache.spark.sql.catalyst.expressions.Hour                         |true       |
|hypot                |org.apache.spark.sql.catalyst.expressions.Hypot                        |true       |
|if                   |org.apache.spark.sql.catalyst.expressions.If                           |true       |
|ifnull               |org.apache.spark.sql.catalyst.expressions.IfNull                       |true       |
|in                   |org.apache.spark.sql.catalyst.expressions.In                           |true       |
|initcap              |org.apache.spark.sql.catalyst.expressions.InitCap                      |true       |
|input_file_name      |org.apache.spark.sql.catalyst.expressions.InputFileName                |true       |
|instr                |org.apache.spark.sql.catalyst.expressions.StringInstr                  |true       |
|isnan                |org.apache.spark.sql.catalyst.expressions.IsNaN                        |true       |
|isnotnull            |org.apache.spark.sql.catalyst.expressions.IsNotNull                    |true       |
|isnull               |org.apache.spark.sql.catalyst.expressions.IsNull                       |true       |
|json_tuple           |org.apache.spark.sql.catalyst.expressions.JsonTuple                    |true       |
|kurtosis             |org.apache.spark.sql.catalyst.expressions.aggregate.Kurtosis           |true       |
|lag                  |org.apache.spark.sql.catalyst.expressions.Lag                          |true       |
|last                 |org.apache.spark.sql.catalyst.expressions.aggregate.Last               |true       |
|last_day             |org.apache.spark.sql.catalyst.expressions.LastDay                      |true       |
|last_value           |org.apache.spark.sql.catalyst.expressions.aggregate.Last               |true       |
|lcase                |org.apache.spark.sql.catalyst.expressions.Lower                        |true       |
+---------------------+-----------------------------------------------------------------------+-----------+
only showing top 100 rows

上面展示了100个函数及其实现类。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark 2.0介绍:Catalog API介绍和使用】(https://www.iteblog.com/archives/1701.html)
喜欢 (11)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!