分布式计算的8个误区
- 网络是可靠的
- 没有延时
- 带宽是无限的
- 网络是安全的
- 网络拓扑不会改变
- 只有一个管理员
- 网络传输没有开销
- 网络是同构的
监督
监督的层级结构
akka使用Actor层级结构来描述监督.
- 当我们创建actor时,新建的Actor就是作为另一个Actor的子Actor.
- 然后是路径为/usr的守护Actor.使用actorSystem.actorOf(Props.create(a.class, parameters), name); 就是Actor的子Actor /usr/yourActor.
- 如果在一个Actor内部创建另外一个Actor, 那么可以通过调用context().actorOf(Props.create(a.class, “”)) / 得到新建的actor, 他的路径就是 /usr/yourActor/newChild.
监督策略和醉酒的厨师

假设厨师很喜欢喝酒. 但是喝了酒之后就经常给自己惹麻烦, 所以此时作为他的上级要采取一些措施
- 继续 resume actor 继续处理下一条信息
- 停止 stop 停止Actor 不做任何操作
- 重启 restart 新建一个Actor 代替原来的Actor
- 向上反映esacalte 将异常信息传递给下一个监督者
默认的监督策略.
- 运行过程中抛出异常 restart
- 运行中抛出错误 escalte()
- 初始化中发生异常 stop()
Actor 生命周期
在Actor的生命周期中会调用这几个方法,我们在需要时可以重写这些方法.
- prestart() 在构造函数之后调用
- postStop() 在重启之前调用
- reRestart(reson, message) 默认情况下会调用postStop()
- postRestart() 默认情况下会调用preStart()
for example. 假设有一个聊天引用, 每个用户都用一个actor来表示, 而Actor 之间的消息传递就表示了用户之间的聊天. 一个用户进入聊天就启动了一个Actor, 并发送一个消息来更新聊天室中的用户名单. 当一个Actor停止的时候,发送另一条消息, 将用户从聊天室的活跃名单活跃名单中删除.

定义重试次数
一旦某个消息抛出了异常,该消息就会被丢弃,不会被重新发送。监督者会执行监督策略。在继续执行(resume)或是重启(restart)的情况下,就会处理下一条消息了。如果有一个工作队列的话,我们可能会想要重试若干次,然后彻底返回失败。
1
| new OneForOneStrategy(2,Duration.create("1 minute"), PartialFunction)
|
终止或者kill 一个 Actor
- 调用 system.stop(ActorRef)
- 调用context.stop(ActorRef)
- 向Actor 发送一个PoisonPill 消息, 在那个Actor 的message中在接到这个消息之后把自己给close掉(system.stop(self()), context.stop(self())), 或者抛出一个ActorKilledException
监督其他的Actor
context.watch(actorRef)
context.unwatch(actorRef)安全重启
比方说我们有一个数据库的Akka
1
我们首先会创建这个actor 然后再通过tell 方法 把消息给到这个actor. 但是比方说因为网络问题 这个actor 创建没有成功 抛错. 进入监督模式的 resume/ restart 导致创建数据库连接的信息丢失, 不能正确的处理这个事情
2
override prestart 方法, 在里面描述连接的逻辑. 当这个akka 启动失败的话 默认的逻辑是stop 这个akka, 所以监督者需要重写监督方法使其能够再次创建
3
重写 prestart 方法. 那么出现错误的时候 会调用 preRestart -> postStop -> postRestart -> preStart 状态
actor 能够安全的存储状态, 允许我们使用无锁的方式进行并发.(所有的对象都是immutable)
- 基于Actor 状态的条件语句
- 热交换 become & unbecome
- 有限自动机
基于Actor状态的条件语句
比方说数据端客户端离线了, 那么在重新上线之前,他都无法进行处理, 这时候我们可以将信息存储起来,直到客户端恢复连接之后再做处理.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package com.akkademy.article;
import akka.actor.AbstractActorWithStash; import akka.japi.pf.ReceiveBuilder; import com.akkademy.messages.GetRequest;
public class MessageStachActor extends AbstractActorWithStash { private Boolean online = Boolean.FALSE; @Override public Receive createReceive() { ReceiveBuilder.create().match(GetRequest.class, x -> { if (online) {
} else { stash(); } }).match(Connected.class, x -> { online = true; unstash(); }).match(Disconnect.class, x -> online = false) .build(); return null; } }
|
热交换
become 这个方法receiver 块中定义的行为修改为一个新的PartialFunction.
unbecome 这个方法将Actor的修改回默认的行为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.akkademy.messages;
import akka.actor.AbstractActorWithStash; import akka.actor.ActorRef; import akka.japi.pf.ReceiveBuilder; import com.akkademy.article.Connected; import com.akkademy.article.Disconnect;
public class BecomeActor extends AbstractActorWithStash { private final ActorRef actorRef;
public BecomeActor(String refLocation) { actorRef = context().actorFor(refLocation); }
@Override public Receive createReceive() {
return ReceiveBuilder.create() .match(GetRequest.class, x -> stash()) .match(Connected.class, x -> { context().become(ReceiveBuilder.create() .match(GetRequest.class, b -> actorRef.tell("", self())).build().onMessage()); unstash(); }).match(Disconnect.class, x -> context().unbecome()) .build(); } }
|
stash 泄漏
如果在很长的时间之后才能将状态变回到unstash 需要的状态或者根本就不能恢复到那个状态,那么会造成内存耗尽或者邮箱开始丢弃信息.
所以我们要在构造函数或者preStart 中通过调度器来检查这个条件.避免泄漏的发生
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package com.akkademy.messages;
import akka.actor.AbstractActorWithStash; import akka.actor.ActorRef; import akka.japi.pf.ReceiveBuilder; import com.akkademy.article.Connected; import com.akkademy.article.ConnectionTimeoutException; import com.akkademy.article.Disconnect; import com.akkademy.article.TestConnected; import scala.concurrent.duration.Duration;
import java.sql.Time; import java.util.concurrent.TimeUnit;
public class BecomeActor extends AbstractActorWithStash { private final ActorRef actorRef; private Boolean connected;
public BecomeActor(String refLocation) { actorRef = context().actorFor(refLocation); }
@Override public Receive createReceive() {
return ReceiveBuilder.create() .match(GetRequest.class, x -> stash()) .match(Connected.class, x -> { context().become(ReceiveBuilder.create() .match(GetRequest.class, b -> actorRef.tell("", self())).build().onMessage()); unstash(); }).match(Disconnect.class, x -> context().unbecome()) .match(TestConnected.class, x -> { if (! connected) { throw new ConnectionTimeoutException(); } }) .build(); }
@Override public void preStart() { context().system() .scheduler() .scheduleOnce( Duration.create(1000, TimeUnit.MICROSECONDS), self(), new TestConnected(), context().dispatcher(), null ); } }
|
有限状态机
fsm中的类型有两个参数 状态和容器
首先 我们定义存在若干的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public enum Status {
DISCONNECTED,
CONNECTED,
CONNECTED_PENDING }
|
首先FSM 肯定存在有限的状态和状态的变迁.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class BunchingActor extends AbstractFSM<Status, LinkedBlockingQueue<GetRequest>> { public BunchingActor() { startWith(Status.DISCONNECTED, null); when(Status.DISCONNECTED, matchEvent(FlushMsg.class, (msg, container) -> stay()) .event(SendRequest.class, (msg, container) -> { container.add(msg); return stay(); }) .event(Tcp.Connected.class, (msg, container) -> { if (Objects.isNull(container.peek())) { return goTo(Status.CONNECTED); } else { return goTo(Status.CONNECTED_PENDING); } }) ); when(Status.CONNECTED, matchEvent(FlushMsg.class, (msg, container) -> stay()) .event(SendRequest.class, (msg, container) -> { container.add(msg); return goTo(Status.CONNECTED_PENDING); }) .event(String.class, (msg, container) -> { System.out.println(msg); return stay(); })); when(Status.CONNECTED_PENDING, matchEvent(FlushMsg.class, (msg, container) -> { container.clear(); return stay(); }).event(SendRequest.class, (msg, container) -> { container.add(msg); return goTo(Status.CONNECTED_PENDING); })); } }
|