20180725 Shuffle of Hadoop and Spark

Hadoop Shuffle

map

  1. 每个 mapper 有一个环形缓存,当数据大小达到阈值时,溢写到磁盘(会运行combiner)
  2. 溢写到磁盘的数据根据 reducer 进行分区且文件内按照 key 进行排序
  3. 多次溢写生成多个文件,最后合并数据文件(文件数大于3运行combiner)和索引文件

reduce

  1. reducer 读取相应 mapper 的分区文件
  2. 读取文件太大或文件数多于阈值,会溢写到磁盘(会运行combiner)
  3. 合并所有读取的文件,归并排序中的merge

Spark Shuffle

HashShuffleManager

  • 每个 mapper 根据数据 hash 写入不同的分区文件
  • 每个 mapper task 产生下一步 task 个文件,共 Mappers * Reducers
  • 优化(consolidateFiles)后,每个 core 产生下一步 task 个文件, E * (C / T) * R(E: –num-executors, C: –executor-cores, T: spark.task.cpus; 一般 spark.task.cpus = 1,就变成 总cpu * Reducers)

SortShuffleManager

normal

  • 和 Hadoop 的 shuffle 过程很像,不过 Hadoop 是环形缓存区,Spark 是根据 reduce 算子不同选择不同结构,比如 reduceByKey 选择 Map,这点又和 combiner 原理相同;Hadoop 是在内存中数据大小超过阈值溢写,Spark是默认超过 10000 条
  • 最后每个 task 只产生一个数据文件和一个索引文件

bypass

  • reduce task 小于 spark.shuffle.sort.bypassMergeThreshold 且 非聚合算子
  • 不进行排序,一个 map task 中根据 hash 生成 reduce 个文件,最后进行合并;和 HashShuffleManager 类似

——————
参考:
https://0x0fff.com/hadoop-mapreduce-comprehensive-description/
https://tech.meituan.com/spark_tuning_pro.html
https://0x0fff.com/spark-architecture-shuffle/

20180719 Spark History Server 常用配置

一般配置在 $SPARK_HOME/conf/spark-defaults.conf 中,分为两类,history server 的配置和 spark client 的配置;基本原理就是 Spark 把 log 写在磁盘上,然后启动的 history server 会读取 log 文件,重现 Spark UI。

History Server 读取的配置

spark.history.ui.port 1234
spark.history.fs.logDirectory hdfs://xxx
spark.history.retainedApplications 200

Spark Client 读取的配置

spark.eventLog.enabled true
spark.eventLog.dir hdfs://xxx (和上面logDirectory对应)
spark.yarn.historyServer.address ip:port (和上面端口对应)
spark.eventLog.compress true

起停

$SPARK_HOME/sbin/start-history-server.sh
$SPARK_HOME/sbin/stop-history-server.sh


详细参考

https://spark.apache.org/docs/latest/running-on-yarn.html#spark-properties
https://spark.apache.org/docs/latest/monitoring.html#spark-configuration-options

注意

有时候为了方便切换集群,用 spark-submit 提交任务时增加参数 --properties-file some-spark-defaults.conf,会覆盖默认配置

20180706

  1. 在 Spark 中使用 collectAsMap 要特别注意,测试及平时运行时可能没问题,但当数据一变大,就容易造成 driver OOM,比较明显的现象是任务还没有失败,但是 Spark UI 打不开。所以尽量用 JavaPairRDD 替代
  2. 在使用多线程运行 Spark job 时,比如 ThreadPool,要特别注意传入的方法中没有使用 createOrReplaceTempView, 这样容易造成多个任务的表名相互覆盖,最后几个得到的结果完全相同

20180629 Scala Puzzlers-3

  1. I Can Has Padding? 当给 foreach 传递一个语句时,注意该语句不要返回一个 Function1 对象,比如

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    val sb = new StringBuilder("hello")
    1 to 6 foreach { sb += '*' }
    // 本意是添加6个星号,但是 StringBuilder 的 append 方法返回对象本身
    // 而 StringBuilder 继承了 Function1,导致最后只添加了1个星号,然后调用 apply 6次,抛出越界异常

    // 再比如
    case class Foo() extends (Any => Unit) {
    def apply(a: Any) = println(s"apply $a")
    def print(a:Any) = {println(a); this}
    }

    val foo = Foo()
    1 to 10 foreach (foo print "hello") // 打印1次 hello, 10次 apply
  2. Cast Away; 尽量让编译器执行类型转换,避免手动转换;当可能时,编译器会尽量在基本类型范围内执行操作,而不是包装类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    println(null.asInstanceOf[Int] == 0)
    println(null.asInstanceOf[Int] == null)
    println(null == 0)
    println(1 == 1)
    // Warning: comparing values of types Int and Null using `==' will always yield false
    // 第二行编译时警告总会返回 false,但实际返回是 true
    // 反编译结果,其中 unboxToInt(i) = return i == null ? 0 : ((java.lang.Integer)i).intValue();
    Predef$.MODULE$.println(BoxesRunTime.boxToBoolean(BoxesRunTime.unboxToInt(null) == 0));
    Predef$.MODULE$.println(BoxesRunTime.boxToBoolean(null == null));
    Predef$.MODULE$.println(BoxesRunTime.boxToBoolean(BoxesRunTime.boxToInteger(0) == null));
    Predef$.MODULE$.println(BoxesRunTime.boxToBoolean(true));

    // 第一行 Wrapper Int 与 0 比较,编译器尽量将 wrapper 转换为基本类型,导致 null 转换为 0,返回 true
    // 第二行 wrapper Int 与 null 比较,因为右边 null 不是 wrapper 类型,所以不转换

20180625 Parquet and null

  1. Spark 读取和保存 Parquet 数据,某一列可以全为 null,但其类型不能为 null,因为现在的 Parquet 格式不支持 NullType;读取的全为 null 的 Parquet 也可以保存,因为保持了类型信息,但经过 withColumn 等转换后,类型信息可能消失;如果数据经过了大量转换,可以在保存前打印 schema 信息便于排查

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    scala> case class Person(name:String, age:Int)
    scala> val ps = Seq(Person(null, 12), Person(null,23), Person(null,23))
    ps: Seq[Person] = List(Person(null,12), Person(null,23), Person(null,23))
    scala> val ds = ps.toDF
    ds: org.apache.spark.sql.DataFrame = [name: string, age: int] // 此时name保留了 string类型,保存parquet也不会出错

    scala> val ds1 = ds.withColumn("name", when('name.isNull, null:String))
    ds1: org.apache.spark.sql.DataFrame = [name: null, age: int] // 此时name类型为 null, 无法保存为parquet

    scala> val ds2 = ds.withColumn("name", when('name.isNull, null:String).cast("string"))
    ds1: org.apache.spark.sql.DataFrame = [name: string, age: int] // 使用cast强制转换类型
  2. Spark 中使用 functions.when 时,要记得使用 otherwise,否则和 SQL 中一样会默认为 null

20180624 Scala Puzzlers-2

  1. If at First You Don’t Succeed…; lazy value 如果在初始化跑出异常,下一次被使用时会重新初始化
  2. To Map, or Not to Map; 注意使用 SortedSet 的 map 后产生的集合也是 SortedSet,所以原集合中的元素顺序不一定和新集合中一致
  3. Self: See Self; 编译器会把两个字符串相加编译成 StringBuilder.append(s1).append(s2) 的形式,所以 val s:String = s + s 会输出 nullnull
  4. Return to Me! return 只能用在有名字的方法(def)或函数体内,所以在方法对象(val)内的 return 实际上是返回的外部函数

    1
    2
    3
    4
    5
    6
    def foo: Int = {
    val bar: Int => Int = return _
    2 + bar(42)
    }

    println(foo) // 42
  5. Count Me Now, Count Me Later; 匿名函数的初始化会推迟到第一次使用时,而partially applied function 的初始化在定义时,所以如果初始化有副作用的话两者不同;val foo = bar(_) 这种形式其实就等价于 val foo = x => bar(x),属于匿名函数;val foo = bar _ 则是 partially applied function

  6. One Bound, Two to Go; 当从一个 currying function 定义一个 partially applied function 时,生成的对象是 FunctionN 的实例,所以参数列表都变成了 v1, v2 …
  7. Implicitly Surprising; 在 eta expansion 过程中,原方法的 implicit 参数会被解析代入

    1
    2
    3
    4
    5
    implicit val a:Int = 42
    def foo(x:Int)(implicit y:Int) = x+y
    println(foo(42)) // 84
    def bar = foo _
    println(bar(42)) // 84
  8. Information Overload; 如果一个方法接受的参数为Unit,则传入其他值进去,编译器会利用 value discarding,在参数之后添加一个 Unit

    1
    2
    def foo(u:Unit) = "wow"
    foo(12) // 编译为 foo({12; Unit})
  9. What’s in a Name? 方法默认参数的实现,是通过给类添加默认只方法实现的,而子类覆盖父类的方法时,会覆盖这些默认值方法,且只和定义顺序有关;子类方法与父类方法参数顺序不同时,调用声明为父类的子类的该方法,会引起顺序问题。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    class SimpleAdder {
    def add(x: Int = 1, y: Int = 2): Int = x + y
    }

    class AdderWithBonus extends SimpleAdder {
    override def add(b: Int = 3, a: Int = 4): Int =
    super.add(a, b) + 10
    }

    val adder: SimpleAdder = new AdderWithBonus
    println(adder add (y = 0)) // 13;编译器只知道 adder 是 SimpleAdder,所以将 0 传递到第二个参数位置,第一个参数位置的值由子类覆盖的默认值方法返回3
    println(adder add 0) // 14

    // scalac -print
    class Tool$SimpleAdder extends Object {
    def add(x: Int, y: Int): Int = x.+(y);
    <synthetic> def add$default$1(): Int = 1;
    <synthetic> def add$default$2(): Int = 2;
    def <init>(): com.Tool$SimpleAdder = {
    Tool$SimpleAdder.super.<init>();
    ()
    }
    };
    class Tool$AdderWithBonus extends com.Tool$SimpleAdder {
    override def add(b: Int, a: Int): Int = Tool$AdderWithBonus.super.add(a, b).+(10);
    override <synthetic> def add$default$1(): Int = 3;
    override <synthetic> def add$default$2(): Int = 4;
    def <init>(): com.Tool$AdderWithBonus = {
    Tool$AdderWithBonus.super.<init>();
    ()
    }
    };
    // 两次调用 add 方法:
    Tool.this.adder = new com.Tool$AdderWithBonus();
    scala.Predef.println({
    <artifact> val x$1: Int = 0; // y = 0
    <artifact> val x$2: Int = Tool.this.adder().add$default$1(); // 被子类覆盖,返回 3
    scala.Int.box(Tool.this.adder().add(x$2, x$1)) // y 是第二个参数
    });
    scala.Predef.println(scala.Int.box(Tool.this.adder().add(0, Tool.this.adder().add$default$2())));
  10. Irregular Expressions; regex findAllIn 返回了 MatchIterator,但此时内部的 matcher 需要初始化之后才能用;和 Java 中一样,定义了 matcher,但是没有使用 find 或其他方法而直接用 start 就会抛出异常

20180623 Scala Puzzlers-1

  1. Hi There! 花括号中的最后一个语句是返回语句,特别在map等函数中使用 _ 时要注意,如 List(1, 2).map{ println("Hi"); _ + 1 }
  2. UPSTAIRS downstairs; var (HOUR, MINUTE, SECOND) = (12, 0, 0); 多变量赋值是由模式匹配实现的,而在模式匹配中大写开头变量名是要匹配的对象,而不是要赋值的对象;应只在命名常量时大写
  3. Location, Location, Location; 类初始化顺序:evaluate args -> early definitions -> superclass constructor -> subclass default constructor
  4. Now You See Me, Now You Don’t; 子类覆盖父类的 val 时,val 也只能被初始化一次,所以在初始化过程中父类得到的值就是原始值
  5. The Missing List; def sumSizes(collections: Iterable[Iterable[_]]): Int = collections.map(_.size).sum; 集合使用 map 产生的是同类型的集合,注意 set.map;写参数是 Iterable 的方法时需注意这点,可通过转换成已知集合类型来减少未知的错误,如 toSeq,或使用其他方法代替 map
  6. Arg Arrgh! 同一个参数列表中,前面参数的类型不能用于推断后面参数的类型;应该用currying function
  7. Caught Up in Closures; 闭包捕获外部 var 时,捕获的是引用,所以外部 var 变化,闭包跟着变化
  8. Map Conprehension; for 语句中左边参数是模式匹配,相当于 withFilter,会舍去不符合的值
  9. Init You, Init Me; object和其中的val都是 lazy instantiated,一个 object 可以看作是一个 lazy value;而循环引用加上lazy instantiated就可能造成val值与定义顺序无关,与使用顺序有关;原因是JVM只会初始化一个对象一次
  10. A Case of Equality; case class 继承其他类A,且类A包含自定义的 hashcodeequals 方法时,case class 不会自动生成这些方法

http://scalapuzzlers.com/

20180620 pattern in for

  1. Scala 中的 for 前面是一个 pattern,而不是简单地参数,会编译成withFilter,比如
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
      val s = Seq(Seq(1, 2), Seq(3, 4), Seq(5, 6, 7))
    for (Seq(k, j) <- s) {
    println(k + j) // 3, 7
    }
    }

    // 大致编译成
    s.withFilter {
    case seq if seq.length == 2 => true
    case _ => false
    }.foreach{
    case seq if seq.length == 2 => println(seq.sum) // 这里确实又判断一次,可用 jad 反编译或 scalac -print 验证
    case _ => throw new MatchError()
    }

20180618 阿里Java开发手册

阿里Java开发手册学到的:

  1. 【强制】抽象类命名使用 Abstract 或 Base 开头(经常缩写成Abs或不写)
  2. 【强制】POJO 类中布尔类型的变量,都不要加 is 前缀,否则部分框架解析会引起序列化错误(比如jackson中没有域的getter会反序列化错误)
  3. 【强制】杜绝完全不规范的缩写,避免望文不知义(比如Abs)
  4. 【参考】枚举类名建议带上 Enum 后缀(有时不带)
  5. 【推荐】 类内方法定义的顺序依次是:公有方法或保护方法 > 私有方法 > getter/setter 方法(共有方法和依赖的私有方法在一起)
  6. 【推荐】集合初始化时,指定集合初始值大小(从不指定)
  7. 【推荐】除常用方法(如 getXxx/isXxx)等外,不要在条件判断中执行其它复杂的语句,将复杂逻辑判断的结果赋值给一个有意义的布尔变量名,以提高可读性(经常直接写)