Spark的countByKey和reduceByKey

2025年3月13日 作者 unix2go

Spark的count并不是惰性执行的,而是立刻落地(action)。可以从如下案例看出。

首先读取一个文件到rdd对象。文件是每行一个水果名。

val rdd = sc.textFile("jobs/fruits.txt")

我们想统计每个水果出现多少次,最直观的想法是reduceByKey,如下所示。

scala> rdd.map( (_,1) ).reduceByKey(_+_).sortBy(-_._2).collect
res21: Array[(String, Int)] = Array((peach,1038), (banana,1034), (pear,1018), (grape,1015), (cherry,1008), (watermelon,1006), (orange,989), (strawberry,986), (apple,970), (mango,936))

但是,reduceByKey是惰性执行的,所以后面要加一个collect来作为action,输出结果。

如果我们使用countByKey,那就不需要collect了,直接落地。

scala> rdd.map( (_,1)).countByKey.toArray.sortBy(-_._2)
res23: Array[(String, Long)] = Array((peach,1038), (banana,1034), (pear,1018), (grape,1015), (cherry,1008), (watermelon,1006), (orange,989), (strawberry,986), (apple,970), (mango,936))

比较 Spark 中这两个操作的效率:

// 第一种方式
rdd.map((_,1)).reduceByKey(_+_).sortBy(-_._2).collect

// 第二种方式
rdd.map((_,1)).countByKey.toArray.sortBy(-_._2)

效率对比

通常第一种方式更适合大规模数据处理,而第二种方式适合小数据集。理由如下:

第一种方式(使用 reduceByKey)

  1. 分布式执行:整个操作链都是在分布式环境中执行的
  2. 惰性求值:所有转换操作都是惰性的,直到 collect 才触发执行
  3. 局部聚合reduceByKey 会先在每个分区内进行局部聚合,然后再在节点间合并结果
  4. 有效的数据移动:减少了 shuffle 过程中的数据传输量
  5. 可扩展性:能够处理非常大的数据集,因为排序也是分布式的

第二种方式(使用 countByKey)

  1. Driver 端操作countByKey 将所有结果收集到 driver 节点
  2. 立即执行countByKey 是一个 action 操作,会立即触发执行
  3. 全局聚合:没有本地预聚合优化
  4. 内存限制:结果必须放入 driver 节点的内存中,可能导致 OutOfMemoryError
  5. 本地排序toArray.sortBy 在 driver 节点的内存中完成

主要差异点

  1. 执行位置
    • 第一种:分布式执行,直到最后的 collect
    • 第二种:在 executor 上执行 map,然后收集到 driver 处理
  2. 内存使用
    • 第一种:每个阶段都可以分布在集群中处理
    • 第二种:结果必须全部放入 driver 节点内存
  3. 适用场景
    • 第一种:适合大规模数据集(GB、TB级别)
    • 第二种:仅适合小数据集(能放入单机内存)

具体性能影响因素

  1. 数据规模:随着数据规模增大,第一种方式的优势越明显
  2. 不同 key 的数量:key 越多,第二种方式消耗的内存越大
  3. 集群资源:第一种方式可以利用整个集群资源,第二种方式受限于 driver 节点

结论

  • 对于生产环境中的大数据处理任务,几乎总是应该使用第一种方式
  • 如果确定处理的是小数据集(key 数量有限),且需要更简洁的代码,可以考虑第二种方式
  • 如果不确定数据规模或 key 的数量,应该选择第一种方式以避免内存问题

在实际应用中,当数据集超过几百 MB 或者有上千个不同的 key 时,第一种方式会明显表现出性能优势。