Spark的countByKey和reduceByKey
2025年3月13日Spark的count并不是惰性执行的,而是立刻落地(action)。可以从如下案例看出。
首先读取一个文件到rdd对象。文件是每行一个水果名。
val rdd = sc.textFile("jobs/fruits.txt")
我们想统计每个水果出现多少次,最直观的想法是reduceByKey,如下所示。
scala> rdd.map( (_,1) ).reduceByKey(_+_).sortBy(-_._2).collect
res21: Array[(String, Int)] = Array((peach,1038), (banana,1034), (pear,1018), (grape,1015), (cherry,1008), (watermelon,1006), (orange,989), (strawberry,986), (apple,970), (mango,936))
但是,reduceByKey是惰性执行的,所以后面要加一个collect来作为action,输出结果。
如果我们使用countByKey,那就不需要collect了,直接落地。
scala> rdd.map( (_,1)).countByKey.toArray.sortBy(-_._2)
res23: Array[(String, Long)] = Array((peach,1038), (banana,1034), (pear,1018), (grape,1015), (cherry,1008), (watermelon,1006), (orange,989), (strawberry,986), (apple,970), (mango,936))
比较 Spark 中这两个操作的效率:
// 第一种方式
rdd.map((_,1)).reduceByKey(_+_).sortBy(-_._2).collect
// 第二种方式
rdd.map((_,1)).countByKey.toArray.sortBy(-_._2)
效率对比
通常第一种方式更适合大规模数据处理,而第二种方式适合小数据集。理由如下:
第一种方式(使用 reduceByKey)
- 分布式执行:整个操作链都是在分布式环境中执行的
- 惰性求值:所有转换操作都是惰性的,直到
collect
才触发执行 - 局部聚合:
reduceByKey
会先在每个分区内进行局部聚合,然后再在节点间合并结果 - 有效的数据移动:减少了 shuffle 过程中的数据传输量
- 可扩展性:能够处理非常大的数据集,因为排序也是分布式的
第二种方式(使用 countByKey)
- Driver 端操作:
countByKey
将所有结果收集到 driver 节点 - 立即执行:
countByKey
是一个 action 操作,会立即触发执行 - 全局聚合:没有本地预聚合优化
- 内存限制:结果必须放入 driver 节点的内存中,可能导致
OutOfMemoryError
- 本地排序:
toArray.sortBy
在 driver 节点的内存中完成
主要差异点
- 执行位置:
- 第一种:分布式执行,直到最后的
collect
- 第二种:在 executor 上执行 map,然后收集到 driver 处理
- 第一种:分布式执行,直到最后的
- 内存使用:
- 第一种:每个阶段都可以分布在集群中处理
- 第二种:结果必须全部放入 driver 节点内存
- 适用场景:
- 第一种:适合大规模数据集(GB、TB级别)
- 第二种:仅适合小数据集(能放入单机内存)
具体性能影响因素
- 数据规模:随着数据规模增大,第一种方式的优势越明显
- 不同 key 的数量:key 越多,第二种方式消耗的内存越大
- 集群资源:第一种方式可以利用整个集群资源,第二种方式受限于 driver 节点
结论
- 对于生产环境中的大数据处理任务,几乎总是应该使用第一种方式
- 如果确定处理的是小数据集(key 数量有限),且需要更简洁的代码,可以考虑第二种方式
- 如果不确定数据规模或 key 的数量,应该选择第一种方式以避免内存问题
在实际应用中,当数据集超过几百 MB 或者有上千个不同的 key 时,第一种方式会明显表现出性能优势。