0%

3-传递消息

消息传递

  1. tell
    向Actor 发送一条消息。所有发送至sender()的响应都会返回给发送消息的Actor。
  2. ask
    向Actor 发送一条消息,返回一个Future。当Actor 返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。
    toJava(ask(actorSelection, new SetRequest(“key”, “value”), 2000));
  3. forward
    将接收到的消息再发送给另一个Actor。所有发送至sender()的响应都
    会返回给原始消息的发送者。
  4. pipe
    用于将Future 的结果返回给sender()或另一个Actor。如果正在使用Ask或是处理一个Future,那么使用Pipe 可以正确地返回Future 的结果。

    消息是不可变的

    因此,无论什么时候,只要需要在线程之间共享数据,就应该首先考虑将数据定义为不可变。既然不可变就不用考虑保存临界值.

    Ask 模式

    在调用ask 向Actor 发起请求时,Akka 实际上会在Actor 系统中创建一个临时Actor。接收请求的Actor 在返回响应时使用的sender()引用就是这个临时Actor。当一个Actor接收到ask 请求发来的消息并返回响应时,这个临时Actor 会使用返回的响应来完成Future. 就是说这个Future和系统创建的actor其实是绑定的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class AskDemoArticleParser extends AbstractActor {
private final ActorSelection cacheActor;
private final ActorSelection httpClientActor;
private final ActorSelection articleParseActor;
private final Timeout timeout;

public AskDemoArticleParser(String cacheActor,
String httpClientActor,
String articleParseActor,
Timeout timeout) {

this.cacheActor = context().actorSelection(cacheActor);
this.httpClientActor = context().actorSelection(httpClientActor);
this.articleParseActor = context().actorSelection(articleParseActor);
this.timeout = timeout;
}

@Override
public Receive createReceive() {
final ActorRef senderRef = sender();

return ReceiveBuilder.create().match(ParseArticle.class, msg -> {
toJava(ask(cacheActor, new GetRequest(msg.getUrl()), timeout)).handle((x, t) ->
! Objects.isNull(x)
? CompletableFuture.completedFuture(x)
: toJava(ask(httpClientActor, msg.getUrl(), timeout))
.thenCompose(rawArticle -> toJava(ask(articleParseActor, new ParseHtmlArticle(msg.getUrl(),
((HttpResponse) rawArticle).getBody()), timeout)))
).handle((x, t) -> {
if (Objects.isNull(x)) {
if (x instanceof ArticleBody) {
String body = ((ArticleBody) x).getBody();

cacheActor.tell(body, self());
senderRef.tell(body, self());
} else {
senderRef.tell(new Status.Failure((Throwable)t), self());
return null;
}
}
return null;
});
}).build();

}
}

在另一个执行上下文中执行回调函数

必须设置超时参数

超时错误的栈追踪信息并没有用

另一点需要注意的是:如果Actor 抛出了一个意料之外的异常,而没有返回错误,那么这个错误看上去会像是由于超时引起的,但是实际上却另有原因。
从这里总结出的经验就是:当代码中发生错误时,一定要返回失败消息。如果一个Actor 抛出了异常那么它是不会返回消息的在Actor 中,代码的编写者负责实现所有的消息处理逻辑:如果某个Actor 需要进行响应,Akka 是不会隐式地做任何响应的。当需要返回响应时,我们必须自己对收到的消息进行响应

Ask的额外性能开销

  1. 首先,ask 会导致Akka在/temp 路径下新建一个临时Actor。这个临时Actor 会等待从接收ask 消息的Actor 返回的响应。
  2. 其次,Future 也有额外的性能开销。Ask 会创建Future,由临时Actor 负责完成。

    Tell 模式

    Tell 是ActorRef/ActorSelection 类的一个方法
    actor.tell(message, self())
    表示消息从内部进行发送
    actor.tell(“message”, akka.actor.ActorRef.noSender());
    消息从外部进行发送

tell 使用的是fire-and-forget 模式 我把消息发送了 然后我就不管了. 对于结果我不关心.

而ask相当于是ticket模式.我这里有一个结果的取货单.

但是我们如果需要关注结果那要怎么办呢? 在内部建立建立一个Actor(姑且叫做innerActor), 由它做FSM(有限状态机器), 对收到的消息作出响应, 比方说innerActor给步骤1发送任务启动指令,完成后返回步骤1完成response,然后再对步骤2发送任务启动指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package top.andrewchen1.chapter3;

import akka.actor.*;
import akka.japi.pf.ReceiveBuilder;
import akka.util.Timeout;
import top.andrewchen1.chapter2.db.GetRequest;
import top.andrewchen1.chapter2.db.SetRequest;

import java.util.concurrent.TimeoutException;

/**
* @author dafuchen
* 2019-03-17
*/
public class ArticleTellParserActor extends AbstractActor {
private final ActorSelection cacheActor;
private final ActorSelection httpClientActor;
private final ActorSelection articleParseActor;
private final Timeout timeout;

public ArticleTellParserActor (String cacheActor,
String httpClientActor,
String articleParseActor,
Timeout timeout) {
this.cacheActor = context().actorSelection(cacheActor);
this.httpClientActor = context().actorSelection(httpClientActor);
this.articleParseActor = context().actorSelection(articleParseActor);
this.timeout = timeout;
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ParseArticle.class, message -> {
ActorRef extraActor = buildExtraActor(sender(), message.getUrl());
cacheActor.tell(new GetRequest(message.getUrl()), extraActor);
httpClientActor.tell(message.getUrl(), extraActor);
context().system().scheduler().scheduleOnce(
timeout.duration(),
extraActor,
"timeout",
context().system().dispatcher(),
ActorRef.noSender()
);
})
.build();
}
private ActorRef buildExtraActor(ActorRef senderRef, String uri) {
class MyActor extends AbstractActor {

@Override
public Receive createReceive() {
return ReceiveBuilder.create().matchEquals(String.class, x -> x.equals("timeout"), x -> {
senderRef.tell(new Status.Failure(new TimeoutException("timeout")), self());
context().stop(self());
})
.match(HttpResponse.class, httpResponse -> {
articleParseActor.tell(new ParseHtmlArticle(uri, httpResponse.getBody()), self());
}).match(String.class, body -> {
senderRef.tell(body, self());
context().stop(self());
}).match(ArticleBody.class, articleBody -> {
cacheActor.tell(new SetRequest(articleBody.getUri(), articleBody.getBody()), self());
context().stop(self());
}).matchAny(t ->
System.out.println("ignoring msg" + t.getClass()))
.build();
}
}
return context().actorOf(Props.create(MyActor.class, MyActor::new));
}
}

在上面的代码中 innerActor 需要作出

  1. 出现超时的话 停止自身 并且告诉消息发送者超时了
  2. 出现HttpResponse, 说明获取到了原文 需要进行解析
  3. 出现除了“timeout”的String值, 说明获取到了内容(无论是从cache还是从parse获得的)
  4. 出现Articlebody, 说明出现了解析完成的Article 需要进行cache

Forward

actor.forward(message, context());

forward 的就是将消息转发给一个另外一个actor. tell(message, self()) 也可以完成一样的任务. 但是forward的语意更加的清晰.

Pipe

pipe 和ask的语音相似

1
pipe(future, system.dispatcher()).to(sender());