20180908 Variance in Scala

T’ extends T Meaning Scala notation
covariant C[T’] is a subclass of C[T] [+T]
contravariant C[T] is a subclass of C[T’] [-T]
invariant C[T] and C[T’] are not related [T]

from Type & polymorphism basics

比如 Function1[-T1, +R] 可以这样理解:
一个类型为 Function1 的变量,可以指向任何一个方法,只要该方法接受 T1 作为输入,接受 R 作为输出。而此时该方法的实际输入类型可以是 T1 的父类,输出类型可以是 R 的子类,也符合情况。
相当于是宽进严出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object Solution {
def main(args: Array[String]): Unit = {
val f1: A2 => A2 = a => a
test(f1)
val f2: A2 => A3 = a => new A3
test(f2)
val f3: A1 => A3 = a => new A3
test(f3)
val f4: A3 => A1 = a => a
test(f4) // can't compile
}

def test(f: A2 => A2) = println(f)
}

class A1
class A2 extends A1
class A3 extends A2

It makes sense to use contravariance for the values an object consumes, and covariance for the values it produces.
Parameters are contravariant positions, and return types are covariant. Inside a function parameter, the variance flips.
from Scala for the Impatients

  • 这就对应了上面 Function1 的定义,T1 是消耗的,所以是 contravariance;R 是产出的,所以是 covariance。

  • Scala 编译器对 variance 的控制比较严,对应的 variance 类型只能放在对应的位置,否则无法通过编译,比如:case class Pair[+T](var first: T, var second: T),由于会生成 setter 方法,而在 setter 方法中 first 和 second 都是在 contravariance 位置,与定义相反,则假如能通过编译,下面的代码就会有类似 Java 数组运行时 ArrayStoreException 的问题:

    1
    2
    val p: Pair[Any] = Pair(1, 2)
    p.first = "hello"

20180906 Spark Serialization Problem Revisited

嵌套内部类可能会引起序列化问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SparkLocalTest {
@Test
public void localTest() {
SparkSession spark = SparkSession.builder().master("local").getOrCreate();
Dataset<Row> ds = spark.range(1000).toDF();
ds.transform(flat2()).show(); // 正常
ds.transform(flat1()).show(); // 报错

}

private static Function1<Dataset<Row>, Dataset<Row>> flat1() {
return new AbstractFunction1<Dataset<Row>, Dataset<Row>>() {
@Override
public Dataset<Row> apply(Dataset<Row> v1) {
return v1.flatMap(new FlatMapFunction<Row, String>() {
@Override
public Iterator<String> call(Row row) throws Exception {
return Stream.of("" + row.getLong(0)).iterator();
}
}, Encoders.STRING()).toDF();
}

};
}
// 与上面的方法区别仅在于使用了 lambda 简化代码
private static Function1<Dataset<Row>, Dataset<Row>> flat2() {
return new AbstractFunction1<Dataset<Row>, Dataset<Row>>() {
@Override
public Dataset<Row> apply(Dataset<Row> v1) {
return v1.flatMap((FlatMapFunction<Row, String>) row -> Stream.of("" + row.getLong(0)).iterator(),
Encoders.STRING()).toDF();
}
};
}
}

虽然两个方法都是静态的,但是编译后会产生多个 class 文件:

1
2
3
4
SparkLocalTest$1.class
SparkLocalTest$1$1.class
SparkLocalTest$2.class
SparkLocalTest.class

  • transform 方法定义 def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this),所以是用 flat 方法在 driver 端产生了一个 Function 对象并仅在 driver 端使用,所以该 Function 对象可以不实现序列化
  • flat1 由于嵌套,产生了两个文件,而且内部类 SparkLocalTest$1$1 还引用了外部类 SparkLocalTest$1.class,由于外部类是 AbstractFunction1 的子类,并没有实现序列化接口,由于要发送到 executor 进行操作,所以使用时会报错 Task not serializable: java.io.NotSerializableException: SparkLocalTest$1
  • 而 flat2 中的 lambda 编译后是静态域,private static java.lang.Object $deserializeLambda$(java.lang.invoke.SerializedLambda); 就没有序列化问题
  • 另一种方案是创建一个实现序列化的类:abstract static class SerializableFunction1<T1, R> extends AbstractFunction1<T1, R> implements Serializable

Java Nested Classes: Behind the Scenes 学到的:

  • 内部类其实就是一种语法糖,对于 JVM 来说是透明的,JVM 看到的就是普通的类
  • 编译器在编译时将内部类提取出来,因为要访问包围类的域,所以内部类要保存包围类的引用
  • 当包围类的域是 private 的,包围类会产生一个静态访问方法供内部类使用;否则会直接通过引用访问

20180905 利用窗口函数给 group 数据打标签

问题:

Spark 中一张表有两列,id 和 query,要根据 query 的 pv 来给每一行增加新的一列 quality(if pv>=2 quality = high, else quality = low)

原有解决办法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

+--------------+ +---------------+
| | | |
| id, query +---------> | query, pv |
| | | |
+--------------+ +---------------+
| |
| |
| +-------v-------+
| | |
| | query,quality |
| | |
| +---------------+
| |
+----------------------------+
|
+--------------v-------------+
| |
| id, query, quality |
| |
+----------------------------+

如图,先创建一个新的 Dataset(query,quality),然后跟原有的表进行 join on query。这样需要要注意的是假如某一个 query pv 特别大,join 的时候会全部分布到一个节点上,轻则数据倾斜,重则 OOM 等。这时的解决办法是根据 pv 将 query 分为两张表,一张记录 pv 较少的,比如 pv<500000,正常采用 SortMergeJoin 即可;另一张表记录 pv 较大的,比如 pv>500000,由于 pv 较大的 query 很少,所以采用 BroadcastHashJoin,这样就不会将所有相同 query 记录拉取到同一节点。

采用窗口函数的方法:

  • 在 SQL 中一般的函数是根据每一行,输出一个值;
  • 聚合函数是根据每一个 group,输出一个值;
  • 而窗口函数是根据每一个 group,给每一行分别输出一个值。
    在以上的问题中,实际是给每行增加一列 pv,然后再转换为 quality,这个 pv 列的获得,原有方法中是通过 join,而其实可以通过窗口函数:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    object ScalaTest {
    case class Item(cuid: String, query: String)
    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._
    val ds = Seq(
    Item("c1", "hello"),
    Item("c1", "hello"),
    Item("c2", "world"),
    Item("c3", "foo")
    ).toDF()
    ds.show()
    val window = Window.partitionBy("query")
    val result = ds.withColumn("pv", count('query).over(window))
    .withColumn("quality", when('pv >= 2, "high").otherwise("low"))
    result.show()
    }
    }

至于使用窗口函数是否会引起数据倾斜,由于实际场景中还有其他限制(比如需要根据一张表的 pv 来给另一张表打标签,所以还是暂时使用的第一种方法),还未验证。


另外窗口函数还有其它很多用处,比如根据每个 id 取出相应数量的记录,但是每个 id 的取出数量都不同,又有大量 id,如果根据每一个 id 进行 filter 再 union,实际上是串行操作,会耗费大量时间;另一种方法是将原表变成 (id,[query …])与 (id, count)进行 join,再取出相应 count 个数,这样利用了并行处理更快一些。更好的方法是用窗口函数,给原表每行数据加上 rowNumber,join (id,count)后直接选取 rowNumber < count 的条目即可。


参考

Introducing Window Functions in Spark SQL

20180827 Lambda & Effective Final

Inner classes can refer to mutable instance fields
if the inner-class expression occurs in an instance context,
but this is not the same thing.
A good way to think of this is that a reference inside an inner class to the field x
from the enclosing class is really shorthand for Outer.this.x,
where Outer.this is an implicitly declared final local variable.

Language designer’s notebook: First, do no harm

  1. 局部变量的的寿命和包围的 block 是相同的,但 lambda 可以被保存在变量中,进而可以在 block 之外运行。当被 lambda 捕获时,则扩大了局部变量的寿命,与局部变量原有的表现不同
  2. 局部变量不会有竞争出现,因为只用一个线程可以访问,如果允许捕获可变局部变量,将打破这一局面
  3. lambda 可以捕获 field,没有打破之前的规则
  4. 只能捕获 effective final 的一个好的副作用是,引导写出更加函数式的代码

20180821 Spark & Scala Lambda Serialization

Lambda serialization
这篇文章的思路是通过加上 @transient 来使得不用的 field 避免序列化,副作用就是假如用到了,就会出现 NPE,解决办法就是再加上 lazy


Hygienic Closures for Scala Function Serialization
其实思路和 Spark RDD Programming Guide 中的一样,都是先将需要对象中的 field 复制到 local variable 中,这样就避免了序列化整个对象
不过该文中实现了一个更通用的方法来包装整个过程,更方便使用


在 Spark 传递 functions 时,

  • Scala 建议使用 Anonymous function syntax 和 Object 中的 static method;当传递类实例中的方法时,就需要注意序列化问题;可以用 anonymous function 是因为其被编译后的代码实现了序列化接口
  • Java 建议时使用对应的 lambda 用来实现 Spark 提供的各种继承序列化接口的 Function 接口;或者写出继承类;这里需要注意,如果使用匿名类或内部类,由于其拥有外部对象的引用,所以外部类必须实现序列化,或者用 static 内部类;但 non-capturing lambda 是被编译成外部类的 static field,所以可以直接使用,外部类也不用序列化(在 Scala 2.12, 2.13 中所有的 lambda 都被编译成 static 方法)

    Compiler by default inserts constructor in the byte code of the
    Anonymous class with reference to Outer class object .
    The outer class object is used to access the instance variable
    The outer class is serialized and sent along with the serialized object of the inner anonymous class

understanding-spark-serialization

Non-capturing lambdas are simply desugared into a static method having exactly the same signature of the lambda expression and declared inside the same class where the lambda expression is used.

Java 8 Lambdas - A Peek Under the Hood

Note that lambda body methods are always static (in Scala 2.12, 2.13). If a closure captures the enclosing instance, the static body method has an additional $this parameter. The Java 8 compiler handles this differently, it generates instance methods if the lambda accesses state from the enclosing object.

Remaining issues for Spark on 2.12

  • Python 同样是 lambda,但多了 local defs 和 top-level functions

值得一读:
Understanding Spark Serialization

What Lambda Expressions are compiled to? What is their runtime behavior?

20180819 Login Shell & Variable

  1. Login shell 就是登录时的 shell, 用 echo $0 会前缀 ‘-‘,如 ‘-bash’;会读取 profile 类文件,如 ~/.bash_profile,.bash_profile 中一般有语句会读取 ~/.bashrc
  2. Non-login shell 是被调用的 shell 或者脚本,不会读取 profile 类文件,如果是 interactive 的,会读取 .bashrc 文件
  3. rc 文件都是 interactive shell 会读取的
  4. Environment variable 一般放在 profile 文件中,用 export a=b 配置,这样整个 session 只会读一次
  5. Local variable 放在 rc 文件中,用 a=b 配置,这个每个打开的 shell 都会读一次

参考:
Difference between Login shell and Non login shell
Bash startup scripts
Difference between Local Variables and Environment Variables
What’s the difference between .bashrc, .bash_profile, and .environment?

20180817 Spark Logging

以下为魔改版 Spark 的使用情况,并不一定适用于官方版本

  1. 最简单的方法,不配置其它日志选项,直接修改 conf/log4j.properties,配置 Appender 为 System.out 或 err,可以在 yarn web 的 Application Master 一栏的 logs 中看到 stdout 和 stderr 的日志记录,这个是 AM container 的日志
  2. 麻烦点可以在 spark-defaults.conf 或者 spark-submit 时传入 -Dlog4j.configuration=some-log4j.properties,不用在 spark-submit 配置 –files=path/some-log4j.properties
  3. 但是如果只配置 Appender 为 RollingFile,在以上是看不到日志的(因为是yarn收集的 stdout 和 stderr 的日志?)
  4. 在 SparkUI 中的 executors 的 logs 一栏中,stdout 是 executor 执行 println 代码的输出,网址为“/stdout?start=-4096”;而 stderr 因为网址有误为 “spark.log?start=-4096“ 导致没有东西,改为”/stderr?start=-4096“ 即可看到 container 的日志
  5. 在 client 模式下,可以使用 File Appender,用于保存日志
  6. 在 cluster 模式下,使用 console Appender 可在 yarn 中看到日志,但是本地也会有一些状态输出
spark-defaults.conf spark-submit jar-resources driver 效果 executor 效果
不设置-Dlog4j.configuration=log4j.properties 没有 –files 没有log4j.properties 使用conf下的配置 未生效
设置 spark.executor.extraJavaOptions -Dlog4j.configuration=log4j.properties 没有 –files 没有log4j.properties 使用conf下的配置 未生效
设置 spark.executor.extraJavaOptions -Dlog4j.configuration=log4j.properties 有 –files 没有log4j.properties 使用conf下的配置 未生效
不设置 spark.executor.extraJavaOptions 没有 –files 有log4j.properties 使用conf下的配置 未生效
不设置 spark.executor.extraJavaOptions 有 –files 没有log4j.properties client 使用conf下的配置, cluster 使用上传的配置 未生效

总之 driver 配置容易改,executor 配置不知道怎么改……
executor 一直读取 spark core 中 /org/apache/spark/log4j-defaults.properties 的配置

20180811

functions

  1. 在处理 Dataset 中的集合类型时,一些在 functions 中常用的方法: array_contains, size, collect_set, collect_list;如获取数组最后一个元素 col.getItem(size(prices).minus(1))
  2. 对于要判断 Map 是否包含 key,不使用 UDF 的话,暂时发现在 2.3 以前只能用 ‘col.getItem(key).isNotNull,但是这样会过滤掉 value == null 的情况,如果是需要 value 的情况下还是可用的;在 2.3 functions 新增了 map_keys,需要搭配 array_contains 使用进行判断,但这样似乎计算成本较高,需要和使用 UDF 进行对比
  3. 使用 explode 时,结果中自动过滤了集合为空的行,可以省一步过滤;对 functions 中方法使用后的列名没有发现可参考的地方,只能看源码,比如 explode 对 array 使用后列名为 col,对 map 使用后为 key, value
  4. substring, 1 based,pos 可以传入负值表示从后计数;在 Spark 的实现中 pos 从0开始和从1开始是一样的;但要注意 pos=0 在不同的数据库中效果不同,比如 SQL SERVER 中是当成空字符导致结果长度短 1 位,而 MySQL 则不出结果

Dataset

  1. 在 Java 中使用 Dataset.transform 时需要继承 AbstractFunction1,而不是 Function1,因为 Function1 使用了 @specialized,编译时会根据原始类型产生对应方法,假如实现这个接口,需要覆盖所有的方法

20180802 Maven Configuration for Spark Unit Test

  1. 由于平常开发所用的魔改版 Spark 无法在本地运行,在本地进行单测需要用原版 Spark,通过 Maven 的 profile 进行切换
  2. 在 IDEA 的 Maven 插件中可以自定义配置,通过修改 Lifecycle 中的 test 和 package 的配置,可以实现在 test 时使用 local profile,在 package 时使用 online profile,不用再手动选择 profile
  3. 由于 Maven 在 package 时会经过 test,而我们需要打包的魔改版 Spark 是无法运行 test 的,这时需要在 scalatest plugin 中增加 skipTests 配置,实现在打包时自动跳过
  4. 根据 Maven Profile Best Practices,也可以删掉 online profile,直接将其内容作为默认配置,只保留 local,这样在不添加 profile 选项的时候也可以打包成功(毕竟我们需要的打包就是 online 版本的,而且之前两种 profile 的配置情况下,test 与 package 也要分别使用,所以这样做相比之前没有损失)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<properties>
<spark.version>online_version</spark.version>
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
<skipTests>${skipTests}</skipTests>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>local</id>
<properties>
<spark.version>2.2.1</spark.version>
<skipTests>false</skipTests>
</properties>
</profile>
</profiles>

20180731 Avoid Null

  1. 方法返回集合时可以返回空集合代替 null
  2. 使用 Null Object Pattern; 可选什么都不做,或输出必要解释信息
  3. private 方法由 public 方法调用时,在 public 方法中检查 null 后可在 private 方法中省略检查,但最好注明其输入不会为 null
  4. 使用 Optional,将其作为 monad 使用,而不是调用 isPresent then get
  5. 使用 @NotNull @Nullable

参考:
https://stackoverflow.com/questions/271526/avoiding-null-statements
https://alvinalexander.com/scala/what-effects-effectful-mean-in-functional-programming