20190504 Spark ShuffleManager

Spark 版本: 2.4.2

  • ShuffleManager 负责注册 shuffle, 以及获取对应分区的 writer/reader
  • 每一个 ShuffleDependency 在初始化时会注册 shuffle 并获得一个 ShuffleHandle
  • ShuffleManager 现在只有一个实现: SortShuffleManager
  • 根据 ShuffleHandle 的不同, 获取不同的writer, 也就对应不同的 shuffle write 方式

    BypassMergeSortShuffleHandle -> BypassMergeSortShuffleWriter

    条件:
    dep.mapSideCombine == false
    && dep.partitioner.numPartitions <= SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD(200)
    方式:
    根据下游分区, 每个分区的数据写入一个文件, 最后合并文件
    不进行 map side combine, 所以必须设定为 false
    同时打开文件流/serializer 不能太多, 所以设定了一个阈值

    SerializedShuffleHandle -> UnsafeShuffleWriter

    条件:
    dependency.serializer.supportsRelocationOfSerializedObjects
    && dependency.mapSideCombine == false
    && numPartitions <= MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE(16777216)
    方式:
    不使用 map side combine, mapper 获取到一条数据后立刻序列化
    所有操作都是在序列化格式下, 比如排序/合并等, 所以需要序列化后的格式支持这些操作, 以避免重复序列化/反序列化
    减少了内存占用/GC

    BaseShuffleHandle -> SortShuffleWriter

    不满足以上条件时使用
    使用了 ExternalSorter 先对数据根据 partition 分区, 然后溢写到磁盘时内部排序( if dep.mapSideCombine == true)
    combine 时使用 PartitionedAppendOnlyMap, 否则使用 PartitionedPairBuffer

20190503 Spark Broadcast 原理

  • SparkContext 调用 broadcast(), 使用 BroadcastManager 中的 TorrentBroadcastFactory 创建 TorrentBroadcast, 实际是工厂模式
  • BroadcastManager 的另一个作用是缓存, TorrentBroadcast 会根据 broadcastId 读取和写入
  • 读取和写入都是根据 “spark.broadcast.blockSize” 分成小段处理
  • TorrentBroadcast 在初始化时会写入到本机, 也就是 driver 端, 所以要注意 driver 内存设定
  • TorrentBroadcast 的读取和写入都是依靠 BlockManager, 读取时会先看本地有没有, 如果没有则使用 BlockManager 远程拉取
  • 远程拉取时从 driver 获得 locationsAndStatus, 然后地址 shuffle 后根据 same host > same rack(topologyInfo) > other 排序拉取, 这也是使用 “Torrent” 的意思

20190130 Python Cyclic Import Problem

代码示例

1
2
3
4
5
6
7
8
9
# p1.py
import p2
v1 = 1
v2 = p2.v2

#p2.py
import p1
v2 = 2
v1 = p1.v1

假如运行 p1.py 会报错 AttributeError: module 'p2' has no attribute 'v2',
运行 p2.py 会报错 AttributeError: module 'p1' has no attribute 'v1'

import 是如何执行的

当遇到 import moduleA 时,

  1. 首先检查 sys.modules 中有没有 key=moduleA 且值不为 None 的条目
  2. 如果有则直接使用该条目对应的 module object, 直接执行下一行
  3. 如果没有, 则创建 key=moduleA 条目, 值为空的 module object, 并执行 moduleA 中的代码, 如果执行中遇到 import 语句, 转到步骤 1

对于上面的示例代码中, 运行 p1.py 时流程如下:

  1. 在 sys.modules 生成 key='__main__' 条目
  2. 执行 p1.py 的 import p2, 这时 sys.modules 中没有 key=='p2', 转到 p2.py
  3. 在 sys.modules 中生成 key='p2' 条目, 并执行 p2.py 的 import p1, 这时 sys.modules 中没有 key=='p1', 重新执行 p1.py
  4. 在 sys.modules 中生成 key='p1' 条目, 并执行 p1.py 的 import p2, 这时 key=='p2' 已经存在, 不用转到 p2.py 执行了, 接着执行 v1 = 1, 无异常; 执行 v2 = p2.v2, 此时 p2 内容为空(因为代码还没有执行到其他行, v2 还没有加入到 p2 的命名空间中), 无法解析 v2, 报错

究其原因是 import/class/def 等都是在 import 过程中直接执行的, 这时候遇到没有解析的对象找不到就会报错; 而方法体只有在调用的时候才会被执行

还有一种容易产生问题的写法, 就是循环引用又使用了 from moduleA import B, 且 B 是方法/类/变量时, 这就要求在执行到这一句的时候, moduleA 就已经解析了 B; 如下示例

1
2
3
4
5
6
7
8
9
# p1.py
from p2 import f2
def f1():
f2()

# p2.py
from p1 import f1
def f2():
f1()

如何解决

一般这种现象的原因都是模块设计不合理, 最好是直接将公共依赖提取出来

临时解决方案包括在方法中 import, 或将 import 语句放在文件末尾

避免使用 import 直接引用类/方法/变量, Google Python Style Guide 提到:

Use import statements for packages and modules only, not for individual classes or functions.

为什么 Java/Scala 中没有这种问题?

归根结底是加载方式不一样.

在 Java 中使用一个类需要经过 加载->验证->准备->解析->初始化, 如果 Java 文件没有问题, 且在方法区生成了相应类的描述, 此时该类所有的引用都还是符号引用, 只需在初始化之前解析为直接引用即可, 只有真正不存在这个符号引用的对象才会出错, 而不会像 Python 中执行到这一行一定要找到该对象, 即使确实存在也因为没有解析而报错.

另外 Java 中没有像 Python 中这样的全局变量/方法.


参考:

https://docs.python.org/3/reference/import.html
http://effbot.org/zone/import-confusion.htm
https://realpython.com/absolute-vs-relative-python-imports/
https://stackoverflow.com/questions/744373/circular-or-cyclic-imports-in-python
https://www.zhihu.com/question/19887316

20190107 Spark 任务优化方案

对于一系列 Spark 任务的优化, 需要有目的的优化, 将精力花费在瓶颈处, 避免过早优化和盲目优化.

所以需要收集足够的运行信息, 比如设计一个 Dashboard, 对整个任务流程的信息进行收集, 包括输入信息/运行时间等. 另外假如这些任务是多人维护的, 有一个详细的文档说明也很重要. 从这些信息中提取公用资源/常用资源进行缓存和提前计算, 以减少实际运算时的重复计算.

任务执行前

任务输入聚合分析

目的: 分析目前各项任务中哪些条件常用, 给数据缓存提供思路

需求: 这些任务描述可能已经保存在数据库中, 最好有一个直观的可视化界面, 或者能有定期脚本分析结果, 分任务对条件进行归类聚合,
比如本周 A任务输入条件中: 条件1 a次, 条件2 b次, 条件3 c次

根据常用输入, 可以分析出各种常见模式, 提前计算结果, 或显示在界面上, 用于引导或推荐

任务执行中

多个任务共用数据

目的: 可缓存不同任务共用的中间数据, 防止重复计算

难点: 各任务逻辑不同, 需熟悉所有执行流程

解决方案:

  1. 一个人读所有计算逻辑代码
  2. 每个人维护各自计算逻辑文档(初始读取的数据, 中间生成了什么数据, 假如有什么数据会简化计算), 然后根据文档分析

单个任务常用数据

从任务输入聚合分析的结果中找到常用数据
这些数据可以采用 Parquet 保存, 所以可以使用明文, 不需要转成 id

影响: 任务的数据加载部分需要修改
问题: 如何验证数据加载/提取/过滤这一块是瓶颈? (根据 Spark 运行记录)

优化计算资源占用(内存/CPU)

目的: 增大并行任务数

解决方案: 分析各任务 Spark 流程图, 看是否占用较多资源

任务执行后

任务时间统计

目的: 寻找整个系统瓶颈

需求: 输出各任务执行时间, 最好有甘特图等

20181209 Coursera: Mind Shift

Week1

  • Test, test, test yourself all the time on anything you really want to learn.
  • Follow your passion, 有时候只是意味着我们擅长这件事,所以才愿意去做,想去做。但事实是,有一些其他需要努力才能擅长的事,我们也能做到, 甚至是更加擅长, 只不过从来没有尝试过罢了

Week2

  • 番茄学习法的休息时间是用来做一些和工作时间完全不同的事, 从而放松大脑
  • 当感受到负面情绪时, 给其打上标签, 有助于从感性转到理性, 找到根本原因
    • All or nothing thinking, 没做成这件事, 我什么都不是, 是个彻底的失败
    • Magnification, 夸大了事情的重要性
    • Overgeneralization, 一方面的失败不代表其他所有方面
    • Mental filter, 只关注了失败的方面
    • Discounting the positive, 不要忽略自己成功的其他方面
    • Jumping to conclusions, 不要盲目预测结果, 未来不知会如何
    • Mind reading, 不要以为别人是这么想我的
    • Emotional reasoning, 自己感觉坏事要发生, 事实却不是这样; 比如感觉要坠机, 黑夜里有鬼
    • Shouldy thinking, 认为自己应该更努力, 应该做得更好
    • Pejoratives, 认为自己笨
    • Personalization and blame, 有时根本不是我的错或责任

Week3

  • 要把 passion 和这个世界的需要的结合起来, 避免把 passion 用在无用的地方
  • 除了主业, 要发展第二技能; 两者之间可能相辅相成, 也可以将 passion 用在第二技能上
  • 同一工作的六个月和六年没有太大区别, 技能的学习是先快后慢的 log 型的 (这一点符合二八原则, 20%的时间就掌握了80%的技能, 如果不是为了精通, 则不必另费时间, 应该学习新的技能; 回想自己的工作, 有多少是在重复之前六个月就学会的东西呢?)

Week4

  • In fact, it’s often better to avoid asking someone to be your mentor, even if you consider them to be a mentor to you.
  • Look for ways to make yourself somehow useful to your mentor
  • If you want to be the smartest person in the room, it’s pretty simple. Just read more than everyone else.

20181125 高效学习方法总结

关键词

主动

主动学习, 充分调动主观能动性, 用自己内心的渴望来激发学习的动力; 或者通过 练习 -> 成功 -> 获得自信 -> 觉得有趣 -> 更多练习 这种正向循环来使自己产生兴趣, 从而变得主动

目标

制定合理的目标, 要足够详细, 可以细化到每一天, 甚至更细, 这样才不至于在闲下来的时候, 虽然知道有事情要做, 但是不知道具体该干什么, 或者拖延着不想做 (这一点和 GTD 中的制定下一步行动原则很像)

专注/发散

大脑有两种模式, 专注和发散; 在专注的时候, 更像是在往大脑输入知识, 如果学习的时候不够专心, 则输入的速度就会很慢, 无法记住东西; 发散模式则是在放松的时候, 大脑会自动处理知识, 将各个不同领域的知识联系起来, 或者加深记忆, 或者产生新的想法

专注和发散都很重要; 这一点又和左脑右脑的分工很像, 在专注的时候, 就像左脑负责逻辑分析与计算, 发散则对应着右脑的大局观, 在专注的时候很容易走错路, 或者产生不合常理的结论, 这时需要右脑(发散)来通观大局.

快速准确反馈

通过不断地快速的反馈, 来纠正大脑中错误的印象, 或者加深正确的记忆;

比如在做题时可以做一道题看一个答案, 迅速纠正错误(这种说法待验证)

间隔

短时间多天的学习, 比长时间一天的学习效果好
这一点应该和重复记忆有关, 多次练习能够加深长期记忆, 比较像艾宾浩斯学习法; 而在学习的间隔中, 发散模式也许也参与了学习过程

转化

只有转化成自己的, 才是真正学会的; 比如用自己的话讲出来, 总结出来(30s 总结), 能够教给别人(费曼学习法)

chunk

许多领域的知识都能结合成一个区块, 比如整个高效学习的方法就是一个 chunk, 区块内部紧密相联, 而通过区块之间互相产生联系, 则可以加深记忆, 促进理解, 发挥创造力

比喻

通过比喻的方法, 将一个知识点化作另一个知识点, 比如电流和水流, 在 chunk 间建立联系, 加深理解; 有时候越荒诞的比喻越能加深记忆

感官

学习不只是只有看和做, 要充分利用各种感官, 将知识点想象成一幅图画, 或者加入想象中的听觉/触觉/嗅觉等, 在加上比喻等技巧

过程/结果

如果要克服拖延症, 可以专注于过程, 而不是结果; 比如不是说要专注于完成这个任务, 而是专注于”做”这个过程, 那么就不会那么痛苦而一直拖延下去, 同时在做的过程中会发现不知不觉任务就完成了

test/test/test

不断进行自我测试, 测试自己是否真正理解, 是否能总结要点, 是否能复述内容等等; 测试反过来能够促进自己更主动地学习(比如 30s 总结法)


方法

30s 总结法

每次上完课/听完讲座/看完一篇有用的文章, 用 30s 立刻总结一下刚才知识中的要点

这个方法最容易掌握, 也是一种检测自己是否认真听讲的好方法, 同时也会督促自己主动, 专心
这里面运用到了: 主动, 转化, 测试

富兰克林

将知识提炼成零散的关键字, 过一段时间看自己能否通过这些只言片语重现知识的全貌, 并与原来的内容进行比较

这里面运用到了: 总结, 间隔, 准确反馈

费曼学习法

假设自己要将这个知识教给一个完全不懂的人, 如何才能用自己的话将知识表述清楚?

这里面运用到了: 转化, 本身也是自我测试

SQ3R

综览 Survey -> 发问 Question -> 阅读 Read -> 背诵 Recite -> 复习 Review
读一本书不要从头到尾一字不落, 而是先要通观大局, 看每一部分的主要内容, 都表达了什么观点, 要带着问题/目标去读, 这样就更加主动, 而不是被动地接受

这里面运用到了: 主动

番茄工作法

每专心工作25分钟, 放松5分钟; 有效控制拖延症
也另有一种说法, 工作52分钟, 休息17分钟

Eat your frog first

把最不愿意做的事/学习的东西放到早上来做, 因为早上最有精力
如果早上都不愿意做, 到了晚上身心俱疲地时候就更不会做了


资料

书: 如何高效学习
在线课程: Learning how to learn
博客: https://www.scotthyoung.com/blog/

20181102 Airflow 中的时区问题

TL;DR:在北京时间,想要在 0:00~8:00 每日触发任务且代码中需要使用执行日期,则需使用 next_dstomorrow_ds 来避免日期偏差问题。


Airflow 支持时区设定,但内部处理时间的时候都是转换成 UTC 的,所以在按天进行调度的时候,可能会有显示时间不一致的问题。

比如我们希望每天早上 9:00 的时候运行昨天一天的任务,无论是按照文档中写的创建一个带时区的 start_date:

1
2
3
4
5
6
7
8
9
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")

with DAG(dag_id='test_timezone',
schedule_interval='0 9 * * *',
start_date=datetime(2018, 10, 20, tzinfo=local_tz),
catchup=True
) as dag:

还是直接转换成 UTC 的时间,即 1:00,对应 +8 区的 9:00

1
2
3
4
5
with DAG(dag_id='test_timezone',
schedule_interval='0 1 * * *',
start_date=datetime(2018, 10, 20),
catchup=True
) as dag:

这时任务执行中,使用 ds 都是没有问题的,比如处理 2018-10-31 的数据,ds 也会被渲染成 2018-10-31,因为在 2018-11-01 09:00:00+08:00 对应的 UTC 时间是 2018-11-01 01:00:00+00:00,此时两者的日期是一样的。

但假如我们想每天早上 7:00 执行任务,这时 2018-11-01 07:00:00+08:00 对应的 UTC 时间就是 2018-10-31 23:00:00+00:00,此时 ds 会被渲染成 2018-10-31,所以假如代码中依赖了执行日期,则需要使用 next_dstomorrow_ds 来避免日期偏差问题。

在内部,marcos 中的这些时间都是通过 execution_date 进行格式转换输出的,而 execution_date 是在 TaskInstance.__init__() 中被转换成 UTC 时间的,无论是设定了 start_date 的时区,还是直接修改了 Airflow 的默认时区。

20181014 Airflow tips

  1. DAG 中的参数 catchup,若设为 False,则启动后不会追溯之前的任务,如果 start time 在现在之前且符合至少一次任务,则会启动最近的一次任务;比如设定的时间为 "50 0 * * *",则是每天的 00:50 启动一次,
    • 假如现在是 2018-09-17 00:20:00,启动暂停的任务,会启动一个 Run id 为 scheduled__2018-09-15T00:50:00+00:00 的任务,因为在 2018-09-15 00:50:00 后过了一天,所以启动上一次的任务
    • 假如现在是 2018-09-17 00:51:00,则启动后会执行上一次,即 Run id 为 scheduled__2018-09-16T00:50:00+00:00 的任务
  2. 时间问题,可以设置默认时区,但是 templates 中返回的还是 UTC 时间,需要手动转换,所以还不如保持默认时区 UTC。当设置 start_time 的时区后,schedule_interval 的 cron 格式就是本地时区的时间了
  3. pyenv 与 Airflow 结合的问题。Airflow 中的 BashOperator 是通过 subprocess 实现的,其中传入的环境变量就是当前的运行时,所以无法和 pyenv 搭配使用,比如进到某个路径下自动用相应的 local env。如果调用的 Python 版本各不相同,暂时的解决方案是直接使用绝对路径的 Python
  4. if else 判断?BranchPythonOperator
  5. 如何传递 execution date?使用 templates,或 trigger 时设定 dag_run.conf
  6. 在 PythonOperator 中设 provide_context=True,则定义 callable 时加上 **kwargs,会自动填入 jinja templates 中的信息,如 ds 等,从而在代码中方便调用 kwargs['ds']
  7. 更新 DAG 后是否重新命名?建议 id 增加 v1,v2
  8. BashOperator 中如果直接指定的是可执行脚本文件,后面需要加一个空格, from Common Pitfalls
  9. 登录账号密码?Web Authentication,注意从一个数据库比如 MySQL 迁移到 PostgreSQL 而没有迁移数据时,需要重新设置账号密码
  10. 假如需要一个 dag 完成之后才进行下一时间的 dag,即同时只有一个 dag 在运行:
    • 可以试着看 wait_for_downstream 是否适用,
    • 或者用 ExternalTaskSensor. from this url;
    • 或者使用 xcom 保存 mutex 信息,和上一条原理相同,都是检测一个之前的 dag 正在运行的标志;from this url
    • 或者通过查询表 dag_run 中的相应条目
    • 或者更加直接地使用 max_active_run = 1
  11. how to kill task? 运行中任务点击 clear 是杀死任务,当前任务失败,并且整个 DAG 失败,后续 trigger rule 为 one_failed 的任务也不会触发;失败任务 clear 会导致重试
  12. 当需要一个 DAG 触发另一个 DAG,而不是将这两个连成一个大的 DAG 时,

    • 可以使用 TriggerDagRunOperator;即使被触发的 DAG 的时间小于设定的 start_date,也会执行;但如果被暂停了,就不会执行,会累计在队列中,此时开启暂停,会执行所有积累的任务
    • 传入 execution_date 时需要注意相同的 dag_id 的多个任务的 execution_date 不能相同,因为写入 table dag_run 时会报错,比如:sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, “Duplicate entry ‘d9_subs-2018-09-28 07:44:00.000000’ for key ‘dag_id’”);这是因为 table dag_run 设定了 UNIQUE KEY dag_id (dag_id,execution_date), 所以同一 dag_id 的 execution_date 不能相同
    • 但不传入 execution_date 时,会导致被触发的 DAG 的实际执行时间是 timezone.utcnow(),而获取 execution_date 又很麻烦,比如需要继承并修改 template_fields 等。所以如果需要两个 DAG execution_date 相同,可以使用 PythonOperator,利用 provide_context 获取时间,然后直接调用 TriggerDagRunOperator 中使用的方法,如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      def trigger_external(**kwargs):
      trigger_dag(dag_id='external1',
      run_id='trig__' + timezone.utcnow().isoformat(),
      conf='null',
      execution_date=kwargs['execution_date'],
      replace_microseconds=False)

      trigger = PythonOperator(task_id='trigger',
      python_callable=trigger_external,
      provide_context=True)
  13. 可以在执行的 task 的 Web UI 中的 Rendered Template 中看到实际经过渲染的命令,可用于任务失败的排查,当然日志中也会打印执行的实际命令;也可以通过 CLI:airflow render dag_id task_id date,可用于执行前的命令验证

  14. Operator 的返回值(如果不是 None)会放在 XCom 中,底层是在数据库的 xcom table 中,所以不同的 task 之间传递数据,即使下游任务失败,重启下游后还能获得相同的值,不必重新执行上游;下游获取时默认 dag_id = self.dag_id, key = ‘return_value’ 和 current execution_date

jinja2 使用的 context 由 TaskInstance.get_template_context() 获得
需要关注的一些 variables:

  • dag 对应 class DAG
  • dag_run 对应 class DagRun
  • conf 对应 class AirflowConfigParser,是解析后的 airflow.cfg,可用 as_dict() 查看全部内容;注意这个不是 DagRun 的 conf
  • params = task.dag.params + task.params;每个 task 的 params 都是定义的 DAG + Operator 的 params,不能由上一个 task 设定 params 然后下一个 task 读取,即不能用 params 在 task 间传递值
  • ti 对应 TaskInstance

给 BashOperator 中的 bash_command 传值:

  • 可以使用 env,注意复制原有环境,然后再使用比如 $FOO;如果是调用脚本文件,且文件没有使用 Airflow 的 marcos,则只能用这种方法传入环境变量
  • 可以用 DAG user_defined_macros,利用 jinja templates;比 env 的方法好在 render 之后可以看到实际执行代码

以前用 bash 执行任务,因为是定时检测上游数据,希望一个日期的任务不要被多次触发,使用的方法是创建一个 mutex 文件标志当前日期文件正在运行中;在 Airflow 中,加入检测和执行是两个 DAG,也可以通过数据库,比如使用 xcom,在任务执行前检测是否已有任务正在执行并插入,执行完成后删除,但是这样假如任务失败就会使得 mutex 文件保留在数据库中,下一次执行的时候需要手动删除,这和在 bash 脚本执行的时候遇到的情况相同。
另一种方法就是利用 Airflow 中 dag_run 这张表,里面记录了 DAG 每次运行情况及状态,在触发 DAG 的时候可以设定其 run_id,将任务日期加在后面,然后每次检测 running 状态的 run_id 是否日期相同即可。这样就不用手动删除 mutex 记录。

而要直接操作数据库,就涉及到 Hook 和 Connection 了。直接在 Airflow 的 Web UI Connection 中设定相应的账号密码,则在代码中可以直接使用对应的 Hook conn_id 获取相应的数据库数据。

20181007 ETL 使用脚本处理存在的问题

之前遗留项目的 ETL 流程大量使用 bash 脚本,现在的项目则因为处理逻辑复杂,重新用 Python 编写。两者都是定时触发,长期维护发现存在以下问题:

  • 每个任务分散在各个不同的目录中,调试更改困难
  • 有时多个任务可并行减少时间,但是只能使用 &wait $PID 来进行
  • 任务失败无法快速找到断点,所以在接手之后根据整个流程写了一个检测脚本
  • 找到断点后,想要执行后面所有的任务,则需要创建一个主流程副本,删掉前面的已成功任务的代码再执行
  • 不知道任务执行到哪一步了,除非看 hadoop、spark history 和 hdfs 路径是否有文件生成或使用检测脚本
  • 新人学习 ETL 流程时,只能阅读脚本然后手动画出整个过程,才能有直观上的理解;但流程图的每一步没有与代码直接对应
  • 但如果采用手工绘制流程图,则更新代码后,流程图不能保证同步更新
  • 一个大任务的日志一般在一个文件中,导致每个小任务日志混杂,难以排查
  • 增加、暂停单个子任务需要直接更新主流程脚本
  • 本地使用临时文件作为任务锁,防止任务重复执行
  • bash 脚本时使用本地文件记录已执行日期,其实是当作简单的数据库;Python 中则使用 sqlite 进行简单记录
  • 统计任务时间困难,要不根据日志,要不在代码中加上统计时间代码

最近受到 Azkaban 等任务调度系统启发,经调研决定使用 Airflow 来进行任务调度,尝试解决上面各种问题。

20180916 Dataset.col(colName) 无法区分衍生表的列

问题:

若数据集1产生数据集2,则两者进行 join 然后使用 ds.col(colName) select 的时候结果中的列可能并非想选择的,例如想选择 left outer join 后右表的同名列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
object ScalaTest {
case class Item(cuid: String, query: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val ds1 = spark.range(2)
//+---+
//| id|
//+---+
//| 0|
//| 1|
//+---+
val ds2 = ds1.filter('id < 1)
//+---+
//| id|
//+---+
//| 0|
//+---+
val ds3 = ds1.join(ds2, ds1.col("id") === ds2.col("id"), "left_outer")
//+---+----+
//| id| id|
//+---+----+
//| 0| 0|
//| 1|null|
//+---+----+
val ds4 = ds3.select(ds1.col("id")) // 选取第一列
//+---+
//| id|
//+---+
//| 0|
//| 1|
//+---+
val ds5 = ds3.select(ds2.col("id")) // 原意是选取第二列,但结果还是第一列
//+---+
//| id|
//+---+
//| 0|
//| 1|
//+---+
}
}

ds2 由 ds1 衍生,其中 ds1.col("id") === ds2.col("id") 这一句会产生警告

1
WARN Column: Constructing trivially true equals predicate, 'id#0L = id#0L'. Perhaps you need to use aliases.

实际上这两个 column 的对象是同一个,在 Column 的 === 方法中会输出这一句警告。假如直接按照语义处理则会变成笛卡尔积的形式,这也是早期版本的处理方式,如 这个问题 中的情况,而后来 Dataset.join 中会特殊处理成 self equal join on key。

1
2
3
4
5
6
7
8
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = {
// Note that in this function, we introduce a hack in the case of self-join to automatically
// resolve ambiguous join conditions into ones that might make sense [SPARK-6231].
// Consider this case: df.join(df, df("key") === df("key"))
// Since df("key") === df("key") is a trivially true condition, this actually becomes a
// cartesian join. However, most likely users expect to perform a self join using "key".
// With that assumption, this hack turns the trivially true condition into equality on join
// keys that are resolved to both sides.

所以上面的 ds3 是符合我们预期的,但 ds5 选择的也是 ds3 的第一列,因为 ds1.col("id")ds2.col("id") 是同一个对象,所以 ds5 结果与 ds4 相同;利用 explain 可看出这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
== Physical Plan == // ds1
*Range (0, 2, step=1, splits=1)
== Physical Plan == // ds2
*Filter (id#0L < 1)
+- *Range (0, 2, step=1, splits=1)
== Physical Plan == // ds3
*BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
:- *Range (0, 2, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Filter (id#4L < 1)
+- *Range (0, 2, step=1, splits=1)
== Physical Plan == // ds4
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
:- *Range (0, 2, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Filter (id#4L < 1)
+- *Range (0, 2, step=1, splits=1)
== Physical Plan == // ds5
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
:- *Range (0, 2, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Filter (id#4L < 1)
+- *Range (0, 2, step=1, splits=1)


解决办法:

  1. 利用 withColumnRenamed 或 as 重命名,新列名可以与原来相同,只是借助重命名这个动作使其产生一个新的引用对象
    1
    val ds2 = ds1.filter('id < 1).withColumn("id",'id)

此时执行计划就发生了改变,可以看出这一次 ds4 输出的是列 [id#0L],而 ds5 是 [id#4L],正好分别是 ds3 中的两列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
== Physical Plan == // ds1
*Range (0, 2, step=1, splits=1)
== Physical Plan == // ds2
*Filter (id#0L < 1)
+- *Range (0, 2, step=1, splits=1)
== Physical Plan == // ds3
*BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
:- *Range (0, 2, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Project [id#0L AS id#4L]
+- *Filter (id#0L < 1)
+- *Range (0, 2, step=1, splits=1)
== Physical Plan == // ds4
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
:- *Range (0, 2, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Project [id#0L AS id#4L]
+- *Filter (id#0L < 1)
+- *Range (0, 2, step=1, splits=1)
== Physical Plan == // ds5
*Project [id#4L]
+- *BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
:- *Range (0, 2, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Project [id#0L AS id#4L]
+- *Filter (id#0L < 1)
+- *Range (0, 2, step=1, splits=1)

  1. 使用 sql string,不会有任何问题;但是否应该在代码中使用大量的 sql 语句呢?一个可能的问题是维护困难,没有编译期检查,多行 sql 之间容易发生错误。
    1
    spark.sql(" SELECT ds2.id FROM ds1 LEFT OUTER JOIN ds2 ON ds1.id = ds2.id ")

另一个类似的问题