年糕的爸爸在B乎上开了专栏前路迢迢,教麻瓜使用Akka和Clojure. 然后就上船了.但是 akka 和clojure 是个什么鬼.但是应该是永远也学会不会的吧. #从入门到放弃# #嘛哩嘛哩哄#
什么是Actor
在我的理解里面Actor 就是固定的一个流程(process)
Actor做什么
发送
在Receiver中可以向sender, 向任意的Actor (只要能找到其ActorRef) 发送消息. 消息是fire and forget的 除了收到消息的Actor 发送一个消息给发送方, 发送方是不知道响应的状态的.
Actor 一次只能接受一个信息,如果多个Actor 同时修改修改一个信息, 那么最终的结果随着时间的交织而变化
创建
Actor 可以创建其他的Actor. 被创建的Actor被称为该Actor的子节点, 行为受其监督.
改变
状态机是保持是保证系统在特定的状态时执行特定功能的有力工具. 根据返回的不同的结果,匹配到不同的match函数内中
监督
父节点在子节点崩溃后根据崩溃的原因作出重试等不同的逻辑
Actor 和消息传递
- Actor
Actor 里面规定了它要处理怎么样的信息, 要怎么处理信息(match). 一个Actor在没有创建router的情况下在ActorSystem中只有一个实例. 就是一个逻辑处理. - 消息
用于跨进程(多个Actor之间)通信的数据 主要通过ask(actorRef, message), sender().tell() 方法来 - 消息传递
一种软件开发范式, 通过消息来触发各种行为,而不是直接触发行为(就如同是Dispatcher invoke 各种的controller 一样) - 邮箱地址
消息传输的目标地址 akka://myTestActorSystem@127.0.0.1:2552/user/nodeName - 邮箱
可以认为是一个消息队列
Actor 和对象的不同之处是不能被直接读取、修改或者是调用. (调用也没有用 里面就是一堆消息处理的逻辑. )
监督和容错机制
在出错的时候,父亲节点会根据子节点错误的类型不同,对子节点采取不用的恢复策略.
创建一个项目
在之前的我们可能使用Typesafe的 activator 项目来创建一个新的项目.现在推荐使用直接从网上下载的方式获得起始的项目.
Akka 101
- 创建消息体, 通常我们让消息在传递的过程中保持Immutable(多线程设计的一种设计模式) 如此数据就不会发生变化, 保证安全.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.akkademy.messages;
/**
* @author dafuchen
* 2019-02-17
*/
public class SetRequest {
private final String key;
private final Object value;
public SetRequest(String key, Object value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public Object getValue() {
return value;
}
} - 创建处理的流程或者说行为(Actor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.akkademy.messages;
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import java.util.HashMap;
import java.util.Map;
/**
* @author dafuchen
* 2019-02-17
*/
public class AkkademyDb extends AbstractActor {
// 创建一个logger
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
protected final Map<String, Object> map = new HashMap<>();
public Receive createReceive() {
// 创建一个Receiver.
// 如果发送过来的消息是*类型的话 进行怎么样的处理
// 否则 怎么样
return ReceiveBuilder.create()
.match(SetRequest.class, message -> {
log.info("received Set request {}", message);
map.put(message.getKey(), message.getValue());
}).matchAny( o -> {
log.info("received unknown message {}", o);
}).build();
}
public Map<String, Object> getMap() {
return map;
}
} - test case
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29package akkademy.message;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.akkademy.messages.AkkademyDb;
import com.akkademy.messages.SetRequest;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* @author dafuchen
* 2019-02-17
*/
public class AkkademyDbTest {
ActorSystem system = ActorSystem.create();
public void isShouldPlaceKeyValueFromSetMessageInfo() {
TestActorRef<AkkademyDb> actorRef = TestActorRef.create(system, Props.create(AkkademyDb.class));
actorRef.tell(new SetRequest("key", "value"), ActorRef.noSender());
AkkademyDb akkademyDb = actorRef.underlyingActor();
assertEquals(akkademyDb.getMap().get("key"), "value");
}
}
创建Akka System
1
ActorSystem.create();
创建Actor
1
TestActorRef.create(system, Props.create(AkkademyDb.class));
在程序之中得到Ref 的方法一般是
1
context().system().actorOf(Props.create(SetRequest.class, "key", "values"));
向Actor 的地址发送信息
1
actorRef.tell(new SetRequest("key", "value"), ActorRef.noSender());
获得这个Ref 下指向的 Actor
1
AkkademyDb akkademyDb = actorRef.underlyingActor();
做Assert 进行比较