响应式四准则
- 灵敏性
立刻进行响应
- 伸缩性
方便进行扩展
- 容错性
从容的对错误进行响应, 不会对系统的其他地方造成影响
- 事件驱动
何时 何地 如何对请求作出响应. 允许对响应的组件进行路由和负载均衡.剖析Actor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.akkademy.pingpong;
import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Status; import akka.japi.pf.ReceiveBuilder;
public class JavaPongActor extends AbstractActor { @Override public Receive createReceive() { return ReceiveBuilder.create() .matchEquals("Ping", s -> sender().tell("Pong", ActorRef.noSender())) .matchAny(x -> sender().tell( new Status.Failure(new Exception("unknown message")), self())) .build(); }
|
- AbstractActor
这里使用了模版模式, 在Abstract 里面就已经定义好了一些方法, 我们要做的就是实现每一个Actor (行为) 要做的事情就好了.
- match
- match(class, function)
match(String.class, s -> retrun dosomething)
- match(class, predicate, function)
match(String.class, s -> Objects.equals(s, “Ping”), s -> return something)
- matchEquals(Object, function)
match(“Ping”, s -> return something)
- matchAny(function)
此处最佳的实践应该是抛出错误
match函数的匹配模式是从上到下的方式进行模式匹配. 所以可以从特殊定义到一般
- tell
sender()函数回返回一个ActorRef. 在上面的代码中我们使用了 tell(). tell是最基本的单向消息传输模式.第一个参数是我们想要发送到对方信箱的信息, 第二个参数消息的发送者Actor的创建
1
| ActorRef actor = actorSystem.actorOf(Props.create(JavaPongActor.class));
|
我们创建了一个Actor 这个Actor 的Reference 是***Props
1
| Props.create(PongActor.class, Object... objects);
|
Promise、Future和事件驱动的编程模型
阻塞与事件驱动API
1 2 3 4 5 6 7 8
| Connection connection = DriverManager.getConnection("1"); PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(0, pre);
ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()) { Integer a = resultSet.getInt(columnName); }
|
调用executeQuery 发起调用的线程必须等待数据库查询的完成. 否则这个线程回一直在阻塞的过程中.如果系统的并发很大的话,很快所有的线程因为这个原因都会处在等待IO的过程中
如果使用事件模型的话1 2
| CompletableFuture<String> userNameFuture = getUserNameFromDatabaseAsync(userId); userNameFuture.thenAccept(userName -> System.out.println(userName));
|
从线程的角度来看, 代码首先会调用这个方法, 然后会进入这个方法内部, 然后立刻返回一个 Future/CompleteableFuture. 返回的就只是一个占位符,真正的值在未来的某个时候会返回到这个占位符内, 数据库的调用和生成是在另外的线程上执行的, 不影响主线程的执行, 试想一下本来我们在一个循环中有50个数据库连接动作要执行, 平均一次所要的时间是m 那个共计花的时间是50m, 如果用下面的方法去执行的话 那么可能需要的时候就是m, 花的时间就要大大的优化了
1 2 3 4 5 6 7 8 9
| List<CompletableFuture<ContractAuditResponseDTO>> futures = contractAuditReportList.stream().map(auditReport -> CompletableFuture.supplyAsync(() -> { }).thenApplyAsync(contractAuditResponseDTO -> { return contractAuditResponseDTO; }) ).collect(Collectors.toList()); List<ContractAuditResponseDTO> contractAuditResponseDTOS = Futures.sequence(futures).get();
|

使用Future 进行响应的Actor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package akkademy.message;
import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import com.akkademy.pingpong.JavaPongActor; import org.junit.Test; import scala.concurrent.Future;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit;
import static akka.pattern.Patterns.ask; import static scala.compat.java8.FutureConverters.toJava;
public class JavaPongActorTest { ActorSystem actorSystem = ActorSystem.create(); ActorRef actorRef = actorSystem.actorOf(Props.create(JavaPongActor.class)); @Test public void shouldReplyToPingWithPong() throws Exception { Future sFuture = ask(actorRef, "Ping", 1000); final CompletionStage<String> cs = toJava(sFuture); final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs; assert (jFuture.get(1000, TimeUnit.MILLISECONDS).equals("Pong")); } }
|
- 创建一个ActorSystem
- 向Actor询问其对于某个消息的响应.
理解Future 和Promise
现代化的Future隐式的处理了两种情况 失败 和 延迟.
对返回的结果进行异步的转换
操作 |
语法 |
Transfer value |
thenApply |
Transfer value async |
thenCompose |
Return value if error |
exceptionally |
return value async if error |
handle((t, x) -> ) |
对失败进行处理
在失败的情况下执行代码
在Java8 中没有面向用户的用于失败处理的方法,所以我们使用handle的方式进行处理
1 2 3 4 5 6 7
| askPong("cause error").handle((x, t) -> { if(t != null){ System.out.println("Error: " + t); } return null; });
|
从失败中恢复
1 2 3 4
| CompletionStage<String> cs = askPong("cause error") .exceptionally(t -> { return "default"; });
|
异步的从失败中恢复
1 2 3 4
| askPong("cause error").handle( (pong, ex) -> ex == null ? CompletableFuture.completedFuture(pong) : askPong("Ping") ).thenCompose(x -> x);
|
链式操作
1 2 3 4
| askPong("Ping") .thenCombine(askPong("Ping"), (a,b) -> { return a + b; });
|