0%

CompleteableFuture

内容抄袭自
彻底理解Java的Future模式
深度学习Java Future (一)
深度学习Java Future (二)
抄完了就假装自己会了 = =… #爽文#

Runnable 和 Callable<V>

runnable 中的void run();方法和 Callable 中的V call() throws Exception; 描述了要执行的任务的逻辑. 这些逻辑要在其他的线程中执行. 这个两个的区别是是否会有返回值. FutureTask 相当于是结果的提货券.

1
2
3
4
5
6
7
// runnable
new Thread(() -> System.out.println("hello world")).start();

//callable
FutureTask<String> futureTask = new FutureTask<>(() -> "this is Future");
new Thread(futureTask).start();
String result = futureTask.get();

FutureTask

预备知识

Treiber Stack

wikipedia blogger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ConcurrentStack<E> {
AtomicReference<Node<E>> top = new AtomicReference<>();

public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;

for(;;) {
oldHead = top.get();
newHead.next = oldHead;
if (top.compareAndSet(oldHead, newHead)) {
break;
}
}
}

public E pop() {
Node<E> oldHead = null;
Node<E> newHead = null;

for(;;) {
oldHead = top.get();
if (Objects.isNull(oldHead)) {
return null;
}
newHead = oldHead;
if (top.compareAndSet(oldHead, newHead)) {
break;
}
}
return oldHead.item;
}
private static class Node<E> {
public final E item;
public Node<E> next;

public Node(E item) {
this.item = item;
}
}
}

LockSupport

Java的LockSupport.park()实现分析

1
2
public native void unpark(Thread jthread);
public native void park(boolean isAbsolute, long time);

unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。
比如线程B连续调用了三次unpark函数,当线程A调用park函数就使用掉这个“许可”,如果线程A再次调用park,则进入等待状态。
注意,unpark函数可以先于park调用.比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。
实际上,park函数即使没有“许可”,有时也会无理由地返回,这点等下再解析。

每个java线程都有一个Parker实例,Parker类是这样定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...
}

在Parker类里的_counter字段,就是用来记录所谓的“许可”的。

当调用park时,先尝试直接能否直接拿到“许可”,即_counter>0时,如果成功,则把_counter设置为0,并返回:

1
2
3
4
5
6
7
8
9
10
11
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().


// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
// 原子操作
if (Atomic::xchg(0, &_counter) > 0) return;

如果不成功,则构造一个ThreadBlockInVM,然后检查_counter是不是>0,如果是,则把_counter设置为0,unlock mutex并返回:

1
2
3
4
ThreadBlockInVM tbivm(jt);
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);

ThreadBlockInVM 从Java到JVM到OS线程睡眠
前面说到 ThreadBlockInVM 会检查当前线程用不用进入 safepoint,它主要的逻辑如下:

  • 首先设置 Java 线程状态,将状态加一,由 _thread_in_vm = 6 变为 _thread_in_vm_trans = 7,从“运行vm本身代码”到“相应的过度状态”。
  • os::is_MP() 用于判断计算机系统是否为多核系统,多核情况下需要做内存屏障处理,这是为了让每个线程都能实时同步状态。
  • 内存屏障有两种方式,一种是 rderAccess::fence() ,它的实现是直接通过CPU指令来实现,汇编指令为 __asm__ volatile (“lock; addl $0,0(%%rsp)” : : : “cc”, “memory”); ,这种方式代价比较大。而另外一种为 InterfaceSupport::serialize_memory ,由 JVM 模拟实现,效率高一点。
  • 调用 SafepointSynchronize::block 尝试在该安全点进行阻塞。
  • 设置 Java 线程状态为 _thread_blocked ,即阻塞。

如果此时的count 的值是0,当前这个线程已经被pack了, 那么判断等待的时间,然后再调用pthread_cond_wait函数等待,如果等待返回,则把_counter设置为0,unlock mutex并返回:

1
2
3
4
5
6
7
if (time == 0) {
status = pthread_cond_wait (_cond, _mutex) ;
}
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
OrderAccess::fence();

当unpark时,则简单多了,直接设置_counter为1,再unlock mutext返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}


FutureTask 代码

类图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   /**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
* /
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

state 是运行的状态, callable 是要执行的逻辑, 它会在其run方法中被执行 outcome 是结果. waiters字段则代表阻塞在该Future上的线程链表.

1
2
3
4
5
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

get()/get(timeout)在task处于非完成状态时是需要阻塞等待的,如果多个线程进行get操作,显然需要一个链表/队列来维护这些等待线程,这就是waiters的意义所在。

最核心的get方法, 如果当前的状态小于等于COMPLETING 则进入awaitDone方法来进行等待任务执行完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
/**
* compareAndSwapObject(Object var1, long var2, Object var3, Object var4)
* var1 操作的对象
* var2 操作的对象属性
* var3 var2与var3比较,相等才更新
* var4 更新值
* waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters")
* 用了CAS 相当于 把当前的线程的信息写入到waiters中, 失败了就会到这个for中然后还是继续到这个if分支, 继续做CAS 直到成功.
*/
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
  • 如果当前的线程被中断, 那么将所有等待该Future上的线程都从阻塞链表中删除
  • 如果当前的状态是确定的状态的时候 就将当前的状态返回
  • 如果当前的状态是 Complete 那就yalid 让出执行的权限
  • 如果q 是null 当然 最初的时候 q 就是null 的 要初始化
  • 把当前的线程放到waiters 中(在第一次执行的时候)
  • 如果配置了等待时间 要判断是否超时了

为什么要这样设计呢

FutureTask 中的Run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
   public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
// 通过CAS的方式唤醒每一个被pack的线程,将他们unpack
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

CompleteableFuture

首先 如何生成一个CompletableFuture

1
2
3
4
5
6
7
8
9
10
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)

之后就是CompletableFuture 的骚操作了, 非常函数式编程

  • thenApply
    将前续结果作为函数的入参
    • thenAccept
      消耗前续产生的结果
    • thenRun
      前序步骤完成之后 执行一个runnable 任务
    • thenCombine