Spark面经

Saprk定义

Apache Spark 是一个多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。

## Spark组件

Spark三大抽象数据结构

RDD:弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行 计算的集合。

  1. 弹性
    • 存储的弹性:内存与磁盘的自动切换;
    • 容错的弹性:数据丢失可以自动恢复;
    • 计算的弹性:计算出错重试机制;
    • 分片的弹性:可根据需要重新分片。
  2. 分布式:数据存储在大数据集群不同节点上
  3. 数据集:RDD 封装了计算逻辑,并不保存数据
  4. 数据抽象:RDD 是一个抽象类,需要子类具体实现
  5. 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
  6. 可分区、并行计算

累加器:累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。

广播变量:广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个 或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表, 广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务 分别发送

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package RDD1

import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object task1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("job3task1")
    val sc = new SparkContext(sparkConf)

    //通过文件生成RDD
    val data = sc.textFile("hdfs://hadoop1:8020/Data01.txt")

     println(data.map(
      x => {
        val student = x.split(",")(0)
        student
      }
    ).distinct().count())//学生数量
    println(data.map(_.split(",")(1)).distinct().count())//课程数量

    val value = data.map(
      x => {
        val student = x.split(",")(0)
        val grade = Integer.parseInt(x.split(",")(2))
        (student, grade)
      }).filter(_._1=="Tom")
      .groupByKey().map{
      y=>{
      y._2.sum/y._2.size.toDouble
    }}
    value.foreach(println) //Tom的课程平均分

    val studentCourse = data.map(
      x=>{
        val student = x.split(",")(0)

        (student,1)
      }
    ).groupByKey().mapValues(_.size)
    studentCourse.foreach(println)//学生选课数

    val database = data.map(
      x=>{
        val course = x.split(",")(1)
        (course,1)
      }
    ).filter(_._1=="DataBase").groupByKey().map(y=>y._2.size)
    database.foreach(println)//选择了DataBase课的学生

    val avgGrade = data.map(
      x=>{
        val course = x.split(",")(1)
        val grade = Integer.parseInt(x.split(",")(2))
        (course,grade)
      }
    ).groupByKey().mapValues(y=>{y.sum/y.size.toDouble})
    avgGrade.foreach(println)//每门课的平均分


    val dbCount:LongAccumulator =  sc.longAccumulator("dbCount")
    data.foreach { line =>
      val course = line.split(",")(1)
      if (course == "DataBase") {
        // 每当课程为"DataBase"时,累加器的值加1
        dbCount.add(1)
      }
    }
    println("DataBase count: " + dbCount.value)
    //累加器实现查找选择了DataBase课的学生

    sc.stop()
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package SprakReview

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}

object BroadCastTry {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("广播变量练习")
    val sc = new SparkContext(sparkConf)

    val v = Array(1,2,3,4,5,6)
    // 创建广播变量
    val broadV:Broadcast[Array[Int]] = sc.broadcast(v)

    //打印广播变量
    println(broadV.value.mkString("Array(", ", ", ")"))

    //销毁广播变量
    broadV.destroy()

    sc.stop()
  }
}

Spark行动算子

Spark转换算子

map mapPartitions flatMap 区别

  1. map 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
1
2
3
4
5
6
7
  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
  }
  1. mapPartitions 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

map 和 mapPartitions 的区别?

  • 数据处理角度 Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。

  • 功能的角度 Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。 MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据

  • 性能的角度 Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。

  1. flatMap 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
1
2
3
4
5
6
7
8
  /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
  }

补充:mapPartitionsWithIndex 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  /**
   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
   * of the original partition.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   */
  def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
      preservesPartitioning)
  }
页面浏览量Loading
网站总访客数:Loading
网站总访问量:Loading
使用 Hugo 构建
主题 StackJimmy 设计