Young's blog Young's blog
首页
Spring
  • 前端文章1

    • JavaScript
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • HTML
  • CSS
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Young

首页
Spring
  • 前端文章1

    • JavaScript
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • HTML
  • CSS
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Hadoop

  • kafka

  • Flume

  • hive

  • scala

  • spark

    • spark core

      • Spark 入 门
      • Spark 运行架构
      • Spark 核心编程之 RDD 算子
      • Spark 核心编程之 RDD 算子(2)
      • Spark 核心编程之 RDD 累加器与广播变量
        • 1.1 实现原理
        • 1.2 基础编程
          • 1.2.1 系统累加器
          • 1.2.2 自定义累加器
        • 2.1 实现原理
        • 2.2 基础编程
    • spark sql

  • 大数据
  • spark
  • spark core
andanyang
2023-12-05
目录

Spark 核心编程之 RDD 累加器与广播变量

# 1. 累加器

# 1.1 实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。

# 1.2 基础编程

# 1.2.1 系统累加器

val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
 num => {
 // 使用累加器
 sum.add(num)
 }
)
// 获取累加器的值
println("sum = " + sum.value)
1
2
3
4
5
6
7
8
9
10
11

# 1.2.2 自定义累加器

// 自定义累加器
// 1. 继承 AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, 
Long]]{
var map : mutable.Map[String, Long] = mutable.Map()
// 累加器是否为初始状态
override def isZero: Boolean = {
 map.isEmpty
}
// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
 new WordCountAccumulator
}
// 重置累加器
override def reset(): Unit = {
 map.clear()
}
// 向累加器中增加数据 (In)
override def add(word: String): Unit = {
 // 查询 map 中是否存在相同的单词
 // 如果有相同的单词,那么单词的数量加 1
 // 如果没有相同的单词,那么在 map 中增加这个单词
 map(word) = map.getOrElse(word, 0L) + 1L
}

// 合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): 
Unit = {
 val map1 = map
 val map2 = other.value
 // 两个 Map 的合并
 map = map1.foldLeft(map2)(
 ( innerMap, kv ) => {
 innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
 innerMap
 }
 )
}
// 返回累加器的结果 (Out)
override def value: mutable.Map[String, Long] = map
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 2. 广播变量

# 2.1 实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个 或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表, 广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。

# 2.2 基础编程

val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
 case (key, num) => {
 var num2 = 0
 // 使用广播变量
 for ((k, v) <- broadcast.value) {
 if (k == key) {
 num2 = v
 }
 }
 (key, (num, num2))
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
编辑 (opens new window)
上次更新: 2024/04/19, 08:52:45
Spark 核心编程之 RDD 算子(2)
Spark 核心编程之 RDD 累加器与广播变量

← Spark 核心编程之 RDD 算子(2) Spark 核心编程之 RDD 累加器与广播变量→

最近更新
01
idea 热部署插件 JRebel 安装及破解,不生效问题解决
04-10
02
spark中代码的执行位置(Driver or Executer)
12-12
03
大数据技术之 SparkStreaming
12-12
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Young | MIT License
浙ICP备20002744号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式