0%

2-Actor与并发

响应式四准则

  1. 灵敏性
    立刻进行响应
  2. 伸缩性
    方便进行扩展
  3. 容错性
    从容的对错误进行响应, 不会对系统的其他地方造成影响
  4. 事件驱动
    何时 何地 如何对请求作出响应. 允许对响应的组件进行路由和负载均衡.

    剖析Actor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.akkademy.pingpong;

    import akka.actor.AbstractActor;
    import akka.actor.ActorRef;
    import akka.actor.Status;
    import akka.japi.pf.ReceiveBuilder;

    /**
    * @author dafuchen
    * 2019-02-17
    */
    public class JavaPongActor extends AbstractActor {
    @Override
    public Receive createReceive() {
    return ReceiveBuilder.create()
    .matchEquals("Ping", s ->
    sender().tell("Pong", ActorRef.noSender()))
    .matchAny(x ->
    sender().tell(
    new Status.Failure(new Exception("unknown message")), self()))
    .build();
    }
  • AbstractActor
    这里使用了模版模式, 在Abstract 里面就已经定义好了一些方法, 我们要做的就是实现每一个Actor (行为) 要做的事情就好了.
  • match
    • match(class, function)
      match(String.class, s -> retrun dosomething)
    • match(class, predicate, function)
      match(String.class, s -> Objects.equals(s, “Ping”), s -> return something)
    • matchEquals(Object, function)
      match(“Ping”, s -> return something)
    • matchAny(function)
      此处最佳的实践应该是抛出错误

match函数的匹配模式是从上到下的方式进行模式匹配. 所以可以从特殊定义到一般

  • tell
    sender()函数回返回一个ActorRef. 在上面的代码中我们使用了 tell(). tell是最基本的单向消息传输模式.第一个参数是我们想要发送到对方信箱的信息, 第二个参数消息的发送者

    Actor的创建

    1
    ActorRef actor = actorSystem.actorOf(Props.create(JavaPongActor.class));
    我们创建了一个Actor 这个Actor 的Reference 是***

    Props

    1
    Props.create(PongActor.class, Object... objects);

    Promise、Future和事件驱动的编程模型

    阻塞与事件驱动API

    1
    2
    3
    4
    5
    6
    7
    8
    Connection connection = DriverManager.getConnection("1");
    PreparedStatement preparedStatement = connection.prepareStatement(sql);
    preparedStatement.setString(0, pre);

    ResultSet resultSet = preparedStatement.executeQuery();
    while (resultSet.next()) {
    Integer a = resultSet.getInt(columnName);
    }
    调用executeQuery 发起调用的线程必须等待数据库查询的完成. 否则这个线程回一直在阻塞的过程中.如果系统的并发很大的话,很快所有的线程因为这个原因都会处在等待IO的过程中
    如果使用事件模型的话
    1
    2
    CompletableFuture<String> userNameFuture = getUserNameFromDatabaseAsync(userId);
    userNameFuture.thenAccept(userName -> System.out.println(userName));
    从线程的角度来看, 代码首先会调用这个方法, 然后会进入这个方法内部, 然后立刻返回一个 Future/CompleteableFuture. 返回的就只是一个占位符,真正的值在未来的某个时候会返回到这个占位符内, 数据库的调用和生成是在另外的线程上执行的, 不影响主线程的执行, 试想一下本来我们在一个循环中有50个数据库连接动作要执行, 平均一次所要的时间是m 那个共计花的时间是50m, 如果用下面的方法去执行的话 那么可能需要的时候就是m, 花的时间就要大大的优化了
1
2
3
4
5
6
7
8
9
List<CompletableFuture<ContractAuditResponseDTO>> futures = contractAuditReportList.stream().map(auditReport ->
CompletableFuture.supplyAsync(() -> {
// create contractAuditResponseDTO
}).thenApplyAsync(contractAuditResponseDTO -> {
// file the other part
return contractAuditResponseDTO;
})
).collect(Collectors.toList());
List<ContractAuditResponseDTO> contractAuditResponseDTOS = Futures.sequence(futures).get();

流程

使用Future 进行响应的Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package akkademy.message;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.akkademy.pingpong.JavaPongActor;
import org.junit.Test;
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static akka.pattern.Patterns.ask;
import static scala.compat.java8.FutureConverters.toJava;

/**
* @author dafuchen
* 2019-02-17
*/
public class JavaPongActorTest {
ActorSystem actorSystem = ActorSystem.create();
ActorRef actorRef = actorSystem.actorOf(Props.create(JavaPongActor.class));
@Test
public void shouldReplyToPingWithPong() throws Exception {
Future sFuture = ask(actorRef, "Ping", 1000);
final CompletionStage<String> cs = toJava(sFuture);
final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;
assert (jFuture.get(1000, TimeUnit.MILLISECONDS).equals("Pong"));
}
}
  1. 创建一个ActorSystem
  2. 向Actor询问其对于某个消息的响应.

理解Future 和Promise

现代化的Future隐式的处理了两种情况 失败延迟.

对返回的结果进行异步的转换

操作 语法
Transfer value thenApply
Transfer value async thenCompose
Return value if error exceptionally
return value async if error handle((t, x) -> )

对失败进行处理

在失败的情况下执行代码

在Java8 中没有面向用户的用于失败处理的方法,所以我们使用handle的方式进行处理

1
2
3
4
5
6
7
// aksPong 返回的是completeStage 的子类
askPong("cause error").handle((x, t) -> {
if(t != null){
System.out.println("Error: " + t);
}
return null;
});

从失败中恢复

1
2
3
4
CompletionStage<String> cs = askPong("cause error")
.exceptionally(t -> {
return "default";
});

异步的从失败中恢复

1
2
3
4
askPong("cause error").handle( (pong, ex) -> ex == null
? CompletableFuture.completedFuture(pong)
: askPong("Ping")
).thenCompose(x -> x);

链式操作

1
2
3
4
askPong("Ping")
.thenCombine(askPong("Ping"), (a,b) -> {
return a + b; //"PongPong"
});