一般来说,spark操作函数(如map或者reduce)中的变量都不支持分布式,每个节点上都是独立的副本。只有以下两种变量才是spark支持的共享变量:
广播变量允许在每台节点机器上缓存只读的变量,而不是每个task持有一份拷贝副本。Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。
使用方式如下:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
广播变量在创建之后,可以在集群的任意函数中使用,只是在广播之后,对应的变量是不能被修改的,因为修改的值不会被广播出去。
累加器是解决RDD并行操作实现count计数、sum求和等情况涉及“加”操作的变量。Spark已原生支持数字类型的累加器,自定义类型必须自己再实现。
使用方式:
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
自定义累加器必须继承于AccumulatorParam。该类有两个接口方法:
例子:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
如果使用Scala写spark,还可以用Accumulable接口实现。也可以SparkContext.accumulableCollection累加scala中的基本集合类型。