RDD(Resilient Distributed Datasets)是 Spark 中最基本的数据结构,是适合做分布式计算的。那如何在分布式系统中,对数据 求和,计数,求均值 呢?
现在有数字
val list = 1 to 9
val numRdd = sc.parallelize(list, 4).cache()
// TODO 0. 打印分区
println("0. 分区情况为:")
println(numRdd.glom().collect().map(_.mkString("[", ",", "]")).mkString("\n") + "\n")
0. 分区情况为:
[1, 2]
[3, 4]
[5, 6]
[7, 8, 9]
求和
val sum = numRdd.reduce((a, b) => a + b)
利用 reduce
算子可以得到结果,它的工作原理是将数据 shuffle
到一个节点做加法:

但是,当数据量非常巨大时,就会收到磁盘 io 以及网络带宽的限制。如果,在将数据发送到下游节点之前,首先上游节点分别做求和,再将求和结果发送给下游节点做相加,那么,磁盘 io 以及网络带宽将不会是瓶颈了,而且也更充分的利用了分布式的优势

具体实现:
val sum = numRdd.aggregate(0)((sum, element) => sum + element, _ + _)
Tips
aggregate(z)(seqOp, combOp)
算子接收 3 个参数:
- z: 初始值
- seqOp: 分区内用于预聚合的操作函数
- combOp: 对分区结果进行合并的操作函数
求平方和
val sumOfSquares = numRdd.aggregate(0)((sum, element) => sum + element * element, _ + _)
计数
理解上面的求和之后,计数就更简单了:分区内计数后,再将分区间的计数结果相加即可
val count = numRdd.aggregate(0)((count, _) => count + 1, _ + _)

求平均
利用 aggregate
算子求分布式系统的均值也是顺理成章:
- 在分区内 求和 及 计数 ,传给下游节点
- 下游节点分别对上游的求和结果和计数结果进行累加

val (sum, count) = numRdd.
aggregate(0, 0)((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
val avg = sum / count
上例中初始值为二元组,第一个值表示求和,第二个值表示计数
源代码
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
/** | |
* 分布式计数、求和、平方和、均值 | |
* | |
*/ | |
object MathFunction { | |
def main(args: Array[String]): Unit = { | |
val spark = SparkSession.builder.appName("MathFunction").master("local[*]").getOrCreate() | |
val sc = spark.sparkContext | |
val list = 1 to 9 | |
val numRdd = sc.parallelize(list, 4).cache() | |
// TODO 0. 打印分区 | |
println("0. 分区情况为:") | |
println(numRdd.glom().collect().map(_.mkString("[", ",", "]")).mkString("\n") + "\n") | |
// TODO 1. 计数 | |
// val count = numRdd.count() | |
val count = numRdd.aggregate(0)((count, _) => count + 1, _ + _) | |
println("1. 计数:" + count + "\n") | |
// TODO 2. 求和 | |
// val sum = numRdd.reduce((a, b) => a + b) | |
// val sum = numRdd.reduce(_+_) | |
val sum = numRdd.aggregate(0)((sum, element) => sum + element, _ + _) | |
println("3. 总和:" + sum + "\n") | |
// TODO 3. 平方和 | |
val sumOfSquares = numRdd.aggregate(0)((sum, element) => sum + element * element, _ + _) | |
println("3. 平方和:" + sumOfSquares + "\n") | |
// TODO 4. 均值 | |
val (sum1, count1) = numRdd.aggregate(0, 0)((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) | |
println("4. 均值:" + sum1 / count1) | |
println("总和:" + sum1 + ",计数:" + count1) | |
sc.stop() | |
} | |
} |