0%

5-纵向扩展

纵向扩张

使用传统的基于线程的抽象概念很难实现多核能力的使用.如果多个线程都能访问可变的共享状态, 很容易发生资源竞争.在并发的实现, 默认使用命令式编程是错误的 因为我们要关注临界资源, 要关注临界区.

Router

使用Actor 进行并行编程

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ActorSystem actorSystem = ActorSystem.create();
ActorRef bunchingActorRefPool = actorSystem.actorOf(Props.create(BunchingActor.class)
.withRouter(new RoundRobinPool(8)));
// Or
// 我们可能之前已经创建了一些actor 然后要把这些actor 组织在一起
List<String> paths = Arrays.asList("/user/w1", "/user/w2", "/user/w3");

ActorRef bunchingActorRefGroup = actorSystem.actorOf(new RoundRobinGroup(paths).props());
}

一个是pool, 一个是group. 我感觉pool 的话是我们生成一些相同功能的actor, 放到一个池子中. 而group 是已经有一些actor, 我们把他们分类, 分成一组.

路由策略

路由策略 功能
Round robin 依次轮训
smallest mailbox 邮箱中东西最少的
scatter gather 全部都发, 接受第一个响应, 丢弃其余的
tail chopping 全部都发, 但是不是一次性发送, 每次发送之后都会等待一段时间
consisteng hashing 由router 提供一个key, router根据这个key生成哈希值, 每次都会发送到这个hash对应的actor 上.[一致性哈希算法]
balancing pool 多个actor 共享一个邮箱, 谁空闲了谁去处理邮箱中的东西

在group 中 监督可以用Actor自己的监督方式, 对于pool, 因为是pool 对pool中的actor 进行监督. 所以要自定义监督方式的话, 需要

pool的监督定义方式

1
2
3
ActorRef bunchingActorRefPool = actorSystem.actorOf(Props.create(BunchingActor.class)
.withRouter(new RoundRobinPool(8).withSupervisorStrategy(OneForOneStrategy.defaultStrategy())));

Dispatcher

Dispatcher 将如何执行任务和何时运行任务进行节藕.一般来说, Dispatch会包含一些线程, 这些线程会负责调度并运行任务.比如说处理Future事件.

Dispatcher基于Exector.

As all we know that exector 有两种

  1. ThreadPoolExector
  2. ForkJoinPoolExecutor

创建Dispatch

1
2
3
4
5
6
7
8
9
10
my-dispatcher {
type = "fork-join-executor"
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2 #Min threads
parallelism-factor = 2 # max thread per core
parallelism-max = 10 # Max total threads
throughput = 100
}
}

在生成actor的时候使用执行的dispatcher(Executor)

1
2
3
4
ActorRef bunchingActorRefPool = actorSystem.actorOf(Props.create(BunchingActor.class)
.withRouter(new RoundRobinPool(8)
.withSupervisorStrategy(OneForOneStrategy.defaultStrategy())
.withDispatcher("my-dispatcher")));
类型 说明
Dispatcher 默认的类型.
PinnedDispatcher 每个Actor都分配自己独有的线程.为每一个Actor都创建一个ThreadPool Executor.
CallingThreadDispatcher 它没有Executor,而是在发起调用的线程上执行.主要用在测试,每个Actor都会获取一个锁,每次一个线程都只有一个actor在运行
balancingDispatcher 已经被废弃了, 暂时也看不懂 = =

决定何时使用那种Dispatcher

在默认的情况下, Actor完成的所有的工作都会在默认的Dispatcher中执行. 对于运行时间比较长的CPU密集任务和会阻塞IO的任务, 我们建议在创建这类Actor的group或者pool的时候指定其他的Dispatcher.

如果要修改默认的dispatcher的话

1
2
3
4
5
6
7
8
9
10
default-disptcher {
# 最小的线程数量
parallelism-min = 8
# 每个核心最大的线程数量
parallelism-factor = 3.0
# 最大的线程数量
parallelism-max = 64
# 每个actor 等待处理的消息数量 (吞吐量)
throughtput = 100
}

如果要对默认值进行修改的话 在config文件中对需要覆盖的部分进行覆盖就好了

在代码中默认CompletableFuture<?> 使用的是默认的executor, 如果有cup密集型的计算或者是有可能会发生IO阻塞的计算, 那么我们建议使用另外的线程池

1
2
3
Executor executor = context().system().dispatchers().lookup("blocking-io-dispatcher");
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> "analyse something", executor);