经历3天的Vert.X学习心得

Vert.x简单介绍

Vert.x是一个基于Netty、支持多语言的、范式自由的、具有高度灵活性的响应式(Reactive)编程工具包。除了Java、Kotlin,也可以使用JavaScript、Groovy等语言编程。唯一让我感到不愉快的地方在于Vert.x 3.x版本就没有了Python支持,如果Python程序想和Vert.x交互,只能靠TCP bridge或者使用其他手段如HTTP、AMQP、gRPC等。

Vert.x包含core和其他的一系列模块,如web,数据库,响应式编程,微服务等模块,这些模块都不是必需的,只有vertx-core才是核心模块,jar包大概在1.2MB左右。

使用Vert.x可以实现异步编程,并解决js的回调地狱,同时也得益于它所提供的类Actor模型,可以写出具有良好的可扩展性、鲁棒性的分布式程序,使用上远比Akka要更加容易。下面介绍下它的设计理念。

理解Vert.x的参考材料

了解Java Swing

在介绍Vert.x的设计之前,可以先看看Java Swing的设计。Java Swing是单线程的,但是它通过一定的办法解决线程不安全的问题。一个Swing程序启动后会有一个主线程和一个Event Dispatch Thread,主线程即这个Java程序,而EDT主要是负责处理GUI事件的线程。当GUI上发生了变化,比如按下一个按钮,就会把事件塞入Event Queue中,唯一的EDT负责处理Event Queue中排队的事件。如果直接把阻塞方法交给EDT处理(直接把阻塞方法包含在GUI事件中),就会阻塞EDT,导致GUI卡死。因此,如果GUI事件涉及阻塞方法,应该交由SwingWorker处理,SwingWorker会用别的线程来处理阻塞任务,并在任务处理中或完成之后修改共享变量或返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.TimeUnit
import javax.swing.JButton
import javax.swing.JFrame
import javax.swing.JProgressBar
import javax.swing.SwingWorker
fun main() {
GenGui()
}
class GenGui{
val jb = JProgressBar(0, 2000).apply {
setBounds(40, 40, 160, 30)
value = 0
isStringPainted = true
}
val btn = JButton("Click Me").apply {
setBounds(40, 90, 100, 30)
addActionListener {
iter(0, 2000)
}
}
val frame = JFrame("progress bar demo").apply {
add(jb)
add(btn)
setSize(250, 200)
layout = null
isVisible = true
}
fun iter(start: Int, end: Int){
val swWorker = object : SwingWorker<Int, Unit>(){
override fun doInBackground(): Int {
var index = start
while (index <= end){
jb.value = index
index += 20
println(index)
TimeUnit.MILLISECONDS.sleep(100)
}
return index
}
override fun done() {
println("All done")
}
}
swWorker.execute()
}
}

上面的代码中,当btn被按下,就会往Event Queue里塞入一个事件,轮到被EDT处理时,就会触发iter函数。而iter函数里通过把进度条更新的任务交由swWorker处理,这个worker在工作的时候会直接修改进度条的value,从而实现不阻塞EDT进程。如果阻塞任务没有委托SwingWorker,就会阻塞EDT导致界面卡死。

如果想安全地修改GUI元素,应该通过Swing的SwingUtilities.invokeLater()或者invokeAndWait()把GUI修改事件放入事件队列中排队,并由EDT负责更新GUI。

了解Erlang

在Erlang中,一个Erlang“进程”就是一个Actor,在Erlang虚拟机中可以低成本地创建上万乃至十万的“进程”,“进程”间的交互只能通过消息传递实现,消息是不可变的。每个“进程”单独维护自己的内部状态,因为没有共享状态,就不需要任何锁,能大幅简化并发编程。当一个Actor接收到消息后,就会通过模式匹配找到对应消息的处理函数,处理完成可以通过消息传递把结果发回给原发送者,也可以根据需要发送给下一个Actor。

Erlang的并发模型是非常直观和容易理解的:

  • 每个Actor就好像一个独立的人,如果你想叫你的同事帮你收集些资料然后给你在会议中用来做演示,你就需要用消息传递告知TA,不管你用电话还是邮件发送信息都好(传输接口)。
  • 你发送了请求给TA,你就可以继续做别的事(异步),不需要停下手上的工作等待回复(阻塞)。
  • 如果TA嫌弃你(内部状态),你作为发送者是无法看到的,经过TA的思考(模式匹配),TA决定不帮你收集,就通过消息传递告知你“我很忙,你找别人吧”。TA可能还会通过消息传递告诉朋友说:被人骚扰了很烦。
  • 或者TA暗恋你(内部状态),就帮你收集,等收集完成后再消息传递,发送结果给你。由于你不用阻塞等待,在TA发送结果给你之前,你可以继续手上的开会准备。
  • 当你接收到资料后,就可以开会,展示结果(对外部对象的副作用)

但是这样是不够的,假设你在开会前需要把资料抄送给直属领导审核(回调),直属领导又要和其他领导开会讨论(回调),就会导致代码中的回调地狱。

Vert.x分析

Vert.x的设计也是跟Swing类似的。当一个vertx实例(定位类似Erlang的虚拟机)被创建的时候,它会根据用户配置或者CPU可用核数(含线程)创建含一定数量的Event Loop Thread(下文简称ELT)的pool,在默认情况下,这个线程数量是可用核数*2(比如我的6700HQ,4核8线程,默认会创建16个event loop的pool)。ELT的作用和Swing的EDT类似,主要作用是分发消息并立即返回,而耗时任务交由另一个线程池负责。

Vertx的两个核心元素分别是Verticle和EventBus。一个Verticle就相当于Erlang进程、actor。而EventBus则是Verticle之间沟通的核心。

Verticle

要实现一个Verticle只需要继承AbstractVerticle,并重写start方法,stop方法是可选的:

1
2
3
4
5
6
class CLU2 : AbstractVerticle(){
private val server = embeddedServer(Netty, 8030, module = Application::module2)
override fun start() {
server.start()
}
}

上面的Verticle维护了一个内部状态server(Ktor的服务器),start方法会在Verticle部署或者接收到消息的时候执行。vertx会确保start方法只会被同一个event loop处理,但是每个event loop在底层都有可能由不同的ELT负责执行。在start方法里绝对不能写while(true)之类的死循环,就跟Swing一样不应该把循环写进GUI事件中。

The Golden Rule - Don’t Block the Event Loop

利用Kotlin的when表达式,可以写出类似Erlang的模式匹配代码来处理Verticle所接收到的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ConsumePrint : AbstractVerticle() {
private val server = embeddedServer(Netty, 8020, module = Application::module2)
private val worker = LoopPrint()
private var name: String = ""
private val stack = mutableListOf<String>()
override fun start() {
vertx.eventBus().consumer<String>("url1") { msg ->
when {
msg.body() == "launch" -> {
server.start()
Thread(worker).apply { start() }
vertx.eventBus().send("url3", "launch done")
}
msg.body() == "stop" -> {
println("Closing server")
server.stop(10, 10)
worker.close()
}
msg.body() == "simpleLoop" -> {
vertx.executeBlocking<String>({ future ->
run {
var d = 0.0
for (i in 0..10000000) d += (i * 10 / 23 * 10.02368899)
future.complete(d.toString())
}
}) { res -> println(" !!!!!!!! $res") }
}
msg.body() == "changeName" -> {
val context = vertx.orCreateContext
context.put("key", "This is value")
context.runOnContext {
println(context.get("key") as String)
}
}
msg.body() == "addStack" -> {
vertx.executeBlocking<String>({ promise ->
run {
Thread.sleep(10000)
promise.complete(if (stack.isNotEmpty()) stack.last() else "stack 1")
}
}) { res ->
run {
stack.add(res.result().toString())
vertx.eventBus().send("url3", "I received your msg!!")
println(stack)
}
}
}
msg.body() == "Stack" -> {
vertx.eventBus().send("url3", "stack is $stack")
}
else -> println("1 received message.body is ${msg.body()}")
}
}
}
}

上面的测试代码就是通过when来判断来自其他地址发送的消息体,然后执行不同的操作。

也可以对消息头进行判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Waste : AbstractVerticle(){
private val mapper = ObjectMapper().registerModule(KotlinModule())
override fun start() {
vertx.eventBus().localConsumer<String>("url2") { msg ->
val header = msg.headers()
when(header.get("msg")){
"waste" -> {
println("from url2: ${mapper.readValue<Person>(msg.body())}")
msg.reply("you are wasting")
}
else -> {
println("Waste wrong")
msg.fail(23, "Waste wrong reply")
}
}
}
}
}

如果需要执行耗时任务,可以部署worker verticle或者用executeBlocking:

一个worker verticle其实跟普通verticle差别不大,区别在于内部是由worker pool的线程来执行耗时任务,部署的时候也要设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
val vertx = Vertx.vertx()
println("size is: ${VertxOptions().eventLoopPoolSize}")
......
vertx.deployVerticle(WorkerV::class.java.name, deploymentOptionsOf(worker = true)) { res ->
run {
if (res.succeeded()) {
println("Deployment id is: ${res.result()}")
ids.add(res.result())
} else {
println("Deployment failed!")
}
}
}

在最新的稳定版本中(3.9.2),可以直接在verticle内部用executeBlocking来执行阻塞代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
msg.body() == "addStack" -> {
vertx.executeBlocking<String>({ promise ->
run {
Thread.sleep(10000)
promise.complete(if (stack.isNotEmpty()) stack.last() else "stack 1")
}
}) { res ->
run {
stack.add(res.result().toString())
vertx.eventBus().send("url3", "I received your msg!!")
println(stack)
}
}
}

executeBlocking的泛型参数是用于标记返回类型,代码里第一个lambda是处理器,用于执行耗时任务,并通过complete方法设置返回值,第二个lambda是对结果的callback方法,即对处理器的返回值进行处理。但是如果处理器中的阻塞代码可能会阻塞超过10秒,就不建议用使用上述两种方法了。最好单独创建进程来执行,比如本小节最开头的Ktor服务器的embeddedServer,start方法只负责启动ktor服务器。如果需要与verticle的context交互,应该通过runOnContext或者event bus来介入event loop中:

1
2
3
4
5
6
7
msg.body() == "changeName" -> {
val context = vertx.orCreateContext
context.put("key", "This is value")
context.runOnContext {
println(context.get("key") as String)
}
}

上面通过put方法对verticle的上下文添加了一个键为key,值为this is value的内部共享状态。而runOnContext其实就跟Swing的invokeLater方法类似,都是把事件提交进队列中就返回,等待event loop的处理(异步任务)。

至于什么是context?context是为每个event loop分配的一个对象。

当 Vert.x 传递一个事件给处理器或者调用 Verticle 的 start 或 stop 方法时,它会关联一个 Context 对象来执行。通常来说这个 Context 会是一个 Event Loop Context,它绑定到了一个特定的 Event Loop 线程上。所以在该 Context 上执行的操作总是在同一个 Event Loop 线程中。对于运行内联的阻塞代码的 Worker Verticle 来说,会关联一个 Worker Context,并且所有的操作运都会运行在 Worker 线程池的线程上。

每个 Verticle 在部署的时候都会被分配一个 Context(根据配置不同,可以是Event Loop Context 或者 Worker Context),之后此 Verticle 上所有的普通代码都会在此 Context 上执行(即对应的 Event Loop 或Worker 线程)。一个 Context 对应一个 Event Loop 线程(或 Worker 线程),但一个 Event Loop (Thread) 可能对应多个 Context。

部署完verticle之后可以通过undeploy方法取消已经部署的verticle实例,但是经过测试,目前3.9.2中如果在关闭vertx对象前把全部verticle手动取消部署,最终在vertx对象的close方法执行时会报异常:java.lang.IllegalStateException: Unknown deployment

Event Bus

Event bus其实就是类似AMQP的消息队列,在Erlang中进程主要是靠进程的pid来作为地址发送和接收消息,而在Vertx中,只需要一个字符串就可以作为地址,字符串的格式并没有固定的格式要求,因此非常灵活。在Vertx的event bus中主要有以下几种消息传递方式:

  • publish:所有订阅该地址的verticle都会收到这个消息。
  • send:简单地发送一个消息到该地址,但是如果有多个verticle都订阅该地址,只有一个会接收到消息。
  • request:跟send类似,但是可以添加回复处理器,可以处理接收者返回的reply,类似RPC调用。

用request发送消息可以包含消息头(send不支持),可以这样设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
data class Person(val name: String, val age: Int)
......
vertx.eventBus().request<String>(
"url2",
mapper.writeValueAsString(Person("css", 24)),
deliveryOptionsOf(headers = mapOf("msg" to "no waste"))
) { reply ->
if (reply.succeeded()) {
println("Main received reply: ${reply.result().body()}")
} else {
println("Main received failed reply: ${reply.cause().message}")
}
}

上面的代码中,url为url2。跟着的是一个使用Jackson序列化的data class转换得到的字符串,作为消息体。deliveryOptionsOf是发送用的配置信息,这里就设置了一个map作为headers。最后的lambda是回复处理器,用于处理来自url2的verticle返回的回复。

为了接收消息,可以用consumer或者localConsumer来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
vertx.eventBus().localConsumer<String>("url2") { msg ->
val header = msg.headers()
when(header.get("msg")){
"waste" -> {
println("from url2: ${mapper.readValue<Person>(msg.body())}")
msg.reply("you are wasting")
}
else -> {
println("Waste wrong")
msg.fail(23, "Waste wrong reply")
}
}
}
1
2
3
4
5
6
7
8
vertx.eventBus().consumer<String>("url1") { msg ->
when(msg.body()) {
"launch" -> {
server.start()
Thread(worker).apply { start() }
vertx.eventBus().send("url3", "launch done")
}
......

consumer比起local的,可以处理集群中的消息。consumer的msg可以用于回复发送者,比如reply和fail方法。

被发送的消息可以是普通的文本,或者用vertx提供的json dsl来创建JSON,vertx会自动处理序列化和反序列化的任务。但是我觉得目前3.9.2的json dsl并不好用,因此我选择直接用Jackson来对data class进行序列化用于发送。

Future

Future是vertx提供的用于解决回调地狱的工具,下面的代码演示了在3.9.2下如何正确地使用Future:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import io.vertx.core.*
import java.util.*
import kotlin.system.exitProcess
class ComposeExample : AbstractVerticle() {
override fun start() {
val future = anAsyncAction()
future.compose { name: String -> anotherAsyncAction(name) }
.onComplete { ar: AsyncResult<String> ->
if (ar.failed()) {
println("Something bad happened")
ar.cause().printStackTrace()
} else {
println("Result: " + ar.result())
}
}
}
private fun anAsyncAction(): Future<String> {
val promise = Promise.promise<String>()
// mimic something that take times
vertx.setTimer(1000) { promise.complete("world") }
return promise.future()
}
private fun anotherAsyncAction(name: String): Future<String> {
val promise = Promise.promise<String>()
// mimic something that take times
vertx.setTimer(100) { promise.complete("hello $name") }
return promise.future()
}
}
fun main() {
val vert = Vertx.vertx()
vert.deployVerticle(ComposeExample::class.java.name)
println("enter to exit...")
Scanner(System.`in`).nextLine()
exitProcess(0)
}

可以使用compose来调度异步任务的顺序。在我刚接触Vertx的时候我曾被Future和executeBlocking这种用于执行阻塞任务的函数有什么相关关系所困惑。其实Future只是一个异步任务容器,一个封装了future的函数里可以包含executeBlocking这种阻塞任务,而Future只是一个用来解决回调地狱的工具。

高可用

Vertx的架构本身就适合于分布式应用,而它通过集群管理器设置为集群模式并启用高可用模式。我目前就尝试了vertx默认的Hazelcast,但实际上也可以使用Zookeeper等工具作为集群管理器。

想使用Hazelcast,只需要在build.gradle里添加相关依赖:

1
2
3
4
5
6
7
dependencies {
/* Useless in kotlin 1.4.0
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"*/
implementation "io.vertx:vertx-core:3.9.2"
implementation "io.vertx:vertx-hazelcast:3.9.2"
implementation "io.vertx:vertx-lang-kotlin:3.9.2"
}

然后就可以开始编写代码:

集群节点1的verticle:

1
2
3
4
5
6
7
8
9
10
11
12
class CLU1 : AbstractVerticle(){
private val server = embeddedServer(Netty, 8020, module = Application::module2)
override fun start() {
server.start()
}
}
fun main() {
Vertx.clusteredVertx(vertxOptionsOf(haEnabled = true)) { vertx ->
vertx.result().deployVerticle(CLU1::class.java.name, deploymentOptionsOf(ha = true))
}
}

集群节点2的verticle:

1
2
3
4
5
6
7
8
9
10
11
12
class CLU2 : AbstractVerticle(){
private val server = embeddedServer(Netty, 8030, module = Application::module2)
override fun start() {
server.start()
}
}
fun main() {
Vertx.clusteredVertx(vertxOptionsOf(haEnabled = true)) { vertx ->
vertx.result().deployVerticle(CLU2::class.java.name, deploymentOptionsOf(ha = true))
}
}

通过clusteredVertx可以创建集群模式的vertx实例,两个ktor实例都在本地运行,只是监听的端口不同。可以先启动CLU1看看会输出什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
八月 28, 2020 9:56:39 下午 com.hazelcast.instance.AddressPicker
信息: [LOCAL] [dev] [3.12.2] Prefer IPv4 stack is true, prefer IPv6 addresses is false
八月 28, 2020 9:56:39 下午 com.hazelcast.instance.AddressPicker
信息: [LOCAL] [dev] [3.12.2] Picked [192.168.233.1]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
八月 28, 2020 9:56:39 下午 com.hazelcast.system
信息: [192.168.233.1]:5701 [dev] [3.12.2] Hazelcast 3.12.2 (20190802 - e34b163) starting at [192.168.233.1]:5701
八月 28, 2020 9:56:39 下午 com.hazelcast.system
信息: [192.168.233.1]:5701 [dev] [3.12.2] Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
八月 28, 2020 9:56:40 下午 com.hazelcast.spi.impl.operationservice.impl.BackpressureRegulator
信息: [192.168.233.1]:5701 [dev] [3.12.2] Backpressure is disabled
八月 28, 2020 9:56:41 下午 com.hazelcast.instance.Node
信息: [192.168.233.1]:5701 [dev] [3.12.2] Creating MulticastJoiner
八月 28, 2020 9:56:42 下午 com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl
信息: [192.168.233.1]:5701 [dev] [3.12.2] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks)
八月 28, 2020 9:56:42 下午 com.hazelcast.internal.diagnostics.Diagnostics
信息: [192.168.233.1]:5701 [dev] [3.12.2] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
八月 28, 2020 9:56:42 下午 com.hazelcast.core.LifecycleService
信息: [192.168.233.1]:5701 [dev] [3.12.2] [192.168.233.1]:5701 is STARTING
八月 28, 2020 9:56:44 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.233.1]:5701 [dev] [3.12.2]
Members {size:1, ver:1} [
Member [192.168.233.1]:5701 - d594a601-9adc-493a-afad-57c3377a4d70 this
]
八月 28, 2020 9:56:44 下午 com.hazelcast.core.LifecycleService
信息: [192.168.233.1]:5701 [dev] [3.12.2] [192.168.233.1]:5701 is STARTED
......
八月 28, 2020 9:56:45 下午 com.hazelcast.internal.partition.impl.PartitionStateManager
信息: [192.168.233.1]:5701 [dev] [3.12.2] Initializing cluster partition table arrangement...
八月 28, 2020 9:56:46 下午 io.vertx.core.impl.HAManager
信息: A quorum has been obtained. Any deploymentIDs waiting on a quorum will now be deployed
21:56:46.298 [vert.x-eventloop-thread-1] INFO ktor.application - No ktor.deployment.watch patterns specified, automatic reload is not active
21:56:46.474 [vert.x-eventloop-thread-1] INFO ktor.application - Responding at http://0.0.0.0:8020

可以看到目前的集群成员只有一个member,uid为d594a601-9adc-493a-afad-57c3377a4d70,并标记了this。Ktor服务器在localhost的8020端口监听。

此时再启动CLU2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
八月 28, 2020 10:00:16 下午 com.hazelcast.instance.AddressPicker
信息: [LOCAL] [dev] [3.12.2] Prefer IPv4 stack is true, prefer IPv6 addresses is false
八月 28, 2020 10:00:16 下午 com.hazelcast.instance.AddressPicker
信息: [LOCAL] [dev] [3.12.2] Picked [192.168.233.1]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true
八月 28, 2020 10:00:16 下午 com.hazelcast.system
信息: [192.168.233.1]:5702 [dev] [3.12.2] Hazelcast 3.12.2 (20190802 - e34b163) starting at [192.168.233.1]:5702
八月 28, 2020 10:00:16 下午 com.hazelcast.system
信息: [192.168.233.1]:5702 [dev] [3.12.2] Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
八月 28, 2020 10:00:17 下午 com.hazelcast.spi.impl.operationservice.impl.BackpressureRegulator
信息: [192.168.233.1]:5702 [dev] [3.12.2] Backpressure is disabled
八月 28, 2020 10:00:18 下午 com.hazelcast.instance.Node
信息: [192.168.233.1]:5702 [dev] [3.12.2] Creating MulticastJoiner
八月 28, 2020 10:00:19 下午 com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl
信息: [192.168.233.1]:5702 [dev] [3.12.2] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks)
八月 28, 2020 10:00:19 下午 com.hazelcast.internal.diagnostics.Diagnostics
信息: [192.168.233.1]:5702 [dev] [3.12.2] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
八月 28, 2020 10:00:19 下午 com.hazelcast.core.LifecycleService
信息: [192.168.233.1]:5702 [dev] [3.12.2] [192.168.233.1]:5702 is STARTING
八月 28, 2020 10:00:19 下午 com.hazelcast.internal.cluster.impl.MulticastJoiner
信息: [192.168.233.1]:5702 [dev] [3.12.2] Trying to join to discovered node: [192.168.233.1]:5701
八月 28, 2020 10:00:19 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Connecting to /192.168.233.1:5701, timeout: 10000, bind-any: true
八月 28, 2020 10:00:19 下午 com.hazelcast.nio.tcp.TcpIpConnection
信息: [192.168.233.1]:5702 [dev] [3.12.2] Initialized new cluster connection between /192.168.233.1:7582 and /192.168.233.1:5701
八月 28, 2020 10:00:25 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.233.1]:5702 [dev] [3.12.2]
Members {size:2, ver:2} [
Member [192.168.233.1]:5701 - d594a601-9adc-493a-afad-57c3377a4d70
Member [192.168.233.1]:5702 - b6af6227-f4d5-4730-a64d-f6fc11583760 this
]
八月 28, 2020 10:00:26 下午 com.hazelcast.core.LifecycleService
信息: [192.168.233.1]:5702 [dev] [3.12.2] [192.168.233.1]:5702 is STARTED
......
八月 28, 2020 10:00:27 下午 io.vertx.core.impl.HAManager
信息: A quorum has been obtained. Any deploymentIDs waiting on a quorum will now be deployed
22:00:27.787 [vert.x-eventloop-thread-1] INFO ktor.application - No ktor.deployment.watch patterns specified, automatic reload is not active
22:00:27.888 [vert.x-eventloop-thread-1] INFO ktor.application - Responding at http://0.0.0.0:8030

同时在CLU1的console输出了配对的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
八月 28, 2020 10:00:19 下午 com.hazelcast.nio.tcp.TcpIpConnection
信息: [192.168.233.1]:5701 [dev] [3.12.2] Initialized new cluster connection between /192.168.233.1:5701 and /192.168.233.1:7582
八月 28, 2020 10:00:25 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.233.1]:5701 [dev] [3.12.2]
Members {size:2, ver:2} [
Member [192.168.233.1]:5701 - d594a601-9adc-493a-afad-57c3377a4d70 this
Member [192.168.233.1]:5702 - b6af6227-f4d5-4730-a64d-f6fc11583760
]
八月 28, 2020 10:00:25 下午 com.hazelcast.internal.partition.impl.MigrationManager
信息: [192.168.233.1]:5701 [dev] [3.12.2] Re-partitioning cluster data... Migration queue size: 271
八月 28, 2020 10:00:27 下午 com.hazelcast.internal.partition.impl.MigrationThread
信息: [192.168.233.1]:5701 [dev] [3.12.2] All migration tasks have been completed. (lastRepartitionTime=Fri Aug 28 22:00:25 CST 2020, completedMigrations=271, totalCompletedMigrations=271, elapsedMigrationTime=1052ms, totalElapsedMigrationTime=1052ms)

可以看到在我的本地环境已经运行了一个两个成员的集群,在不同的console里是会用this显示哪个uid才是目前的vertx实例。

由于两个CLU都启用了高可用的设置,如果现在随便关闭一个vertx集群成员,vertx就会自动把失效的成员中的verticle迁移到其他可用集群成员中。下面是我kill了CLU1后CLU2的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
八月 28, 2020 10:02:27 下午 com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor
信息: [192.168.233.1]:5702 [dev] [3.12.2] Invocations:1 timeouts:1 backup-timeouts:0
八月 28, 2020 10:05:11 下午 com.hazelcast.nio.tcp.TcpIpConnection
警告: [192.168.233.1]:5702 [dev] [3.12.2] Connection[id=1, /192.168.233.1:7582->/192.168.233.1:5701, qualifier=null, endpoint=[192.168.233.1]:5701, alive=false, type=MEMBER] closed. Reason: Exception in Connection[id=1, /192.168.233.1:7582->/192.168.233.1:5701, qualifier=null, endpoint=[192.168.233.1]:5701, alive=true, type=MEMBER], thread=hz._hzInstance_1_dev.IO.thread-in-0
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:113)
at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:369)
at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:354)
at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:280)
at com.hazelcast.internal.networking.nio.NioThread.run(NioThread.java:235)
八月 28, 2020 10:05:11 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Connecting to /192.168.233.1:5701, timeout: 10000, bind-any: true
八月 28, 2020 10:05:13 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Could not connect to: /192.168.233.1:5701. Reason: SocketException[Connection refused: no further information to address /192.168.233.1:5701]
八月 28, 2020 10:05:13 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Connecting to /192.168.233.1:5701, timeout: 10000, bind-any: true
八月 28, 2020 10:05:15 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Could not connect to: /192.168.233.1:5701. Reason: SocketException[Connection refused: no further information to address /192.168.233.1:5701]
八月 28, 2020 10:05:15 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Connecting to /192.168.233.1:5701, timeout: 10000, bind-any: true
八月 28, 2020 10:05:17 下午 com.hazelcast.nio.tcp.TcpIpConnector
信息: [192.168.233.1]:5702 [dev] [3.12.2] Could not connect to: /192.168.233.1:5701. Reason: SocketException[Connection refused: no further information to address /192.168.233.1:5701]
八月 28, 2020 10:05:17 下午 com.hazelcast.nio.tcp.TcpIpConnectionErrorHandler
警告: [192.168.233.1]:5702 [dev] [3.12.2] Removing connection to endpoint [192.168.233.1]:5701 Cause => java.net.SocketException {Connection refused: no further information to address /192.168.233.1:5701}, Error-Count: 5
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.impl.MembershipManager
警告: [192.168.233.1]:5702 [dev] [3.12.2] Member [192.168.233.1]:5701 - d594a601-9adc-493a-afad-57c3377a4d70 is suspected to be dead for reason: No connection
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.impl.MembershipManager
信息: [192.168.233.1]:5702 [dev] [3.12.2] Starting mastership claim process...
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.impl.MembershipManager
信息: [192.168.233.1]:5702 [dev] [3.12.2] Local MembersView{version=2, members=[MemberInfo{address=[192.168.233.1]:5701, uuid=d594a601-9adc-493a-afad-57c3377a4d70, liteMember=false, memberListJoinVersion=1}, MemberInfo{address=[192.168.233.1]:5702, uuid=b6af6227-f4d5-4730-a64d-f6fc11583760, liteMember=false, memberListJoinVersion=2}]} with suspected members: [[192.168.233.1]:5701] and initial addresses to ask: []
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.ClusterService
信息: [192.168.233.1]:5702 [dev] [3.12.2]
Members {size:1, ver:3} [
Member [192.168.233.1]:5702 - b6af6227-f4d5-4730-a64d-f6fc11583760 this
]
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.cluster.impl.MembershipManager
信息: [192.168.233.1]:5702 [dev] [3.12.2] Mastership is claimed with: MembersView{version=3, members=[MemberInfo{address=[192.168.233.1]:5702, uuid=b6af6227-f4d5-4730-a64d-f6fc11583760, liteMember=false, memberListJoinVersion=2}]}
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.partition.InternalPartitionService
信息: [192.168.233.1]:5702 [dev] [3.12.2] Fetching most recent partition table! my version: 678
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.partition.InternalPartitionService
信息: [192.168.233.1]:5702 [dev] [3.12.2] Most recent partition table version: 678
八月 28, 2020 10:05:17 下午 com.hazelcast.transaction.TransactionManagerService
信息: [192.168.233.1]:5702 [dev] [3.12.2] Committing/rolling-back live transactions of [192.168.233.1]:5701, UUID: d594a601-9adc-493a-afad-57c3377a4d70
八月 28, 2020 10:05:17 下午 com.hazelcast.internal.partition.impl.MigrationManager
信息: [192.168.233.1]:5702 [dev] [3.12.2] Partition balance is ok, no need to re-partition cluster data...
八月 28, 2020 10:05:17 下午 io.vertx.core.impl.HAManager
信息: nodeb6af6227-f4d5-4730-a64d-f6fc11583760 says: Node d594a601-9adc-493a-afad-57c3377a4d70 has failed. This node will deploy 1 deploymentIDs from that node.
22:05:17.377 [vert.x-eventloop-thread-3] INFO ktor.application - No ktor.deployment.watch patterns specified, automatic reload is not active
22:05:17.378 [vert.x-eventloop-thread-3] INFO ktor.application - Responding at http://0.0.0.0:8020
八月 28, 2020 10:05:17 下午 io.vertx.core.impl.HAManager
信息: Successfully redeployed verticle CLU1 after failover

从上面的输出信息可以看到集群成员只剩下b6af6227,然后自动把CLU1的监听8020端口的web服务器迁移到幸存的集群成员中并重新部署。当然如果集群所有成员死亡,那就无法再实现高可用了。

结语

上面所介绍的只是经过短暂三天内的学习研究,我的Vertx-core的使用心得,vertx实际上还包含一堆可用的扩展功能,我个人用vertx的主要原因是它是JVM上少数实现了类似Erlang的并发模型的工具包,而且比起Akka,它的文档更容易读懂。配合Kotlin的when表达式的Erlang味更浓。建议新入门的可以多看看vertx官方的github仓库的examples,展示了大多数使用方法可供参考。

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2020 HOCHIKONG's WAPORIZer All Rights Reserved.

访客数 : | 访问量 :