一、安装配置


主参考 参考1 参考2 参考3

1、前提条件

  • JDK 1.8
  • Hadoop 集群方式安装
  • Scala 2.11.x

(1)安装Scala

下载scala,并安装

su
wget https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.tgz
tar zxvf scala-2.11.12.tgz
mkdir /usr/scala
mv scala-2.11.12 /usr/scala/
cd /usr/scala/
ln -s /usr/scala/scala-2.11.12 default
vim /etc/profile
# Scala
export SCALA_HOME=/usr/scala/default
export PATH="$SCALA_HOME/bin:$PATH"

2、安装

http://spark.apache.org/downloads.html

选择下载没有hadoop包的版本

(1)下载解压

# 重要:su 到hadoop运行的用户
wget https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-without-hadoop.tgz
sudo mkdir /usr/spark
sudo chown xxx:xxx /usr/spark
tar zxvf spark-2.4.0-bin-without-hadoop.tgz
mv spark-2.4.0-bin-without-hadoop /usr/spark/
cd /usr/spark/
ln -s /usr/spark/spark-2.4.0-bin-without-hadoop default
cd default/conf

(2)配置

环境变量sudo vim /etc/profile

# Spark
export SPARK_HOME=/home/spark/default
export PATH=$SPARK_HOME/bin:$PATH

编辑Spark环境cp spark-env.sh.template spark-env.sh && vim spark-env.sh

# 由于ssh执行脚本环境变量会失效,所以需要在此重新加载环境变量
export PATH=/usr/local/bin:/usr/bin
source /etc/profile
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
export JAVA_HOME=/usr/java/default   #Java环境变量
export SCALA_HOME=/usr/scala/default #SCALA环境变量
export SPARK_WORKING_MEMORY=512m  #每一个worker节点上可用的最大内存

export SPARK_MASTER_HOST=hadoop-master #master节点主机名
SPARK_MASTER_PORT=7077            # master 通信端口,worker和master通信端口
SPARK_MASTER_WEBUI_PORT=8080      # master SParkUI用的端口

export HADOOP_HOME=/usr/hadoop/default  #Hadoop路径
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop #Hadoop配置目录
export SPARK_WORKER_CORES=1           #每个节点多少核
export SPARK_WORKER_INSTANCES=1     #每台机器上开启的worker节点的数目

编辑集群情况cp slaves.template slaves && vim slaves

hadoop-slave1
hadoop-slave2

验证

spark-shell

(3)分发到slave节点

使用scp工具

(4)启动关闭spark

sbin/start-all.sh
sbin/stop-all.sh

(5)验证

在各个节点输入jps命令。可以看到Master或者Worker输出

访问Master节点8080端口http://192.168.3.20:8080/

将看到两个worker

二、模型与机制

1、术语定义

用户程序相关

  • Application 指用户编写的程序,包含Driver和Executor
  • Dirver 一般指Application上的Main函数创建的SparkContext, SparkContext是:
    • Spark应用程序的运行环境
    • 负责与ClusterManager通讯,申请资源、任务的分配和监控
  • Executor 指运行在Worker节点上的一个进程,负责运行Task,并负责将数据存储在内存或磁盘上,每个Executor并行Task的数目默认取决于CPU数

集群管理相关

  • Cluster Manager 集群管理者
    • standalone
    • yarn
  • Worker 工作机器守护进行,负责启动管理Executor

运行时相关

  • Job 包含多个Task组成
  • Stage Job被拆分为多组的Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段,阶段之间存在上下游关系
  • Task 被送到某个Executor上的工作任务

2、执行过程

  • 创建应用程序计算 RDD DAG(有向无环图)
  • 创建RDD DAG逻辑执行方案,将计算过程应用到Stage
  • 获取Executor来调度并执行各个Stage对应的ShuffleMapResult和ResultTask等任务,必须是一个Stage执行完成后下一个Stage才能执行

(1)Stage划分

划分Stage的方式是判断是算子之间是否产生宽依赖(宽依赖就是指父RDD的分区被多个子RDD的分区所依赖),Stage之间必然有Shuffle产生

(2)partition与并行度的划分

partition决定了Task的数目和每个分区文件的大小,partition越大,分区文件越小,Task数目越多,占用资源越高,额外开销越大,运行速度越快

三、参数与优化

1、自适应shuffle分区

参考 https://blog.csdn.net/u013332124/article/details/90677676

Adaptive Execution简称AE。原理是:

  • 在Shuffle过程中设定最大分区数目和分区文件期望大小。
  • 针对小分区,将对分区文件进行合并,使之达到期望文件大小的尺寸以减少task

开启方式

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.join.enabled=true;

最大并行度(也是初始并行度)(最大分区数目)

set spark.sql.adaptive.maxNumPostShufflePartitions = 300
  • 要保证 targetPostShuffleInputSize * maxNumPostShufflePartitions * 1.2 > 最大stage的shuffle read size 否者会导致运行速度变慢(因为stage的分区太大)

Shuffle read从每个上游task拿到的文件尺寸的最大值

set spark.sql.adaptive.shuffle.targetPostShuffleInputSize;

执行过程

  • 上游stage的每个task进行Shuffle Write,分区数为 spark.sql.adaptive.maxNumPostShufflePartitions
  • dirver 会汇总每个上游stage中每个task的Shuffle Write的每个分区的编号和文件大小,计算出下游任务数和每个任务读取每个上游任务的那几个partition
    • 规则是,针对每个上游task从0号分区其获取连续的、总大小小于spark.sql.adaptive.shuffle.targetPostShuffleInputSize的连续分区作为下游一个任务的输入
    • 按照上述规则划分分区,最后得到的分区组的数目就是下游task的数目,每个分区需要读取的分区编号就是这个连续的范围
    • 创建这些task
  • 下游task并行获取分区数据

2、Cache Table

四、Spark原理与实现

1、总体介绍

https://spark-internals.books.yourtion.com/markdown/1-Overview.html

部署图

  • 上图的 Master Node 的 Master 为 Master Domain Process (Master 守护进程),每个集群一个,生命周期和集群一致
  • 上图的 Worker Node 存在 Worker Domain Process (Worker守护进程),每个集群多个,一般一台机器一个,生命周期和集群一致
  • 上图的 Driver 可以运行在 Master Node 或者 Worker Node,和Application一一对应,生命周期和Application一致
  • 上图的 Executor 存在于 每个Worker中,由 Worker Domain Process 创建,每个Worker可能存在多个
    • 每个 Executor 存在一个线程池,线程池的数目受 spark.executor.cores 参数影响
    • 每个 Executor 的每个线程会运行一个 task,同一个 Executor 执行同一个任务

其他参考文档

五、场景

1、MySQL binlog Dump

  • 搭建 Canal 接入MySQLBinLog
  • Canal 将数据发送到 Kafka
  • Kafka 同步到 hdfs
  • spark读取增量数据并做成虚表,进行merge

2、数据倾斜处理

http://www.jasongj.com/spark/skew/

六、疑问自答

1、什么是AE

全称 adaptive execution,有如下作用

  • 根据配置的ShufflePartition文件大小,自动配置每个Stage的Partition数
  • 每个Stage执行完成之后,根据统计信息动态改变物理执行计划,重新生成RDD代码
    • 决定是否使用boardcast
  • 打开AE,Spark History UI的Stage DAG图会出现很多跳过Stage
  • 打开AE后,Spark History UI 的 SQL DAG图会随运行过程中不断变化

2、Spark Application、Driver、Job、Stage、Partition、Task及如何划分

Spark Application:

  • 通过spark-submit提交的一个程序称之为Application
  • 一个spark-shell称之为一个Application

Driver:

  • 运行用户提交主代码的进程,负责
    • 解析用户输入
    • 构建执行图
    • 调度协调Task

Job:

  • 一个Application可以划分为1个或多个Job
  • 划分Job的依据是:调用了action算子(ResultTask),action算子:
    • 收集信息到Driver程序
  • 一般情况下
    • AE:一个SparkSQL的每个Stage就是一个Job(因为AE会动态调整执行计划,需要收集信息到Driver)
    • 非AE:一个SparkSQL(insert into)一般对应一个一个Stage(因为物理执行图一旦确定就不会变,直到action操作insert into)

Stage:

  • 一个Job包含一个或者多个Stage,以Shuffle为界

Partition

  • Partition 是一个数据分片,完整的数据会分为多个Partition

Task

  • 一个Stage会启动多个Task,这些Task封装了相同的计算流水线,只是处理的数据不同。
  • 一个任务task的个数和Stage最后一个RDD的partition数目相同。(参考 不管是 1:1 还是 N:1 …)
  • task执行会以流水线的方式向上追溯到读表或者ShuffleRead,然后依次处理

3、Random等不确定函数作为joinkey时的问题

当在使用Random函数作为joinkey处理数据倾斜问题时,在失败重试的情况会导致数据不一致。

Join下游Task某个shuffle fetch faild,就需要重跑上游Stage,此时Random函数式不确定函数,重跑导致前后数据不一致。从而造成数据不一致

解决办法: 使用其他确定字段代替random生成JoinKey

4、前缀优化

select
  t1.a, t1.b,
  count(1) as uv,
  sum(vv) as vv
from
(
  select
    t1.a, t1.b, t1.c,
    count(1) as vv
  from t1
  left join t2
  on t1.a = t2.a
  group by t1.a, t1.b, t1.c
) x
group by t1.a, t1.b

以上SQL如果没有优化的话会产生3个Shuffle,分别是

  • Shuffle1:
    • t1 join t2
    • shaffle key 为 t1.a
  • Shuffle2:
    • group by t1.a, t1.b, t1.c
    • shaffle key 为 t1.a, t1.b, t1.c
  • Shuffle3:
    • group by t1.a, t1.b
    • shaffle key 为 t1.a, t1.b

可以观察到,shuffle1 的 key 是 shuffle2 key 的前缀。所以 shuffle2 可以优化为窄依赖