消息传递
- tell
向Actor 发送一条消息。所有发送至sender()的响应都会返回给发送消息的Actor。
- ask
向Actor 发送一条消息,返回一个Future。当Actor 返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。
toJava(ask(actorSelection, new SetRequest(“key”, “value”), 2000));
- forward
将接收到的消息再发送给另一个Actor。所有发送至sender()的响应都
会返回给原始消息的发送者。
- pipe
用于将Future 的结果返回给sender()或另一个Actor。如果正在使用Ask或是处理一个Future,那么使用Pipe 可以正确地返回Future 的结果。消息是不可变的
因此,无论什么时候,只要需要在线程之间共享数据,就应该首先考虑将数据定义为不可变。既然不可变就不用考虑保存临界值.Ask 模式
在调用ask 向Actor 发起请求时,Akka 实际上会在Actor 系统中创建一个临时Actor。接收请求的Actor 在返回响应时使用的sender()引用就是这个临时Actor。当一个Actor接收到ask 请求发来的消息并返回响应时,这个临时Actor 会使用返回的响应来完成Future. 就是说这个Future和系统创建的actor其实是绑定的.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class AskDemoArticleParser extends AbstractActor { private final ActorSelection cacheActor; private final ActorSelection httpClientActor; private final ActorSelection articleParseActor; private final Timeout timeout;
public AskDemoArticleParser(String cacheActor, String httpClientActor, String articleParseActor, Timeout timeout) { this.cacheActor = context().actorSelection(cacheActor); this.httpClientActor = context().actorSelection(httpClientActor); this.articleParseActor = context().actorSelection(articleParseActor); this.timeout = timeout; }
@Override public Receive createReceive() { final ActorRef senderRef = sender(); return ReceiveBuilder.create().match(ParseArticle.class, msg -> { toJava(ask(cacheActor, new GetRequest(msg.getUrl()), timeout)).handle((x, t) -> ! Objects.isNull(x) ? CompletableFuture.completedFuture(x) : toJava(ask(httpClientActor, msg.getUrl(), timeout)) .thenCompose(rawArticle -> toJava(ask(articleParseActor, new ParseHtmlArticle(msg.getUrl(), ((HttpResponse) rawArticle).getBody()), timeout))) ).handle((x, t) -> { if (Objects.isNull(x)) { if (x instanceof ArticleBody) { String body = ((ArticleBody) x).getBody();
cacheActor.tell(body, self()); senderRef.tell(body, self()); } else { senderRef.tell(new Status.Failure((Throwable)t), self()); return null; } } return null; }); }).build();
} }
|
在另一个执行上下文中执行回调函数
必须设置超时参数
超时错误的栈追踪信息并没有用
另一点需要注意的是:如果Actor 抛出了一个意料之外的异常,而没有返回错误,那么这个错误看上去会像是由于超时引起的,但是实际上却另有原因。
从这里总结出的经验就是:当代码中发生错误时,一定要返回失败消息。如果一个Actor 抛出了异常那么它是不会返回消息的在Actor 中,代码的编写者负责实现所有的消息处理逻辑:如果某个Actor 需要进行响应,Akka 是不会隐式地做任何响应的。当需要返回响应时,我们必须自己对收到的消息进行响应
Ask的额外性能开销
- 首先,ask 会导致Akka在/temp 路径下新建一个临时Actor。这个临时Actor 会等待从接收ask 消息的Actor 返回的响应。
- 其次,Future 也有额外的性能开销。Ask 会创建Future,由临时Actor 负责完成。
Tell 模式
Tell 是ActorRef/ActorSelection 类的一个方法
actor.tell(message, self())
表示消息从内部进行发送
actor.tell(“message”, akka.actor.ActorRef.noSender());
消息从外部进行发送
tell 使用的是fire-and-forget 模式 我把消息发送了 然后我就不管了. 对于结果我不关心.
而ask相当于是ticket模式.我这里有一个结果的取货单.
但是我们如果需要关注结果那要怎么办呢? 在内部建立建立一个Actor(姑且叫做innerActor), 由它做FSM(有限状态机器), 对收到的消息作出响应, 比方说innerActor给步骤1发送任务启动指令,完成后返回步骤1完成response,然后再对步骤2发送任务启动指令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| package top.andrewchen1.chapter3;
import akka.actor.*; import akka.japi.pf.ReceiveBuilder; import akka.util.Timeout; import top.andrewchen1.chapter2.db.GetRequest; import top.andrewchen1.chapter2.db.SetRequest;
import java.util.concurrent.TimeoutException;
public class ArticleTellParserActor extends AbstractActor { private final ActorSelection cacheActor; private final ActorSelection httpClientActor; private final ActorSelection articleParseActor; private final Timeout timeout;
public ArticleTellParserActor (String cacheActor, String httpClientActor, String articleParseActor, Timeout timeout) { this.cacheActor = context().actorSelection(cacheActor); this.httpClientActor = context().actorSelection(httpClientActor); this.articleParseActor = context().actorSelection(articleParseActor); this.timeout = timeout; } @Override public Receive createReceive() { return ReceiveBuilder.create() .match(ParseArticle.class, message -> { ActorRef extraActor = buildExtraActor(sender(), message.getUrl()); cacheActor.tell(new GetRequest(message.getUrl()), extraActor); httpClientActor.tell(message.getUrl(), extraActor); context().system().scheduler().scheduleOnce( timeout.duration(), extraActor, "timeout", context().system().dispatcher(), ActorRef.noSender() ); }) .build(); } private ActorRef buildExtraActor(ActorRef senderRef, String uri) { class MyActor extends AbstractActor {
@Override public Receive createReceive() { return ReceiveBuilder.create().matchEquals(String.class, x -> x.equals("timeout"), x -> { senderRef.tell(new Status.Failure(new TimeoutException("timeout")), self()); context().stop(self()); }) .match(HttpResponse.class, httpResponse -> { articleParseActor.tell(new ParseHtmlArticle(uri, httpResponse.getBody()), self()); }).match(String.class, body -> { senderRef.tell(body, self()); context().stop(self()); }).match(ArticleBody.class, articleBody -> { cacheActor.tell(new SetRequest(articleBody.getUri(), articleBody.getBody()), self()); context().stop(self()); }).matchAny(t -> System.out.println("ignoring msg" + t.getClass())) .build(); } } return context().actorOf(Props.create(MyActor.class, MyActor::new)); } }
|
在上面的代码中 innerActor 需要作出
- 出现超时的话 停止自身 并且告诉消息发送者超时了
- 出现HttpResponse, 说明获取到了原文 需要进行解析
- 出现除了“timeout”的String值, 说明获取到了内容(无论是从cache还是从parse获得的)
- 出现Articlebody, 说明出现了解析完成的Article 需要进行cache
Forward
actor.forward(message, context());
forward 的就是将消息转发给一个另外一个actor. tell(message, self()) 也可以完成一样的任务. 但是forward的语意更加的清晰.
Pipe
pipe 和ask的语音相似
1
| pipe(future, system.dispatcher()).to(sender());
|