欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Apache Spark快速入门:基本概念和例子(2)

五、弹性分布式数据集(Resilient Distributed Dataset,RDD)

  弹性分布式数据集(RDD,从Spark 1.3版本开始已被DataFrame替代)是Apache Spark的核心理念。它是由数据组成的不可变分布式集合,其主要进行两个操作:transformation和action。Transformation是类似在RDD上做 filter()、map()或union() 以生成另一个RDD的操作,而action则是count()、first()、take(n)、collect() 等促发一个计算并返回值到Master或者稳定存储系统的操作。Transformations一般都是lazy的,直到action执行后才会被执行。Spark Master/Driver会保存RDD上的Transformations。这样一来,如果某个RDD丢失(也就是salves宕掉),它可以快速和便捷地转换到集群中存活的主机上。这也就是RDD的弹性所在。

  下图展示了Transformation的lazy:

  我们可以通过下面示例来理解这个概念:从文本中发现5个最常用的word。下图显示了一个可能的解决方案:

[/code]
  下面就是Spark Scala REPL shell的简单实例:

scala> val hamlet = sc.textFile("~/temp/gutenburg.txt")
hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

  在上面的代码中,我们读取了文件,并创建了一个String类型的RDD,每一个String代表文件中的每一行。

scala> val topWordCount = hamlet.flatMap(str=>str.split(" "))
.filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_)
.map{case (word, count) => (count, word)}.sortByKey(false)

topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14

  1、通过上述命令我们可以发现这个操作非常简单——通过简单的Scala API来连接transformations和actions。
  2、可能存在某些words被1个以上空格分隔的情况,导致有些words是空字符串,因此需要使用filter(!_.isEmpty)将它们过滤掉。
  3、每个word都被映射成一个键值对:map(word=>(word,1))。
  4、为了合计所有计数,这里需要调用一个reduce步骤——reduceByKey(_+_)。 _+_ 可以非常便捷地为每个key赋值。
  5、我们得到了words以及各自的counts,下一步需要做的是根据counts排序。在Apache Spark,用户只能根据key排序,而不是值。因此,这里需要使用map{case (word, count) => (count, word)}将(word, count)流转到(count, word)。
  6、需要计算最常用的5个words,因此需要使用sortByKey(false)做一个计数的递减排序。

scala> topWordCount.take(5).foreach(x=>println(x))
(1044,the)
(730,and)
(679,of)
(648,to)
(511,I)

  上述命令包含了一个.take(5) (an action operation, which triggers computation)和在 ~/temp/gutenburg.txt文本中输出10个最常用的words。在Python shell中用户可以实现同样的功能。

  RDD lineage可以通过toDebugString(一个值得记住的操作)来跟踪:

scala> topWordCount.toDebugString
res8: String = MapPartitionsRDD[19] at sortByKey at <console>:14 
        ShuffledRDD[18] at sortByKey at <console>:14
        MappedRDD[17] at map at <console>:14
            MapPartitionsRDD[16] at reduceByKey at <console>:14
                ShuffledRDD[15] at reduceByKey at <console>:14 
                    MapPartitionsRDD[14] at reduceByKey at <console>:14
                        MappedRDD[13] at map at <console>:14 
                            FilteredRDD[12] at filter at <console>:14
                                FlatMappedRDD[11] at flatMap at <console>:14 
                                    MappedRDD[1] at textFile at <console>:12
                                        HadoopRDD[0] at textFile at <console>:12

常用的Transformations

Transformation & PurposeExample & Result
filter(func)
Purpose: new RDD by selecting those data elements on which func returns true
scala> val rdd = sc.parallelize(List("ABC","BCD","DEF")) 
scala> val filtered = rdd.filter(_.contains("C"))
scala> filtered.collect() 

Result:

Array[String] = Array(ABC, BCD)

map(func)
Purpose: return new RDD by applying func on each data element
scala> val rdd=sc.parallelize(List(1,2,3,4,5))
scala> val times2 = rdd.map(_*2)
scala> times2.collect()

Result:

Array[Int] = Array(2, 4, 6, 8, 10)

flatMap(func)
Purpose: Similar to map but func returns
a Seq instead of a value. For example, mapping a sentence into a Seq of
words
scala> val rdd=sc.parallelize(List("Spark is awesome","It is fun"))
scala> val fm=rdd.flatMap(str=>str.split(" "))
scala> fm.collect() 

Result:

Array[String] = Array(Spark, is, awesome, It, is, fun)

reduceByKey(func,[numTasks])
Purpose: To aggregate values
of a key using a function. "numTasks" is an optional parameter to specify
number of reduce tasks
scala> val word1=fm.map(word=>(word,1)) 
scala> val wrdCnt=word1.reduceByKey(_+_)
scala> wrdCnt.collect() 

Result:

Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1))

groupByKey([numTasks])
Purpose: To convert (K,V) to (K,Iterable<V>)
scala> val cntWrd = wrdCnt.map{case (word, count) => (count, word)}
scala> cntWrd.groupByKey().collect() 

Result:

Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark,
fun)), (2,ArrayBuffer(is)))

distinct([numTasks])
Purpose: Eliminate duplicates from
RDD
scala> fm.distinct().collect() 

Result:

Array[String] = Array(is, It, awesome, Spark, fun)

常用的集合操作:

Transformation and PurposeExample and Result
union()

Purpose: new RDD containing all elements from source RDD
and argument.
scala> val rdd1=sc.parallelize(List(‘A’,’B’))
scala> val rdd2=sc.parallelize(List(‘B’,’C’))
scala> rdd1.union(rdd2).collect()

Result:

Array[Char] = Array(A, B, B, C)

intersection()

Purpose: new RDD containing only common elements from
source RDD and argument.
scala> rdd1.intersection(rdd2).collect()

Result:

Array[Char] = Array(B)

cartesian()

Purpose: new RDD cross product of all elements from source
RDD and argument
scala> rdd1.cartesian(rdd2).collect()

Result:

Array[(Char, Char)] = Array((A,B), (A,C), (B,B), (B,C))

subtract()

Purpose: new RDD created by removing data elements in
source RDD in common with argument
scala> rdd1.subtract(rdd2).collect() 

Result:

Array[Char] = Array(A)

join(RDD,[numTasks])

Purpose: When invoked on (K,V) and (K,W), this operation
creates a new RDD of (K, (V,W))
scala> val personFruit = sc.parallelize(Seq(("Andy", "Apple"), ("Bob",
           "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot")))
scala> val personSE = sc.parallelize(Seq(("Andy", "Google"), ("Bob",
         "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista")))
scala> personFruit.join(personSE).collect()

Result:

Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)),
(Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))

cogroup(RDD,[numTasks])

Purpose: To convert (K,V) to (K,Iterable<V>)
scala> personFruit.cogroup(personSe).collect()

Result:

Array[(String, (Iterable[String], Iterable[String]))] = Array((Andy,(ArrayBuffer(Apple,
Apricot),ArrayBuffer(google))), (Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))),
(Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing, AltaVista))))

常用的actions

Action & PurposeExample & Result
count() Purpose: get the number of data elements in the
RDD
scala> val rdd = sc.parallelize(list('A','B','c')) 
scala> rdd.count()

Result:

long = 3

collect() Purpose: get all the data elements in an RDD
as an array
scala> val rdd = sc.parallelize(list('A','B','c')) 
scala> rdd.collect()

Result:

Array[char] = Array(A, B, c)

reduce(func) Purpose: Aggregate the data elements in an
RDD using this function which takes two arguments and returns one
scala> val rdd = sc.parallelize(list(1,2,3,4)) 
scala> rdd.reduce(_+_) 

Result:

Int = 10

take (n) Purpose: : fetch first n data elements in an RDD.
computed by driver program.
scala> val rdd = sc.parallelize(list(1,2,3,4)) 
scala> rdd.take(2)

Result:

Array[Int] = Array(1, 2)

foreach(func) Purpose: execute function for each data element
in RDD. usually used to update an accumulator(discussed later) or interacting
with external systems.
scala> val rdd = sc.parallelize(list(1,2,3,4)) 
scala> rdd.foreach(x=>println("%s*10=%s".
                    format(x,x*10))) 

Result:

1*10=10 4*10=40 3*10=30 2*10=20

first() Purpose: retrieves the first data element in RDD.
Similar to take(1)
scala> val rdd = sc.parallelize(list(1,2,3,4)) 
scala> rdd.first()

Result:

Int = 1

saveAsTextFile(path) Purpose: Writes the content of RDD
to a text file or a set of text files to local file system/ HDFS
scala> val hamlet = sc.textFile("~/temp/gutenburg.txt")
scala> hamlet.filter(_.contains("Shakespeare")).saveAsTextFile("~/temp/filtered") 

Result:

akuntamukkala@localhost~/temp/filtered$ ls _SUCCESS part-00000 part-00001

六、RDD持久性

  Apache Spark中一个主要的能力就是在集群内存中持久化/缓存RDD。这将显著地提升交互速度。下表显示了Spark中各种选项:

Storage LevelPurpose
MEMORY_ONLY (Default level)This option stores RDD in available cluster memory as deserialized Java
objects. Some partitions may not be cached if there is not enough cluster
memory. Those partitions will be recalculated on the fly as needed.
MEMORY_AND_DISKThis option stores RDD as deserialized Java objects. If RDD does not fit
in cluster memory, then store those partitions on the disk and read them
as needed.
MEMORY_ONLY_SERThis options stores RDD as serialized Java objects (One byte array per
partition). This is more CPU intensive but saves memory as it is more space
efficient. Some partitions may not be cached. Those will be recalculated
on the fly as needed.
MEMORY_ONLY_DISK_SERThis option is same as above except that disk is used when memory is not
sufficient.
DISC_ONLYThis option stores the RDD only on the disk
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as other levels but partitions are replicated on 2 slave nodes

  上面的存储等级可以通过RDD. cache()操作上的 persist()操作访问,可以方便地指定MEMORY_ONLY选项。关于持久化等级的更多信息,可以访问这里http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。

  Spark使用Least Recently Used (LRU)算法来移除缓存中旧的、不常用的RDD,从而释放出更多可用内存。同样还提供了一个unpersist() 操作来强制移除缓存/持久化的RDD。

七、变量共享

  

累加器Accumulators

  Spark提供了一个非常便捷地途径来避免可变的计数器和计数器同步问题——Accumulators。Accumulators在一个Spark context中通过默认值初始化,这些计数器在Slaves节点上可用,但是Slaves节点不能对其进行读取。它们的作用就是来获取原子更新,并将其转发到Master。Master是唯一可以读取和计算所有更新合集的节点。举个例子:

test@localhost~/temp$ cat output.log
error
warning
info
trace
error
info
info
scala> val nErrors=sc.accumulator(0.0)
scala> val logs = sc.textFile("/Users/akuntamukkala/temp/output.log")
scala> logs.filter(_.contains("error")).foreach(x=>nErrors+=1)
scala> nErrors.value
Result:Int = 2

  

广播变量Broadcast Variables

  实际生产中,通过指定key在RDDs上对数据进行合并的场景非常常见。在这种情况下,很可能会出现给slave nodes发送大体积数据集的情况,让其负责托管需要做join的数据。因此,这里很可能存在巨大的性能瓶颈,因为网络IO比内存访问速度慢100倍。为了解决这个问题,Spark提供了Broadcast Variables,如其名称一样,它会向slave nodes进行广播。因此,节点上的RDD操作可以快速访问Broadcast Variables值。举个例子,期望计算一个文件中所有路线项的运输成本。通过一个look-up table指定每种运输类型的成本,这个look-up table就可以作为Broadcast Variables。

test@localhost~/temp$ cat packagesToShip.txt ground
express
media
priority
priority
ground
express
media
scala> val map = sc.parallelize(Seq(("ground",1),("med",2), ("priority",5),("express",10))).collect().toMap
map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10)
scala> val bcMailRates = sc.broadcast(map)

上述命令中,我们建立了一个broadcast variable,基于服务类别成本的map。

scala> val pts = sc.textFile("~/temp/packagesToShip.txt")
scala> pts.map(shipType=>(shipType,1)).reduceByKey(_+_). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect()

在上述命令中,我们通过broadcast variable的mailing rates来计算运输成本。

Array[(String, Int)] = Array((priority,10), (express,20), (media,4), (ground,2))
scala> val shippingCost=sc.accumulator(0.0)
scala> pts.map(x=>(x,1)).reduceByKey(_+_).map{case (x,y)=>(x,y*bcMailRates.value(x))}.foreach(v=>shippingCost+=v._2) scala> shippingCost.value
Result: Double = 36.0

  通过上述命令,我们使用accumulator来累加所有运输的成本。详细信息可通过下面的PDF查看http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。

八、Spark SQL

  通过Spark Engine,Spark SQL提供了一个便捷的途径来进行交互式分析,使用一个被称为SchemaRDD类型的RDD。SchemaRDD可以通过已有RDDs建立,或者其他外部数据格式,比如Parquet files、JSON数据,或者在Hive上运行HQL。SchemaRDD非常类似于RDBMS中的表格。一旦数据被导入SchemaRDD,Spark引擎就可以对它进行批或流处理。Spark SQL提供了两种类型的Contexts——SQLContext和HiveContext,扩展了SparkContext的功能。

  SparkContext提供了到简单SQL parser的访问,而HiveContext则提供了到HiveQL parser的访问。HiveContext允许企业利用已有的Hive基础设施。

  这里看一个简单的SQLContext示例,下面文本中的用户数据通过"|"来分割。

John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854

  定义Scala case class来表示每一行:

case class Customer(name:String,age:Int,gender:String,address: String)

  下面的代码片段体现了如何使用SparkContext来建立SQLContext,读取输入文件,将每一行都转换成SparkContext中的一条记录,并通过简单的SQL语句来查询30岁以下的男性用户。

val sparkConf = new SparkConf().setAppName("Customers")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val r = sc.textFile("/Users/akuntamukkala/temp/customers.txt") val records = r.map(_.split('|'))
val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable("customers")
sqlContext.sql("select * from customers where gender='M' and age < 30").collect().foreach(println)

Result:[John Ledger,28,M,203 Galaxy Way,Paris, TX,75461]

  更多使用SQL和HiveQL的示例请访问下面链接https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。

九、Spark Streaming

  Spark Streaming提供了一个可扩展、容错、高效的途径来处理流数据,同时还利用了Spark的简易编程模型。从真正意义上讲,Spark Streaming会将流数据转换成micro batches,从而将Spark批处理编程模型应用到流用例中。这种统一的编程模型让Spark可以很好地整合批量处理和交互式流分析。下图显示了Spark Streaming可以从不同数据源中读取数据进行分析。

  Spark Streaming中的核心抽象是Discretized Stream(DStream)。DStream由一组RDD组成,每个RDD都包含了规定时间(可配置)流入的数据。图12很好地展示了Spark Streaming如何通过将流入数据转换成一系列的RDDs,再转换成DStream。每个RDD都包含两秒(设定的区间长度)的数据。在Spark Streaming中,最小长度可以设置为0.5秒,因此处理延时可以达到1秒以下。

  Spark Streaming同样提供了 window operators,它有助于更有效率在一组RDD( a rolling window of time)上进行计算。同时,DStream还提供了一个API,其操作符(transformations和output operators)可以帮助用户直接操作RDD。下面不妨看向包含在Spark Streaming下载中的一个简单示例。示例是在Twitter流中找出趋势hashtags,详见下面代码:

spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)

上述代码用于建立Spark Streaming Context。Spark Streaming将在DStream中建立一个RDD,包含了每2秒流入的tweets。

val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

上述代码片段将Tweet转换成一组words,并过滤出所有以a#开头的。

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))

上述代码展示了如何整合计算60秒内一个hashtag流入的总次数。

topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s
total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s
tweets)".format(tag, count))} })

上面代码将找出top 10趋势tweets,然后将其打印。

ssc.start()

上述代码让Spark Streaming Context 开始检索tweets。一起聚焦一些常用操作,假设我们正在从一个socket中读入流文本。

val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
Tranformation & PurposeExample & Result
map(func)
Purpose: Create new DStream by applying this function to tall constituent RDDS in DStream
lines.map(x=>x.tolnt*10).print()

prompt>nc –lk 9999
12
34
Output:
120
340
flatMap(func)
Purpose: This is same as map but mapping function can output 0 or more items
lines.flatMap(_.split(" ")).print()

prompt>nc –lk
9999
Spark is fun
Output:
Spark is fun
count()
Purpose: create a DStream of RDDs containing count of number of data elements
lines.flatMap(_.split(" ")).print()

prompt>nc –lk
9999
say
hello
to
spark
Output:
4
reduce(func)
Purpose: Same as count but instead of count, the value is derived by applying the function
lines.map(x=>x.toInt).reduce(_+_).print()

prompt>nc –lk
9999
1
3
5
7
Output:
16
countByValue()
Purpose: This is same as map but mapping function can output 0 or more items
lines.map(x=>x.toInt).reduce(_+_).print()

prompt>nc –lk 9999
9999
spark
spark
is
fun
fun
Output:
(is,1)
(spark,2)
(fun,2)
reduceByKey(func,[numTasks]) lines.map(x=>x.toInt).reduce(_+_).print()

prompt>nc –lk 9999
9999
spark
spark
is
fun
fun
Output:
(is,1)
(spark,2)
(fun,2)
reduceByKey(func,[numTasks])val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).
reduceByKey(_+_)
wordCounts.print()

prompt>nc –lk 9999
spark is fun
fun
fun
Output:
(is,1)
(spark,1)
(fun,3)
The following example shows how Apache Spark combines Spark batch with Spark Streaming. This is a powerful capability for an all-in-one technology stack. In this example, we read a file containing brand names and filter those streaming data sets that contain any of the brand names in the file.
transform(func)
Purpose: Creates a new DStream by
applying RDD->RDD transformation to all
RDDs in DStream.

brandNames.txt
coke
nike
sprite
reebok

val sparkConf = new SparkConf()
.setAppName("NetworkWordCount")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.
socketTextStream("localhost" 9999,
StorageLevel.MEMORY_AND_DISK_SER)
val brands = sc.textFile("/Users/
akuntamukkala/temp/brandNames.txt")
lines.transform(rdd=> {
rdd.intersection(brands)
}).print()

prompt>nc –lk 9999
msft
apple
nike
sprite
ibm
Output:
sprite
nike
updateStateByKey(func)
Purpose: creates a new DStream where the value of each key is updated by applying given function.
Please refer to the StatefulNetworkCount example in Spark Streaming.

This helps in computing a running aggregate of total number of times a word has occurred

常用的窗口操作函数

Tranformation & PurposeExample & Result
window(windowLength, slideInterval)
Purpose: Returns a new DStream computed from windowed batches of source DStream
val win = lines.window(Seconds(30),Seconds(10)); win.foreachRDD(rdd => {
rdd.foreach(x=>println(x+ " "))
})

prompt>nc –lk 9999
10 (0th second)
20 (10 seconds later)
30 (20 seconds later)
40 (30 seconds later)
Output:
10
10 20
20 10 30
20 30 40 (drops 10)
countByWindow(windowLength, slideInterval)
Purpose: Returns a new sliding window count of elements in a steam
lines.countByWindow(Seconds(30),Seconds(10)).print()

prompt>nc –lk 9999
10 (0th second)
20 (10 seconds later)
30 (20 seconds later)
40 (30 seconds later)
Output:
1
2
3
3

  更多operators请访问http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations

  Spark Streaming拥有大量强大的output operators,比如上文提到的 foreachRDD(),了解更多可访问 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations。

十、附加学习资源

Wikipedia article (good): http://en.wikipedia.org/wiki/Apache_Spark
Launching a Spark cluster on EC2: http://ampcamp.berkeley.edu/exercises-strata-conf-2013/launching-a-cluster.html
Quick start: https://spark.apache.org/docs/1.0.1/quick-start.html
The Spark platform provides MLLib(machine learning) and GraphX(graph algorithms). The following links provide more information:https://spark.apache.org/docs/latest/mllib-guide.html、https://spark.apache.org/docs/1.0.1/graphx-programming-guide.html、https://dzone.com/refcardz/apache-spark

  本文英文原文:https://dzone.com/refcardz/apache-spark
  本文转载自:http://www.csdn.net/article/2015-07-10/2825184,并修改其中部分的错误,如果你英文够好,建议直接看英文。
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache Spark快速入门:基本概念和例子(2)】(https://www.iteblog.com/archives/1410.html)
喜欢 (8)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!