0%

并发

为什么要有并发

现在多核心的处理器变得越来越常见,要如何使用多个核心呢?并发是一个很好的解决方案.

Java并发的实现

Thread

线程可以驱动任务,因此你需要一种描述任务的方式, 这可以由Runable接口来提供.将Runable对象转变为工作任务的传统方式就是把它提交给一个Thread构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class BasicThread {
public static void main(String[] args) throws Exception {
final Flag flag = new Flag();
flag.setFlag(true);
Thread t = new Thread(
() -> {
while(flag) {
System.out.println("I AM Running");
}
}
);
t.start();
Thread.sleep(1000);
flag.setFlag(false);
}
}
class Flag {
private Boolean flag;

public void setFlag(Boolean flag) {
this.flag = flag;
}
public Boolean getFlag() {
return flag;
}
}

Join

简谈Java的Join()方法

join 是个啥

join() method suspends the execution of the calling thread until the object called finishes its execution.

t.join()方法阻塞调用此方法的线程(calling thread),直到线程t完成,此线程再继续

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Waits at most <code>millis</code> milliseconds for this thread to
* die. A timeout of <code>0</code> means to wait forever.
*/
//此处A timeout of 0 means to wait forever 字面意思是永远等待,其实是等到t结束后。
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

线程池

从Java 5 开始提供了Exectors 来管理Thread.

Exectors.newCachedThreadPool();

newCachedThread 会为每一个任务都创建一个线程.

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ExectorService exectorService = Exectors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exectorService.exectute(() -> {
do something;
});
}
//通过shutdown 阻止新的内容的提交
exectorService.shutdown();
}

Exectors.fixedThreadPool

可以一次性预先执行代价高昂的线程分配. 如果提交了超过预定数量的任务,那么这些任务会排队,每个任务都会在上一个任务结束之后执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
final Flag flag = new Flag();
flag.setFlag(true);
executorService.execute(() -> {
while (flag.getFlag()) {
System.out.println("I AM ONE");
}
});
Thread.sleep(50);
System.out.println("here here ONE" );
flag.setFlag(false);
System.out.println("here here TWO");
executorService.execute(() -> {
while (true) {
System.out.println("I AM TWo");
}
});
System.out.println("here here THREE");
System.exit(0);
}
}
class Flag {
private Boolean flag;

public void setFlag(Boolean flag) {
this.flag = flag;
}
public Boolean getFlag() {
return flag;
}
}
/**
输出结果
...
I AM ONE
here here ONE
here here TWO
I AM ONE
here here THREE
I AM TWo
I AM TWo
I AM TWo
...
*/

Exectors.newSingleThreadPool

对于数量为1 的fixThreadPool 可以直接使用newSingleThreadPool

Callable

runable 都是没有返回结果, 如果要使用有返回结果的要使用callable. callable 会返回Future 类型的对象. 可以通过带有超时的get() 方法 (在没有返回时候会阻塞当前线程, 如果超时抛出异常), 或者通过判断isDone();方法.

Thread.yalid() 和Thread.sleep()

yaild 让出控制权. 由就绪的线程们重新竞争时间片. Sleep 则是休眠 线程由运行状态变为阻塞状态.

共享受限资源

多线程的出现为了什么. 为了更快的完成一件事情. 将计算任务由一个CPU分配给多个CPU.那么当中肯定会涉及到资源的共享.

Java的内存模型

资源都是放到哪里的.对于JVM 来说 内存区域分为

  • pc寄存区

    每个线程都有自己的pc寄存器

  • Java虚拟机栈

    每个线程都有自己的私有java虚拟机栈, 里面会保存局部变量(对象的引用)和尚未计算好的结果(简单类型), 比方说从一个函数要跳转到另外一个函数的时候,PC的值会变成函数入口的地址, 同时函数的临时变量会被push到栈中, 当这个函数要执行完毕的时候 PC的值会指回原函数的运行到的地址,同时该函数的临时变量也会被从栈中pop出来. (所谓的现场保护和现场恢复)

  • Java堆

    各个线程共享的运行时区域, 平时创建的对象都会在这个地方.我们所谓的共享资源也在这个地方

  • 方法区

Jave的类加载机制

JVM 类加载机制详解

JVM 中类的加载要通过Loading, Linking(verification, prepare, resolve), Initialzation 这三个步骤

加载

加载是类加载过程中的一个阶段,这个阶段会在内存中生成一个代表这个类的java.lang.Class对象,作为方法区这个类的各种数据的入口。注意这里不一定非得要从一个Class文件获取,这里既可以从ZIP包中读取(比如从jar包和war包中读取),也可以在运行时计算生成(动态代理),也可以由其它文件生成(比如将JSP文件转换成对应的Class类)。

验证

这一阶段的主要目的是为了确保Class文件的字节流中包含的信息是否符合当前虚拟机的要求,并且不会危害虚拟机自身的安全。

  1. 准备

准备阶段是正式为类变量分配内存并设置类变量的初始值阶段,即在方法区中分配这些变量所使用的内存空间。注意这里所说的初始值概念,比如一个类变量定义为:

1
public static int v = 8080;

实际上变量v在准备阶段过后的初始值为0而不是8080,将v赋值为8080的putstatic指令是程序被编译后,存放于类构造器方法之中,这里我们后面会解释。
但是注意如果声明为:

1
public static final int v = 8080;

在编译阶段会为v生成ConstantValue属性,在准备阶段虚拟机会根据ConstantValue属性将v赋值为8080。

  1. 解析

解析阶段是指虚拟机将常量池中的符号引用替换为直接引用的过程。符号引用就是class文件中的:

  • CONSTANT_Class_info
  • CONSTANT_Field_info
  • CONSTANT_Method_info

等类型的常量。

下面我们解释一下符号引用和直接引用的概念:

  • 符号引用与虚拟机实现的布局无关,引用的目标并不一定要已经加载到内存中。各种虚拟机实现的内存布局可以各不相同,但是它们能接受的符号引用必须是一致的,因为符号引用的字面量形式明确定义在Java虚拟机规范的Class文件格式中。
  • 直接引用可以是指向目标的指针,相对偏移量或是一个能间接定位到目标的句柄。如果有了直接引用,那引用的目标必定已经在内存中存在。
  1. 初始化

初始化阶段是类加载最后一个阶段,前面的类加载阶段之后,除了在加载阶段可以自定义类加载器以外,其它操作都由JVM主导。到了初始阶段,才开始真正执行类中定义的Java程序代码。

初始化阶段是执行类构造器方法的过程。方法是由编译器自动收集类中的类变量的赋值操作和静态语句块中的语句合并而成的。虚拟机会保证方法执行之前,父类的方法已经执行完毕。p.s: 如果一个类中没有对静态变量赋值也没有静态语句块,那么编译器可以不为这个类生成()方法。

注意以下几种情况不会执行类初始化:

  • 通过子类引用父类的静态字段,只会触发父类的初始化,而不会触发子类的初始化。
  • 定义对象数组,不会触发该类的初始化。
  • 常量在编译期间会存入调用类的常量池中,本质上并没有直接引用定义常量的类,不会触发定义常量所在的类。
  • 通过类名获取Class对象,不会触发类的初始化。
  • 通过Class.forName加载指定类时,如果指定参数initialize为false时,也不会触发类初始化,其实这个参数是告诉虚拟机,是否要对类进行初始化。
  • 通过ClassLoader默认的loadClass方法,也不会触发初始化动作。

在第一步中load的信息就放在方法区内, 为什么要怎么做

  1. 安全. 类似于String的等的类一定先于程序的类被加载
  2. 减少内存的消耗
  • 运行时常量池

  • 本地方法栈

引用来自 Java内存模型

Cache和内存的交互

就会像我们知道的那样, CPU和内存的之间的速度相差若干个数量级. 需要被CPU处理的处理的数据会先从内存移到cache. 然后被CPU处理完成之后cache中的值会被写回到内存.Cache和内存有如下的交互动作

  • 主内存:所有的变量都存储在主内存(Main Memory,类比物理内存)中。

  • 工作内存:每条线程有自己的工作内存(Working Memory,类比处理器高速缓存),线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的变量。不同的线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递均需要通过主内存来完成

  • lock(锁定):作用于主内存的变量,它把一个变量标识为一条线程独占的状态。

  • unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。

  • read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用。

  • load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。

  • use(使用):作用于工作内存的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量的值的字节码指令时将会执行这个操作。

  • assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。

  • store(存储):作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后的write操作使用。

  • write(写入):作用于主内存的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中。

volatile 为什么能实现线程之间的透明

除了被volite修饰的变量.

1
instance = new Singleton(); // instance 是 volatile 变量

转变为汇编代码为 会增加lock 开头的一行代码. Lock前缀的指令在多核处理器瞎会发生两件事情

  • Lock前缀指令会引起处理器缓存写回到内存.

    它会锁定这块内存区域的缓存并回写到内存,并使用缓存一致性机制来确保修改的原子性

  • 一个处理器的缓存会写到内存会导致其他处理器的缓存无效

synchronized 和 monitor

当其他的线程从主线程读取数据进行操作之后会写到主线程之后可能会发生错误的覆盖.造成错误.基本上所有的并发模式在解决线程冲突的时候都是采用序列化访问共享资源的方案.通常是通过在代码前加上一条锁语句来实现的,这种机制常常被称为互斥量.

Java提供synchronized的方式,为防止资源冲突提供了内置支持.synchronized可以修饰方法

此时的互斥量是该对象本身

1
2
3
public synchronized void getSomething() {

}

或者修饰代码块 此时的互斥量是 显示制定的互斥量

1
2
3
4
5
public void getSomething() {
synchronized (互斥量) {

}
}

有如下的Java 经过 Java -c 反编译之后得到

1
2
3
4
5
6
7
public class Main {
public void doSomething() {
synchronized (this) {
System.out.println("this is a test");
}
}
}

Compiled from “Main.java”

public class Main {

public Main();

Code:

   0: aload_0

   1: invokespecial #1                  // Method java/lang/Object."<init>":()V

   4: return

public void doSomething();

Code:

   0: aload_0

   1: dup

   2: astore_1

   3: monitorenter

   4: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;

   7: ldc           #3                  // String this is a test

   9: invokevirtual #4                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V

  12: aload_1

  13: monitorexit

  14: goto          22

  17: astore_2

  18: aload_1

  19: monitorexit

  20: aload_2

  21: athrow

  22: return

Exception table:

   from    to  target type

       4    14    17   any

      17    20    17   any

}

可以看到有 monitorenter 和 monitorexit

monitorenter 的描述为

Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows: • If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor. • If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count. • If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.

这段话的大概意思为:

每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:

1、如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。

2、如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.

3.如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。

monitorexit:

The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref. The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.

这段话的大概意思为:

执行monitorexit的线程必须是objectref所对应的monitor的所有者。

指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。

其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法.

对于同步方法的编译

1
2
3
4
5
public class Main {
public synchronized void doSomething() {
System.out.println("this is a test");
}
}

Compiled from “Main.java”

public class Main {

public Main();

Code:

   0: aload_0

   1: invokespecial #1                  // Method java/lang/Object."<init>":()V

   4: return

public synchronized void doSomething();

Code:

   0: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;

   3: ldc           #3                  // String this is a test

   5: invokevirtual #4                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V

   8: return

}

执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。在方法执行期间,其他任何线程都无法再获得同一个monitor对象。 其实本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。

Java并发编程:Synchronized及其实现原理

#锁的类型

为了减少对获得锁和释放锁带来的性能损耗, 引入了“偏向锁”, “轻量锁”. 无锁状态 -> 偏向锁 -> 轻量锁 -> 重量锁.

聊聊并发(二)Java SE1.6中的Synchronized

在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。Mark Word可能变化为存储以下4种数据:

锁的类型

偏向锁

偏向锁的设置:Hotspot的作者经过以往的研究发现大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID,以后该线程在进入和退出同步块时不需要花费CAS操作来加锁和解锁,而只需简单的测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁,如果测试成功,表示线程已经获得了锁,如果测试失败,则需要再测试下Mark Word中偏向锁的标识是否设置成1(表示当前是偏向锁),如果没有设置,则使用CAS竞争锁,如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。

偏向锁的撤销:偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否活着,如果线程不处于活动状态,则将对象头设置成无锁状态,如果线程仍然活着,拥有偏向锁的栈会被执行,遍历偏向对象的锁记录,栈中的锁记录和对象头的Mark Word要么重新偏向于其他线程,要么恢复到无锁或者标记对象不适合作为偏向锁,最后唤醒暂停的线程。下图中的线程1演示了偏向锁初始化的流程,线程2演示了偏向锁撤销的流程。

偏向锁的撤销

轻量锁

轻量级锁加锁:线程在执行同步块之前,JVM会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,官方称为Displaced Mark Word。然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。

轻量级锁解锁:轻量级解锁时,会使用原子的CAS操作来将Displaced Mark Word替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。下图是两个线程同时争夺锁,导致锁膨胀的流程图。

轻量锁

重量锁

因为自旋会消耗CPU,为了避免无用的自旋(比如获得锁的线程被阻塞住了),一旦锁升级成重量级锁,就不会再恢复到轻量级锁状态。当锁处于这个状态下,其他线程试图获取锁时,都会被阻塞住,当持有锁的线程释放锁之后会唤醒这些线程,被唤醒的线程就会进行新一轮的夺锁之争。

优点 优点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 如果线程间存在锁竞争,会带来额外的锁撤销的消耗。 适用于只有一个线程访问同步块场景。
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度。 如果始终得不到锁竞争的线程使用自旋会消耗CPU。 追求响应时间。同步块执行速度非常快。
重量级锁 线程竞争不使用自旋,不会消耗CPU。 线程阻塞,响应时间缓慢。 追求吞吐量。同步块执行速度较长。

#显式的使用锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class AttempLocking {
private Lock lock = new ReentrantLock();

public void untimed() {
boolean captured = lock.tryLock();
try {
System.out.println("tryLock()" + captured);
} finally {
if (! Objects.isNull(captured) && captured) {
lock.unlock();
}
}
}
}

公平锁和非公平锁

ReetrantLock

在ReentrantLock中,对于公平和非公平的定义是通过对同步器AbstractQueuedSynchronizer的扩展加以实现的,也就是在tryAcquire的实现上做了语义的控制。

非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
  • 如果当前状态为初始状态,那么尝试设置状态;
  • 如果状态设置成功后就返回;
  • 如果状态被设置,且获取锁的线程又是当前线程的时候,进行状态的自增;
  • 如果未设置成功状态且当前线程不是获取锁的线程,那么返回失败。

公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

上述逻辑相比较非公平的获取,仅加入了当前线程(Node)之前是否有前置节点在等待的判断。hasQueuedPredecessors()方法命名有些歧义,其实应该是currentThreadHasQueuedPredecessors()更为妥帖一些,也就是说当前面没有人排在该节点(Node)前面时候队且能够设置成功状态,才能够获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

在非公平获取的过程中,“插队”现象非常严重,后续获取锁的线程根本不顾及sync队列中等待的线程,而是能获取就获取。反观公平获取的过程,锁的获取就类似线性化的,每次都由sync队列中等待最长的线程(链表的第一个,sync队列是由尾部结点添加,当前输出的sync队列是逆序输出)获取锁。一个 hasQueuedPredecessors方法能够获得公平性的特性,这点实际上是由AbstractQueuedSynchronizer来完成的,看一下acquire方法:

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

可以看到,如果获取状态和在sync队列中排队是短路的判断,也就是说如果tryAcquire成功,那么是不会进入sync队列的,可以通过下图来深刻的认识公平性和AbstractQueuedSynchronizer的获取过程。 非公平的,或者说默认的获取方式如下图所示:

对于状态的获取,可以快速的通过tryAcquire的成功,也就是黄色的Fast路线,也可以由于tryAcquire的失败,构造节点,进入sync队列中排序后再次获取。因此可以理解为Fast就是一个快速通道,当例子中的线程释放锁之后,快速的通过Fast通道再次获取锁,就算当前sync队列中有排队等待的线程也会被忽略。这种模式,可以保证进入和退出锁的吞吐量,但是sync队列中过早排队的线程会一直处于阻塞状态,造成“饥饿”场景。 而公平性锁,就是在tryAcquire的调用中顾及当前sync队列中的等待节点(废弃了Fast通道),也就是任意请求都需要按照sync队列中既有的顺序进行,先到先得。这样很好的确保了公平性,但是可以从结果中看到,吞吐量就没有非公平的锁高了。

ReetrantLock 的不同锁的类型

Java中Lock,tryLock,lockInterruptibly有什么区别?

lock

public void lock()

获取锁。

如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。

如果当前线程已经保持该锁,则将保持计数加 1,并且该方法立即返回。

如果该锁被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁之前,该线程将一 直处于休眠状态,此时锁保持计数被设置为 1。

lockInterruptibly

public void lockInterruptibly() throws InterruptedException

  1. 如果当前线程未被中断,则获取锁。
  2. 如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
  3. 如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
  4. 如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以 前,该线程将一直处于休眠状态:
    • 锁由当前线程获得;或者
    • 其他某个线程中断当前线程。
  5. 如果当前线程获得该锁,则将锁保持计数设置为 1。 如果当前线程:
    • 在进入此方法时已经设置了该线程的中断状态;或者
    • 在等待获取锁的同时被中断。 则抛出 InterruptedException,并且清除当前线程的已中断状态。
  6. 在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或 重入获取。

tryLock

public boolean tryLock()

仅在调用时锁未被另一个线程保持的情况下,才获取该锁。

  1. 如果该锁没有被另一个线程保持,并且立即返回 true 值,则将锁的保持计数设置为 1。
    即使已将此锁设置为使用公平排序策略,但是调用 tryLock() 仍将 立即获取锁(如果有可用的),
    而不管其他线程当前是否正在等待该锁。在某些情况下,此“闯入”行为可能很有用,即使它会打破公
    平性也如此。如果希望遵守此锁的公平设置,则使用 tryLock(0, TimeUnit.SECONDS) ,它几乎是等效的(也检测中断)。
  2. 如果当前线程已经保持此锁,则将保持计数加 1,该方法将返回 true。
  3. 如果锁被另一个线程保持,则此方法将立即返回 false 值。

指定者:
接口 Lock 中的 tryLock
返回:
如果锁是自由的并且被当前线程获取,或者当前线程已经保持该锁,则返回 true;否则返回 false

关于中断又是一段很长的叙述,先不谈。

  • lock()

    拿不到lock就不罢休,不然线程就一直block。 比较无赖的做法。

  • tryLock()

    马上返回,拿到lock就返回true,不然返回false。 比较潇洒的做法。 带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false。比较聪明的做法。

  • lockInterruptibly

    先说说线程的打扰机制,每个线程都有一个 打扰 标志。这里分两种情况,

    • 线程在sleep或wait,join,此时如果别的进程调用此进程的 interrupt()方法,此线程会被唤醒并被要求处理InterruptedException;(thread在做IO操作时也可能有类似行为,见java thread api)

    • 此线程在运行中, 则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过isInterrupted()查看并 作出处理。

      lockInterruptibly()和上面的第一种情况是一样的, 线程在请求lock并被阻塞时,如果被interrupt,则“此线程会被唤醒并被要求处理InterruptedException”。并且如果线程已经被interrupt,再使用lockInterruptibly的时候,此线程也会被要求处理interruptedException先看lock()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
*@author 作者 E-mail:
* @version 创建时间:2015-10-23 下午01:47:03 类说明
*/
public class TestLock
{
// @Test
public void test() throws Exception
{
final Lock lock = new ReentrantLock();
lock.lock();

Thread t1 = new Thread(new Runnable()
{
@Override
public void run()
{
lock.lock();
System.out.println(Thread.currentThread().getName() + " interrupted.");
}
},"child thread -1");

t1.start();
Thread.sleep(1000);

t1.interrupt();

Thread.sleep(1000000);
}

public static void main(String[] args) throws Exception {
new TestLock().test();
}
}

用eclipse对这个程序进行debug发现,即使子线程已经被打断,但是子线程仍然在run,可见lock()方法并不关心线程是否被打断,甚至说主线程已经运行完毕,子线程仍然在block()

而使用LockInterupptibly,则会响应中断

  • import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
    
    - @author 作者 E-mail:
    - @version 创建时间:2015-10-23 下午01:53:10 类说明
    */
    public class TestLockInterruptibly
    {
      // @Test
    public void test3() throws Exception
    {
        final Lock lock = new ReentrantLock();
        lock.lock();
    
    Thread t1 = new Thread(new Runnable()
    {
        @Override
        public void run()
        {
            try
            {
                lock.lockInterruptibly();
            }
            catch(InterruptedException e)
            {
                System.out.println(Thread.currentThread().getName() + " interrupted.");
            }
        }
    }, "child thread -1");
    
    t1.start();
    Thread.sleep(1000);
    
    t1.interrupt();
    
    Thread.sleep(1000000);
    }
    
    public static void main(String[] args) throws Exception
    {
        new TestLockInterruptibly().test3();
    }
    }
    try{
      Thread.sleep(2000);
      lock.lockInterruptibly();
       }catch(InterruptedException e){
       System.out.println(Thread.currentThread().getName()+" interrupted.");
    }
      t1.start();
      t1.interrupt();
      Thread.sleep(1000000);
    <!--code17-->
  1. Unsafe,是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。
  2. 变量valueOffset,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。
  3. 变量value用volatile修饰,保证了多线程之间的内存可见性。

看看AtomicInteger如何实现并发下的累加操作:

1
2
3
4
5
6
7
8
9
10
11
12
public final int getAndAdd(int delta) {    
return unsafe.getAndAddInt(this, valueOffset, delta);
}

//unsafe.getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
  1. AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据Java内存模型,线程A和线程B各自持有一份value的副本,值为3。
  2. 线程A通过getIntVolatile(var1, var2)拿到value值3,这时线程A被挂起。
  3. 线程B也通过getIntVolatile(var1, var2)方法获取到value值3,运气好,线程B没有被挂起,并执行compareAndSwapInt方法比较内存值也为3,成功修改内存值为2。
  4. 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值(3)和内存的值(2)不一致,说明该值已经被其它线程提前修改过了,那只能重新来一遍了。
  5. 重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,直到成功。

整个过程中,利用CAS保证了对于value的修改的并发安全,继续深入看看Unsafe类中的compareAndSwapInt方法实现。

1
public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);
1
2
3
4
5
6
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
  1. 先想办法拿到变量value在内存中的地址。
  2. 通过Atomic::cmpxchg实现比较替换,其中参数x是即将更新的值,参数e是原内存的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::isMP(); //判断是否是多处理器
_asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}

// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:

LOCK_IF_MP根据当前系统是否为多核处理器决定是否为cmpxchg指令添加lock前缀。

  1. 如果是多处理器,为cmpxchg指令添加lock前缀。
  2. 反之,就省略lock前缀。(单处理器会不需要lock前缀提供的内存屏障效果)

intel手册对lock前缀的说明如下:

  1. 确保后续指令执行的原子性。
  2. 在Pentium及之前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其它处理器暂时无法通过总线访问内存,很显然,这个开销很大。在新的处理器中,Intel使用缓存锁定来保证指令执行的原子性,缓存锁定将大大降低lock前缀指令的执行开销。
  3. 禁止该指令与前面和后面的读写指令重排序。
  4. 把写缓冲区的所有数据刷新到内存中。

上面的第2点和第3点所具有的内存屏障效果,保证了CAS同时具有volatile读和volatile写的内存语义

#线程本地存储

防止任务在共享资源上产生冲突的第二种方式是根处对变量的共享.

1
2
3
4
5
6
7
8
9
10
11
12
public class ThreadLocalHolder implement Runnable {
ThreadLocal<String> values;
public ThreadLocalHolder(ThreadLocal<String> values) {
this.values = values;
}

public void run() {
String value = values.get();
// do something
values.set(value);
}
}

ThreadLocal 对象通常当作静态域存储. 每个单独的线程都被分配了自己的存储.

ThreadLocal

ThreadLocal、ThreadLocalMap、Thread之间的关系

  ThreadLocalMap是ThreadLocal内部类,由ThreadLocal创建,Thread有ThreadLocal.ThreadLocalMap类型的属性。简单的说 就是Thread 中有个类似于HashMap(ThreadLocalMap) 的对象,然后我们用put/set 方法设置或者得到key对应的value的值

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class Thread implements Runnable {
/*...其他属性...*/

/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
1
2
3
public class ThreadLocal<T> {
static class ThreadLocalMap {

1
2
3
4
5
6
7
8
9
10
/**
* Create the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the map
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/**
* Returns the current thread's "initial value" for this
* thread-local variable. This method will be invoked the first
* time a thread accesses the variable with the {@link #get}
* method, unless the thread previously invoked the {@link #set}
* method, in which case the {@code initialValue} method will not
* be invoked for the thread. Normally, this method is invoked at
* most once per thread, but it may be invoked again in case of
* subsequent invocations of {@link #remove} followed by {@link #get}.
*
* <p>This implementation simply returns {@code null}; if the
* programmer desires thread-local variables to have an initial
* value other than {@code null}, {@code ThreadLocal} must be
* subclassed, and this method overridden. Typically, an
* anonymous inner class will be used.
*
* @return the initial value for this thread-local
*/
protected T initialValue() {
return null;
}

/**
* Creates a thread local variable. The initial value of the variable is
* determined by invoking the {@code get} method on the {@code Supplier}.
*
* @param <S> the type of the thread local's value
* @param supplier the supplier to be used to determine the initial value
* @return a new thread local variable
* @throws NullPointerException if the specified supplier is null
* @since 1.8
*/
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
return new SuppliedThreadLocal<>(supplier);
}

/**
* Creates a thread local variable.
* @see #withInitial(java.util.function.Supplier)
*/
public ThreadLocal() {
}

/**
* Returns the value in the current thread's copy of this
* thread-local variable. If the variable has no value for the
* current thread, it is first initialized to the value returned
* by an invocation of the {@link #initialValue} method.
*
* @return the current thread's value of this thread-local
*/
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

/**
* Variant of set() to establish initialValue. Used instead
* of set() in case user has overridden the set() method.
*
* @return the initial value
*/
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}

/**
* Sets the current thread's copy of this thread-local variable
* to the specified value. Most subclasses will have no need to
* override this method, relying solely on the {@link #initialValue}
* method to set the values of thread-locals.
*
* @param value the value to be stored in the current thread's copy of
* this thread-local.
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

/**
* Removes the current thread's value for this thread-local
* variable. If this thread-local variable is subsequently
* {@linkplain #get read} by the current thread, its value will be
* reinitialized by invoking its {@link #initialValue} method,
* unless its value is {@linkplain #set set} by the current thread
* in the interim. This may result in multiple invocations of the
* {@code initialValue} method in the current thread.
*
* @since 1.5
*/
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}


中断

Thread类包含interrupt()方法.因此你可以终止被阻塞的任务.这个方法将设置线程的中断状态,如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException.当抛出该异常时,中断状态将被复位.

为了调用interrupt(), 你必须持有Thread. 新的concurrent类库避免对Thread的直接使用,而是鼓励Exector来执行所有的操作.如果要中断任务池中所有的任务的时候可以使用shutdown(). 如果想对某个特定的线程执行中断操作.那么要使用submit方法而不是executor方法. 通过submit()将放回一个泛型.

中断是不能作用在等待I/O, 或者互斥量的线程上的.

中断的唯一发生机会是在任务要进入到阻塞操作中,或者已经在阻塞操作内部时. 如果你的任务始终不会进入阻塞状态的话, 可以调用interrupted()来检查中断状态.

#生产者和消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package tk.andrewchen;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author dafuchen
* 2018/9/9
*/
class Table {
private int size;
public synchronized void producer() {
try {
while (size == 10) {
wait();
}
size ++;
notifyAll();
System.out.println("producer:" + size);
} catch(Exception e) {
e.printStackTrace();
}
}
public synchronized void consumer() {
try {
while(size == 0) {
wait();
}
size --;
notifyAll();
System.out.println("consumer:" + size);
} catch(Exception e) {
e.printStackTrace();
}
}
}

class Cooker implements Runnable {
private Table table;
public Cooker(Table t) {
table = t;
}

@Override
public void run(){
try {
while(true) {
table.producer();
}
} catch(Exception e) {
e.printStackTrace();
}
}
}

class Waiter implements Runnable {
private Table table;
public Waiter(Table t) {
table = t;
}

@Override
public void run() {
try {
while (true) {
table.consumer();
}
} catch(Exception e) {
e.printStackTrace();
}
}
}

public class Main {
public static void main(String[] args) throws Exception {
Table t = new Table();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Cooker(t));
executorService.submit(new Waiter(t));
Thread.sleep(10000000);
}
}

#死锁

比方说有个场景 A和B要吃饭, 吃饭要有刀和叉,缺一不可.A到拿到dao,B拿到了叉,双方僵持不下,谁也不让步.这样就会造成死锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Main {
public static void main(String[] args) {
Lock fork = new ReentrantLock();
Lock spoon = new ReentrantLock();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(() -> {
while(true) {
fork.lock();
System.out.println(Thread.currentThread().getName() + " get fork");
Thread.sleep(1000);
spoon.lock();
System.out.println(Thread.currentThread().getName() + " get spoon");
Thread.sleep(1000);
spoon.unlock();
System.out.println(Thread.currentThread().getName() + "unlock spoon");
fork.unlock();
System.out.println(Thread.currentThread().getName() + "unlock fork");
}
});
executorService.submit(() -> {
while (true) {
spoon.lock();
System.out.println(Thread.currentThread().getName() + " get spoon");
Thread.sleep(1000);
fork.lock();
System.out.println(Thread.currentThread().getName() + " get fork");
Thread.sleep(1000);
fork.unlock();
System.out.println(Thread.currentThread().getName() + "unlock fork");
spoon.unlock();
System.out.println(Thread.currentThread().getName() + "unlock spoon");
}
});
}
}

输出的结果

pool-1-thread-1 get fork
pool-1-thread-2 get spoon

如下4个条件都发生的时候就会发生死锁

  1. 互斥条件

    任务使用的资源中至少有1个是不能共享的.在上述的例子中fork和spoon两个当中的一个其实是不能共享的.

  2. *至少有一个任务它必须持有一个资源且正在等待一个当前被其他的任务持有的资源. *

    拿着fork等spoon.

  3. *资源不能被抢占,任务必须把资源释放当作普通时间. *

    一个线程不会从另外一个线程里面把临界资源抢过来.

  4. 必须要有循环等待.

    一个任务等待其他任务持有的资源, 后者又在等待另一个任务持有的资源.周而复始,用不停息.

要解决这个问题只要破除当中的任意一个条件就好了.

新类库中的构件

CountDownLunch

它被用来同步一个或者多个任务, 强制他们等待由其他任务执行的一组操作完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
class WaitingTask implements Runnable {
private CountDownLatch countDownLatch;

public WaitingTask(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
countDownLatch.awit();
} catch (Exception e) {
log.error("error occurred", e);
}
}
}

CycliBarrier

CyclicBarrier介绍

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

1
2
3
4
5
6
7
8
//设置parties、count及barrierCommand属性。  
CyclicBarrier(int):

//当await的数量到达了设定的数量后,首先执行该Runnable对象。
CyclicBarrier(int,Runnable):

//通知barrier已完成线程
await():

在Think in Java 中举的例子就是蛮妥当的

之前每个Hourse 跑一次之后会调用cyclicBarrier的await()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
barrier = new CyclicBarrier(nHourse, () - > {
@Override
public void run() {
StringBuilder s = new StringBuilder();
for (Hourse hourse : hourses) {
System.out.println(hourse.tracks());
}
for (Hourse hourse : hourses) {
if (hourse.getStrides() >= FINISH_LINE) {
System.out.println(hourse + "WON!");
exec.shutdown();
}
}
}
});

DelayQueue

Queue

java中queue的使用

方法 说明
add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
offer 添加一个元素并返回true 如果队列已满,则返回false
poll 移除并返问队列头部的元素 如果队列为空,则返回null
peek 返回队列头部的元素 如果队列为空,则返回null
put 添加一个元素 如果队列满,则阻塞
take 移除并返回队列头部的元素 如果队列为空,则阻塞

JDK 中提供的有

  • ArrayBlockingQueue
  • PriorityBlockingQueue
  • DelayQueue

DelayQueue

精巧好用的DelayQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = count ++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<>();

/**
* delat 在多少时间后触发
* trigger 具体的出发时间
* @param delayInMillionSeconds
*/
public DelayedTask (int delayInMillionSeconds) {
delat = delayInMillionSeconds;
trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, MILLIONSECONDS);
sequence.add(this);
}
/**
* 距离到触发还有多少时间
*/
@Override
public long getDelay(TimeUnit timeUnit) {
return timeUnit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed delayed) {
DelayedTask that = (DelayedTask) delayed;
if (tigger < that.trigger) {
return -1;
} else if (tigger > that.trigger) {
return 1;
} else {
return 0;
}
}
/**
* 具体要做的事情
* 在这里是简单的进行打印
*/
@Override
public void run() {
print(this);
}

@Override
public String toString() {
return String.format("[1$-4d]", delta) + " TASK " + id;
}

public String summery() {
return "(" + id + ":" + delta + ")";
}
/**
* 创建一个内部类
* 这个内部类生成的对象能停止所有的任务
*/
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;

public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
@Override
public void run() {
for (DelayedTask pt : sequence) {
System.out.println(pt.summery());
}
System.out.println(this + " CALLING for shutdownNow()");
exec.shutdown();
}
}
}

class DelayedTaskConsumer implements Runnable {
private DelayedQueue<DelayedTask> q;

public DelayedTaskConsumer(DelayedTaskConsumer<DelayedTask> q) {
this.q = q;
}

@Override
public void run() {
try {
while(! Thread.interrupted()) {
q.take().run();
}
} catch(Exception e) {
log.error("error occurred", e);
}
System.out.println("Finished DelayedTaskConsumer");
}
}

public class DelayedQueueDemo {
public static void main(String[] args) {
Random random = new Random(50);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<>();

for (int i = 0; i < 20; i++) {
queue.add(new DelayedTask(random.nextInt(5000)));
}

queue.add(new DelayedTaskConsumer.EndSentinel(queue));
exec.execute(new DelayedTaskConsumer(queue));
}
}

PriorityBlockingQueue

PriorityQueue

PriorityQueue

Java中PriorityQueue实现了Queue接口,不允许放入null元素;其通过堆实现,具体说是通过完全二叉树(complete binary tree)实现的小顶堆(任意一个非叶子节点的权值,都不大于其左右子节点的权值),也就意味着可以通过数组来作为PriorityQueue的底层实现。

父子节点和array之间的关系

堆排序

堆排序(Heapsort)之Java实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package tk.andrewchen;

/**
* 将数据从小到大进行排序
* step1.
* 创建小根堆
* 注意 **小根堆** 和平衡二叉树不一样
* 不要求 左边的子节点的值 小于右边的子节点的值
* 只用父节点的值要比子节点的值小就可以了
* 在创建小根堆的时候从叶子节点开始 不停的把 小的数据向上推
* step2.
* 在创建小根堆完毕之后,根节点的值是最小值
* 把根节点的值和最后一个节点的值进行互换
* 此时根节点上 大根堆的结构被破坏,需要进行重建
* 看 根节点 需要和 左右 子节点的那个值进行互换,
* 如果进行了互换 那么 子节点作为父节点的堆结构被破坏, 需要进行重建 (由此 周而复始 进行递归)
* @author dafuchen
* 2018/9/17
*/
public class HeapSort<T extends Comparable<T>> {
private T[] heap;
private int size;

public HeapSort(T[] heap) {
this.heap = heap;
this.size = heap.length;

for (int i = size / 2; i >= 0; i --) {
minHeap(i);
}
}

private void minHeap(int cur) {
int rootPos = cur;
int lowest = cur;
int leftPos = cur * 2 + 1;
int rightPos = cur * 2 + 2;

if (leftPos < size && heap[leftPos].compareTo(heap[rootPos]) < 0) {
lowest = leftPos;
}
if (rightPos < size && heap[rightPos].compareTo(heap[lowest])< 0) {
lowest = rightPos;
}

if (lowest != cur) {
swap(heap, lowest, cur);
minHeap(lowest);
}
}

private void swap(T[] heap, int posA, int posB) {
T temp = heap[posB];
heap[posB] = heap[posA];
heap[posA] = temp;
}

public void sort() {
while(size > 0) {
System.out.println(heap[0]);
swap(heap, 0, --size);
minHeap(0);
}
}

public static void main(String[] args) {
Integer[] array = { 9, 8, 100, 6, 5, 4, 3, 2, 1, 0, -1, -2, -3 };
HeapSort<Integer> heapSort = new HeapSort<>(array);
heapSort.sort();
}
}
add() & offer()

add(E e)offer(E e)的语义相同,都是向优先队列中插入元素,只是Queue接口规定二者对插入失败时的处理不同,前者在插入失败时抛出异常,后则则会返回false。对于PriorityQueue这两个方法其实没什么差别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//offer(E e)
public boolean offer(E e) {
if (e == null)//不允许放入null元素
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1);//自动扩容
size = i + 1;
if (i == 0)//队列原来为空,这是插入的第一个元素
queue[0] = e;
else
siftUp(i, e);//调整
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
//siftUp()
private void siftUp(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;//parentNo = (nodeNo-1)/2
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)//调用比较器的比较方法
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}

暂时停在P726, 脑壳疼 😢

peek() & element()

1
2
3
4
5
6
//peek()
public E peek() {
if (size == 0)
return null;
return (E) queue[0];//0下标处的那个元素就是最小的那个
}
poll()& remove()

1
2
3
4
5
6
7
8
9
10
11
12
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];//0下标处的那个元素就是最小的那个
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);//调整
return result;
}

ScheduledExecutor

通过scheduleAtFixedRate() 每隔规则的时间重复任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

Semaphore ˈseməˌfôr

正常的锁在任何时候都只允许一个任务访问同一项资源, 而计数信号量允许n个任务同时访问这个资源.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package tk.andrewchen;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

/**
* @author dafuchen
* 2018/9/23
*/
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<>();
private volatile boolean[] checkedOut;
private Semaphore available;

public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(size, true);

for (int i = 0; i < size; i++) {
try {
items.add(classObject.newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

public T checkout() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkoutIn(T x) {
if (releaseItem(x)) {
available.release();
}
}
// 这部分 其实可以用blockedQueue 来代替
private synchronized T getItem() {
for (int i = 0; i < size; i++) {
if (! checkedOut[i]) {
checkedOut[i] = true;
}
items.get(i);
}
return null;
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if (index == -1) {
return false;
}
if (checkedOut[index]) {
checkedOut[index] = false;
return true;
}
else {
return false;
}
}
}

Exchanger

Exchange是在两个任务之间交换对象的栅栏

当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行 .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package tk.andrewchen;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Exchanger;

/**
* @author dafuchen
* 2018/9/23
*/
public class ExchangeProducer <T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
private Class<T> classObject;

public ExchangeProducer(Exchanger<List<T>> exchanger, Generator<T> gen, List<T> holder, Class<T> classObject) {
this.exchanger = exchanger;
this.generator = gen;
this.holder = holder;
this.classObject = classObject;
}
@Override
public void run() {
try {
while(! Thread.interrupted()) {
for (int i = 0; i < SIZE; i++) {
holder.add(generator.next(classObject));
}
holder = exchanger.exchange(holder);
}
} catch (Exception e) {

}
}
}
class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;

public ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder) {
exchanger = ex;
this.holder = holder;
}

@Override
public void run() {
try {
while (! Thread.interrupted()) {
holder = exchanger.exchange(holder);
Iterator<T> iterator = holder.iterator();
while (iterator.hasNext()) {
T item = iterator.next();
iterator.remove();
// do something
}
}
} catch (Exception e) {

}
}
}
class Generator<T> {
public T next(Class<T> classObject) {
try {
return classObject.newInstance();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}