0%

纵向扩张

使用传统的基于线程的抽象概念很难实现多核能力的使用.如果多个线程都能访问可变的共享状态, 很容易发生资源竞争.在并发的实现, 默认使用命令式编程是错误的 因为我们要关注临界资源, 要关注临界区.

Router

使用Actor 进行并行编程

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ActorSystem actorSystem = ActorSystem.create();
ActorRef bunchingActorRefPool = actorSystem.actorOf(Props.create(BunchingActor.class)
.withRouter(new RoundRobinPool(8)));
// Or
// 我们可能之前已经创建了一些actor 然后要把这些actor 组织在一起
List<String> paths = Arrays.asList("/user/w1", "/user/w2", "/user/w3");

ActorRef bunchingActorRefGroup = actorSystem.actorOf(new RoundRobinGroup(paths).props());
}

一个是pool, 一个是group. 我感觉pool 的话是我们生成一些相同功能的actor, 放到一个池子中. 而group 是已经有一些actor, 我们把他们分类, 分成一组.

路由策略

路由策略 功能
Round robin 依次轮训
smallest mailbox 邮箱中东西最少的
scatter gather 全部都发, 接受第一个响应, 丢弃其余的
tail chopping 全部都发, 但是不是一次性发送, 每次发送之后都会等待一段时间
consisteng hashing 由router 提供一个key, router根据这个key生成哈希值, 每次都会发送到这个hash对应的actor 上.[一致性哈希算法]
balancing pool 多个actor 共享一个邮箱, 谁空闲了谁去处理邮箱中的东西

在group 中 监督可以用Actor自己的监督方式, 对于pool, 因为是pool 对pool中的actor 进行监督. 所以要自定义监督方式的话, 需要

pool的监督定义方式

1
2
3
ActorRef bunchingActorRefPool = actorSystem.actorOf(Props.create(BunchingActor.class)
.withRouter(new RoundRobinPool(8).withSupervisorStrategy(OneForOneStrategy.defaultStrategy())));

Dispatcher

Dispatcher 将如何执行任务和何时运行任务进行节藕.一般来说, Dispatch会包含一些线程, 这些线程会负责调度并运行任务.比如说处理Future事件.

Dispatcher基于Exector.

As all we know that exector 有两种

  1. ThreadPoolExector
  2. ForkJoinPoolExecutor

创建Dispatch

1
2
3
4
5
6
7
8
9
10
my-dispatcher {
type = "fork-join-executor"
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2 #Min threads
parallelism-factor = 2 # max thread per core
parallelism-max = 10 # Max total threads
throughput = 100
}
}

在生成actor的时候使用执行的dispatcher(Executor)

1
2
3
4
ActorRef bunchingActorRefPool = actorSystem.actorOf(Props.create(BunchingActor.class)
.withRouter(new RoundRobinPool(8)
.withSupervisorStrategy(OneForOneStrategy.defaultStrategy())
.withDispatcher("my-dispatcher")));
类型 说明
Dispatcher 默认的类型.
PinnedDispatcher 每个Actor都分配自己独有的线程.为每一个Actor都创建一个ThreadPool Executor.
CallingThreadDispatcher 它没有Executor,而是在发起调用的线程上执行.主要用在测试,每个Actor都会获取一个锁,每次一个线程都只有一个actor在运行
balancingDispatcher 已经被废弃了, 暂时也看不懂 = =

决定何时使用那种Dispatcher

在默认的情况下, Actor完成的所有的工作都会在默认的Dispatcher中执行. 对于运行时间比较长的CPU密集任务和会阻塞IO的任务, 我们建议在创建这类Actor的group或者pool的时候指定其他的Dispatcher.

如果要修改默认的dispatcher的话

1
2
3
4
5
6
7
8
9
10
default-disptcher {
# 最小的线程数量
parallelism-min = 8
# 每个核心最大的线程数量
parallelism-factor = 3.0
# 最大的线程数量
parallelism-max = 64
# 每个actor 等待处理的消息数量 (吞吐量)
throughtput = 100
}

如果要对默认值进行修改的话 在config文件中对需要覆盖的部分进行覆盖就好了

在代码中默认CompletableFuture<?> 使用的是默认的executor, 如果有cup密集型的计算或者是有可能会发生IO阻塞的计算, 那么我们建议使用另外的线程池

1
2
3
Executor executor = context().system().dispatchers().lookup("blocking-io-dispatcher");
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> "analyse something", executor);

分布式计算的8个误区

  • 网络是可靠的
  • 没有延时
  • 带宽是无限的
  • 网络是安全的
  • 网络拓扑不会改变
  • 只有一个管理员
  • 网络传输没有开销
  • 网络是同构的

    监督

    监督的层级结构

    akka使用Actor层级结构来描述监督.
  1. 当我们创建actor时,新建的Actor就是作为另一个Actor的子Actor.
  2. 然后是路径为/usr的守护Actor.使用actorSystem.actorOf(Props.create(a.class, parameters), name); 就是Actor的子Actor /usr/yourActor.
  3. 如果在一个Actor内部创建另外一个Actor, 那么可以通过调用context().actorOf(Props.create(a.class, “”)) / 得到新建的actor, 他的路径就是 /usr/yourActor/newChild.

    监督策略和醉酒的厨师

    酒店的层级结构
    假设厨师很喜欢喝酒. 但是喝了酒之后就经常给自己惹麻烦, 所以此时作为他的上级要采取一些措施
    • 继续 resume actor 继续处理下一条信息
    • 停止 stop 停止Actor 不做任何操作
    • 重启 restart 新建一个Actor 代替原来的Actor
    • 向上反映esacalte 将异常信息传递给下一个监督者

默认的监督策略.

  • 运行过程中抛出异常 restart
  • 运行中抛出错误 escalte()
  • 初始化中发生异常 stop()

    Actor 生命周期

    在Actor的生命周期中会调用这几个方法,我们在需要时可以重写这些方法.
  • prestart() 在构造函数之后调用
  • postStop() 在重启之前调用
  • reRestart(reson, message) 默认情况下会调用postStop()
  • postRestart() 默认情况下会调用preStart()

for example. 假设有一个聊天引用, 每个用户都用一个actor来表示, 而Actor 之间的消息传递就表示了用户之间的聊天. 一个用户进入聊天就启动了一个Actor, 并发送一个消息来更新聊天室中的用户名单. 当一个Actor停止的时候,发送另一条消息, 将用户从聊天室的活跃名单活跃名单中删除.
启动流程

定义重试次数

一旦某个消息抛出了异常,该消息就会被丢弃,不会被重新发送。监督者会执行监督策略。在继续执行(resume)或是重启(restart)的情况下,就会处理下一条消息了。如果有一个工作队列的话,我们可能会想要重试若干次,然后彻底返回失败。

1
new OneForOneStrategy(2,Duration.create("1 minute"), PartialFunction)

终止或者kill 一个 Actor

  • 调用 system.stop(ActorRef)
  • 调用context.stop(ActorRef)
  • 向Actor 发送一个PoisonPill 消息, 在那个Actor 的message中在接到这个消息之后把自己给close掉(system.stop(self()), context.stop(self())), 或者抛出一个ActorKilledException

    监督其他的Actor

    context.watch(actorRef)
    context.unwatch(actorRef)

    安全重启

    比方说我们有一个数据库的Akka
    1
    我们首先会创建这个actor 然后再通过tell 方法 把消息给到这个actor. 但是比方说因为网络问题 这个actor 创建没有成功 抛错. 进入监督模式的 resume/ restart 导致创建数据库连接的信息丢失, 不能正确的处理这个事情
    2
    override prestart 方法, 在里面描述连接的逻辑. 当这个akka 启动失败的话 默认的逻辑是stop 这个akka, 所以监督者需要重写监督方法使其能够再次创建
    3
    重写 prestart 方法. 那么出现错误的时候 会调用 preRestart -> postStop -> postRestart -> preStart

    状态

    actor 能够安全的存储状态, 允许我们使用无锁的方式进行并发.(所有的对象都是immutable)
  • 基于Actor 状态的条件语句
  • 热交换 become & unbecome
  • 有限自动机

基于Actor状态的条件语句

比方说数据端客户端离线了, 那么在重新上线之前,他都无法进行处理, 这时候我们可以将信息存储起来,直到客户端恢复连接之后再做处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.akkademy.article;

import akka.actor.AbstractActorWithStash;
import akka.japi.pf.ReceiveBuilder;
import com.akkademy.messages.GetRequest;

/**
* @author dafuchen
* 2019-03-03
*/
public class MessageStachActor extends AbstractActorWithStash {
private Boolean online = Boolean.FALSE;
@Override
public Receive createReceive() {
ReceiveBuilder.create().match(GetRequest.class, x -> {
if (online) {

} else {
stash();
}
}).match(Connected.class, x -> {
online = true;
unstash();
}).match(Disconnect.class, x -> online = false)
.build();
return null;
}
}

热交换

become 这个方法receiver 块中定义的行为修改为一个新的PartialFunction.
unbecome 这个方法将Actor的修改回默认的行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.akkademy.messages;

import akka.actor.AbstractActorWithStash;
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import com.akkademy.article.Connected;
import com.akkademy.article.Disconnect;

/**
* @author dafuchen
* 2019-03-03
*/
public class BecomeActor extends AbstractActorWithStash {
private final ActorRef actorRef;

public BecomeActor(String refLocation) {
actorRef = context().actorFor(refLocation);
}

@Override
public Receive createReceive() {

return ReceiveBuilder.create()
.match(GetRequest.class, x -> stash())
.match(Connected.class, x -> {
context().become(ReceiveBuilder.create()
.match(GetRequest.class, b -> actorRef.tell("", self())).build().onMessage());
unstash();
}).match(Disconnect.class, x -> context().unbecome())
.build();
}
}

stash 泄漏

如果在很长的时间之后才能将状态变回到unstash 需要的状态或者根本就不能恢复到那个状态,那么会造成内存耗尽或者邮箱开始丢弃信息.
所以我们要在构造函数或者preStart 中通过调度器来检查这个条件.避免泄漏的发生

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.akkademy.messages;

import akka.actor.AbstractActorWithStash;
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import com.akkademy.article.Connected;
import com.akkademy.article.ConnectionTimeoutException;
import com.akkademy.article.Disconnect;
import com.akkademy.article.TestConnected;
import scala.concurrent.duration.Duration;

import java.sql.Time;
import java.util.concurrent.TimeUnit;

/**
* @author dafuchen
* 2019-03-03
*/
public class BecomeActor extends AbstractActorWithStash {
private final ActorRef actorRef;
private Boolean connected;

public BecomeActor(String refLocation) {
actorRef = context().actorFor(refLocation);
}

@Override
public Receive createReceive() {

return ReceiveBuilder.create()
.match(GetRequest.class, x -> stash())
.match(Connected.class, x -> {
context().become(ReceiveBuilder.create()
.match(GetRequest.class, b -> actorRef.tell("", self())).build().onMessage());
unstash();
}).match(Disconnect.class, x -> context().unbecome())
.match(TestConnected.class, x -> {
if (! connected) {
throw new ConnectionTimeoutException();
}
})
.build();
}


@Override
public void preStart() {
context().system()
.scheduler()

.scheduleOnce(
// 延迟执行的时间
Duration.create(1000, TimeUnit.MICROSECONDS),
self(),
new TestConnected(),
context().dispatcher(),
null
);
}
}

有限状态机

fsm中的类型有两个参数 状态和容器

首先 我们定义存在若干的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public enum Status {
/**
* 中断连接
*/
DISCONNECTED,
/**
* 连接
*/
CONNECTED,
/**
* 连接等待
*/
CONNECTED_PENDING
}

首先FSM 肯定存在有限的状态和状态的变迁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class BunchingActor extends AbstractFSM<Status,
LinkedBlockingQueue<GetRequest>> {
public BunchingActor() {
startWith(Status.DISCONNECTED, null);
//在离线的情况下
// 如果有清空的请求 -> 保持原状
// 如果有发送消息的请求 -> 把这个请求放到队列中
// 如果连接上了网络
// 请求队列中有没有内容 状态变迁到连接中
// 有内容 变迁到填充中状态中
when(Status.DISCONNECTED,
matchEvent(FlushMsg.class, (msg, container) -> stay())
.event(SendRequest.class, (msg, container) -> {
container.add(msg);
return stay();
})
.event(Tcp.Connected.class, (msg, container) -> {
if (Objects.isNull(container.peek())) {
return goTo(Status.CONNECTED);
} else {
return goTo(Status.CONNECTED_PENDING);
}
})
);
//在在线的情况下
// 如果有清空的请求 -> 保持原状(因为队列是空的, 不能进行清空)
// 如果有发送消息的请求 -> 保存到队列 同时将状态变迁到填充中的状态
when(Status.CONNECTED,
matchEvent(FlushMsg.class, (msg, container) -> stay())
.event(SendRequest.class, (msg, container) -> {
container.add(msg);
return goTo(Status.CONNECTED_PENDING);
})
.event(String.class, (msg, container) -> {
System.out.println(msg);
return stay();
}));
//在 填充中的状态
// 清空请求 -> 进行清空, 同时将状态变迁到连接中(没有内容可以被清空了)
when(Status.CONNECTED_PENDING,
matchEvent(FlushMsg.class, (msg, container) -> {
container.clear();
return stay();
}).event(SendRequest.class, (msg, container) -> {
container.add(msg);
return goTo(Status.CONNECTED_PENDING);
}));
}
}

刚出炉的一套面试题(JAVA岗)

你们在关于微服务间数据一致性问题,是如何解决的?

微服务下的数据一致性的几种实现方式之概述

传统应用的事务管理

本地事务

事务的特征

ACID原子(原子, 一致, 隔离, 持久)

并发事务问题

  • 脏读 (读到了其他事务没有提交的修改)
  • 不可重复读 (读到了其他事务提交的修改)
  • 幻影读 (做count操作的时候在同一事务中获得结果不一致)

    事务隔离级别

  • read uncommit
    可以读到没有提交的修改
  • read commit
    可以读到已经提交的修改
  • repeatable read
    可重复读, 在一个事务中
  • serializable
    串行化, 只有在一个事务完成之后再执行下一个事务

    事务和Connection 在JDBC的事务操作中,必须操作的是同一个Connection连接吗?

    当JDBC程序向DriverManagement获得一个Connection对象时,默认情况下这个Connection对象会自动向数据库提交在它上面发送的SQL语句.
    statement.executeQuery(). 然后就直接提交给数据库了.若想关闭这种默认提交方式,让多条SQL在一个事务中执行,可使用下列语句:
    1
    2
    3
    Connection.setAutoCommit(false); //  相当于start transaction
    Connection.rollback(); //rollback
    Connection.commit(); //commit
    再扩展下,我们直到SQL语句到达MySql之后会有针对SQL语句的编译缓存,也就是执行计划缓存,由于对SQL语句解析和执行计划分析是比较耗时的,于是如果同样的SQL语句发送过来,并且是同一个Connection的,其实也就是同一个TCP链接的,就可以直接从缓存中读取,(因为事务的等级是repeatable read)当Connection断开也就是TCP链接断开,缓存也就失效了。

在MyBatis中的事务管理有两种模式

1
2
org.apache.ibatis.transaction.jdbc.JdbcTransaction
org.apache.ibatis.transaction.managed.ManagedTraction

其实前者org.apache.ibatis.transaction.jdbc.JdbcTransaction就是封装了java.sql.Connection类的事务操作而已,禁止了自动提交模式

也就是说在数据库自己的事务层次,一次Connection的提交是处理基本单元,即认为Connection的一次提交就是一次事务。所以在ORM层次的MyBatis,封装JDBC Connection 的事务逻辑中,始终都是围绕着Connection类的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

package org.apache.ibatis.transaction.jdbc;

import java.sql.Connection;
import java.sql.SQLException;

import javax.sql.DataSource;

import org.apache.ibatis.logging.Log;
import org.apache.ibatis.logging.LogFactory;
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.Transaction;
import org.apache.ibatis.transaction.TransactionException;

/**
* {@link Transaction} that makes use of the JDBC commit and rollback facilities directly.
* It relies on the connection retrieved from the dataSource to manage the scope
* of the transaction.
* Delays connection retrieval until getConnection() is called.
* Ignores commit or rollback requests when autocommit is on.
*
* @see JdbcTransactionFactory
*/
/**
* Jdbc事务
* @author Clinton Begin
*/
public class JdbcTransaction implements Transaction {

private static final Log log = LogFactory.getLog(JdbcTransaction.class);

protected Connection connection; //数据库连接
protected DataSource dataSource; //数据源
protected TransactionIsolationLevel level;//事务隔离级别
protected boolean autoCommmit; //是否自动提交

public JdbcTransaction(DataSource ds, TransactionIsolationLevel desiredLevel, boolean desiredAutoCommit) {
dataSource = ds;
level = desiredLevel;
autoCommmit = desiredAutoCommit;
}

public JdbcTransaction(Connection connection) {
this.connection = connection;
}

@Override
public Connection getConnection() throws SQLException {
if (connection == null) {
openConnection();
}
return connection;
}

@Override
public void commit() throws SQLException {
if (connection != null && !connection.getAutoCommit()) { //连接非空且事务不是自动提交时
if (log.isDebugEnabled()) {
log.debug("Committing JDBC Connection [" + connection + "]");
}
connection.commit();
}
}

@Override
public void rollback() throws SQLException {
if (connection != null && !connection.getAutoCommit()) { //连接非空且事务不是自动提交时
if (log.isDebugEnabled()) {
log.debug("Rolling back JDBC Connection [" + connection + "]");
}
connection.rollback();
}
}

@Override
public void close() throws SQLException {
if (connection != null) {
resetAutoCommit(); //重置自动提交为true
if (log.isDebugEnabled()) {
log.debug("Closing JDBC Connection [" + connection + "]");
}
connection.close();
}
}

/**
* @description 设置是否自动提交事务
* @author xudj
* @date 2016年7月15日 下午5:37:37
* @param desiredAutoCommit
*/
protected void setDesiredAutoCommit(boolean desiredAutoCommit) {
try {
//和之前的不相等时进行设置
if (connection.getAutoCommit() != desiredAutoCommit) {
if (log.isDebugEnabled()) {
log.debug("Setting autocommit to " + desiredAutoCommit + " on JDBC Connection [" + connection + "]");
}
connection.setAutoCommit(desiredAutoCommit);
}
} catch (SQLException e) {
// Only a very poorly implemented driver would fail here,
// and there's not much we can do about that.
throw new TransactionException("Error configuring AutoCommit. "
+ "Your driver may not support getAutoCommit() or setAutoCommit(). "
+ "Requested setting: " + desiredAutoCommit + ". Cause: " + e, e);
}
}

/**
* @description 重置事务自动提交
* @author xudj
* @date 2016年7月15日 下午5:41:40
*/
protected void resetAutoCommit() {
try {
if (!connection.getAutoCommit()) {
// MyBatis does not call commit/rollback on a connection if just selects were performed.
// Some databases start transactions with select statements
// and they mandate a commit/rollback before closing the connection.
// A workaround is setting the autocommit to true before closing the connection.
// Sybase throws an exception here.
if (log.isDebugEnabled()) {
log.debug("Resetting autocommit to true on JDBC Connection [" + connection + "]");
}
connection.setAutoCommit(true);
}
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("Error resetting autocommit to true "
+ "before closing the connection. Cause: " + e);
}
}
}

/**
* @description 打开数据库连接
* @author xudj
* @date 2016年7月15日 下午1:55:38
* @throws SQLException
*/
protected void openConnection() throws SQLException {
if (log.isDebugEnabled()) { //先判断是否是debug模式,其它地方作用相同
log.debug("Opening JDBC Connection");
}
connection = dataSource.getConnection(); //获取数据库连接
if (level != null) {
//设置事务隔离级别,通过枚举进行获取
connection.setTransactionIsolation(level.getLevel());
}
setDesiredAutoCommit(autoCommmit); //设置是否自动提交
}

}

本地事务的的实现

传统单机应用使用一个RDBMS作为数据源。应用开启事务,进行CRUD,提交或回滚事务,统统发生在本地事务中,由资源管理器(RM)直接提供事务支持。数据的一致性在一个本地事务中得到保证。
Spring中有7中的事务传播级别, 用@Transactional(propagation = value) 注解进行标注, 这个标注记号可以使用在类或者方法上.

  • REQUIRED(没有事务创建事务, 有事务使用当前事务, 默认, 被设计成这个级别时, 会为每一个被调用的方法创建一个逻辑事务域. 如果前面的方法已经创建了事务,后面的方法支持当前的事务,如果当前没有事务会重新建立事务)
  • SUPPORTS (支持当前事务, 当前没有事务,就以非事务方式执行)
  • MANDATORY(该方法必须运行在一个事务中.如果当前没有事务正在发生,将抛出一个异常)
  • REQUIRES_NEW(新建事务, 如果当前存在事务,把当前事务挂起)
  • NOT_SUPPORTED(在没有事务的方式执行操作, 如果当前存在事务, 把当前事务挂起)
  • NEVER (在没有事务的情况下执行.如果一个事务正在运行,则抛出一个异常)
  • NESTED(支持当前事务, 新增SavePoint, 外层事务失败的时候全部回滚,内层事务失败的时候并不会发生回滚)

分布式事务

该部分内容全部抄袭自分布式系统理论基础 - 一致性、2PC和3PC

一致性

假设一个具有N个节点的分布式系统,当其满足以下条件时,我们说这个系统满足一致性:
全认同(agreement): 所有N个节点都认同一个结果
值合法(validity): 该结果必须由N个节点中的节点提出
可结束(termination): 决议过程在一定时间内结束,不会无休止地进行下去

但就这样看似简单的事情,分布式系统实现起来并不轻松,因为它面临着这些问题:

  • 消息传递异步无序(asynchronous): 现实网络不是一个可靠的信道,存在消息延时、丢失,节点间消息传递做不到同步有序(synchronous)
  • 节点宕机(fail-stop): 节点持续宕机,不会恢复
  • 节点宕机恢复(fail-recover): 节点宕机一段时间后恢复,在分布式系统中最常见
  • 网络分化(network partition): 网络链路出现问题,将N个节点隔离成多个部分
  • 拜占庭将军问题(byzantine failure): 节点或宕机或逻辑失败,甚至不按套路出牌抛出干扰决议的### 两阶段提交(2PC)

对照现实中的问题

我: 老王,今晚7点老地方,搓够48圈不见不散!
……
(第二天凌晨3点) 隔壁老王: 没问题! // 消息延迟
我: ……
我: 小张,今晚7点老地方,搓够48圈不见不散!
小张: No ……
(两小时后……)
小张: No problem! // 宕机节点恢复
我: ……
我: 老李头,今晚7点老地方,搓够48圈不见不散!
老李: 必须的,大保健走起! // 拜占庭将军
(这是要打麻将呢?还是要大保健?还是一边打麻将一边大保健……)
我: 老李头们,今晚7点老地方,搓够48圈不见不散!
老李头A们: 联系不上 一致采纳A的意见去西门
老李头B们: 联系不上 一致采纳B的意见去东门

我们把以上所列的问题称为系统模型(system model),讨论分布式系统理论和工程实践的时候,必先划定模型。例如有以下两种模型:

异步环境(asynchronous)下,节点宕机(fail-stop)
异步环境(asynchronous)下,节点宕机恢复(fail-recover)、网络分化(network partition)
2比1多了节点恢复、网络分化的考量,因而对这两种模型的理论研究和工程解决方案必定是不同的,在还没有明晰所要解决的问题前谈解决方案都是一本正经地耍流氓。
一致性还具备两个属性,一个是强一致(safety),它要求所有节点状态一致、共进退;一个是可用(liveness),它要求分布式系统24*7无间断对外服务。FLP定理(FLP impossibility) 已经证明在一个收窄的模型中(异步环境并只存在节点宕机),不能同时满足 safety 和 liveness。

2 Pharse Commit

2PC(tow phase commit)两阶段提交顾名思义它分成两个阶段,先由一方进行提议(propose)并收集其他节点的反馈(vote),再根据反馈决定提交(commit)或中止(abort)事务。我们将提议的节点称为协调者(coordinator),其他参与决议节点称为参与者(participants, 或cohorts):
第一阶段提议并且收集信息
第二阶段提交或者终止事务
在异步环境(asynchronous)并且没有节点宕机(fail-stop)的模型下,2PC可以满足全认同、值合法、可结束,是解决一致性问题的一种协议。但如果再加上节点宕机(fail-recover)的考虑,2PC是否还能解决一致性问题呢?
coordinator如果在发起提议后宕机,那么participant将进入阻塞(block)状态、一直等待coordinator回应以完成该次决议。这时需要另一角色把系统从不可结束的状态中带出来,我们把新增的这一角色叫协调者备份(coordinator watchdog)。coordinator宕机一定时间后,watchdog接替原coordinator工作,通过问询(query) 各participant的状态,决定阶段2是提交还是中止。这也要求 coordinator/participant 记录(logging)历史状态,以备coordinator宕机后watchdog对participant查询、coordinator宕机恢复后重新找回状态。

从coordinator接收到一次事务请求、发起提议到事务完成,经过2PC协议后增加了2次RTT(propose+commit),带来的时延(latency)增加相对较少。

3 Pharse Commit

3PC(three phase commit)即三阶段提交,既然2PC可以在异步网络+节点宕机恢复的模型下实现一致性,那还需要3PC做什么,3PC是什么鬼?

在2PC中一个participant的状态只有它自己和coordinator知晓,假如coordinator提议后自身宕机,在watchdog启用前一个participant又宕机,其他participant就会进入既不能回滚、又不能强制commit的阻塞状态,直到participant宕机恢复。这引出两个疑问:

能不能去掉阻塞,使系统可以在commit/abort前回滚(rollback)到决议发起前的初始状态
当次决议中,participant间能不能相互知道对方的状态,又或者participant间根本不依赖对方的状态

相比2PC,3PC增加了一个准备提交(prepare to commit)阶段来解决以上问题:

enter description here

coordinator接收完participant的反馈(vote)之后,进入阶段2,给各个participant发送准备提交(prepare to commit)指令。participant接到准备提交指令后可以锁资源,但要求相关操作必须可回滚。coordinator接收完确认(ACK)后进入阶段3、进行commit/abort,3PC的阶段3与2PC的阶段2无异。协调者备份(coordinator watchdog)、状态记录(logging)同样应用在3PC。

participant如果在不同阶段宕机,我们来看看3PC如何应对:

  1. 阶段1: coordinator或watchdog未收到宕机participant的vote,直接中止事务;宕机的participant恢复后,读取logging发现未发出赞成vote,自行中止该次事务
  2. 阶段2: coordinator未收到宕机participant的precommit ACK,但因为之前已经收到了宕机participant的赞成反馈(不然也不会进入到阶段2),coordinator进行commit;watchdog可以通过问询其他participant获得这些信息,过程同理;宕机的participant恢复后发现收到precommit或已经发出赞成vote,则自行commit该次事务
  3. 阶段3: 即便coordinator或watchdog未收到宕机participant的commit ACK,也结束该次事务;宕机的participant恢复后发现收到commit或者precommit,也将自行commit该次事务
    因为有了准备提交(prepare to commit)阶段,3PC的事务处理延时也增加了1个RTT,变为3个RTT(propose+precommit+commit),但是它防止participant宕机后整个系统进入阻塞态,增强了系统的可用性,对一些现实业务场景是非常值得的。

    微服务事务管理

  4. 由于微服务间无法直接进行数据访问,微服务间互相调用通常通过RPC(dubbo)或Http API(SpringCloud)进行,所以已经无法使用TM统一管理微服务的RM。
  5. 不同的微服务使用的数据源类型可能完全不同,如果微服务使用了NoSQL之类不支持事务的数据库,则事务根本无从谈起。
  6. 即使微服务使用的数据源都支持事务,那么如果使用一个大事务将许多微服务的事务管理起来,这个大事务维持的时间,将比本地事务长几个数量级。如此长时间的事务及跨服务的事务,将为产生很多锁及数据不可用,严重影响系统性能。

BASE理论由eBay的架构师Dan Pritchett提出,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性,应用应该可以采用合适的方式达到最终一致性。BASE是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。

BASE中的最终一致性是对于微服务下的事务管理的根本要求,既基于微服务的事务管理无法达到强一致性,但必须保证最重一致性。那么,有哪些方法可以保证微服务下的事务管理的最终一致性呢,按照实现原理分主要有两类,事件通知型和补偿型,其中事件通知型又可分为可靠事件通知模式及最大努力通知模式,而补偿型又可分为TCC模式、和业务补偿模式两种。这四种模式都可以达到微服务下的数据最终一致性。

理想化事情
错误的回滚1
错误的回滚2

本地事件服务

本地事件服务

  1. 当业务执行时,在同一个本地事务中将事件写入本地事件表,同时投递该事件,如果事件投递成功,则将该事件从事件表中删除。
  2. 如果投递失败,则使用事件服务定时地异步统一处理投递失败的事件,进行重新投递,直到事件被正确投递,并将事件从事件表中删除。这种方式最大可能地保证了事件投递的实效性,并且当第一次投递失败后,也能使用异步事件服务保证事件至少被投递一次。

    外部事件服务

要创建事件服务

业务服务在提交前,向事件服务发送事件,事件服务只记录事件,并不发送。业务服务在提交或回滚后通知事件服务,事件服务发送事件或者删除事件。不用担心业务系统在提交或者会滚后宕机而无法发送确认事件给事件服务,因为事件服务会定时获取所有仍未发送的事件并且向业务系统查询,根据业务系统的返回来决定发送或者删除该事件。

可靠事件模式需要注意的有两点,

  1. 事件的正确发送;
  2. 事件的重复消费。
    通过异步消息服务可以确保事件的正确发送,然而事件是有可能重复发送的,那么就需要消费端保证同一条事件不会重复被消费,简而言之就是保证事件消费的幂等性。

如果事件本身是具备幂等性的状态型事件,如订单状态的通知(已下单、已支付、已发货等),则需要判断事件的顺序。
一般通过时间戳来判断,既消费过了新的消息后,当接受到老的消息直接丢弃不予消费。如果无法提供全局时间戳,则应考虑使用全局统一的序列号。
对于不具备幂等性的事件,一般是动作行为事件,如扣款100,存款200,则应该将事件id及事件结果持久化,在消费事件前查询事件id,若已经消费则直接返回执行结果;若是新消息,则执行,并存储执行结果。

最大努力通知模式

最大努力通知型的特点是,业务服务在提交事务后,进行有限次数(设置最大次数限制)的消息发送,比如发送三次消息,若三次消息发送都失败,则不予继续发送。所以有可能导致消息的丢失。

业务补偿模式

补偿模式比起事件通知模式最大的不同是,补偿模式的上游服务依赖于下游服务的运行结果,而事件通知模式上游服务不依赖于下游服务的运行结果。首先介绍业务补偿模式,业务补偿模式是一种纯补偿模式,其设计理念为,业务在调用的时候正常提交,当一个服务失败的时候,所有其依赖的上游服务都进行业务补偿操作
小明从杭州出发,去往美国纽约出差,现在他需要定从杭州去往上海的火车票,以及从上海飞往纽约的飞机票。如果小明成功购买了火车票之后发现那天的飞机票已经售空了,那么与其在上海再多待一天,小明还不如取消去上海的火车票,选择飞往北京再转机纽约,所以小明就取消了去上海的火车票。这个例子中购买杭州到上海的火车票是服务a,购买上海到纽约的飞机票是服务b,业务补偿模式就是在服务b失败的时候,对服务a进行补偿操作,在例子中就是取消杭州到上海的火车票。

补偿模式要求每个服务都提供补偿借口,且这种补偿一般来说是不完全补偿,既即使进行了补偿操作,那条取消的火车票记录还是一直存在数据库中可以被追踪(一般是有相信的状态字段“已取消”作为标记),毕竟已经提交的线上数据一般是不能进行物理删除的。

补偿模式要求每个服务都提供补偿借口,且这种补偿一般来说是不完全补偿,既即使进行了补偿操作,那条取消的火车票记录还是一直存在数据库中可以被追踪(一般是有相信的状态字段“已取消”作为标记),毕竟已经提交的线上数据一般是不能进行物理删除的。

TCC/Try Confirm Cancel模式

TCC模式是一种优化了的业务补偿模式,它可以做到完全补偿,既进行补偿后不留下补偿的纪录,就好像什么事情都没有发生过一样。同时,TCC的软状态时间很短,原因是因为TCC是一种两阶段型模式(已经忘了两阶段概念的可以回顾一下1.2.1),只有在所有的服务的第一阶段(try)都成功的时候才进行第二阶段确认(Confirm)操作,否则进行补偿(Cancel)操作,而在try阶段是不会进行真正的业务处理的。
TCC业务逻辑
和2PC很像

  1. Try,业务服务完成所有的业务检查,预留必需的业务资源
  2. 如果Try在所有服务中都成功,那么执行Confirm操作,Confirm操作不做任何的业务检查(因为try中已经做过),只是用Try阶段预留的业务资源进行业务处理;否则进行Cancel操作,Cancel操作释放Try阶段预留的业务资源.

example:
服务a(小明从招行转出100元):
try: update cmb_account set balance=balance-100, freeze=freeze+100 where acc_id=1 and balance>100;
confirm: update cmb_account set freeze=freeze-100 where acc_id=1;
cancel: update cmb_account set balance=balance+100, freeze=freeze-100 where acc_id=1;
服务b(小明往广发银行汇入100元):
try: update cgb_account set freeze=freeze+100 where acc_id=1;
confirm: update cgb_account set balance=balance+100, freeze=freeze-100 where acc_id=1;
cancel: update cgb_account set freeze=freeze-100 where acc_id=1;
具体说明:
a的try阶段,服务做了两件事,1:业务检查,这里是检查小明的帐户里的钱是否多余100元;2:预留资源,将100元从余额中划入冻结资金。
a的confirm阶段,这里不再进行业务检查,因为try阶段已经做过了,同时由于转账已经成功,将冻结资金扣除。
a的cancel阶段,释放预留资源,既100元冻结资金,并恢复到余额。
b的try阶段进行,预留资源,将100元冻结。
b的confirm阶段,使用try阶段预留的资源,将100元冻结资金划入余额。
b的cancel阶段,释放try阶段的预留资源,将100元从冻结资金中减去。
从上面的简单例子可以看出,TCC模式比纯业务补偿模式更加复杂,所以在实现上每个服务都需要实现Cofirm和Cancel两个接口。

内容抄袭自
彻底理解Java的Future模式
深度学习Java Future (一)
深度学习Java Future (二)
抄完了就假装自己会了 = =… #爽文#

Runnable 和 Callable<V>

runnable 中的void run();方法和 Callable 中的V call() throws Exception; 描述了要执行的任务的逻辑. 这些逻辑要在其他的线程中执行. 这个两个的区别是是否会有返回值. FutureTask 相当于是结果的提货券.

1
2
3
4
5
6
7
// runnable
new Thread(() -> System.out.println("hello world")).start();

//callable
FutureTask<String> futureTask = new FutureTask<>(() -> "this is Future");
new Thread(futureTask).start();
String result = futureTask.get();

FutureTask

预备知识

Treiber Stack

wikipedia blogger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ConcurrentStack<E> {
AtomicReference<Node<E>> top = new AtomicReference<>();

public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;

for(;;) {
oldHead = top.get();
newHead.next = oldHead;
if (top.compareAndSet(oldHead, newHead)) {
break;
}
}
}

public E pop() {
Node<E> oldHead = null;
Node<E> newHead = null;

for(;;) {
oldHead = top.get();
if (Objects.isNull(oldHead)) {
return null;
}
newHead = oldHead;
if (top.compareAndSet(oldHead, newHead)) {
break;
}
}
return oldHead.item;
}
private static class Node<E> {
public final E item;
public Node<E> next;

public Node(E item) {
this.item = item;
}
}
}

LockSupport

Java的LockSupport.park()实现分析

1
2
public native void unpark(Thread jthread);
public native void park(boolean isAbsolute, long time);

unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。
比如线程B连续调用了三次unpark函数,当线程A调用park函数就使用掉这个“许可”,如果线程A再次调用park,则进入等待状态。
注意,unpark函数可以先于park调用.比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。
实际上,park函数即使没有“许可”,有时也会无理由地返回,这点等下再解析。

每个java线程都有一个Parker实例,Parker类是这样定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...
}

在Parker类里的_counter字段,就是用来记录所谓的“许可”的。

当调用park时,先尝试直接能否直接拿到“许可”,即_counter>0时,如果成功,则把_counter设置为0,并返回:

1
2
3
4
5
6
7
8
9
10
11
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().


// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
// 原子操作
if (Atomic::xchg(0, &_counter) > 0) return;

如果不成功,则构造一个ThreadBlockInVM,然后检查_counter是不是>0,如果是,则把_counter设置为0,unlock mutex并返回:

1
2
3
4
ThreadBlockInVM tbivm(jt);
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);

ThreadBlockInVM 从Java到JVM到OS线程睡眠
前面说到 ThreadBlockInVM 会检查当前线程用不用进入 safepoint,它主要的逻辑如下:

  • 首先设置 Java 线程状态,将状态加一,由 _thread_in_vm = 6 变为 _thread_in_vm_trans = 7,从“运行vm本身代码”到“相应的过度状态”。
  • os::is_MP() 用于判断计算机系统是否为多核系统,多核情况下需要做内存屏障处理,这是为了让每个线程都能实时同步状态。
  • 内存屏障有两种方式,一种是 rderAccess::fence() ,它的实现是直接通过CPU指令来实现,汇编指令为 __asm__ volatile (“lock; addl $0,0(%%rsp)” : : : “cc”, “memory”); ,这种方式代价比较大。而另外一种为 InterfaceSupport::serialize_memory ,由 JVM 模拟实现,效率高一点。
  • 调用 SafepointSynchronize::block 尝试在该安全点进行阻塞。
  • 设置 Java 线程状态为 _thread_blocked ,即阻塞。

如果此时的count 的值是0,当前这个线程已经被pack了, 那么判断等待的时间,然后再调用pthread_cond_wait函数等待,如果等待返回,则把_counter设置为0,unlock mutex并返回:

1
2
3
4
5
6
7
if (time == 0) {
status = pthread_cond_wait (_cond, _mutex) ;
}
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
OrderAccess::fence();

当unpark时,则简单多了,直接设置_counter为1,再unlock mutext返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}


FutureTask 代码

类图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   /**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
* /
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

state 是运行的状态, callable 是要执行的逻辑, 它会在其run方法中被执行 outcome 是结果. waiters字段则代表阻塞在该Future上的线程链表.

1
2
3
4
5
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

get()/get(timeout)在task处于非完成状态时是需要阻塞等待的,如果多个线程进行get操作,显然需要一个链表/队列来维护这些等待线程,这就是waiters的意义所在。

最核心的get方法, 如果当前的状态小于等于COMPLETING 则进入awaitDone方法来进行等待任务执行完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
/**
* compareAndSwapObject(Object var1, long var2, Object var3, Object var4)
* var1 操作的对象
* var2 操作的对象属性
* var3 var2与var3比较,相等才更新
* var4 更新值
* waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters")
* 用了CAS 相当于 把当前的线程的信息写入到waiters中, 失败了就会到这个for中然后还是继续到这个if分支, 继续做CAS 直到成功.
*/
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
  • 如果当前的线程被中断, 那么将所有等待该Future上的线程都从阻塞链表中删除
  • 如果当前的状态是确定的状态的时候 就将当前的状态返回
  • 如果当前的状态是 Complete 那就yalid 让出执行的权限
  • 如果q 是null 当然 最初的时候 q 就是null 的 要初始化
  • 把当前的线程放到waiters 中(在第一次执行的时候)
  • 如果配置了等待时间 要判断是否超时了

为什么要这样设计呢

FutureTask 中的Run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
   public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
// 通过CAS的方式唤醒每一个被pack的线程,将他们unpack
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

CompleteableFuture

首先 如何生成一个CompletableFuture

1
2
3
4
5
6
7
8
9
10
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)

之后就是CompletableFuture 的骚操作了, 非常函数式编程

  • thenApply
    将前续结果作为函数的入参
    • thenAccept
      消耗前续产生的结果
    • thenRun
      前序步骤完成之后 执行一个runnable 任务
    • thenCombine

响应式四准则

  1. 灵敏性
    立刻进行响应
  2. 伸缩性
    方便进行扩展
  3. 容错性
    从容的对错误进行响应, 不会对系统的其他地方造成影响
  4. 事件驱动
    何时 何地 如何对请求作出响应. 允许对响应的组件进行路由和负载均衡.

    剖析Actor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.akkademy.pingpong;

    import akka.actor.AbstractActor;
    import akka.actor.ActorRef;
    import akka.actor.Status;
    import akka.japi.pf.ReceiveBuilder;

    /**
    * @author dafuchen
    * 2019-02-17
    */
    public class JavaPongActor extends AbstractActor {
    @Override
    public Receive createReceive() {
    return ReceiveBuilder.create()
    .matchEquals("Ping", s ->
    sender().tell("Pong", ActorRef.noSender()))
    .matchAny(x ->
    sender().tell(
    new Status.Failure(new Exception("unknown message")), self()))
    .build();
    }
  • AbstractActor
    这里使用了模版模式, 在Abstract 里面就已经定义好了一些方法, 我们要做的就是实现每一个Actor (行为) 要做的事情就好了.
  • match
    • match(class, function)
      match(String.class, s -> retrun dosomething)
    • match(class, predicate, function)
      match(String.class, s -> Objects.equals(s, “Ping”), s -> return something)
    • matchEquals(Object, function)
      match(“Ping”, s -> return something)
    • matchAny(function)
      此处最佳的实践应该是抛出错误

match函数的匹配模式是从上到下的方式进行模式匹配. 所以可以从特殊定义到一般

  • tell
    sender()函数回返回一个ActorRef. 在上面的代码中我们使用了 tell(). tell是最基本的单向消息传输模式.第一个参数是我们想要发送到对方信箱的信息, 第二个参数消息的发送者

    Actor的创建

    1
    ActorRef actor = actorSystem.actorOf(Props.create(JavaPongActor.class));
    我们创建了一个Actor 这个Actor 的Reference 是***

    Props

    1
    Props.create(PongActor.class, Object... objects);

    Promise、Future和事件驱动的编程模型

    阻塞与事件驱动API

    1
    2
    3
    4
    5
    6
    7
    8
    Connection connection = DriverManager.getConnection("1");
    PreparedStatement preparedStatement = connection.prepareStatement(sql);
    preparedStatement.setString(0, pre);

    ResultSet resultSet = preparedStatement.executeQuery();
    while (resultSet.next()) {
    Integer a = resultSet.getInt(columnName);
    }
    调用executeQuery 发起调用的线程必须等待数据库查询的完成. 否则这个线程回一直在阻塞的过程中.如果系统的并发很大的话,很快所有的线程因为这个原因都会处在等待IO的过程中
    如果使用事件模型的话
    1
    2
    CompletableFuture<String> userNameFuture = getUserNameFromDatabaseAsync(userId);
    userNameFuture.thenAccept(userName -> System.out.println(userName));
    从线程的角度来看, 代码首先会调用这个方法, 然后会进入这个方法内部, 然后立刻返回一个 Future/CompleteableFuture. 返回的就只是一个占位符,真正的值在未来的某个时候会返回到这个占位符内, 数据库的调用和生成是在另外的线程上执行的, 不影响主线程的执行, 试想一下本来我们在一个循环中有50个数据库连接动作要执行, 平均一次所要的时间是m 那个共计花的时间是50m, 如果用下面的方法去执行的话 那么可能需要的时候就是m, 花的时间就要大大的优化了
1
2
3
4
5
6
7
8
9
List<CompletableFuture<ContractAuditResponseDTO>> futures = contractAuditReportList.stream().map(auditReport ->
CompletableFuture.supplyAsync(() -> {
// create contractAuditResponseDTO
}).thenApplyAsync(contractAuditResponseDTO -> {
// file the other part
return contractAuditResponseDTO;
})
).collect(Collectors.toList());
List<ContractAuditResponseDTO> contractAuditResponseDTOS = Futures.sequence(futures).get();

流程

使用Future 进行响应的Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package akkademy.message;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.akkademy.pingpong.JavaPongActor;
import org.junit.Test;
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static akka.pattern.Patterns.ask;
import static scala.compat.java8.FutureConverters.toJava;

/**
* @author dafuchen
* 2019-02-17
*/
public class JavaPongActorTest {
ActorSystem actorSystem = ActorSystem.create();
ActorRef actorRef = actorSystem.actorOf(Props.create(JavaPongActor.class));
@Test
public void shouldReplyToPingWithPong() throws Exception {
Future sFuture = ask(actorRef, "Ping", 1000);
final CompletionStage<String> cs = toJava(sFuture);
final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;
assert (jFuture.get(1000, TimeUnit.MILLISECONDS).equals("Pong"));
}
}
  1. 创建一个ActorSystem
  2. 向Actor询问其对于某个消息的响应.

理解Future 和Promise

现代化的Future隐式的处理了两种情况 失败延迟.

对返回的结果进行异步的转换

操作 语法
Transfer value thenApply
Transfer value async thenCompose
Return value if error exceptionally
return value async if error handle((t, x) -> )

对失败进行处理

在失败的情况下执行代码

在Java8 中没有面向用户的用于失败处理的方法,所以我们使用handle的方式进行处理

1
2
3
4
5
6
7
// aksPong 返回的是completeStage 的子类
askPong("cause error").handle((x, t) -> {
if(t != null){
System.out.println("Error: " + t);
}
return null;
});

从失败中恢复

1
2
3
4
CompletionStage<String> cs = askPong("cause error")
.exceptionally(t -> {
return "default";
});

异步的从失败中恢复

1
2
3
4
askPong("cause error").handle( (pong, ex) -> ex == null
? CompletableFuture.completedFuture(pong)
: askPong("Ping")
).thenCompose(x -> x);

链式操作

1
2
3
4
askPong("Ping")
.thenCombine(askPong("Ping"), (a,b) -> {
return a + b; //"PongPong"
});

消息传递

  1. tell
    向Actor 发送一条消息。所有发送至sender()的响应都会返回给发送消息的Actor。
  2. ask
    向Actor 发送一条消息,返回一个Future。当Actor 返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。
    toJava(ask(actorSelection, new SetRequest(“key”, “value”), 2000));
  3. forward
    将接收到的消息再发送给另一个Actor。所有发送至sender()的响应都
    会返回给原始消息的发送者。
  4. pipe
    用于将Future 的结果返回给sender()或另一个Actor。如果正在使用Ask或是处理一个Future,那么使用Pipe 可以正确地返回Future 的结果。

    消息是不可变的

    因此,无论什么时候,只要需要在线程之间共享数据,就应该首先考虑将数据定义为不可变。既然不可变就不用考虑保存临界值.

    Ask 模式

    在调用ask 向Actor 发起请求时,Akka 实际上会在Actor 系统中创建一个临时Actor。接收请求的Actor 在返回响应时使用的sender()引用就是这个临时Actor。当一个Actor接收到ask 请求发来的消息并返回响应时,这个临时Actor 会使用返回的响应来完成Future. 就是说这个Future和系统创建的actor其实是绑定的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class AskDemoArticleParser extends AbstractActor {
private final ActorSelection cacheActor;
private final ActorSelection httpClientActor;
private final ActorSelection articleParseActor;
private final Timeout timeout;

public AskDemoArticleParser(String cacheActor,
String httpClientActor,
String articleParseActor,
Timeout timeout) {

this.cacheActor = context().actorSelection(cacheActor);
this.httpClientActor = context().actorSelection(httpClientActor);
this.articleParseActor = context().actorSelection(articleParseActor);
this.timeout = timeout;
}

@Override
public Receive createReceive() {
final ActorRef senderRef = sender();

return ReceiveBuilder.create().match(ParseArticle.class, msg -> {
toJava(ask(cacheActor, new GetRequest(msg.getUrl()), timeout)).handle((x, t) ->
! Objects.isNull(x)
? CompletableFuture.completedFuture(x)
: toJava(ask(httpClientActor, msg.getUrl(), timeout))
.thenCompose(rawArticle -> toJava(ask(articleParseActor, new ParseHtmlArticle(msg.getUrl(),
((HttpResponse) rawArticle).getBody()), timeout)))
).handle((x, t) -> {
if (Objects.isNull(x)) {
if (x instanceof ArticleBody) {
String body = ((ArticleBody) x).getBody();

cacheActor.tell(body, self());
senderRef.tell(body, self());
} else {
senderRef.tell(new Status.Failure((Throwable)t), self());
return null;
}
}
return null;
});
}).build();

}
}

在另一个执行上下文中执行回调函数

必须设置超时参数

超时错误的栈追踪信息并没有用

另一点需要注意的是:如果Actor 抛出了一个意料之外的异常,而没有返回错误,那么这个错误看上去会像是由于超时引起的,但是实际上却另有原因。
从这里总结出的经验就是:当代码中发生错误时,一定要返回失败消息。如果一个Actor 抛出了异常那么它是不会返回消息的在Actor 中,代码的编写者负责实现所有的消息处理逻辑:如果某个Actor 需要进行响应,Akka 是不会隐式地做任何响应的。当需要返回响应时,我们必须自己对收到的消息进行响应

Ask的额外性能开销

  1. 首先,ask 会导致Akka在/temp 路径下新建一个临时Actor。这个临时Actor 会等待从接收ask 消息的Actor 返回的响应。
  2. 其次,Future 也有额外的性能开销。Ask 会创建Future,由临时Actor 负责完成。

    Tell 模式

    Tell 是ActorRef/ActorSelection 类的一个方法
    actor.tell(message, self())
    表示消息从内部进行发送
    actor.tell(“message”, akka.actor.ActorRef.noSender());
    消息从外部进行发送

tell 使用的是fire-and-forget 模式 我把消息发送了 然后我就不管了. 对于结果我不关心.

而ask相当于是ticket模式.我这里有一个结果的取货单.

但是我们如果需要关注结果那要怎么办呢? 在内部建立建立一个Actor(姑且叫做innerActor), 由它做FSM(有限状态机器), 对收到的消息作出响应, 比方说innerActor给步骤1发送任务启动指令,完成后返回步骤1完成response,然后再对步骤2发送任务启动指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package top.andrewchen1.chapter3;

import akka.actor.*;
import akka.japi.pf.ReceiveBuilder;
import akka.util.Timeout;
import top.andrewchen1.chapter2.db.GetRequest;
import top.andrewchen1.chapter2.db.SetRequest;

import java.util.concurrent.TimeoutException;

/**
* @author dafuchen
* 2019-03-17
*/
public class ArticleTellParserActor extends AbstractActor {
private final ActorSelection cacheActor;
private final ActorSelection httpClientActor;
private final ActorSelection articleParseActor;
private final Timeout timeout;

public ArticleTellParserActor (String cacheActor,
String httpClientActor,
String articleParseActor,
Timeout timeout) {
this.cacheActor = context().actorSelection(cacheActor);
this.httpClientActor = context().actorSelection(httpClientActor);
this.articleParseActor = context().actorSelection(articleParseActor);
this.timeout = timeout;
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ParseArticle.class, message -> {
ActorRef extraActor = buildExtraActor(sender(), message.getUrl());
cacheActor.tell(new GetRequest(message.getUrl()), extraActor);
httpClientActor.tell(message.getUrl(), extraActor);
context().system().scheduler().scheduleOnce(
timeout.duration(),
extraActor,
"timeout",
context().system().dispatcher(),
ActorRef.noSender()
);
})
.build();
}
private ActorRef buildExtraActor(ActorRef senderRef, String uri) {
class MyActor extends AbstractActor {

@Override
public Receive createReceive() {
return ReceiveBuilder.create().matchEquals(String.class, x -> x.equals("timeout"), x -> {
senderRef.tell(new Status.Failure(new TimeoutException("timeout")), self());
context().stop(self());
})
.match(HttpResponse.class, httpResponse -> {
articleParseActor.tell(new ParseHtmlArticle(uri, httpResponse.getBody()), self());
}).match(String.class, body -> {
senderRef.tell(body, self());
context().stop(self());
}).match(ArticleBody.class, articleBody -> {
cacheActor.tell(new SetRequest(articleBody.getUri(), articleBody.getBody()), self());
context().stop(self());
}).matchAny(t ->
System.out.println("ignoring msg" + t.getClass()))
.build();
}
}
return context().actorOf(Props.create(MyActor.class, MyActor::new));
}
}

在上面的代码中 innerActor 需要作出

  1. 出现超时的话 停止自身 并且告诉消息发送者超时了
  2. 出现HttpResponse, 说明获取到了原文 需要进行解析
  3. 出现除了“timeout”的String值, 说明获取到了内容(无论是从cache还是从parse获得的)
  4. 出现Articlebody, 说明出现了解析完成的Article 需要进行cache

Forward

actor.forward(message, context());

forward 的就是将消息转发给一个另外一个actor. tell(message, self()) 也可以完成一样的任务. 但是forward的语意更加的清晰.

Pipe

pipe 和ask的语音相似

1
pipe(future, system.dispatcher()).to(sender());

年糕的爸爸在B乎上开了专栏前路迢迢,教麻瓜使用Akka和Clojure. 然后就上船了.但是 akka 和clojure 是个什么鬼.但是应该是永远也学会不会的吧. #从入门到放弃# #嘛哩嘛哩哄#

什么是Actor

在我的理解里面Actor 就是固定的一个流程(process)

Actor做什么

  • 发送

    在Receiver中可以向sender, 向任意的Actor (只要能找到其ActorRef) 发送消息. 消息是fire and forget的 除了收到消息的Actor 发送一个消息给发送方, 发送方是不知道响应的状态的.

    Actor 一次只能接受一个信息,如果多个Actor 同时修改修改一个信息, 那么最终的结果随着时间的交织而变化

  • 创建

    Actor 可以创建其他的Actor. 被创建的Actor被称为该Actor的子节点, 行为受其监督.

  • 改变

    状态机是保持是保证系统在特定的状态时执行特定功能的有力工具. 根据返回的不同的结果,匹配到不同的match函数内中

  • 监督

    父节点在子节点崩溃后根据崩溃的原因作出重试等不同的逻辑

Actor 和消息传递

  • Actor
    Actor 里面规定了它要处理怎么样的信息, 要怎么处理信息(match). 一个Actor在没有创建router的情况下在ActorSystem中只有一个实例. 就是一个逻辑处理.
  • 消息
    用于跨进程(多个Actor之间)通信的数据 主要通过ask(actorRef, message), sender().tell() 方法来
  • 消息传递
    一种软件开发范式, 通过消息来触发各种行为,而不是直接触发行为(就如同是Dispatcher invoke 各种的controller 一样)
  • 邮箱地址
    消息传输的目标地址 akka://myTestActorSystem@127.0.0.1:2552/user/nodeName
  • 邮箱
    可以认为是一个消息队列

Actor 和对象的不同之处是不能被直接读取、修改或者是调用. (调用也没有用 里面就是一堆消息处理的逻辑. )

监督和容错机制

在出错的时候,父亲节点会根据子节点错误的类型不同,对子节点采取不用的恢复策略.

创建一个项目

在之前的我们可能使用Typesafe的 activator 项目来创建一个新的项目.现在推荐使用直接从网上下载的方式获得起始的项目.

Akka 101

  1. 创建消息体, 通常我们让消息在传递的过程中保持Immutable(多线程设计的一种设计模式) 如此数据就不会发生变化, 保证安全.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    package com.akkademy.messages;

    /**
    * @author dafuchen
    * 2019-02-17
    */
    public class SetRequest {
    private final String key;
    private final Object value;

    public SetRequest(String key, Object value) {
    this.key = key;
    this.value = value;
    }

    public String getKey() {
    return key;
    }

    public Object getValue() {
    return value;
    }
    }
  2. 创建处理的流程或者说行为(Actor)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    package com.akkademy.messages;

    import akka.actor.AbstractActor;
    import akka.event.Logging;
    import akka.event.LoggingAdapter;
    import akka.japi.pf.ReceiveBuilder;

    import java.util.HashMap;
    import java.util.Map;

    /**
    * @author dafuchen
    * 2019-02-17
    */
    public class AkkademyDb extends AbstractActor {
    // 创建一个logger
    private final LoggingAdapter log = Logging.getLogger(context().system(), this);
    protected final Map<String, Object> map = new HashMap<>();

    @Override
    public Receive createReceive() {
    // 创建一个Receiver.
    // 如果发送过来的消息是*类型的话 进行怎么样的处理
    // 否则 怎么样
    return ReceiveBuilder.create()
    .match(SetRequest.class, message -> {
    log.info("received Set request {}", message);
    map.put(message.getKey(), message.getValue());
    }).matchAny( o -> {
    log.info("received unknown message {}", o);
    }).build();
    }

    public Map<String, Object> getMap() {
    return map;
    }
    }
  3. test case
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    package akkademy.message;

    import akka.actor.ActorRef;
    import akka.actor.ActorSystem;
    import akka.actor.Props;
    import akka.testkit.TestActorRef;
    import com.akkademy.messages.AkkademyDb;
    import com.akkademy.messages.SetRequest;
    import org.junit.Test;

    import static org.junit.Assert.assertEquals;

    /**
    * @author dafuchen
    * 2019-02-17
    */
    public class AkkademyDbTest {
    ActorSystem system = ActorSystem.create();

    @Test
    public void isShouldPlaceKeyValueFromSetMessageInfo() {
    TestActorRef<AkkademyDb> actorRef = TestActorRef.create(system, Props.create(AkkademyDb.class));
    actorRef.tell(new SetRequest("key", "value"), ActorRef.noSender());

    AkkademyDb akkademyDb = actorRef.underlyingActor();
    assertEquals(akkademyDb.getMap().get("key"), "value");
    }
    }

  • 创建Akka System

    1
    ActorSystem.create();
  • 创建Actor

    1
    TestActorRef.create(system, Props.create(AkkademyDb.class));

    在程序之中得到Ref 的方法一般是

    1
    context().system().actorOf(Props.create(SetRequest.class, "key", "values"));
  • 向Actor 的地址发送信息

    1
    actorRef.tell(new SetRequest("key", "value"), ActorRef.noSender());
    • 获得这个Ref 下指向的 Actor

      1
      AkkademyDb akkademyDb = actorRef.underlyingActor();
    • 做Assert 进行比较

google jib容器打包工具试用

Building Docker Image for a Spring Boot App With Jib

使用 Jib 生成 Java Docker 镜像

github GoogleContainerTools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<build>
<finalName>suidifu-discovery-eureka</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>0.9.11</version>
<configuration>
<from>
<image>registry.hub.docker.com/openjdk:8-jdk-alpine</image>
</from>
<to>
<image>registry.cn-hangzhou.aliyuncs.com/andrewchen/spring-boot-example</image>
<auth>
<username>chendufu</username>
<password>habwaD-wiske5-jecxyc</password>
</auth>
</to>
<container>
<jvmFlags>
<jvmFlag>-Xms256M</jvmFlag>
</jvmFlags>
<mainClass>com.suidifu.discovery.eureka.SuidifuDiscoveryEurekaApplication</mainClass>
<ports>
<port>8080</port>
</ports>
</container>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

枚举类型

创建 enmu时, 编译器会为你生成一个相关的类, 这个类继承自java.lang.Enum. 但是这个新生成的类是final的,不能为继承

如果在emum()实例上调用getDeclaringClass()方法,我们就能知道其所属的enum()类.

向enum 中添加新方法

如果打算定义自己的方法,那么必须在enum实例序列的最后添加一个分号.同时必须先定义enum实例.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public enum Color {
RED(255, 0, 0);
Color(int r, int b, int g) {
this.r = r;
this.b = b;
this.g = g;
}
private int r;
private int b;
private int g;

public int getRed() {
return r;
}
//... etc

@Override
public String toString() {
return "the rbg is {" + r +", " + b + ", " + g + "}";
}
}

switch 语句中的enum

实际上,在编译器中会通过ordinal()方法取得其次序.

values()的神秘之处

通过反射我们会发现Enum类,它并没有values()方法.

values方法是由编译器添加的static方法.编译器还会为其添加了valueOf()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public enum Main {
RED(255, 0, 0);
Main(int r, int b, int g) {
this.r = r;
this.b = b;
this.g = g;
}
private int r;
private int b;
private int g;

public int getRed() {
return r;
}
//... etc

@Override
public String toString() {
return "the rbg is {" + r +", " + b + ", " + g + "}";
}
}

经过 javap Main 之后之后

1
2
3
4
5
6
7
8
public final class Main extends java.lang.Enum<Main> {
public static final Main RED;
public static Main[] values();
public static Main valueOf(java.lang.String);
public int getRed();
public java.lang.String toString();
static {};
}

由于擦除效应,反编译无法得到Enum的完整信息,所以它展示的Main的父亲类只是一个原始的Enum, 而并非事实上的Enum.

由于values()方法是由编译器插入到enum定义的中断饿static方法. 如果上转型Enum, 那么values()就不能访问了. 不过class类中有getEnumConstants()方法,可以得到所有enum实例.

使用EnumMap

EnumMap 是一种特殊的Map, 它要求其中的key必须来自于一个enum. 由于enum本身的限制,所以EnumMap在内部可以由数组实现. 比方说 Enum ordinal的值是0, 那么arr[0] 就是enumMap.get(Enum.something)对应的值.

常量相关的方法

Java 的enum有个非常有趣特性,他允许程序员为enum实例编写方法,从而为每个enum实例赋予各自不同的行为.

1
2
3
4
5
6
7
8
9
public enum ConstantSpecificMethod {
DATE_TIME {
String getInfo() {
return "I AM DATE_TIME";
}
};

abstract String getInfo();
}

606

注解

注解为我们在代码中添加信息提供了形式化的方法, 使我们可以在稍后某个时刻非常方便的使用这些元素.

Java支持的三种标准注解

  • @Override
  • @Deprecated
  • @Suppress Warnings

Java支持的四种元注解

  • @Targe

    表示该注解可以用于什么地方.可能的ElementType参数包括

    • CONSTRUCTOR 构造方法
    • FIELD 成员变量
    • LOCAL_VARIABLE 局部变量
    • METHOD 方法
    • PACKAGE 包
    • PAPAMETER 参数
    • TYPE 类, 接口, 枚举
  • @Retention riˈtenCHən

    • SOURCE 注解将被编译器丢弃
    • CLASS 注解在class文件中可用, 但是会被VM丢弃
    • RUNTIME VM 在运行期间也保留注解, 可以通过反射机制来获取注解的信息
  • @Documented

    将该注解包含在javadoc 中

  • @Inherited

    允许子类继承父类中的注解

注解元素

注解元素可以用的类型如下所示

  • 所有基本类型 int, long, float, double, short, byte, boolean, void
  • String
  • Class
  • enum
  • Annotation
  • 以上类型的数组

生成外部文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface DbTable {
public String name() default "";
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@interface ConStraints {
boolean primaryKey() default false;
boolean allowNull() default true;
boolean unqiue() default false;
}

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@interface SQLString {
int value() default 0;
String name() default "";
//
ConStraints conStraints() default @ConStraints;
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@interface SQLInter {
String name() default "";
ConStraints conStraints() default @ConStraints;
}

在使用的时候

1
2
3
4
5
6
7
8
9
10
11
12
@DbTable(name = "member")
@Date
public class Member {
@SQLString(30)
private String firstName;
@SQLString(50)
private String lastName;
@SQLInter
private Integer age;
@SQLString(value = 30, conStraints = @ConStraints(primaryKey = true))
private String handler;
}

框架是通过类似这样的方法来解析的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Main {
public static void main(String[] args) {
Class member = Member.class;
DbTable dbTable = member.getAnnotationsByType(DbTable.class);
if (! Objects.isNull(dbTable)) {
String tableName = dbTable.name();
if (! Objects.isNull(tableName) && tableName.length() > 0) {
tableName = tableName.toUpperCase();
} else {
tableName = member.getName().toUpperCase();
}
Field[] fields = member.getDeclaredFields();
StringBuilder[] fieldsDefs = new StringBuilder[fields.length];

for (int i = 0; i < fields.length; i ++) {
Field field = fields[i];
fieldsDef[i] = new StringBuilder();

Annotation[] annotations = field.getAnnotations();
if (annotations.length == 0) {
continue;
}
for (Annotation annotation : annotations) {
String columnName = "";
if (annotation instanceof SQLInter) {
SQLInter sqlInter = (SQLInter) annotation;
fieldsDef[i].append(" INT ").append( getConstrains(sqlInter.conStraints()));
}
}
// many many other type
}

StringBuilder createCommand = new StringBuilder();
createCommand.append("crate table ").append(tableName).append(" (");
for (StringBuilder stringBuilder : fieldsDefs ) {
createCommand.append(stringBuilder).append(" , ");
}
// 去掉最后一个逗号
createCommand = createCommand.subSequence(0, createCommand.length() - 2);
createCommand.append(" );");
}
}

private static String getConstrains(ConStraints con) {
StringBuilder constraints = new StringBuilder();
if (! con.allowNull()) {
constraints.append(" NOT NUll ");
}

if (con.primaryKey()) {
constraints.append(" PRIMARY KEY ");
}

if (con.unqiue()) {
constraints.append(" UNIQUE ");
}

return constraints.toString();
}
}