Spark支持读取很多格式的文件,其中包括了所有继承了Hadoop的InputFormat类的输入文件,以及平时我们常用的Text、Json、CSV (Comma Separated Values) 以及TSV (Tab Separated Values)文件。本文主要介绍如何通过Spark来读取Json文件。很多人会说,直接用Spark SQL模块的jsonFile方法不就可以读取解析Json文件吗?是的,没错,我们是可以通过那个读取Json数据,但是本文本着学习的心态,来自己解析Json数据。
本文通过play-json_2.10依赖包进行解析的,如果你用Java的话,可以使用Jackson(当然Scala也是可以使用Jackson解析Json数据的);如果你是用python,可以使用内置的Json解析函数。我们可以这样来使用:
import play.api.libs.json._ val input = sc.parallelize(List( """{"name":"过往记忆","website":"www.iteblog.com"}""", """{"other":"过往记忆"}""")) val parsed = input.map(Json.parse) parsed.collect output: {"name":"过往记忆","website":"www.iteblog.com"} {"other":"过往记忆"}
这样很就解析出Json了,但是我们只是简单将它转换成字符串了,相当于还是没解析。上面的Json数据每条Json格式不一样,如果你的Json数据格式都一样,比如每条Json最多只包含了name和website属性,那么可以这样解析:
import play.api.libs.json._ val input = sc.parallelize(List( """{"name":"过往记忆","website":"www.iteblog.com"}""", """{"name":"过往记忆"}""")) val parsed = input.map(Json.parse) case class Info(name: String, website: String) { override def toString: String = name + "\t" + website } implicit val personReads = Json.format[Info] val result = parsed.flatMap(record => personReads.reads(record).asOpt) result.collect output: 过往记忆 www.iteblog.com
也就是把Json数据解析出来了,并存储在Info类型中,这样就便于下面我们的处理。细心的同学可能会说,这个Json不是有两条数据吗?{"name":"过往记忆"}这条数据为什么没打出了?这是因为,我们不能保证输入的Json数据格式都是包含了name和website属性,如果不包含这两个属性的Json数据我们认为其是错误的数据,也就是要过滤掉。我们在程序中使用到asOpt和flatMap,它的功能是当解析失败的时候将那条失败的数据过滤掉。
如果我们需要将计算的结果保存成Json的数据可以如下操作:
val data = sc.parallelize(List(Info("过往记忆", "www.iteblog.com"))) data.map(Json.toJson(_)).collect.foreach(println)
结果是{"name":"过往记忆","website":"www.iteblog.com"}。
需要正常运行上面的程序,需要引入相关的依赖包:如果你是用Maven,请在你的pom.xml文件加入以下依赖:
<dependency> <groupId>com.typesafe.play</groupId> <artifactId>play-json_2.10</artifactId> <version>2.4.0-M1</version> </dependency>
如果你是用sbt,请在build.sbt文件加入以下依赖:
"com.typesafe.play" % "play-json_2.10" % "2.2.1"本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark解析Json数据(非Sql方式)】(https://www.iteblog.com/archives/1246.html)
你好,我按照你的方式,报错,Caused by: java.io.NotSerializableException: play.api.libs.json.OFormat$$anon$1,在执行val result: RDD[Info] = paserd.flatMap(recode => pasedfReads.reads(recode).asOpt)这个的时候,是什么原因呢?
你的Info是咋定义的?
你好,info和你的是一样的,问题解决了,extends App就是可以了,不要写main方法就行。谢谢啊!对了,加入你的例子中name是和数组,怎么获取?
如果是数组的话,你解析之后使用Play json转化成数字即可。
如果数据中的是个对象,可以转换吗?像这样{"name":"tom","other":[{"sex":"male","age":20}]},假如有个对象是other数组中的,这个对象有两个属性,就是sex和age,这样我试了一下,转不过来,你有什么办法吗?
还有,就是对象里可以嵌套对象吗?
personReads 这是从哪里来的,这句话不能编译通过
personReads定义如下:
implicit val personReads = Json.format[Info]
本文主要介绍如果通过Spark来读取Json文件。此句有错别字
已经修正,谢谢提醒。