0%

4-Actor 的生命周期 处理状态和错误

分布式计算的8个误区

  • 网络是可靠的
  • 没有延时
  • 带宽是无限的
  • 网络是安全的
  • 网络拓扑不会改变
  • 只有一个管理员
  • 网络传输没有开销
  • 网络是同构的

    监督

    监督的层级结构

    akka使用Actor层级结构来描述监督.
  1. 当我们创建actor时,新建的Actor就是作为另一个Actor的子Actor.
  2. 然后是路径为/usr的守护Actor.使用actorSystem.actorOf(Props.create(a.class, parameters), name); 就是Actor的子Actor /usr/yourActor.
  3. 如果在一个Actor内部创建另外一个Actor, 那么可以通过调用context().actorOf(Props.create(a.class, “”)) / 得到新建的actor, 他的路径就是 /usr/yourActor/newChild.

    监督策略和醉酒的厨师

    酒店的层级结构
    假设厨师很喜欢喝酒. 但是喝了酒之后就经常给自己惹麻烦, 所以此时作为他的上级要采取一些措施
    • 继续 resume actor 继续处理下一条信息
    • 停止 stop 停止Actor 不做任何操作
    • 重启 restart 新建一个Actor 代替原来的Actor
    • 向上反映esacalte 将异常信息传递给下一个监督者

默认的监督策略.

  • 运行过程中抛出异常 restart
  • 运行中抛出错误 escalte()
  • 初始化中发生异常 stop()

    Actor 生命周期

    在Actor的生命周期中会调用这几个方法,我们在需要时可以重写这些方法.
  • prestart() 在构造函数之后调用
  • postStop() 在重启之前调用
  • reRestart(reson, message) 默认情况下会调用postStop()
  • postRestart() 默认情况下会调用preStart()

for example. 假设有一个聊天引用, 每个用户都用一个actor来表示, 而Actor 之间的消息传递就表示了用户之间的聊天. 一个用户进入聊天就启动了一个Actor, 并发送一个消息来更新聊天室中的用户名单. 当一个Actor停止的时候,发送另一条消息, 将用户从聊天室的活跃名单活跃名单中删除.
启动流程

定义重试次数

一旦某个消息抛出了异常,该消息就会被丢弃,不会被重新发送。监督者会执行监督策略。在继续执行(resume)或是重启(restart)的情况下,就会处理下一条消息了。如果有一个工作队列的话,我们可能会想要重试若干次,然后彻底返回失败。

1
new OneForOneStrategy(2,Duration.create("1 minute"), PartialFunction)

终止或者kill 一个 Actor

  • 调用 system.stop(ActorRef)
  • 调用context.stop(ActorRef)
  • 向Actor 发送一个PoisonPill 消息, 在那个Actor 的message中在接到这个消息之后把自己给close掉(system.stop(self()), context.stop(self())), 或者抛出一个ActorKilledException

    监督其他的Actor

    context.watch(actorRef)
    context.unwatch(actorRef)

    安全重启

    比方说我们有一个数据库的Akka
    1
    我们首先会创建这个actor 然后再通过tell 方法 把消息给到这个actor. 但是比方说因为网络问题 这个actor 创建没有成功 抛错. 进入监督模式的 resume/ restart 导致创建数据库连接的信息丢失, 不能正确的处理这个事情
    2
    override prestart 方法, 在里面描述连接的逻辑. 当这个akka 启动失败的话 默认的逻辑是stop 这个akka, 所以监督者需要重写监督方法使其能够再次创建
    3
    重写 prestart 方法. 那么出现错误的时候 会调用 preRestart -> postStop -> postRestart -> preStart

    状态

    actor 能够安全的存储状态, 允许我们使用无锁的方式进行并发.(所有的对象都是immutable)
  • 基于Actor 状态的条件语句
  • 热交换 become & unbecome
  • 有限自动机

基于Actor状态的条件语句

比方说数据端客户端离线了, 那么在重新上线之前,他都无法进行处理, 这时候我们可以将信息存储起来,直到客户端恢复连接之后再做处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.akkademy.article;

import akka.actor.AbstractActorWithStash;
import akka.japi.pf.ReceiveBuilder;
import com.akkademy.messages.GetRequest;

/**
* @author dafuchen
* 2019-03-03
*/
public class MessageStachActor extends AbstractActorWithStash {
private Boolean online = Boolean.FALSE;
@Override
public Receive createReceive() {
ReceiveBuilder.create().match(GetRequest.class, x -> {
if (online) {

} else {
stash();
}
}).match(Connected.class, x -> {
online = true;
unstash();
}).match(Disconnect.class, x -> online = false)
.build();
return null;
}
}

热交换

become 这个方法receiver 块中定义的行为修改为一个新的PartialFunction.
unbecome 这个方法将Actor的修改回默认的行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.akkademy.messages;

import akka.actor.AbstractActorWithStash;
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import com.akkademy.article.Connected;
import com.akkademy.article.Disconnect;

/**
* @author dafuchen
* 2019-03-03
*/
public class BecomeActor extends AbstractActorWithStash {
private final ActorRef actorRef;

public BecomeActor(String refLocation) {
actorRef = context().actorFor(refLocation);
}

@Override
public Receive createReceive() {

return ReceiveBuilder.create()
.match(GetRequest.class, x -> stash())
.match(Connected.class, x -> {
context().become(ReceiveBuilder.create()
.match(GetRequest.class, b -> actorRef.tell("", self())).build().onMessage());
unstash();
}).match(Disconnect.class, x -> context().unbecome())
.build();
}
}

stash 泄漏

如果在很长的时间之后才能将状态变回到unstash 需要的状态或者根本就不能恢复到那个状态,那么会造成内存耗尽或者邮箱开始丢弃信息.
所以我们要在构造函数或者preStart 中通过调度器来检查这个条件.避免泄漏的发生

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.akkademy.messages;

import akka.actor.AbstractActorWithStash;
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import com.akkademy.article.Connected;
import com.akkademy.article.ConnectionTimeoutException;
import com.akkademy.article.Disconnect;
import com.akkademy.article.TestConnected;
import scala.concurrent.duration.Duration;

import java.sql.Time;
import java.util.concurrent.TimeUnit;

/**
* @author dafuchen
* 2019-03-03
*/
public class BecomeActor extends AbstractActorWithStash {
private final ActorRef actorRef;
private Boolean connected;

public BecomeActor(String refLocation) {
actorRef = context().actorFor(refLocation);
}

@Override
public Receive createReceive() {

return ReceiveBuilder.create()
.match(GetRequest.class, x -> stash())
.match(Connected.class, x -> {
context().become(ReceiveBuilder.create()
.match(GetRequest.class, b -> actorRef.tell("", self())).build().onMessage());
unstash();
}).match(Disconnect.class, x -> context().unbecome())
.match(TestConnected.class, x -> {
if (! connected) {
throw new ConnectionTimeoutException();
}
})
.build();
}


@Override
public void preStart() {
context().system()
.scheduler()

.scheduleOnce(
// 延迟执行的时间
Duration.create(1000, TimeUnit.MICROSECONDS),
self(),
new TestConnected(),
context().dispatcher(),
null
);
}
}

有限状态机

fsm中的类型有两个参数 状态和容器

首先 我们定义存在若干的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public enum Status {
/**
* 中断连接
*/
DISCONNECTED,
/**
* 连接
*/
CONNECTED,
/**
* 连接等待
*/
CONNECTED_PENDING
}

首先FSM 肯定存在有限的状态和状态的变迁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class BunchingActor extends AbstractFSM<Status,
LinkedBlockingQueue<GetRequest>> {
public BunchingActor() {
startWith(Status.DISCONNECTED, null);
//在离线的情况下
// 如果有清空的请求 -> 保持原状
// 如果有发送消息的请求 -> 把这个请求放到队列中
// 如果连接上了网络
// 请求队列中有没有内容 状态变迁到连接中
// 有内容 变迁到填充中状态中
when(Status.DISCONNECTED,
matchEvent(FlushMsg.class, (msg, container) -> stay())
.event(SendRequest.class, (msg, container) -> {
container.add(msg);
return stay();
})
.event(Tcp.Connected.class, (msg, container) -> {
if (Objects.isNull(container.peek())) {
return goTo(Status.CONNECTED);
} else {
return goTo(Status.CONNECTED_PENDING);
}
})
);
//在在线的情况下
// 如果有清空的请求 -> 保持原状(因为队列是空的, 不能进行清空)
// 如果有发送消息的请求 -> 保存到队列 同时将状态变迁到填充中的状态
when(Status.CONNECTED,
matchEvent(FlushMsg.class, (msg, container) -> stay())
.event(SendRequest.class, (msg, container) -> {
container.add(msg);
return goTo(Status.CONNECTED_PENDING);
})
.event(String.class, (msg, container) -> {
System.out.println(msg);
return stay();
}));
//在 填充中的状态
// 清空请求 -> 进行清空, 同时将状态变迁到连接中(没有内容可以被清空了)
when(Status.CONNECTED_PENDING,
matchEvent(FlushMsg.class, (msg, container) -> {
container.clear();
return stay();
}).event(SendRequest.class, (msg, container) -> {
container.add(msg);
return goTo(Status.CONNECTED_PENDING);
}));
}
}