Posted by CY Blog on December 26, 2021

Spark:Cluster Computing With Working Sets

这篇论文是Spark系列论文的开篇之作,主要介绍了当时解决大规模数据的分布式框架存在的局限性,并针对这些问题提出了Spark的解决方案。

简介

目前大规模数据解决方案(主要指MapReduce)是用来处理acyclic data flow的,对于有时候我们想要在一个数据集上执行多次操作( reuse a working set of data across multiple parallel operations),即针对如下两类问题,则存在这局限性:

  • 迭代式作业:虽然每次迭代都可以表示成一个MR任务,但每次迭代必须从磁盘加载数据;
  • 交互式数据分析:SQL虽然也可以转换为MR任务,但每次MR任务都要从磁盘家加载数据;

磁盘IO的代价非常高,因此MR在解决上述问题的时候效率并不高。对于此,论文提出了一种新的大规模数据计算方案Spark,弹性分布式数据集(RDD)可以用来解决迭代式作业问题,而Spark是基于Scala构建的,Scala可以提供交互式的操作,可以很好地解决交互式的数据分析。

编程模型

开发人员需要编写驱动程序(Driver),控制应用程序的执行流程并在并行的环境中执行一系列的并行操作。Spark主要提供了两类抽象:RDD和并行算子(parallel operation)。此外Spark还提供了两种受限的共享变量。

弹性分布式数据集(RDD)

RDD的特点
  1. 跨计算机的可分区的只读对象集合
  2. 分区丢失后可以重建(因为RDD不需要物化在物理存储上,相反可以通过物理存储上的数据来构建RDD)
  3. 可以持久化RDD——cache() ,persist(),供后续计算使用。
如何创建RDD?
  1. 从HDFS这样的分布式文件系统创建;
  2. 通过并行的读取Scala集合来创建;
  3. 从另一个RDD转化而来;
  4. 改变现有RDD的持久性。
RDD默认是lazy且临时的,但是可以通过特定的操作来改变其持久性,如何改变?
  1. Cache action:将数据保存在内存中,以便后期重用时,可以快速的使用。
  2. Save action:将数据持久化到像HDFS这样的分布式文件存统上,这个被保存的版本也可以在后期的操作中重用。

数据集上的并行操作

  • 支持的操作
    • reduce:在driver产出一个结果
    • collect:在driver收集所有的数据
    • foreach:执行分布式的操作,会产生副作用
  • 不支持多个reduce,并发,比如执行按某个key的group时,只能聚合到driver一个节点。未来需要支持shuffle来支持这种操作。

共享变量

Spark提供了两种共享变量:

  1. 为什么需要:上面的map,filter,reduce都是传递/copy一个函数闭包到某个节点上执行,只能处理本节点自己创建的变量,我们可能需要一些共享的信息。
  2. 广播变量Broadcast variables:只读,传递一个大的查找表,这种变量只会被广播到每一个Worker一次;

If a large read-only piece of data (e.g., a lookup table) is used in multiple parallel op- erations, it is preferable to distribute it to the workers only once instead of packaging it with every closure. Spark lets the programmer create a “broadcast variable” object that wraps the value and ensures that it is only copied to each worker once.

  1. 累加器Accumulators:提供”add”语意(包括zero),任何节点可操作,只有driver可读可以在Worker节点间共享该变量,可以用来作为计数器

• Accumulators: These are variables that workers can only “add” to using an associative operation, and that only the driver can read. They can be used to im- plement counters as in MapReduce and to provide a more imperative syntax for parallel sums. Accumu- lators can be defined for any type that has an “add” operation and a “zero” value. Due to their “add-only” semantics, they are easy to make fault-tolerant.

实例

文本搜索(cache)

val file = spark.textFile("hdfs://...")
val errs = file.filter(_.contains("ERROR"))
val ones = errs.map(_ => 1)
val count = ones.reduce(_+_)

假设需要对存储在HDFS中的大型日志文件中包含的错误行进行统计。上面的代码示例使用Spark的方式实现了MapReduce操作。与MapReduce的操作不同的是,Spark可以保存中间数据。如果我们想保存errs数据,就可以使用如下方式创建一个缓存的RDD:

val cachedErrs = errs.cache()

这样如果后续我们需要读errs数据进行更多的操作,就会大大的提高执行效率了。

逻辑回归(accumulator)

LR是一种迭代算法,因此可将迭代数据缓存在内存中从而提高执行效率(迭代ITERATIONS次,每次points都是从cache到内存的数据来读)。将梯度设置成累加器变量,这样其就可以在并行的环境下进行累加了。

// 从文本文件中读取点数据,并缓存在内存中
val points = spark.textFile(...).map(parsePoint).cache()
// Initialize w to random D-dimensional vector
var w = Vector.random(D)
// Run multiple iterations to update w
for (i <- 1 to ITERATIONS) {
    //将梯度设置成累计,可以在所有的worker之间累加该数据
   val grad = spark.accumulator(new Vector(D))  
   //scala的for是语法糖,因此如下的代码会被转换成points.foreach来执行,是一个并行操作
   for (p <- points) { 
      // Runs in parallel
      val s = (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
      grad += s*p.x
   }
   w -= grad.value
}

最小二乘法(broadcast)

计算U和M时,都是通过并行化的方式进行计算的,而计算的过程中每一次循环,都需要数据集R,因此我们可以把数据集R设置成广播变量,在程序启动之后,数据集R只会被driver向所有参与计算的worker节点发送一次。

//每次计算的时候R都是被当作参数传进去,所以这里将数据集R设置成广播变量
val Rb = spark.broadcast(R)
for (i <- 1 to ITERATIONS) {
        U = spark.parallelize(0 until u)
                        .map(j => updateUser(j, Rb, M)).collect()
        M = spark.parallelize(0 until m)
                         .map(j => updateUser(j, Rb, U)).collect()
}

实现

RDD

  1. spark是建立在mesos之上的,mesos是一个『集群操作系统』,在上面协议跑各种分布式的框架,hadoop、mpi等,可以实现spark——可以用mesos的api启动task,共享机器资源

  2. 实现spark的核心是实现『RDD』

  3. 数据集在spark中表示成一串scala的object:他们表示了 the lineage of each RDD,且每个对象包括:

    ① 一个指向他parent的指针

    ② 如何从parent转换为这个RDD

    image-20201017231859789

  4. 所有RDD都实现了的一组相同的接口
    • getPartitions:获取所有分区ID的List
    • getIterator(partition):获得一个分区恩迭代器
    • getPreferredLocations(partition):用于数据的本地调度
  5. 当我们执行『并行操作』时,会:
    • 创建一个task来处理数据集的每个分区
    • 发送这个task到每个worker节点:会尽量发送到preferred locations。使用『delay sheduling算法』来获得本地调度。
    • 在worker上调用getIterator读取数据
  6. 不同类型的RDD,只是实现上面的接口不同
    • HdfsTTextFile
      • getPartitions:分区是block的id
      • getPreferredLocations:就是block的位置
      • getIterator:一个stream流
    • MappedDataset
      • Partitions、PreferredLocations:同上
      • Iterator:就是应用mapper func到上面的steam
    • CachedDataset
      • getIterator:查找本地内存有没有transformed partition的副本。
      • PreferredLocations:一开始就是parent的PreferredLocations。在cached之后会更新为内存存储的node
      • 如果存储fail,会重新读取parent的数据,再cache到其他node
  7. 发送task到worker上,就是发送closure(以及闭包bound内的变量),
    • 发送的内容具体而言包括:
      • 定义数据集的闭包:RDD?
      • 操作相关的闭包:如reduce
    • 使用java序列化scala/java的object来发送
      • 理论上比较简单直接发送对象就行
      • 实际上,scala的闭包实现有bug,会把闭包内没有引用obj打包进来,他们通过分析字节码把没有用的变量设为null

共享变量

  1. broadcast and accumulators 各自都使用了『自定义的序列化格式
  2. broadcast:以具有值v的广播变量b为例
    • v存储在一个『共享文件系统』中(初始版本用的是HDFS,正在开发高效的流式广播系统)
    • b的序列化方式就是一个文件路径,指向v
    • 当worker查询时,会读取共享文件的路径下的数据,并把数据缓存到本地内存
  3. Accumulators
    • 先在driver上序列化了Accumulator的数据结构,他含有:
      • 唯一的ID
      • 类型定义的初始『zero』值
    • 每个worker节点上为每个运行task的线程复制一个ThreadLocal的Accumulator变量,也是初始为zero
    • task运行结束后,发送Accumulator到driver,进行合并
      • 对某个Accumulator的操作的更新,每个partition只会执行一次。防止重新执行的task多次计算相同的值。

继承Scala解释器实现交互查询

  1. scala交互解释器实现原理:
    • 用户输入的一行都编译成一个class,这个类包含一个单例object,如var x = 5会编译成class Line1
    • println(x)会翻译成 println(Line1.getInstance().x)
  2. spark对他的修改
    • 每一个class定义都输出到一个共享文件系统中,目的是在其他机器上的节点可以使用(通过java classloader加载)
    • We changed the generated code so that the singleton object for each line references the singleton objects for previous lines directly, rather than going through the static getInstance methods. This allows clo-sures to capture the current state of the singletons they reference whenever they are serialized to be sent to a worker. If we had not done this, then updates to the singleton objects (e.g., a line setting x = 7 in the ex-ample above) would not propagate to the workers

参考

  1. Spark:Cluster Computing With Working Sets
  2. Spark:Cluster Computing With Working Sets阅读笔记

  3. Spark论文阅读