经历3天的Vert.X学习心得

Vert.x简单介绍

Vert.x是一个基于Netty、支持多语言的、范式自由的、具有高度灵活性的响应式(Reactive)编程工具包。除了Java、Kotlin,也可以使用JavaScript、Groovy等语言编程。唯一让我感到不愉快的地方在于Vert.x 3.x版本就没有了Python支持,如果Python程序想和Vert.x交互,只能靠TCP bridge或者使用其他手段如HTTP、AMQP、gRPC等。

Vert.x包含core和其他的一系列模块,如web,数据库,响应式编程,微服务等模块,这些模块都不是必需的,只有vertx-core才是核心模块,jar包大概在1.2MB左右。

使用Vert.x可以实现异步编程,并解决js的回调地狱,同时也得益于它所提供的类Actor模型,可以写出具有良好的可扩展性、鲁棒性的分布式程序,使用上远比Akka要更加容易。下面介绍下它的设计理念。

理解Vert.x的参考材料

了解Java Swing

在介绍Vert.x的设计之前,可以先看看Java Swing的设计。Java Swing是单线程的,但是它通过一定的办法解决线程不安全的问题。一个Swing程序启动后会有一个主线程和一个Event Dispatch Thread,主线程即这个Java程序,而EDT主要是负责处理GUI事件的线程。当GUI上发生了变化,比如按下一个按钮,就会把事件塞入Event Queue中,唯一的EDT负责处理Event Queue中排队的事件。如果直接把阻塞方法交给EDT处理(直接把阻塞方法包含在GUI事件中),就会阻塞EDT,导致GUI卡死。因此,如果GUI事件涉及阻塞方法,应该交由SwingWorker处理,SwingWorker会用别的线程来处理阻塞任务,并在任务处理中或完成之后修改共享变量或返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.TimeUnit
import javax.swing.JButton
import javax.swing.JFrame
import javax.swing.JProgressBar
import javax.swing.SwingWorker
fun main() {
GenGui()
}
class GenGui{
val jb = JProgressBar(0, 2000).apply {
setBounds(40, 40, 160, 30)
value = 0
isStringPainted = true
}
val btn = JButton("Click Me").apply {
setBounds(40, 90, 100, 30)
addActionListener {
iter(0, 2000)
}
}
val frame = JFrame("progress bar demo").apply {
add(jb)
add(btn)
setSize(250, 200)
layout = null
isVisible = true
}
fun iter(start: Int, end: Int){
val swWorker = object : SwingWorker<Int, Unit>(){
override fun doInBackground(): Int {
var index = start
while (index <= end){
jb.value = index
index += 20
println(index)
TimeUnit.MILLISECONDS.sleep(100)
}
return index
}
override fun done() {
println("All done")
}
}
swWorker.execute()
}
}

上面的代码中,当btn被按下,就会往Event Queue里塞入一个事件,轮到被EDT处理时,就会触发iter函数。而iter函数里通过把进度条更新的任务交由swWorker处理,这个worker在工作的时候会直接修改进度条的value,从而实现不阻塞EDT进程。如果阻塞任务没有委托SwingWorker,就会阻塞EDT导致界面卡死。

如果想安全地修改GUI元素,应该通过Swing的SwingUtilities.invokeLater()或者invokeAndWait()把GUI修改事件放入事件队列中排队,并由EDT负责更新GUI。

了解Erlang

在Erlang中,一个Erlang“进程”就是一个Actor,在Erlang虚拟机中可以低成本地创建上万乃至十万的“进程”,“进程”间的交互只能通过消息传递实现,消息是不可变的。每个“进程”单独维护自己的内部状态,因为没有共享状态,就不需要任何锁,能大幅简化并发编程。当一个Actor接收到消息后,就会通过模式匹配找到对应消息的处理函数,处理完成可以通过消息传递把结果发回给原发送者,也可以根据需要发送给下一个Actor。

Erlang的并发模型是非常直观和容易理解的:

  • 每个Actor就好像一个独立的人,如果你想叫你的同事帮你收集些资料然后给你在会议中用来做演示,你就需要用消息传递告知TA,不管你用电话还是邮件发送信息都好(传输接口)。
  • 你发送了请求给TA,你就可以继续做别的事(异步),不需要停下手上的工作等待回复(阻塞)。
  • 如果TA嫌弃你(内部状态),你作为发送者是无法看到的,经过TA的思考(模式匹配),TA决定不帮你收集,就通过消息传递告知你“我很忙,你找别人吧”。TA可能还会通过消息传递告诉朋友说:被人骚扰了很烦。
  • 或者TA暗恋你(内部状态),就帮你收集,等收集完成后再消息传递,发送结果给你。由于你不用阻塞等待,在TA发送结果给你之前,你可以继续手上的开会准备。
  • 当你接收到资料后,就可以开会,展示结果(对外部对象的副作用)

但是这样是不够的,假设你在开会前需要把资料抄送给直属领导审核(回调),直属领导又要和其他领导开会讨论(回调),就会导致代码中的回调地狱。

Vert.x分析

Vert.x的设计也是跟Swing类似的。当一个vertx实例(定位类似Erlang的虚拟机)被创建的时候,它会根据用户配置或者CPU可用核数(含线程)创建含一定数量的Event Loop Thread(下文简称ELT)的pool,在默认情况下,这个线程数量是可用核数*2(比如我的6700HQ,4核8线程,默认会创建16个event loop的pool)。ELT的作用和Swing的EDT类似,主要作用是分发消息并立即返回,而耗时任务交由另一个线程池负责。

Vertx的两个核心元素分别是Verticle和EventBus。一个Verticle就相当于Erlang进程、actor。而EventBus则是Verticle之间沟通的核心。

Verticle

要实现一个Verticle只需要继承AbstractVerticle,并重写start方法,stop方法是可选的:

1
2
3
4
5
6
class CLU2 : AbstractVerticle(){
private val server = embeddedServer(Netty, 8030, module = Application::module2)
override fun start() {
server.start()
}
}

上面的Verticle维护了一个内部状态server(Ktor的服务器),start方法会在Verticle部署或者接收到消息的时候执行。vertx会确保start方法只会被同一个event loop处理,但是每个event loop在底层都有可能由不同的ELT负责执行。在start方法里绝对不能写while(true)之类的死循环,就跟Swing一样不应该把循环写进GUI事件中。

The Golden Rule - Don’t Block the Event Loop

利用Kotlin的when表达式,可以写出类似Erlang的模式匹配代码来处理Verticle所接收到的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ConsumePrint : AbstractVerticle() {
private val server = embeddedServer(Netty, 8020, module = Application::module2)
private val worker = LoopPrint()
private var name: String = ""
private val stack = mutableListOf<String>()
override fun start() {
vertx.eventBus().consumer<String>("url1") { msg ->
when {
msg.body() == "launch" -> {
server.start()
Thread(worker).apply { start() }
vertx.eventBus().send("url3", "launch done")
}
msg.body() == "stop" -> {
println("Closing server")
server.stop(10, 10)
worker.close()
}
msg.body() == "simpleLoop" -> {
vertx.executeBlocking<String>({ future ->
run {
var d = 0.0
for (i in 0..10000000) d += (i * 10 / 23 * 10.02368899)
future.complete(d.toString())
}
}) { res -> println(" !!!!!!!! $res") }
}
msg.body() == "changeName" -> {
val context = vertx.orCreateContext
context.put("key", "This is value")
context.runOnContext {
println(context.get("key") as String)
}
}
msg.body() == "addStack" -> {
vertx.executeBlocking<String>({ promise ->
run {
Thread.sleep(10000)
promise.complete(if (stack.isNotEmpty()) stack.last() else "stack 1")
}
}) { res ->
run {
stack.add(res.result().toString())
vertx.eventBus().send("url3", "I received your msg!!")
println(stack)
}
}
}
msg.body() == "Stack" -> {
vertx.eventBus().send("url3", "stack is $stack")
}
else -> println("1 received message.body is ${msg.body()}")
}
}
}
}

上面的测试代码就是通过when来判断来自其他地址发送的消息体,然后执行不同的操作。

也可以对消息头进行判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Waste : AbstractVerticle(){
private val mapper = ObjectMapper().registerModule(KotlinModule())
override fun start() {
vertx.eventBus().localConsumer<String>("url2") { msg ->
val header = msg.headers()
when(header.get("msg")){
"waste" -> {
println("from url2: ${mapper.readValue<Person>(msg.body())}")
msg.reply("you are wasting")
}
else -> {
println("Waste wrong")
msg.fail(23, "Waste wrong reply")
}
}
}
}
}

如果需要执行耗时任务,可以部署worker verticle或者用executeBlocking:

一个worker verticle其实跟普通verticle差别不大,区别在于内部是由worker pool的线程来执行耗时任务,部署的时候也要设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
val vertx = Vertx.vertx()
println("size is: ${VertxOptions().eventLoopPoolSize}")
......
vertx.deployVerticle(WorkerV::class.java.name, deploymentOptionsOf(worker = true)) { res ->
run {
if (res.succeeded()) {
println("Deployment id is: ${res.result()}")
ids.add(res.result())
} else {
println("Deployment failed!")
}
}
}

在最新的稳定版本中(3.9.2),可以直接在verticle内部用executeBlocking来执行阻塞代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
msg.body() == "addStack" -> {
vertx.executeBlocking<String>({ promise ->
run {
Thread.sleep(10000)
promise.complete(if (stack.isNotEmpty()) stack.last() else "stack 1")
}
}) { res ->
run {
stack.add(res.result().toString())
vertx.eventBus().send("url3", "I received your msg!!")
println(stack)
}
}
}

executeBlocking的泛型参数是用于标记返回类型,代码里第一个lambda是处理器,用于执行耗时任务,并通过complete方法设置返回值,第二个lambda是对结果的callback方法,即对处理器的返回值进行处理。但是如果处理器中的阻塞代码可能会阻塞超过10秒,就不建议用使用上述两种方法了。最好单独创建进程来执行,比如本小节最开头的Ktor服务器的embeddedServer,start方法只负责启动ktor服务器。如果需要与verticle的context交互,应该通过runOnContext或者event bus来介入event loop中:

1
2
3
4
5
6
7
msg.body() == "changeName" -> {
val context = vertx.orCreateContext
context.put("key", "This is value")
context.runOnContext {
println(context.get("key") as String)
}
}

上面通过put方法对verticle的上下文添加了一个键为key,值为this is value的内部共享状态。而runOnContext其实就跟Swing的invokeLater方法类似,都是把事件提交进队列中就返回,等待event loop的处理(异步任务)。

至于什么是context?context是为每个event loop分配的一个对象。

当 Vert.x 传递一个事件给处理器或者调用 Verticle 的 start 或 stop 方法时,它会关联一个 Context 对象来执行。通常来说这个 Context 会是一个 Event Loop Context,它绑定到了一个特定的 Event Loop 线程上。所以在该 Context 上执行的操作总是在同一个 Event Loop 线程中。对于运行内联的阻塞代码的 Worker Verticle 来说,会关联一个 Worker Context,并且所有的操作运都会运行在 Worker 线程池的线程上。

每个 Verticle 在部署的时候都会被分配一个 Context(根据配置不同,可以是Event Loop Context 或者 Worker Context),之后此 Verticle 上所有的普通代码都会在此 Context 上执行(即对应的 Event Loop 或Worker 线程)。一个 Context 对应一个 Event Loop 线程(或 Worker 线程),但一个 Event Loop (Thread) 可能对应多个 Context。

部署完verticle之后可以通过undeploy方法取消已经部署的verticle实例,但是经过测试,目前3.9.2中如果在关闭vertx对象前把全部verticle手动取消部署,最终在vertx对象的close方法执行时会报异常:java.lang.IllegalStateException: Unknown deployment

Event Bus

Event bus其实就是类似AMQP的消息队列,在Erlang中进程主要是靠进程的pid来作为地址发送和接收消息,而在Vertx中,只需要一个字符串就可以作为地址,字符串的格式并没有固定的格式要求,因此非常灵活。在Vertx的event bus中主要有以下几种消息传递方式:

  • publish:所有订阅该地址的verticle都会收到这个消息。
  • send:简单地发送一个消息到该地址,但是如果有多个verticle都订阅该地址,只有一个会接收到消息。
  • request:跟send类似,但是可以添加回复处理器,可以处理接收者返回的reply,类似RPC调用。

用request发送消息可以包含消息头(send不支持),可以这样设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
data class Person(val name: String, val age: Int)
......
vertx.eventBus().request<String>(
"url2",
mapper.writeValueAsString(Person("css", 24)),
deliveryOptionsOf(headers = mapOf("msg" to "no waste"))
) { reply ->
if (reply.succeeded()) {
println("Main received reply: ${reply.result().body()}")
} else {
println("Main received failed reply: ${reply.cause().message}")
}
}

上面的代码中,url为url2。跟着的是一个使用Jackson序列化的data class转换得到的字符串,作为消息体。deliveryOptionsOf是发送用的配置信息,这里就设置了一个map作为headers。最后的lambda是回复处理器,用于处理来自url2的verticle返回的回复。

为了接收消息,可以用consumer或者localConsumer来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
vertx.eventBus().localConsumer<String>("url2") { msg ->
val header = msg.headers()
when(header.get("msg")){
"waste" -> {
println("from url2: ${mapper.readValue<Person>(msg.body())}")
msg.reply("you are wasting")
}
else -> {
println("Waste wrong")
msg.fail(23, "Waste wrong reply")
}
}
}
1
2
3
4
5
6
7
8
vertx.eventBus().consumer<String>("url1") { msg ->
when(msg.body()) {
"launch" -> {
server.start()
Thread(worker).apply { start() }
vertx.eventBus().send("url3", "launch done")
}
......

consumer比起local的,可以处理集群中的消息。consumer的msg可以用于回复发送者,比如reply和fail方法。

被发送的消息可以是普通的文本,或者用vertx提供的json dsl来创建JSON,vertx会自动处理序列化和反序列化的任务。但是我觉得目前3.9.2的json dsl并不好用,因此我选择直接用Jackson来对data class进行序列化用于发送。

Future

Future是vertx提供的用于解决回调地狱的工具,下面的代码演示了在3.9.2下如何正确地使用Future:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import io.vertx.core.*
import java.util.*
import kotlin.system.exitProcess
class ComposeExample : AbstractVerticle() {
override fun start() {
val future = anAsyncAction()
future.compose { name: String -> anotherAsyncAction(name) }
.onComplete { ar: AsyncResult<String> ->
if (ar.failed()) {
println("Something bad happened")
ar.cause().printStackTrace()
} else {
println("Result: " + ar.result())
}
}
}
private fun anAsyncAction(): Future<String> {
val promise = Promise.promise<String>()
// mimic something that take times
vertx.setTimer(1000) { promise.complete("world") }
return promise.future()
}
private fun anotherAsyncAction(name: String): Future<String> {
val promise = Promise.promise<String>()
// mimic something that take times
vertx.setTimer(100) { promise.complete("hello $name") }
return promise.future()
}
}
fun main() {
val vert = Vertx.vertx()
vert.deployVerticle(ComposeExample::class.java.name)
println("enter to exit...")
Scanner(System.`in`).nextLine()
exitProcess(0)
}

可以使用compose来调度异步任务的顺序。在我刚接触Vertx的时候我曾被Future和executeBlocking这种用于执行阻塞任务的函数有什么相关关系所困惑。其实Future只是一个异步任务容器,一个封装了future的函数里可以包含executeBlocking这种阻塞任务,而Future只是一个用来解决回调地狱的工具。

高可用

Vertx的架构本身就适合于分布式应用,而它通过集群管理器设置为集群模式并启用高可用模式。我目前就尝试了vertx默认的Hazelcast,但实际上也可以使用Zookeeper等工具作为集群管理器。

想使用Hazelcast,只需要在build.gradle里添加相关依赖:

1
2
3
4
5
6
7
dependencies {
/* Useless in kotlin 1.4.0
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"*/
implementation "io.vertx:vertx-core:3.9.2"
implementation "io.vertx:vertx-hazelcast:3.9.2"
implementation "io.vertx:vertx-lang-kotlin:3.9.2"
}

然后就可以开始编写代码:

集群节点1的verticle:

1
2
3
4
5
6
7
8
9
10
11
12
class CLU1 : AbstractVerticle(){
private val server = embeddedServer(Netty, 8020, module = Application::module2)
override fun start() {
server.start()
}
}
fun main() {
Vertx.clusteredVertx(vertxOptionsOf(haEnabled = true)) { vertx ->
vertx.result().deployVerticle(CLU1::class.java.name, deploymentOptionsOf(ha = true))
}
}

集群节点2的verticle:

1
2
3
4
5
6
7
8
9
10
11
12
class CLU2 : AbstractVerticle(){
private val server = embeddedServer(Netty, 8030, module = Application::module2)
override fun start() {
server.start()
}
}
fun main() {
Vertx.clusteredVertx(vertxOptionsOf(haEnabled = true)) { vertx ->
vertx.result().deployVerticle(CLU2::class.java.name, deploymentOptionsOf(ha = true))
}
}

通过clusteredVertx可以创建集群模式的vertx实例,两个ktor实例都在本地运行,只是监听的端口不同。可以先启动CLU1看看会输出什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
八月 28, 2020 9:56:39 下午 com.hazelcast.instance.AddressPicker
信息: [LOCAL] [dev] [3.12.2] Prefer IPv4 stack is true, prefer IPv6 addresses is false
八月 28, 2020 9:56:39 下午 com.hazelcast.instance.AddressPicker
信息: [LOCAL] [dev] [3.12.2] Picked [192.168.233.1]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
八月 28, 2020 9:56:39 下午 com.hazelcast.system
信息: [192.168.233.1]:5701 [dev] [3.12.2] Hazelcast 3.12.2 (20190802 - e34b163) starting at [192.168.233.1]:5701
八月 28, 2020 9:56:39 下午 com.hazelcast.system
信息: [192.168.233.1]:5701 [dev] [3.12.2] Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
八月 28, 2020 9:56:40 下午 com.hazelcast.spi.impl.operationservice.impl.BackpressureRegulator
信息: [192.168.233.1]:5701 [dev] [3.12.2] Backpressure is disabled
八月 28, 2020 9:56:41 下午 com.hazelcast.instance.Node
信息: [192.168.233.1]:5701 [dev] [3.12.2] Creating MulticastJoiner
八月 28, 2020 9:56:42 下午 com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl
信息: [192.168.233.1]:5701 [dev] [3.12.2] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks)
八月 28, 2020 9:56:42 下午 com.hazelcast.internal.diagnostics.Diagnostics
信息: [192.168.233.1]:5701 [dev] [3.12.2] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
八月 28, 2020 9:56:42 下午 com.hazelcast.core.LifecycleService
信息: [192.168.233.1]:5701 [dev] [3.12.2] [192.168.233.1]:5701 is STARTING
八月 28, 2020 9:56:44 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.233.1]:5701 [dev] [3.12.2]
Members {size:1, ver:1} [
Member [192.168.233.1]:5701 - d594a601-9adc-493a-afad-57c3377a4d70 this
]
八月 28, 2020 9:56:44 下午 com.hazelcast.core.LifecycleService
信息: [192.168.233.1]:5701 [dev] [3.12.2] [192.168.233.1]:5701 is STARTED
......
八月 28, 2020 9:56:45 下午 com.hazelcast.internal.partition.impl.PartitionStateManager
信息: [192.168.233.1]:5701 [dev] [3.12.2] Initializing cluster partition table arrangement...
八月 28, 2020 9:56:46 下午 io.vertx.core.impl.HAManager
信息: A quorum has been obtained. Any deploymentIDs waiting on a quorum will now be deployed
21:56:46.298 [vert.x-eventloop-thread-1] INFO ktor.application - No ktor.deployment.watch patterns specified, automatic reload is not active
21:56:46.474 [vert.x-eventloop-thread-1] INFO ktor.application - Responding at http://0.0.0.0:8020

可以看到目前的集群成员只有一个member,uid为d594a601-9adc-493a-afad-57c3377a4d70,并标记了this。Ktor服务器在localhost的8020端口监听。

此时再启动CLU2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
八月 28, 2020 10:00:16 下午 com.hazelcast.instance.AddressPicker
信息: [LOCAL] [dev] [3.12.2] Prefer IPv4 stack is true, prefer IPv6 addresses is false
八月 28, 2020 10:00:16 下午 com.hazelcast.instance.AddressPicker
信息: [LOCAL] [dev] [3.12.2] Picked [192.168.233.1]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true
八月 28, 2020 10:00:16 下午 com.hazelcast.system
信息: [192.168.233.1]:5702 [dev] [3.12.2] Hazelcast 3.12.2 (20190802 - e34b163) starting at [192.168.233.1]:5702
八月 28, 2020 10:00:16 下午 com.hazelcast.system
信息: [192.168.233.1]:5702 [dev] [3.12.2] Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
八月 28, 2020 10:00:17 下午 com.hazelcast.spi.impl.operationservice.impl.BackpressureRegulator
信息: [192.168.233.1]:5702 [dev] [3.12.2] Backpressure is disabled
八月 28, 2020 10:00:18 下午 com.hazelcast.instance.Node
信息: [192.168.233.1]:5702 [dev] [3.12.2] Creating MulticastJoiner
八月 28, 2020 10:00:19 下午 com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl
信息: [192.168.233.1]:5702 [dev] [3.12.2] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks)
八月 28, 2020 10:00:19 下午 com.hazelcast.internal.diagnostics.Diagnostics
信息: [192.168.233.1]:5702 [dev] [3.12.2] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
八月 28, 2020 10:00:19 下午 com.hazelcast.core.LifecycleService
信息: [192.168.233.1]:5702 [dev] [3.12.2] [192.168.233.1]:5702 is STARTING
八月 28, 2020 10:00:19 下午 com.hazelcast.internal.cluster.impl.MulticastJoiner
信息: [192.168.233.1]:5702 [dev] [3.12.2] Trying to join to discovered node: [192.168.233.1]:5701
八月 28, 2020 10:00:19 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Connecting to /192.168.233.1:5701, timeout: 10000, bind-any: true
八月 28, 2020 10:00:19 下午 com.hazelcast.nio.tcp.TcpIpConnection
信息: [192.168.233.1]:5702 [dev] [3.12.2] Initialized new cluster connection between /192.168.233.1:7582 and /192.168.233.1:5701
八月 28, 2020 10:00:25 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.233.1]:5702 [dev] [3.12.2]
Members {size:2, ver:2} [
Member [192.168.233.1]:5701 - d594a601-9adc-493a-afad-57c3377a4d70
Member [192.168.233.1]:5702 - b6af6227-f4d5-4730-a64d-f6fc11583760 this
]
八月 28, 2020 10:00:26 下午 com.hazelcast.core.LifecycleService
信息: [192.168.233.1]:5702 [dev] [3.12.2] [192.168.233.1]:5702 is STARTED
......
八月 28, 2020 10:00:27 下午 io.vertx.core.impl.HAManager
信息: A quorum has been obtained. Any deploymentIDs waiting on a quorum will now be deployed
22:00:27.787 [vert.x-eventloop-thread-1] INFO ktor.application - No ktor.deployment.watch patterns specified, automatic reload is not active
22:00:27.888 [vert.x-eventloop-thread-1] INFO ktor.application - Responding at http://0.0.0.0:8030

同时在CLU1的console输出了配对的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
八月 28, 2020 10:00:19 下午 com.hazelcast.nio.tcp.TcpIpConnection
信息: [192.168.233.1]:5701 [dev] [3.12.2] Initialized new cluster connection between /192.168.233.1:5701 and /192.168.233.1:7582
八月 28, 2020 10:00:25 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.233.1]:5701 [dev] [3.12.2]
Members {size:2, ver:2} [
Member [192.168.233.1]:5701 - d594a601-9adc-493a-afad-57c3377a4d70 this
Member [192.168.233.1]:5702 - b6af6227-f4d5-4730-a64d-f6fc11583760
]
八月 28, 2020 10:00:25 下午 com.hazelcast.internal.partition.impl.MigrationManager
信息: [192.168.233.1]:5701 [dev] [3.12.2] Re-partitioning cluster data... Migration queue size: 271
八月 28, 2020 10:00:27 下午 com.hazelcast.internal.partition.impl.MigrationThread
信息: [192.168.233.1]:5701 [dev] [3.12.2] All migration tasks have been completed. (lastRepartitionTime=Fri Aug 28 22:00:25 CST 2020, completedMigrations=271, totalCompletedMigrations=271, elapsedMigrationTime=1052ms, totalElapsedMigrationTime=1052ms)

可以看到在我的本地环境已经运行了一个两个成员的集群,在不同的console里是会用this显示哪个uid才是目前的vertx实例。

由于两个CLU都启用了高可用的设置,如果现在随便关闭一个vertx集群成员,vertx就会自动把失效的成员中的verticle迁移到其他可用集群成员中。下面是我kill了CLU1后CLU2的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
八月 28, 2020 10:02:27 下午 com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor
信息: [192.168.233.1]:5702 [dev] [3.12.2] Invocations:1 timeouts:1 backup-timeouts:0
八月 28, 2020 10:05:11 下午 com.hazelcast.nio.tcp.TcpIpConnection
警告: [192.168.233.1]:5702 [dev] [3.12.2] Connection[id=1, /192.168.233.1:7582->/192.168.233.1:5701, qualifier=null, endpoint=[192.168.233.1]:5701, alive=false, type=MEMBER] closed. Reason: Exception in Connection[id=1, /192.168.233.1:7582->/192.168.233.1:5701, qualifier=null, endpoint=[192.168.233.1]:5701, alive=true, type=MEMBER], thread=hz._hzInstance_1_dev.IO.thread-in-0
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:113)
at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:369)
at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:354)
at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:280)
at com.hazelcast.internal.networking.nio.NioThread.run(NioThread.java:235)
八月 28, 2020 10:05:11 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Connecting to /192.168.233.1:5701, timeout: 10000, bind-any: true
八月 28, 2020 10:05:13 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Could not connect to: /192.168.233.1:5701. Reason: SocketException[Connection refused: no further information to address /192.168.233.1:5701]
八月 28, 2020 10:05:13 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Connecting to /192.168.233.1:5701, timeout: 10000, bind-any: true
八月 28, 2020 10:05:15 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Could not connect to: /192.168.233.1:5701. Reason: SocketException[Connection refused: no further information to address /192.168.233.1:5701]
八月 28, 2020 10:05:15 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Connecting to /192.168.233.1:5701, timeout: 10000, bind-any: true
八月 28, 2020 10:05:17 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Could not connect to: /192.168.233.1:5701. Reason: SocketException[Connection refused: no further information to address /192.168.233.1:5701]
八月 28, 2020 10:05:17 下午 com.hazelcast.nio.tcp.TcpIpConnectionErrorHandler
警告: [192.168.233.1]:5702 [dev] [3.12.2] Removing connection to endpoint [192.168.233.1]:5701 Cause => java.net.SocketException {Connection refused: no further information to address /192.168.233.1:5701}, Error-Count: 5
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.impl.MembershipManager
警告: [192.168.233.1]:5702 [dev] [3.12.2] Member [192.168.233.1]:5701 - d594a601-9adc-493a-afad-57c3377a4d70 is suspected to be dead for reason: No connection
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.impl.MembershipManager
信息: [192.168.233.1]:5702 [dev] [3.12.2] Starting mastership claim process...
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.impl.MembershipManager
信息: [192.168.233.1]:5702 [dev] [3.12.2] Local MembersView{version=2, members=[MemberInfo{address=[192.168.233.1]:5701, uuid=d594a601-9adc-493a-afad-57c3377a4d70, liteMember=false, memberListJoinVersion=1}, MemberInfo{address=[192.168.233.1]:5702, uuid=b6af6227-f4d5-4730-a64d-f6fc11583760, liteMember=false, memberListJoinVersion=2}]} with suspected members: [[192.168.233.1]:5701] and initial addresses to ask: []
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.233.1]:5702 [dev] [3.12.2]
Members {size:1, ver:3} [
Member [192.168.233.1]:5702 - b6af6227-f4d5-4730-a64d-f6fc11583760 this
]
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.impl.MembershipManager
信息: [192.168.233.1]:5702 [dev] [3.12.2] Mastership is claimed with: MembersView{version=3, members=[MemberInfo{address=[192.168.233.1]:5702, uuid=b6af6227-f4d5-4730-a64d-f6fc11583760, liteMember=false, memberListJoinVersion=2}]}
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.partition.InternalPartitionService
信息: [192.168.233.1]:5702 [dev] [3.12.2] Fetching most recent partition table! my version: 678
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.partition.InternalPartitionService
信息: [192.168.233.1]:5702 [dev] [3.12.2] Most recent partition table version: 678
八月 28, 2020 10:05:17 下午 com.hazelcast.transaction.TransactionManagerService
信息: [192.168.233.1]:5702 [dev] [3.12.2] Committing/rolling-back live transactions of [192.168.233.1]:5701, UUID: d594a601-9adc-493a-afad-57c3377a4d70
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.partition.impl.MigrationManager
信息: [192.168.233.1]:5702 [dev] [3.12.2] Partition balance is ok, no need to re-partition cluster data...
八月 28, 2020 10:05:17 下午 io.vertx.core.impl.HAManager
信息: nodeb6af6227-f4d5-4730-a64d-f6fc11583760 says: Node d594a601-9adc-493a-afad-57c3377a4d70 has failed. This node will deploy 1 deploymentIDs from that node.
22:05:17.377 [vert.x-eventloop-thread-3] INFO ktor.application - No ktor.deployment.watch patterns specified, automatic reload is not active
22:05:17.378 [vert.x-eventloop-thread-3] INFO ktor.application - Responding at http://0.0.0.0:8020
八月 28, 2020 10:05:17 下午 io.vertx.core.impl.HAManager
信息: Successfully redeployed verticle CLU1 after failover

从上面的输出信息可以看到集群成员只剩下b6af6227,然后自动把CLU1的监听8020端口的web服务器迁移到幸存的集群成员中并重新部署。当然如果集群所有成员死亡,那就无法再实现高可用了。

结语

上面所介绍的只是经过短暂三天内的学习研究,我的Vertx-core的使用心得,vertx实际上还包含一堆可用的扩展功能,我个人用vertx的主要原因是它是JVM上少数实现了类似Erlang的并发模型的工具包,而且比起Akka,它的文档更容易读懂。配合Kotlin的when表达式的Erlang味更浓。建议新入门的可以多看看vertx官方的github仓库的examples,展示了大多数使用方法可供参考。

Kotlin插件化应用实践

Kotlin插件化应用踩坑之路

最近我在写一个基于Kotlin+Swing的桌面软件KtMeta,考虑在某些地方添加插件支持,于是就调研了下JVM生态下的插件框架。发现好像也不多,就一些基于OSGi的框架如Apache Felix和pf4j比较有名,star数比较多。但是听闻OSGi极其复杂,说实话我一个桌面软件开发有必要搞那么复杂吗?所以就调研了下有1.2k左右stars的pf4j框架

坑爹的PF4J

由于我目前主要是使用Kotlin+Gradle,幸好pf4j也有Gradle的demo,所以我就深入分析了下这个demo

!!!这个demo使用的gradle版本较低,因此语法和我现在用的gradle 6存在差别。

简化的目录树如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
.
|-- api
| `-- src
| `-- main
| `-- java
| `-- org
| `-- pf4j
| `-- demo
| `-- api
|-- app
| `-- src
| `-- main
| |-- java
| | `-- org
| | `-- pf4j
| | `-- demo
| `-- resources
|-- gradle
| `-- wrapper
`-- plugins
|-- plugin1
| `-- src
| `-- main
| `-- java
| `-- org
| `-- pf4j
| `-- demo
| `-- welcome
|-- plugin2
| `-- src
| `-- main
| `-- java
| `-- org
| `-- pf4j
| `-- demo
| `-- hello
`-- plugin3
`-- src
`-- main
`-- kotlin
`-- org
`-- pf4j
`-- demo
`-- kotlin

这个demo是个Gradle多模块项目。分为三个模块,api,app和plugins,而plugins里又有3个单独的plugin模块。

首先我们分析根目录的build.gradle、gradle.properties和settings.gradle:

  • build.gradle
1
2
3
4
5
6
7
8
9
10
11
12
subprojects {
apply plugin: 'java'
repositories {
mavenLocal()
mavenCentral()
}
}
// plugin location
ext.pluginsDir = rootProject.buildDir.path + '/plugins'
task build(dependsOn: [':app:uberjar'])

这个文件很简单,定义了subprojects的插件,设置了repositories,定义了一个pluginsDir的变量,指向demo_gradle根目录下的plugins目录,标记gradle的java plugin的内建命令build依赖于app模块的任务(task)uberjar。uberjar简单地说就是fatjar,把依赖也搞进去了。

  • gradle.properties
1
2
# PF4J
pf4jVersion=3.1.0

这个文件也很简单,就是设定了一个可以被gradle使用的变量。

  • settings.gradle
1
2
3
4
5
6
7
8
include 'api'
include 'app'
include 'plugins'
include 'plugins:plugin1'
include 'plugins:plugin2'
include 'plugins:plugin3'

这个文件表示了这个多模块项目要包含哪些模块,通过”:”来表示路径分隔符。

分析完根目录,就分析api模块。

1
2
3
4
5
6
7
8
9
10
api
|-- build.gradle
`-- src
`-- main
`-- java
`-- org
`-- pf4j
`-- demo
`-- api
`-- Greeting.java

这个模块中只有一个build.gradle文件,内容如下:

1
2
3
4
5
6
dependencies {
compile group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}"
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
testCompile group: 'junit', name: 'junit', version: '4.+'
}

上面的文件只配置了api模块所依赖的库,并且标记为compile group。而pf4jVersion就是引用根目录下的gradle.properties的设置。

而api也是一个再简单不过的java代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
* Copyright (C) 2012-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pf4j.demo.api;
import org.pf4j.ExtensionPoint;
/**
* @author Decebal Suiu
*/
public interface Greeting extends ExtensionPoint {
String getGreeting();
}

这个代码没什么难的,就是实现了ExtensionPoint的一个接口而已。

分析完api模块,接着分析plugins:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
plugins
|-- build.gradle
|-- disabled.txt
|-- enabled.txt
|-- plugin1
| |-- build.gradle
| |-- gradle.properties
| `-- src
| `-- main
| `-- java
| `-- org
| `-- pf4j
| `-- demo
| `-- welcome
| `-- WelcomePlugin.java
|-- plugin2
| |-- build.gradle
| |-- gradle.properties
| `-- src
| `-- main
| `-- java
| `-- org
| `-- pf4j
| `-- demo
| `-- hello
| `-- HelloPlugin.java
`-- plugin3
|-- build.gradle
|-- gradle.properties
`-- src
`-- main
`-- kotlin
`-- org
`-- pf4j
`-- demo
`-- kotlin
`-- KotlinPlugin.kt

这是一个子模块嵌套子模块,存在三个插件,分别为WelcomePlugin、HelloPlugin和KotlinPlugin。

首先是plugins的enabled.txt和disabled.txt,用于为pf4j分析哪些插件是启用的,下面的是enabled.txt:

1
2
3
4
5
6
########################################
# - load only these plugins
# - add one plugin id on each line
# - put this file in plugins folder
########################################
#welcome-plugin

而它的build.gradle文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
subprojects {
jar {
manifest {
attributes 'Plugin-Class': "${pluginClass}",
'Plugin-Id': "${pluginId}",
'Plugin-Version': "${archiveVersion}",
'Plugin-Provider': "${pluginProvider}"
}
}
task plugin(type: Jar) {
archiveBaseName = "plugin-${pluginId}"
into('classes') {
with jar
}
into('lib') {
from configurations.compile
}
archiveExtension ='zip'
}
task assemblePlugin(type: Copy) {
from plugin
into pluginsDir
}
}
task assemblePlugins(type: Copy) {
dependsOn subprojects.assemblePlugin
}
build.dependsOn project.tasks.assemblePlugins

这个文件分为三大块,subprojects,task assemblePlugins和build。文件的执行顺序从上到下。

首先分析subprojects,jar是java plugin的内置任务,但这里打包是交由task plugin负责的,jar只负责写入manifest文件。jar的manifest块负责读取子项目中的gradle.properties文件的设置并写入最终打包的zip文件的classes/META-INF/MANIFEST.MF文件内,如plugin3的gradle.properties:

1
2
3
4
5
6
version=1.0.0
pluginId=KotlinPlugin
pluginClass=org.pf4j.demo.kotlin.KotlinPlugin
pluginProvider=Anindya Chatterjee
pluginDependencies=

生成的zip包中对应的classes/META-INF/MANIFEST.MF就是这样(这里的Plugin-Version出了点问题,但是最重要的是Plugin-Class和Id):

1
2
3
4
5
Manifest-Version: 1.0
Plugin-Id: KotlinPlugin
Plugin-Provider: Anindya Chatterjee
Plugin-Version: task ':plugins:plugin3:jar' property 'archiveVersion'
Plugin-Class: org.pf4j.demo.kotlin.KotlinPlugin

在完成了jar块之后,就会进入task plugin,这个task主要是负责把build目录下的classes里面的内容以jar的组织方式打包进目标zip包中,而编译时依赖就会进入zip包的lib目录中。

完成plugin任务后,就会执行assemblePlugin,这个任务的类型是copy,from plugin to pluginsDir意味着把上一步打包出来的zip包复制到demo_gradle/plugins中。然后执行assemblePlugins

最后是表示plugins这个子项目的build任务需要先完成上述的jar,plugin和assemblePlugin和assemblePlugins任务才能执行。

最终打包完成的每个插件(zip包)都是这样的组织方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|-- META-INF
| `-- MANIFEST.MF
|-- classes
| |-- META-INF
| | |-- MANIFEST.MF
| | |-- extensions.idx
| | `-- plugin3.kotlin_module
| `-- org
| `-- pf4j
| `-- demo
| `-- kotlin
| |-- KotlinGreeting.class
| `-- KotlinPlugin.class
|-- lib
| |-- annotations-13.0.jar
| |-- commons-lang3-3.5.jar
| |-- kotlin-stdlib-1.3.50.jar
| |-- kotlin-stdlib-common-1.3.50.jar
| |-- kotlin-stdlib-jdk7-1.3.50.jar
| `-- kotlin-stdlib-jdk8-1.3.50.jar

最顶层的MANIFEST.MF只有Manifest-Version信息,而classes中的MANIFEST.MF则包含了插件项目的gradle.properties信息(Plugin-Id、Plugin-Provider、Plugin-Class和Plugin-Version)和Manifest-Version。

分析完plugins,就是app了:

1
2
3
4
5
6
7
8
9
10
11
12
app
|-- build.gradle
`-- src
`-- main
|-- java
| `-- org
| `-- pf4j
| `-- demo
| |-- Boot.java
| `-- WhazzupGreeting.java
`-- resources
`-- log4j.properties

app的build.gradle如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
apply plugin: 'application'
mainClassName = 'org.pf4j.demo.Boot'
run {
systemProperty 'pf4j.pluginsDir', '../build/plugins'
}
dependencies {
compile project(':api')
compile group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}"
annotationProcessor(group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}")
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.+'
}
task uberjar(type: Jar, dependsOn: ['compileJava']) {
zip64 true
from configurations.runtimeClasspath.asFileTree.files.collect {
exclude "META-INF/*.SF"
exclude "META-INF/*.DSA"
exclude "META-INF/*.RSA"
zipTree(it)
}
from files(sourceSets.main.output.classesDirs)
from files(sourceSets.main.resources)
manifest {
attributes 'Main-Class': mainClassName
}
archiveBaseName = "${project.name}-plugin-demo"
archiveClassifier = "uberjar"
}

apply plugin ‘application’是gradle的内置插件,用于生成可执行程序,程序的主类在uberjar的manifest的’Main-Class’里设置 。说实话这个文件也没什么好讲的。总而言之,这个项目如果拆分成三个单独的gradle项目,首先要做的是把api打包成jar,然后分别放入app和plugin项目中作为本地jar的依赖,对于app,这个api需要用implementation(在gradle 6被用于中代替compile)标记作为运行时依赖。而对于plugin,主要是作为编译时依赖(gradle 6中用compileOnly来标记)。最终打包出来的plugin中是不含api的jar的,而app需要把api包含在内,运行时调用插件即可。

关于这个app是如何加载和使用插件的网上有例子,这里不再啰嗦,但是这框架的插件脑瘫用法我是忍不了了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
...
// create the plugin manager
PluginManager pluginManager = new JarPluginManager(); // or "new ZipPluginManager() / new DefaultPluginManager()"
// start and load all plugins of application
pluginManager.loadPlugins();
pluginManager.startPlugins();
// retrieve all extensions for "Greeting" extension point
List<Greeting> greetings = pluginManager.getExtensions(Greeting.class);
for (Greeting greeting : greetings) {
System.out.println(">>> " + greeting.getGreeting());
}
// stop and unload all plugins
pluginManager.stopPlugins();
pluginManager.unloadPlugins();
...
}

我用过OpenStack的社区项目stevedore,一个python的插件框架,它的用法是动态加载完驱动就可以把驱动分配给某个变量,然后就可以通过这个变量访问驱动所实现的方法,不需要时就可以卸载驱动。但是你看看这pf4j的傻逼API,我想不懂作者是怎么设计的。

Kotlin的插件框架实现

目前我认为最好的Kotlin插件功能实现是直接使用Java反射来处理。处理方式和使用pf4j时差不多。首先建立一个api项目,提供一个接口并打包成jar。在app中要implementation这个jar,而在要开发的plugin项目中compileOnly这个jar。如果你的plugin依赖其他库,这些依赖需要使用implementation而不是api标记,使用api会导致依赖传递到app中。

我创建了一个测试api:

1
2
3
4
5
6
package io.github.hochikong.ktmeta.driver
interface AbstractDriver{
fun accessDrive(): String
fun exitDrive(): Boolean
}

接着创建了一个测试plugin,其build.gradle内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
plugins {
id 'java'
id 'org.jetbrains.kotlin.jvm' version '1.3.72'
}
group 'io.github.hochikong.ktmeta.driver.official'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
compileOnly files("libs/ktmeta-driver-api-1.0-SNAPSHOT.jar")
testCompile group: 'junit', name: 'junit', version: '4.12'
}
compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
jar {
manifest {
attributes 'Driver-Name0': 'io.github.hochikong.ktmeta.driver.official.FTPDriver'
attributes 'Driver-Name1': 'io.github.hochikong.ktmeta.driver.official.SSHDriver'
}
from {
configurations.runtime.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}

上面的代码中,对于api使用compileOnly来表示依赖本地的api的jar包。在jar块中,我使用了带不同后缀的两个键Driver-Name来表示这个插件有两个驱动实现:SSHDriver和FTPDriver。这些信息会被写入plugin的MANIFEST.MF中,然后被app所检查以查找需要加载的类的路径。

我写的测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import io.github.hochikong.ktmeta.driver.AbstractDriver
import java.lang.reflect.Method
import java.net.URL
import java.net.URLClassLoader
import java.nio.file.Files
import java.util.*
import java.util.jar.JarFile
import kotlin.system.exitProcess
/**
* Test only
* */
fun main() {
print("Enter your name: ")
val input = Scanner(System.`in`).nextLine()
if (input.trim() == "password") {
println("You can access the db.")
}
// path used for reading manifest
val path = "plugins/ktmeta-driver-1.0-SNAPSHOT.jar"
// absp used for load classes
val absp = "file:C:\\Users\\ckhoi\\IdeaProjects\\ktmeta\\plugins\\ktmeta-driver-1.0-SNAPSHOT.jar"
val m = JarFile(path).manifest
val ma = m.mainAttributes
val classNames = ma.keys.filter { it.toString().startsWith("Driver-Name") }.map { ma[it] }.toList()
println(classNames)
// val urlClassLoader = URLClassLoader(arrayOf(URL(absp)))
// val driver = urlClassLoader.loadClass(className)
// val instance = driver.newInstance()
// println(instance is AbstractDriver)
// val met: Method = driver.getMethod("accessDrive")
// println(met.invoke(instance))
// urlClassLoader.close()
print("Enter to exit...")
if (Scanner(System.`in`).nextLine() is String) {
exitProcess(0)
}
}

上面的测试代码充分利用了kotlin与java的无缝对接能力。使用JarFile读取jar包并解析manifest的内容,查找所有以Driver-Name开头的attributes并获取类的加载路径。

然后使用这些代码来加载和调用插件实现的函数:

1
2
3
4
5
6
7
val urlClassLoader = URLClassLoader(arrayOf(URL(absp)))
val driver = urlClassLoader.loadClass(className)
val instance = driver.newInstance()
println(instance is AbstractDriver)
val met: Method = driver.getMethod("accessDrive")
println(met.invoke(instance))
urlClassLoader.close()

最重要的就是约定jar包的manifest文件要包含何种字段,字段对应的值,然后约定app要扫描哪些内容。插件需要实现驱动接口并自包含依赖最后打包成jar提供给app扫描,并根据需要动态加载这些驱动并执行接口中约定的方法。

Python程序的最佳分发方式与实践

Python程序分发

Python是一个脚本语言,程序运行时需要依赖Python解释器,并且要安装相应的依赖库。对于我这种Python用户来说自然不是什么大问题,一个pip就完事了。但是如果是分发给其他windows用户,尤其是不熟悉Python的人来说,这样实在太麻烦。因此最好的办法是连同python解释器和python程序打包在一起,通过inno setup一次安装解决问题。

嵌入式Python处理

此嵌入式不是硬件的嵌入式,而是Python官方提供的免安装版解释器,可以从这里下载所需的版本:Python for Windows

注意要下载embeddable的版本,不然无法用于制作安装程序。

下载完免安装版本后解压即可,然后从这里下载pip的get-pip.py放入解压的目录中,用cmd执行:

1
python.exe get-pip.py

安装完会在根目录下生成Scripts子目录,用于存储安装后生成的exe:

Scripts目录

安装完pip就可以安装所需的依赖库和主程序了,以我的同人志爬虫框架为例子,安装完后会在Scripts生成djsc.exe(见上图)。这样程序就部署完毕了。

但是目前的python整合包尚不能用于分发,因为pip在安装这些会生成exe的程序时会把python解释器的路径写死在这些exe中,需要手动删除那些绝对路径的内容,只保留python.exe:

删除前

删除后

制作Windows安装程序

在删除解释器的绝对路径之后,就可以用inno setup创建安装包,我建议用inno setup的QuickStart Pack,会自动安装Inno Script Studio,体验很好。下面是我的脚本文件(djscf.iss):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
; Script generated by the Inno Script Studio Wizard.
; SEE THE DOCUMENTATION FOR DETAILS ON CREATING INNO SETUP SCRIPT FILES!
#define MyAppName "DJSCF"
#define MyAppVersion "0.0.3.2"
#define MyAppPublisher "Hochikong"
#define MyAppURL "https://github.com/Hochikong/DoujinshiCollectorFramework"
#define MyAppExeName "Scripts\djsc.exe"
[Setup]
; NOTE: The value of AppId uniquely identifies this application.
; Do not use the same AppId value in installers for other applications.
; (To generate a new GUID, click Tools | Generate GUID inside the IDE.)
AppId={{53AC6536-1651-4A86-AB6B-69583B96FA78}
AppName={#MyAppName}
AppVersion={#MyAppVersion}
;AppVerName={#MyAppName} {#MyAppVersion}
AppPublisher={#MyAppPublisher}
AppPublisherURL={#MyAppURL}
AppSupportURL={#MyAppURL}
AppUpdatesURL={#MyAppURL}
DefaultDirName={pf}\{#MyAppName}
DefaultGroupName={#MyAppName}
AllowNoIcons=yes
LicenseFile=C:\Users\ckhoi\Desktop\DJSCFramework\LICENSE.txt
InfoAfterFile=C:\Users\ckhoi\Desktop\DJSCFramework\README.txt
OutputDir=C:\Users\ckhoi\Desktop
OutputBaseFilename=djscForWindows-v0.0.3.2-setup
Compression=lzma
SolidCompression=yes
[Code]
function NeedsAddPath(Param: string): boolean;
var
OrigPath: string;
begin
if not RegQueryStringValue(HKEY_LOCAL_MACHINE,
'SYSTEM\CurrentControlSet\Control\Session Manager\Environment',
'Path', OrigPath)
then begin
Result := True;
exit;
end;
{ look for the path with leading and trailing semicolon }
{ Pos() returns 0 if not found }
Result := Pos(';' + Param + ';', ';' + OrigPath + ';') = 0;
end;
[Languages]
Name: "english"; MessagesFile: "compiler:Default.isl"
Name: "chinesesimplified"; MessagesFile: "compiler:Languages\ChineseSimplified.isl"
[Tasks]
Name: "desktopicon"; Description: "{cm:CreateDesktopIcon}"; GroupDescription: "{cm:AdditionalIcons}"; Flags: unchecked
Name: "quicklaunchicon"; Description: "{cm:CreateQuickLaunchIcon}"; GroupDescription: "{cm:AdditionalIcons}"; Flags: unchecked; OnlyBelowVersion: 0,6.1
[Files]
Source: "C:\Users\ckhoi\Desktop\DJSCFramework\*"; DestDir: "{app}"; Flags: ignoreversion recursesubdirs createallsubdirs
; NOTE: Don't use "Flags: ignoreversion" on any shared system files
[Icons]
Name: "{group}\{#MyAppName}"; Filename: "{app}\{#MyAppExeName}"
Name: "{group}\{cm:ProgramOnTheWeb,{#MyAppName}}"; Filename: "{#MyAppURL}"
Name: "{group}\{cm:UninstallProgram,{#MyAppName}}"; Filename: "{uninstallexe}"
Name: "{group}\打开配置文件"; Filename: "{app}\Scripts\config.ini"
Name: "{commondesktop}\{#MyAppName}"; Filename: "{app}\{#MyAppExeName}"; Tasks: desktopicon
Name: "{userappdata}\Microsoft\Internet Explorer\Quick Launch\{#MyAppName}"; Filename: "{app}\{#MyAppExeName}"; Tasks: quicklaunchicon
[Setup]
AlwaysRestart = yes
[Run]
Filename: "{app}\{#MyAppExeName}"; Description: "{cm:LaunchProgram,{#StringChange(MyAppName, '&', '&&')}}"; Flags: nowait postinstall skipifsilent
[Registry]
Root: "HKLM";Subkey: "SYSTEM\CurrentControlSet\Control\Session Manager\Environment";ValueType: expandsz;ValueName: "Path";ValueData: "{olddata};{app}";Check: NeedsAddPath('{app}')

其中最重要的几点如下:

  • 1
    #define MyAppExeName "Scripts\djsc.exe"

    在用ISS的向导程序创建脚本时会让你填写主程序的路径,如下图:

    向导程序

    但是我们的exe是在嵌入式python目录的子目录Scripts中,如果在向导中直接设置主程序为djsc.exe并没有修改上面的脚本的话,就会导致封包出来的安装程序在安装完会把djsc.exe从Scripts目录中提取到根目录并为它生成开始菜单快捷方式。

    为了解决这个问题最好的办法是直接修改MyAppExeName,前面加上子目录。

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    [Code]
    function NeedsAddPath(Param: string): boolean;
    var
    OrigPath: string;
    begin
    if not RegQueryStringValue(HKEY_LOCAL_MACHINE,
    'SYSTEM\CurrentControlSet\Control\Session Manager\Environment',
    'Path', OrigPath)
    then begin
    Result := True;
    exit;
    end;
    { look for the path with leading and trailing semicolon }
    { Pos() returns 0 if not found }
    Result := Pos(';' + Param + ';', ';' + OrigPath + ';') = 0;
    end;
    [Registry]
    Root: "HKLM";Subkey: "SYSTEM\CurrentControlSet\Control\Session Manager\Environment";ValueType: expandsz;ValueName: "Path";ValueData: "{olddata};{app}";Check: NeedsAddPath('{app}')

    上面的代码是用于安装时在PATH中写入安装路径,安装路径即’{app}’。把安装路径写入Path就可以直接在cmd中调用python。但是用户在安装完并不能直接使用,还需要下面的代码。

  • 1
    2
    [Setup]
    AlwaysRestart = yes

    上面的代码可以让用户选择是否重启计算机,重启后就可以正常使用djsc.exe了。

  • 1
    Name: "{group}\打开配置文件"; Filename: "{app}\Scripts\config.ini"

    这个代码可以把配置文件的快捷方式一并加入开始菜单,免得用户手动打开目录编辑。

结语

通过上面的实践,就可以把python程序和解释器、依赖库一并打包分发给非专业用户,尤其是pyqt程序。如果依赖库使用了numpy等库,pyinstaller经常会打包失败,但是这样打包成安装包就提高了用户的使用体验。

ESP8266与Arduino协同工作

介绍

最近重新捡起了2016年买的Arduino套件开始玩,但是最近折腾ESP8266的时候遇到了一堆坑,非常难受,本文就从固件烧录开始介绍ESP如何与Arduino协作。

硬件介绍与开发环境介绍

Arduino使用的是国产UNO,便宜好用,可能需要安装CH340的驱动才能正常烧录。ESP8266我使用的是安信可的ESP-01S,flash容量为8Mbit,自带两个GPIO,供电电压为3.3v,自带四个焊好的引脚方便接线,出厂已经自带AT固件。另外我还买了一个专门提供给ESP-01和ESP-01S的下载器,如图:

开发环境使用的是VS Code+Platform IO,具体怎么用这里不会详细介绍,VS Code可以换为CLion,可以得到更好的编程体验。串口调试工具使用的是Realterm。TCP服务器使用的是网络调试助手。

硬件的初步测试

把ESP-01S插入下载器,如图:

然后插入电脑,打开Realterm,打开Port设置,把baud改为115200,这是出厂固件默认的设置,然后选择合适的端口,点击open和change,如下图:

然后切换到Send标签页,点击EOL的两个+CR和+LF,然后输入“AT”,点Send ASCII,会看到它返回OK,表示AT固件正常,如下图:

输入“AT+GMR”能查看固件信息:

如果上述测试通过的话,就可以进一步测试其他AT指令。

ESP8266固件更新

如果通过AT+GMR查询到的固件版本太旧,可以手动烧录固件。首先参考这里查询你的硬件信息,然后在这里下载ESP FLASH TOOL,然后去这里下载你所需的固件,我用的是“出厂默认 AT 固件”。

下载好工具和固件后解压,电脑插入安装好ESP-01S的下载器,打开烧录工具,选择“ESP8266 DownloadTool”,然后看到下图

直接点选“START”,会自动在DETECTED INFO那里显示硬件信息。然后参考下图设置烧录工具:

首先选择合适的固件,因为我的ESP-01S的flash是8Mbit,所以要选择包里对应的8Mbit固件,然后在后方的框填上“0x00000”(上图里填漏了一个0)。至于其他参数,就看图片里的设置。最后是烧录软件的port和baud都要设置好,一般的baud应该为115200。

设置好点击“START”就会自动烧录固件了,烧录完可以重复上一节的AT指令测试检查是否烧录成功。

与Arduino的协作

与Arduino协作前最好先用下载器在电脑上改掉ESP-01S的默认波特率(baud),可以输入指令”AT+UART_DEF=9600,8,1,0,0”修改。具体的指令描述和用法请去查乐鑫的AT指令手册。

ESP8266本身就是一个MCU,可以用专用的SDK编程,在这个实验里,我只拿它当一个独立工作的wifi模块使用。ESP-01S的引脚参考下图:

接线方式见下表:

Arduino ESP01s
GND GND
3.3v VCC
3.3v CH_PD
PIN 11(作为软RX) TX
PIN 10(作为软TX) RX

建议UNO再加一个9v电池供电,因为上面的接线情况下直接通过USB下载程序兼供电会导致ESP-01S快速升温到烫手的程度,加上电池供电后温度上升就较为平缓也不那么烫手。如果使用NANO或者Pro这类小板,最好加一个扩展版双供电或者给ESP单独一个3.3v的外部电源。

所依赖的库主要是SoftwareSerial,用platform.io的平台很容易下载,但是请注意不要下载了ESP版本的,ESP版本是为那些在ESP32或者8266上跑Arduino环境的人提供的,我手上的ESP-01S仅作为外部模块用AT指令控制,因此建议稍微注意一下这点。

第一个程序代码(文件格式为cpp而非ino)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <Arduino.h>
#include <SoftwareSerial.h>
SoftwareSerial esp8266(11, 10); // parameters -> receive, send
void setup() {
Serial.begin(9600);
Serial.println("Arduino...OK");
esp8266.begin(9600);
// esp8266.write("AT+UART_DEF=9600,8,1,0,0\r\n");
delay(2500);
// esp8266.begin(9600);
Serial.println("ESP8266...OK");
}
void loop() {
if (esp8266.available()) {
Serial.write(esp8266.read());
}
if (Serial.available()) {
esp8266.write(Serial.read());
}
}

这个程序首先创建了软串口esp8266,并设置arduino的11号引脚为软RX,因为它对接ESP8266的TX,而10号引脚为软TX。这个程序被注释的那条AT指令是把ESP-01S的baud设置为9600而非默认的115200,因为SoftwareSerial在过高的baud下无法正常工作。本人推荐在接线前先用下载器在电脑用上述指令”AT+UART_DEF=9600, 8, 1, 0, 0”预先把baud设置为9600并保存到flash里。

上面的程序启动后,可以用Arduino IDE的串口监视器或者Realterm测试。如果使用Arduino IDE,记得把换行符的NL和CR选上:

然后就可以输入AT指令测试程序和接线有没有问题。如果输入AT指令并成功在串口监视器上显示对应的结果,那就可以尝试第二个程序,需要用到网络调试助手:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#include <Arduino.h>
#include <SoftwareSerial.h>
#define WIFI_SSID "REPLACE ME!!!" //WIFI SSID
#define WIFI_PASS "REPLACE ME!!!" // Wifi Password
#define SERVER_IP "REPLACE ME!!!" // IP of the target server.
#define TIMEOUT 5000 //Timeout for the serial communication
#define CONTINUE false
#define HALT true
#define NO_OF_PARAMETERS 1 //No of parameters sending to the server.
SoftwareSerial esp8266SerialPort(11, 10); // RX, TX, 这里的arduino 11是连接到ESP的TX,arduino 12连接到ESP的RX
boolean readResponseData(String keyword)
{ //Receive data from the serial port. Returns true if keyword matches.
String response;
long deadline = millis() + TIMEOUT;
while (long(millis()) < deadline)
{
if (esp8266SerialPort.available())
{
char ch = esp8266SerialPort.read(); // Read one character from serial port and append to a string.
response += ch;
if (keyword != "")
{
if (response.indexOf(keyword) > 0) // 找到keyword
{ //Searched keyword found.
Serial.println(response);
return true;
}
}
}
}
Serial.println(response);
return false;
}
void exception(String msg)
{ //Exception occured. Print msg and stops.
Serial.println(msg);
Serial.println("HALT");
while (true)
{
readResponseData("");
delay(60000);
}
}
boolean sendCommand(String command, String acknowledgement, boolean stopOnError)
{
esp8266SerialPort.println(command);
if (!readResponseData(acknowledgement))
{
if (stopOnError)
{
Serial.println(command + " Failed to execute.");
return false;
}
else
{
return false; // Let the caller handle it.
}
}
else
{
return true; // ack blank or ack found
}
}
boolean initializeESP8266Module()
{
esp8266SerialPort.begin(9600);
esp8266SerialPort.setTimeout(TIMEOUT); // 等待串口数据的时间
delay(2000);
//sendCommand("AT+RST", "ready", HALT); // Reset & test if the module is ready
sendCommand("AT+GMR", "OK", CONTINUE); // Retrieves the firmware ID (version number) of the module.
sendCommand("AT+CWMODE?", "OK", CONTINUE); // Get module access mode.
sendCommand("AT+CWMODE=1", "OK", HALT); // Station mode
// sendCommand("AT+CIPMUX=1", "OK", HALT); // Allow multiple connections (we'll only use the first).
String cmd = "AT+CWJAP=\""; // 连接wifi
cmd += WIFI_SSID;
cmd += "\",\"";
cmd += WIFI_PASS;
cmd += "\"";
for (int counter = 0; counter < 5; counter++)
{
if (sendCommand(cmd, "OK", CONTINUE))
{ // Join Access Point
Serial.println("Connected to WiFi.");
break;
}
else if (counter == 4)
exception("Connection to Wi-Fi failed. Please Check");
}
delay(5000);
sendCommand("AT+CWSAP=?", "OK", CONTINUE); // Test connection
sendCommand("AT+CIFSR", "OK", HALT); // Echo IP address. (Firmware bug - should return "OK".)
return 0;
}
void setup()
{
Serial.begin(9600);
Serial.println("ESP8266 Demo");
initializeESP8266Module();
Serial.println("Module is ready.");
}
void loop()
{
// 建立单连接TCP链接 AT+CIPSTART="TCP","192.XXX.XXX.XXX","50XX"
String cmd = "AT+CIPSTART=\"TCP\",\"";
cmd += SERVER_IP;
cmd += "\",5088"; //Start a TCP connection. to server SERVER_IP on port 80
if (!sendCommand(cmd, "OK", CONTINUE))
return;
delay(2000);
if (!sendCommand("AT+CIPSTATUS", "OK", CONTINUE)) // Check for TCP Connection status.
return;
// 进入发送模式
String content = "NMSL";
String request = "AT+CIPSEND=" + String(content.length());
if (!sendCommand(request, ">", CONTINUE))
{
// 当发送指令失败时
sendCommand("AT+CIPCLOSE", "", CONTINUE);
Serial.println("Connection timeout.");
return;
}
// send content
sendCommand(content, "OK", CONTINUE);
// close TCP
sendCommand("AT+CIPCLOSE", "OK", CONTINUE);
exception("ONCE ONLY");
}

烧录完成后,打开串口监视器和网络调试助手并启动服务,再启动Arduino。如果接线和其他一切正常,就会在串口打印一堆信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
ESP8266 Demo
AT+GMR
AT version:1.2.0.0(Jul 1 2016 20:04:45)
SDK version:1.5.4.1(39cb9a32)
Ai-Thinker TeogC .v51 75:
AT+CWMODE?
+CWMODE:1
OK
AT+CWMODE=1
OK
xxx
AT+CWJAP="xxx","xxx"
WIFI DISCONNECT
WIFI CONNECTED
WIFI GOT I
ATCWJAP"xxx","xxx"
busy p...
OK
Connected to WiFi.
AT+CWSAP=?
ERROR
AT+CIFSR
+CIFSR:STAIP,"192.168.2.173"
+CIFSR:STAMAC,"18:fe:34:d1:d6:bd"
OK
Module is ready.
AT+CIPSTART="TCP","192.168.2.166",5088
CONNECT
OK
AT+CIPSTATUS
STATUS:3
+CIPSTATUS:0,"TCP","192.168.2.166",5088,14009,0
OK
AT+CIPSEND=8
OK
>
busy s...
Recv 8 bytes
SEND OK
AT+CIPCLOSE
CLOSED
OK
ONCE ONLY
HALT

网络调试助手会输出相关信息:

至此,Arduino通过AT指令操作esp8266的实验就结束了。

其他

如果直接用下载器进入AT模式,连wifi掉线,可能是设置为AP模式或者没有启用DHCP,具体的AT指令可以参考乐鑫官网的AT指令手册和范例手册。

Recursive CTE解析

PostgreSQL递归CTE解析

虽然本文以PG为主,但是递归CTE这个特性在MySQL,MariaDB等RDBMS都提供了。在讲解递归CTE之前先简单介绍下什么是CTE:CTE即Common Table Expression,以With语句开头,功能是在当前SQL执行期间起到一个临时表的作用。如果规划器分析到查询代价很高,还会执行物化提高效率(与PG的物化视图行为类似),CTEs在整个SQL语句执行周期内只会执行一次,CTEs之间还可以有依赖关系。

CTE的例子

给定以下两表,查询同时修了X和Y两门课且成绩均大于等于40的学生信息:

grades表:

栏位 | 类型 | Collation | Nullable | Default
——-+———+———–+———-+———
class | text | | |
sid | integer | | |
grade | integer | | |

student表:

栏位 | 类型 | Collation | Nullable | Default
——+———+———–+———-+———
sid | integer | | |
name | text | | |

1
2
3
4
WITH ctex AS ( SELECT * FROM grades WHERE grade >= 40 AND class = 'X'),
ctey AS ( SELECT * FROM grades WHERE grade >= 40 AND class = 'Y')
SELECT ctex.sid, ctex.class, ctex.grade, ctey.class, ctey.grade
FROM ctex JOIN ctey ON ctex.sid = ctey.sid;

上面的查询我使用了两个CTE,分别查询课程为X和Y且成绩大于等于40的分数记录,然后基于SID做一个inner join,即求得同时修了X和Y的课的学生ID为1和4:

sid | class | grade | class | grade
—–+——-+——-+——-+——-
1 | X | 50 | Y | 70
4 | X | 60 | Y | 100
(2 行记录)

使用CTE能让代码可读性更高,另一方面能提高查询效率,毕竟滥用子查询伤眼效率还不一定高。

递归CTE

本文的重点在递归版本的CTE,虽然是recursive,但用迭代iterative来描述更适合。

想象数据库里有一个储存SQL查询结果的list(以python list为例),递归CTE由锚成员和迭代成员组成。步骤如下:

1.首先执行锚成员查询,将查询的结果存入list的第一个位置

2.然后执行迭代成员的查询,这个迭代成员的查询会使用到其index-1位置的SQL查询结果来计算出一个新的结果,并把结果存入list的第二个位置

3.如此类推,最后通过UNION或UNION ALL合并查询结果(行为类似pandas的concat函数)

下面是一个SQL版range的实现:

1
2
3
4
5
6
7
WITH RECURSIVE cte AS (
SELECT 1 as num
UNION
SELECT num+1 FROM cte
WHERE num < 10
)
SELECT * FROM cte;

其中,SELECT 1 AS num是锚成员,首先执行锚成员的查询,存入list,然后执行SELECT num+1 FROM cte,即调用锚成员的查询结果(num = 1),再加1;然后where num < 10是迭代条件;使用UNION合并所有行(合并list里的所有成员);上面的代码相当于python的range(1, 11),结果如下:

num

1
2
3
4
5
6
7
8
9
10
(10 行记录)

更复杂点的例子

首先执行下面的SQL语句创建表和导入数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
CREATE TABLE employees (
employee_id serial PRIMARY KEY,
full_name VARCHAR NOT NULL,
manager_id INT
);
INSERT INTO employees (
employee_id,
full_name,
manager_id
)
VALUES
(1, 'Michael North', NULL),
(2, 'Megan Berry', 1),
(3, 'Sarah Berry', 1),
(4, 'Zoe Black', 1),
(5, 'Tim James', 1),
(6, 'Bella Tucker', 2),
(7, 'Ryan Metcalfe', 2),
(8, 'Max Mills', 2),
(9, 'Benjamin Glover', 2),
(10, 'Carolyn Henderson', 3),
(11, 'Nicola Kelly', 3),
(12, 'Alexandra Climo', 3),
(13, 'Dominic King', 3),
(14, 'Leonard Gray', 4),
(15, 'Eric Rampling', 4),
(16, 'Piers Paige', 7),
(17, 'Ryan Henderson', 7),
(18, 'Frank Tucker', 8),
(19, 'Nathan Ferguson', 8),
(20, 'Kevin Rampling', 8);

然后想找出员工ID为2的雇员的全部下属(下属的下属也被包含在内)

可以使用下面的递归CTE:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
WITH RECURSIVE subordinates AS (
SELECT
employee_id,
manager_id,
full_name
FROM
employees
WHERE
employee_id = 2
UNION
SELECT
e.employee_id,
e.manager_id,
e.full_name
FROM
employees e
INNER JOIN subordinates s ON s.employee_id = e.manager_id
) SELECT
*
FROM
subordinates;

首先先选出了employee_id为2的雇员信息作为锚成员的查询结果(只有一条记录),然后在UNION后面的迭代成员从“锚结果 JOIN employees”的结果里查询出manger_id等于2的全部雇员信息,然后再重复迭代成员的查询,查找manager_id为2的雇员的所有下属。

上面的查询等价于下面的CTE:

1
2
3
4
5
6
WITH cte AS ( SELECT employee_id, manager_id, full_name FROM employees WHERE employee_id = 2),
cte1 AS ( SELECT e.employee_id, e.manager_id, e.full_name FROM employees e
JOIN cte s ON s.employee_id = e.manager_id),
cte2 AS ( SELECT e.employee_id, e.manager_id, e.full_name FROM employees e
JOIN cte1 s ON s.employee_id = e.manager_id)
SELECT * FROM cte UNION SELECT * FROM cte1 UNION SELECT * FROM cte2 ORDER BY employee_id;

为了达到同样的目的,使用嵌套CTE需要写3个CTE(还是已知employee_id为2的这个人的下属有多少层的前提下),而使用递归CTE可以简化代码,并实现普通查询无法实现的查询。

另一种解释收入效应与替代效应例子的途径

收入效应与替代效应

如果学过微观经济学,都应该会学到收入效应与替代效应这两个概念,这里引用MBA智库百科上其定义:

收入效应是指:

收入效应指由商品的价格变动所引起的实际收入水平变动,进而由实际收入水平变动所引起的商品需求量的变动。它表示消费者的效用水平发生变化。具体来说就是当你在购买一种商品时,如果该种商品的价格下降了,对于你来说,你的名义货币收入是固定不变的,但是价格下降后,你的实际购买力增强了,你就可以买得更多得该种商品.这种实际货币收入的提高,会改变消费者对商品的购买量,从而达到更高的效用水平,这就是收入效应

替代效应是指:

一种商品的名义价格nominal price)发生变化后,将同时对商品需求量发生两种影响:一种是因该种商品名义价格变化,而导致的消费者所购买的商品组合中,该商品与其他商品之间的替代,称为替代效应(substitution effect)。另一种是在名义收入不变的条件下,因一种商品名义价格变化,导致消费者实际收入变化,而导致的消费者所购商品总量的变化,称为收入效应income effect

然后在替代效应的页面里还有一段补充

替代效应是指当工资率上涨时,单位时间所获得的收入增加,劳动者为获得更多的收入,宁愿牺牲闲暇,增加劳动量以多获得工资收入。收入效应是指随着工资率的提高,劳动者可以用减少的工作时间获得同样的收入,因此当工资率提高时劳动者的供给反而减少。这两种效应的相对强弱就决定于劳动供给曲线的特殊形状,当替代效应大于收入效应时提高工资率会使劳动供给量增加,供给曲线上各点切线的斜率为正,向右上方延伸,当收入效应大于替代效应时,提高工资率反而会使劳动供给减少,所以劳动的供给曲线向后弯曲。

但是,如果只看定义,你会发现和上面的补充内容关系不大,至少你第一眼看不出来消费者收入水平、消费这些与工资、加班或休息有什么明确联系,那么怎么解释呢?

联系概念

在开始解释上面的补充内容之前,引入一个概念:机会成本。机会成本是决策时,面对行动集的时候,你所做的决策的机会成本就是那些被舍弃的行动的价值。举个例子:你选择中午叫外卖而不是自己煮,那么你的机会成本就是“自己煮”所带来的价值(比如:自己煮比叫外卖更健康,更放心,更便宜),如果你选择自己煮而不是叫外卖,那么此刻的机会成本就是“叫外卖”所带来的价值(比如:外卖能省时间、尝试新东西带来的刺激感)。与此相关的还有损失函数,有兴趣的读者可以去了解下,基于损失函数来做决策是比收益函数更好的选择。

解释

那么回到本文的主题,解释收入效益与替代效应的补充内容。把加班和休息看作行动集的两个成员,对应的机会成本分别是:用于休息、陪伴家人、娱乐的时间和精力;更高的收入。

对于补充内容中的替代效应,可以这么解释(别忘了联系其定义):商品的名义价格即行动集成员的机会成本,工资率上升,“休息”这个行为的价格就会上涨,从而使用机会成本较低的行动来代替,那么劳动者就会选择牺牲闲暇来增加劳动量,从而导致“休息”购买的减少。

再引入一个例子:假设黑人牙膏和高露洁两款牙膏里,黑人的价格上涨而高露洁不变,对于收入水平不变的消费者来说,就会选择相对较便宜的高露洁同时减少对黑人的消费。

然后是补充内容里的收入效应,可以这么解释(还是要联系其定义思考哦):工资率的上升,达到一定程度之后,“休息”的机会成本可以被“由工资率上升带来的同等工作时长的收入显著增加”来抵消,也就是说:“休息”的机会成本下降,商品的价格下降,对于收入水平固定的消费者来说,商品的价格下降实际上增加了他们的购买力,那么就会改变消费者的购买量,买更多这个降价品从而达到高效用。从而劳动者会因为工资率上升而减少加班,多休假(此时加班带来的薪酬增加的机会成本远大于把时间用于休息等事的机会成本)。

至于劳动者怎么平衡加班和休息,就只能通过图像分析来谈了,但这不是本文要讨论的内容,有兴趣的读者请阅读平狄克的微观经济学

参考资料

收入效应

替代效应

《贝叶斯统计》 茆诗松 汤银才等 著

R SUCKS!

恶心得一逼

本来我并没有什么计划去学R或者用R来做分析,但是统计学专业好几门课的老师都是以R为核心讲的,自己也用过R的线性回归之类的功能来做分析。但是今天做一个要用R的可视化的作业真的恶心死我。

为什么我讨厌R

本来我是写Python的,然后又学习了Kotlin,现在尤其注重代码规范和可读性,就算是写个小脚本也好,至少我要把注释写上去。

代码没规范,用户瞎几把写

先说代码规范方面的问题,Python的IDE支持非常强:Visual Studio、Eclipse、PyCharm,其中PyCharm我是从高中用到现在的IDE,代码提示、代码格式化各种支持都非常完善。R在开发工具方面拿什么来跟Python比?现在就只有一个R for Visual Studio能战,RStudio只能算是够用,R Gui连ipython都比不过,就是一个普通REPL。

然后是格式,有人笑写Python需要用游标卡尺,因为不同的缩进影响很大,有些人要素觉得Python不友好别怪Python,怪你自己不选择一个合适的IDE。但是来到R里面,和Java的某些不规范写法一样,if或者for循环的花括号居然能直接omit掉,然后赋值语法居然有“<-”和“=”,虽然一些正规的书里都不建议用后者,但是我觉得这是R在设计上的不一致,是引起潜在麻烦的源头。引用些别人文章的例子:

1
2
3
4
5
6
7
length(x = y <- 1:10)
c(a = 1, b <- 2) # 本来应该是c(a = 1, b = 2)
x>5
x<3
x<-3

第一行代码,本意是先赋值再传参,用一行代码完成两行代码的工作,但是这种语法本来就是增加潜在问题出现的可能性。

第三行,带来副作用,且不容易被察觉(赋值不会报错)。

最后一行,假如本来你要比较x与-3,但是忘了空格,就变成了把-3赋值给x,如果你的x是dataframe之类的东西,然后又没及时发现,后面带来的问题将引起非常恶心的问题。

虽然上述问题在Visual Studio里能稍微减轻(比如美化代码),但是本来用R的人里面会用Visual Studio来写的人就不多吧?连个类似PEP8的标准都没有的R,只靠那群不擅长编程、不懂得规范化的用户怎么能让R变得具有可读性?我已经见过些写得像屎一样的R代码:不同逻辑块的代码不空一行再写、符号后不空一格、写了等于没写的注释……我真的忍够了那些不规范的代码。

然后是Kotlin,Kotlin相比Java简洁了很多,合理使用语法糖和FP能让代码读起来像自然语言,同样,我在写Py的时候,也是尽量把代码写得直观易读,然而这种观念在很多R用户里并没有任何概念。

这里贴zen of python的部分内容,另有翻译版

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.

我就想问,有多少个写R甚至是写其他语言的人能做到上面的内容,然而很多人的代码都是implicit、nested、dense. Zen of Python 不仅仅适用于Python,还适用于其他任意一种语言——如果你想写出自己或别人能读得懂代码的话。

地雷般的第三方库

警告!不熟悉、非核心的第三方库别用。就算是核心库,能把文档写好、能看得懂的库也没几个,example?Read the FUCKING Manual!毕竟

R是自由软件,不带任何担保。

那么库也一样。

无论是Visual Studio或者RStudio都可以用help来查询函数对应的文档,但是你妈的,库的作者能写好点吗?参数描述不清不楚、例子无一丁点注释、甚至no fucking examples.

更牛逼的是,你拿某些地雷去google,你查不到一丁点有用的信息,甚至搜出来的是Matlab当中类似的东西。文档看不懂,自然要找examples,然而examples也没几个!

不优雅,look so shitty

说不出话,就是ugly

结论

优势是:R是由统计学家开发的;
劣势是:R是由统计学家开发的。

R的确在统计学中走得非常前,Python还是很难追得上,但是抛开高深的统计研究工作,只单纯做点分析,Python也不是做不了,但是我真的被R的缺点恶心死了,以后的分析基本只围绕Excel、Py和PostgreSQL,除非有些东西只能用R做,否则还是算了吧,R滚粗。

Hexo博客发布助手正式完工

本文是使用HexoPubAssistant生成并发布的

最近学了PyQt5,想着做个比较实用的软件,就选择写了个Hexo博客发布助手,不用每次都hexo new,然后找到生成的文档手动用Typora打开,编辑完成后又要打开cmder执行生成、预览和部署的命令,非常麻烦,平时真的懒得写任何文章,所以用PyQt写了这个玩意应该还是对我自己非常有利的。

杂谈

这次的开发工作,在技术上主要遇到的问题是如何处理hexo的交互和pyqtSignal的emit问题。首先hexo的预览是最麻烦的,使用os.system()来执行的话会阻塞,导致无法靠程序自身的逻辑结束预览,因此为了解决这个问题,我选择了subprocess.Popen()来执行命令,因为命令执行完毕之后会即刻返回不会阻塞,但是现在还有个问题是如何模拟用户输入CTRL-C,我暂时没找到合适的方案,选择直接用psutil扫描node.exe的进程然后用os.kill()配合signal.SIGINT来杀进程。一开始我是偷懒,直接把所有node.exe的进程全kill了,但是考虑用户体验还是做了个筛选,只kill用户开始预览和结束预览期间新增的node.exe进程。

然后是pyqtSignal,pyqtSignal必须在使用前声明并设置数据类型,数据类型仅支持python内置的基本类型,比如:

1
previewing = QtCore.pyqtSignal(str)

如果你想emit一个dict,那就需要用json.dumps()为str再emit,否则会报错。这里有篇文章可供参考

然后是打包为安装程序,不得不说Inno Setup是真的强,不过要注意的是那些用于在安装期间展示给用户的文件,最好选择rtf格式,不然显示非英文会乱码。然后安装程序的语言包,可以访问这里,把repo下载下载,然后把你想要的语言支持复制到你的Inno Setup安装目录里(如:C:\Program Files (x86)\Inno Setup 5\Languages)即可使用。

而这次这个软件的美术风格,布局上没有过多设计(我也不会高端的设计),配色选了Typora的某款主题的颜色,然后字体使用了源云明体。毕竟最近听传闻字体乱用会出事,轻则花钱重则官司,虽然我写的是开源软件(用MIT license),但是还是注意下字体的问题,一开始选择的是思源字体系列,但是在PyQt中渲染得很奇怪,有些字还不能对齐,然后搜免费字体就找到了源云明体系列的,感觉有点意思就选择了它。

最近都不想再写任何项目了…

如何在iOS和Android之间交换数据

Why?

学校的垃圾wifi,虽然是全校覆盖,但是速度只有500KB/s,虽然平时都是用OneDrive同步文件到iPad上,但是速度慢得要死,有时出去上课拷贝了课件什么的又要回宿舍再复制到电脑上再同步到iPad上麻烦死了。故我摸索出了一个比较方便可行的流程用于交换文件

How?

首先你需要在iOS上安装一个Readdle的Documents软件,手机开启热点,iOS设备连上手机热点之后,点开软件的设置,找到“WebDAV”,设置好用户名和密码后就可以开始WebDAV,然后就可以在手机上根据给出的IP地址访问,每次访问都要求输入一个码。连接上去之后就可以自由地往iOS设备上传输或者下载文件了。上传到Documents后,就可以使用拷贝用其他软件打开那些上传到iOS的文件了。最重要的是Documents这玩意免费,强烈推荐。

Then?

平时上课有三样东西必带:手机、iPad、U盘。U盘我额外买了个typec转USB-A的接口,可以otg连接到我的小米6上。有时去拷ppt或者实验室电脑里做的半成品都可以拷走,然后就可以通过前面所讲的方法,把ppt或者电子书通过WebDAV传到我的iPad上用来看,毕竟我的iPad基本上只拿来当电子书阅读器。

如何在Google Play上购买你的第一个付费app

前提准备

一台装了google play服务和google play商店的手机,一个google账号,一个美国或者你想要注册国家的vpn,我这里用的是美国的节点,所以以美服为例。上某宝买张礼品卡,美服的最低面额似乎是5刀,即30多人民币。

购买流程

首先fq,打开google play,兑换你购买的礼品卡,然后由于是首次购买,所以他会要求创建一个账号(与google账号不太一样,更多是和付费有关),然后要求填写姓名、邮编、手机号码。姓名直接填你的英文名,因为我用的是美国vpn,所以邮编选择美国的免税州的邮编,比如俄勒冈的97201,手机号码可以填国内号码,但是要记得+86,填完即可选择你想要购买的app付费了。


Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2020 HOCHIKONG's WAPORIZer All Rights Reserved.

访客数 : | 访问量 :