文章目录
《Spark 2.0技术预览:更容易、更快速、更智能》文章中简单地介绍了Spark 2.0带来的新技术等。Spark 2.0是Apache Spark的下一个主要版本。此版本在架构抽象、API以及平台的类库方面带来了很大的变化,为该框架明年的发展奠定了方向,所以了解Spark 2.0的一些特性对我们能够使用它有着非常重要的作用。本博客将对Spark 2.0进行一序列的介绍(参见Spark 2.0分类),欢迎关注。
Catalog API
Spark中的DataSet和Dataframe API支持结构化分析。结构化分析的一个重要的方面是管理元数据。这些元数据可能是一些临时元数据(比如临时表)、SQLContext上注册的UDF以及持久化的元数据(比如Hivemeta store或者HCatalog)。
Spark的早期版本是没有标准的API来访问这些元数据的。用户通常使用查询语句(比如show tables)来查询这些元数据。这些查询通常需要操作原始的字符串,而且不同元数据类型的操作也是不一样的。
这种情况在Spark 2.0中得到改变。Spark 2.0中添加了标准的API(称为catalog)来访问Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。
这篇文章中我将介绍如何使用catalog API。
访问Catalog
Catalog可以通过SparkSession获取,下面代码展示如何获取Catalog:
/**
* User: 过往记忆
* Date: 2016年07月05日
* Time: 下午23:16
* bolg: https://www.iteblog.com
* 本文地址:https://www.iteblog.com/archives/1701.html
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val sparkSession = SparkSession.builder.appName("spark session example").enableHiveSupport().getOrCreate()
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5d50ea49
scala> val catalog = sparkSession.catalog
catalog: org.apache.spark.sql.catalog.Catalog = org.apache.spark.sql.internal.CatalogImpl@17308af1
Querying the databases
我们一旦创建好catalog对象之后,我们可以使用它来查询元数据中的数据库,catalog上的API返回的结果全部都是dataset。
scala> catalog.listDatabases().select("name").show(false)
+-----------------------+
|name |
+-----------------------+
|iteblog |
|default |
+-----------------------+
only showing top 20 rows
listDatabases返回元数据中所有的数据库。默认情况下,元数据仅仅只有名为default的数据库。如果是Hive元数据,那么它会从Hive元数据中获取所有的数据库。listDatabases返回的类型是dataset,所以我们可以使用Dataset上的所有操作来查询元数据。
使用createTempView注册Dataframe
在Spark的早期版本,我们使用registerTempTable来注册Dataframe。然而在Spark 2.0中,这个API已经被遗弃了。registerTempTable名字很让人误解,因为用户会认为这个函数会将Dataframe持久化并且保证这个临时表,但是实际上并不是这样的,所以社区才有意将它替换成createTempView。createTempView的使用方法如下:
df.createTempView("iteblog")
我们注册完一个view之后,然后就可以使用listTables函数来查询它。
查询表
正如我们可以展示出元数据中的所有数据库一样,我们也可以展示出元数据中某个数据库中的表。它会展示出Spark SQL中所有注册的临时表。同时可以展示出Hive中默认数据库(也就是default)中的表。如下:
/**
* User: 过往记忆
* Date: 2016年07月05日
* Time: 下午23:16
* bolg: https://www.iteblog.com
* 本文地址:https://www.iteblog.com/archives/1701.html
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
scala> catalog.listTables().select("name").show(false)
+----------------------------------------+
|name |
+----------------------------------------+
|city_to_level |
|table2 |
|test |
|ticket_order |
|tmp1_result |
|iteblog |
+----------------------------------------+
上面的iteblog表就是使用df.createTempView("iteblog")注册的临时表。
判断某个表是否缓存
我们可以使用Catalog提供的API来检查某个表是否缓存。如下:
scala> println(catalog.isCached("iteblog"))
false
上面判断iteblog表是否缓存,结果输出false。默认情况下表是不会被缓存的,我们可以手动缓存某个表,如下:
scala> df.cache()
res4: df.type = [_c0: string, _c1: string ... 2 more fields]
scala> println(catalog.isCached("iteblog"))
true
现在iteblog表已经被缓存了,所有现在的输出结构是true。
删除view
我们可以使用catalog提供的API来删除view。如果是Spark SQL情况,那么它会删除事先注册好的view;如果是hive情况,那么它会从元数据中删除表。
/**
* User: 过往记忆
* Date: 2016年07月05日
* Time: 下午23:16
* bolg: https://www.iteblog.com
* 本文地址:https://www.iteblog.com/archives/1701.html
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
scala> catalog.dropTempView("iteblog")
查询已经注册的函数
我们不仅可以使用Catalog API操作表,还可以用它操作UDF。下面代码片段展示SparkSession上所有已经注册号的函数,当然也包括了Spark内置的函数。
/**
* User: 过往记忆
* Date: 2016年07月05日
* Time: 下午23:16
* bolg: https://www.iteblog.com
* 本文地址:https://www.iteblog.com/archives/1701.html
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
scala> catalog.listFunctions().select("name","className","isTemporary").show(100, false)
+---------------------+-----------------------------------------------------------------------+-----------+
|name |className |isTemporary|
+---------------------+-----------------------------------------------------------------------+-----------+
|! |org.apache.spark.sql.catalyst.expressions.Not |true |
|% |org.apache.spark.sql.catalyst.expressions.Remainder |true |
|& |org.apache.spark.sql.catalyst.expressions.BitwiseAnd |true |
|* |org.apache.spark.sql.catalyst.expressions.Multiply |true |
|+ |org.apache.spark.sql.catalyst.expressions.Add |true |
|- |org.apache.spark.sql.catalyst.expressions.Subtract |true |
|/ |org.apache.spark.sql.catalyst.expressions.Divide |true |
|< |org.apache.spark.sql.catalyst.expressions.LessThan |true |
|<= |org.apache.spark.sql.catalyst.expressions.LessThanOrEqual |true |
|<=> |org.apache.spark.sql.catalyst.expressions.EqualNullSafe |true |
|= |org.apache.spark.sql.catalyst.expressions.EqualTo |true |
|== |org.apache.spark.sql.catalyst.expressions.EqualTo |true |
|> |org.apache.spark.sql.catalyst.expressions.GreaterThan |true |
|>= |org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual |true |
|^ |org.apache.spark.sql.catalyst.expressions.BitwiseXor |true |
|abs |org.apache.spark.sql.catalyst.expressions.Abs |true |
|acos |org.apache.spark.sql.catalyst.expressions.Acos |true |
|add_months |org.apache.spark.sql.catalyst.expressions.AddMonths |true |
|and |org.apache.spark.sql.catalyst.expressions.And |true |
|approx_count_distinct|org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus|true |
|array |org.apache.spark.sql.catalyst.expressions.CreateArray |true |
|array_contains |org.apache.spark.sql.catalyst.expressions.ArrayContains |true |
|ascii |org.apache.spark.sql.catalyst.expressions.Ascii |true |
|asin |org.apache.spark.sql.catalyst.expressions.Asin |true |
|assert_true |org.apache.spark.sql.catalyst.expressions.AssertTrue |true |
|atan |org.apache.spark.sql.catalyst.expressions.Atan |true |
|atan2 |org.apache.spark.sql.catalyst.expressions.Atan2 |true |
|avg |org.apache.spark.sql.catalyst.expressions.aggregate.Average |true |
|base64 |org.apache.spark.sql.catalyst.expressions.Base64 |true |
|bin |org.apache.spark.sql.catalyst.expressions.Bin |true |
|bround |org.apache.spark.sql.catalyst.expressions.BRound |true |
|cbrt |org.apache.spark.sql.catalyst.expressions.Cbrt |true |
|ceil |org.apache.spark.sql.catalyst.expressions.Ceil |true |
|ceiling |org.apache.spark.sql.catalyst.expressions.Ceil |true |
|coalesce |org.apache.spark.sql.catalyst.expressions.Coalesce |true |
|collect_list |org.apache.spark.sql.catalyst.expressions.aggregate.CollectList |true |
|collect_set |org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet |true |
|concat |org.apache.spark.sql.catalyst.expressions.Concat |true |
|concat_ws |org.apache.spark.sql.catalyst.expressions.ConcatWs |true |
|conv |org.apache.spark.sql.catalyst.expressions.Conv |true |
|corr |org.apache.spark.sql.catalyst.expressions.aggregate.Corr |true |
|cos |org.apache.spark.sql.catalyst.expressions.Cos |true |
|cosh |org.apache.spark.sql.catalyst.expressions.Cosh |true |
|count |org.apache.spark.sql.catalyst.expressions.aggregate.Count |true |
|covar_pop |org.apache.spark.sql.catalyst.expressions.aggregate.CovPopulation |true |
|covar_samp |org.apache.spark.sql.catalyst.expressions.aggregate.CovSample |true |
|crc32 |org.apache.spark.sql.catalyst.expressions.Crc32 |true |
|cube |org.apache.spark.sql.catalyst.expressions.Cube |true |
|cume_dist |org.apache.spark.sql.catalyst.expressions.CumeDist |true |
|current_database |org.apache.spark.sql.catalyst.expressions.CurrentDatabase |true |
|current_date |org.apache.spark.sql.catalyst.expressions.CurrentDate |true |
|current_timestamp |org.apache.spark.sql.catalyst.expressions.CurrentTimestamp |true |
|date_add |org.apache.spark.sql.catalyst.expressions.DateAdd |true |
|date_format |org.apache.spark.sql.catalyst.expressions.DateFormatClass |true |
|date_sub |org.apache.spark.sql.catalyst.expressions.DateSub |true |
|datediff |org.apache.spark.sql.catalyst.expressions.DateDiff |true |
|day |org.apache.spark.sql.catalyst.expressions.DayOfMonth |true |
|dayofmonth |org.apache.spark.sql.catalyst.expressions.DayOfMonth |true |
|dayofyear |org.apache.spark.sql.catalyst.expressions.DayOfYear |true |
|decode |org.apache.spark.sql.catalyst.expressions.Decode |true |
|degrees |org.apache.spark.sql.catalyst.expressions.ToDegrees |true |
|dense_rank |org.apache.spark.sql.catalyst.expressions.DenseRank |true |
|e |org.apache.spark.sql.catalyst.expressions.EulerNumber |true |
|encode |org.apache.spark.sql.catalyst.expressions.Encode |true |
|exp |org.apache.spark.sql.catalyst.expressions.Exp |true |
|explode |org.apache.spark.sql.catalyst.expressions.Explode |true |
|expm1 |org.apache.spark.sql.catalyst.expressions.Expm1 |true |
|factorial |org.apache.spark.sql.catalyst.expressions.Factorial |true |
|find_in_set |org.apache.spark.sql.catalyst.expressions.FindInSet |true |
|first |org.apache.spark.sql.catalyst.expressions.aggregate.First |true |
|first_value |org.apache.spark.sql.catalyst.expressions.aggregate.First |true |
|floor |org.apache.spark.sql.catalyst.expressions.Floor |true |
|format_number |org.apache.spark.sql.catalyst.expressions.FormatNumber |true |
|format_string |org.apache.spark.sql.catalyst.expressions.FormatString |true |
|from_unixtime |org.apache.spark.sql.catalyst.expressions.FromUnixTime |true |
|from_utc_timestamp |org.apache.spark.sql.catalyst.expressions.FromUTCTimestamp |true |
|get_json_object |org.apache.spark.sql.catalyst.expressions.GetJsonObject |true |
|greatest |org.apache.spark.sql.catalyst.expressions.Greatest |true |
|grouping |org.apache.spark.sql.catalyst.expressions.Grouping |true |
|grouping_id |org.apache.spark.sql.catalyst.expressions.GroupingID |true |
|hash |org.apache.spark.sql.catalyst.expressions.Murmur3Hash |true |
|hex |org.apache.spark.sql.catalyst.expressions.Hex |true |
|hour |org.apache.spark.sql.catalyst.expressions.Hour |true |
|hypot |org.apache.spark.sql.catalyst.expressions.Hypot |true |
|if |org.apache.spark.sql.catalyst.expressions.If |true |
|ifnull |org.apache.spark.sql.catalyst.expressions.IfNull |true |
|in |org.apache.spark.sql.catalyst.expressions.In |true |
|initcap |org.apache.spark.sql.catalyst.expressions.InitCap |true |
|input_file_name |org.apache.spark.sql.catalyst.expressions.InputFileName |true |
|instr |org.apache.spark.sql.catalyst.expressions.StringInstr |true |
|isnan |org.apache.spark.sql.catalyst.expressions.IsNaN |true |
|isnotnull |org.apache.spark.sql.catalyst.expressions.IsNotNull |true |
|isnull |org.apache.spark.sql.catalyst.expressions.IsNull |true |
|json_tuple |org.apache.spark.sql.catalyst.expressions.JsonTuple |true |
|kurtosis |org.apache.spark.sql.catalyst.expressions.aggregate.Kurtosis |true |
|lag |org.apache.spark.sql.catalyst.expressions.Lag |true |
|last |org.apache.spark.sql.catalyst.expressions.aggregate.Last |true |
|last_day |org.apache.spark.sql.catalyst.expressions.LastDay |true |
|last_value |org.apache.spark.sql.catalyst.expressions.aggregate.Last |true |
|lcase |org.apache.spark.sql.catalyst.expressions.Lower |true |
+---------------------+-----------------------------------------------------------------------+-----------+
only showing top 100 rows
上面展示了100个函数及其实现类。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark 2.0介绍:Catalog API介绍和使用】(https://www.iteblog.com/archives/1701.html)


