Java 多线程并发
线程的创建方式
继承Thread类
1
2
3
4
5
6Thread thread = new Thread() {
public void run() {
}
}实现Runnable接口
1
2
3
4
5
6
7Runnable run = new Runnable() {
public void run() {
//do something
}
};
Thread t = new Thread(run);ExecutorService Callable
, Future 1
2
3
4ExecutorSrevice threadPool = Executors.cachedThreadPool();
Future f = threadPool.submit(new Runnable() {
// 各种run任务的描述
});线程池
newCachedThreadPool
调用execute将重用之前构造的线程(如果线程可用) 如果现有的线程没有可用的,则创建一个新线程并添加到线程池中.终止并从缓存中移除那些已有60s未被使用的线程
newFixedThreadPool
newScheduledThreadPool
newSingleThreadExecutor
线程的结束
定义一个退出标志
1
2
3
4
5
6
7
8
9
10
11
12
13
14Thread t = new Thread() {
volatile Boolean stop;
{
stop = false;
}
public void run() {
while (! stop) {
}
}
}
interrupted
用interrupted 有两种方法
当线程处于blocked的状态的时候.我们调用该线程的interrupt()方法会抛出InterruptedException
在线程在处于非blocked状态的时候 去判断它的中断标示,这个方法会和方法1同时使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14Thread t = new Thread() {
volatile Boolean stop;
{
stop = false;
}
public void run() {
while (! stop && ! isInterrupted()) {
// do something
}
}
}
sleep 和wait之间的区别
- sleep 是属于Thread 的方法. wait是属于object的方法.
- sleep 是让渡cpu时间. 而wait是让渡出临界资源的控制权
Java 锁
悲观锁
悲观锁可以叫做独占锁, 在得到这个资源之后到让渡出这个资源之前其他的事物都不能访问这个资源.
乐观锁
相对于悲观锁, 乐观锁认为在很大的程度下这个资源都不会发生变化 所以也没有必要进行独占(大家都可以访问), 在准备对这个资源进行修改的时候要去验证一下这个资源是不是发生变化了, 如果发生变化了就放弃修改, 如果是没有发生变化,那就修改它. (Compare And Swap)
自旋锁
- 自旋
1 | long offset = Unsafe.getUnsafe().objectFieldOffset(Stuednt.class.getDeclaredField("name")); |
- 如果持有锁的线程能在很短的时间内释放锁资源, 那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞挂起状态,只要等一等就好了.
Synchronized
作用范围
- 作用方法
- 作用静态方法
- sychronized(something)
核心组件
contention list
竞争队列 所有请求锁的线程首先放在这个竞争队列中
entry list
Contention list zhong 有资格称为候选资源的线程被移动到这个竞争队列中
wait set
调用wait的线程在这个队列中
OnDeck
任意时刻最多只能有一个线程正在竞争锁,该线程被称为OnDeck
Owner
获得锁的进程称为Owener
! Owener
释放锁的线程
ContentionList 虚拟队列
ContentionList 并不是一个真正的Queue,而只是一个虚拟队列,原因在于ContentionList是由Node及其next指 针逻辑构成,并不存在一个Queue的数据结构。ContentionList是一个后进先出(LIFO)的队列,每次新加入Node时都会在队头进行, 通过CAS改变第一个节点的的指针为新增节点,同时设置新增节点的next指向后续节点,而取得操作则发生在队尾。显然,该结构其实是个Lock- Free的队列。
因为只有Owner线程才能从队尾取元素,也即线程出列操作无争用,当然也就避免了CAS的ABA问题。
EntryList
EntryList与ContentionList逻辑上同属等待队列,ContentionList会被线程并发访问,为了降低对 ContentionList队尾的争用,而建立EntryList。Owner线程在unlock时会从ContentionList中迁移线程到 EntryList,并会指定EntryList中的某个线程(一般为Head)为Ready(OnDeck)线程。Owner线程并不是把锁传递给 OnDeck线程,只是把竞争锁的权利交给OnDeck,OnDeck线程需要重新竞争锁。这样做虽然牺牲了一定的公平性,但极大的提高了整体吞吐量,在 Hotspot中把OnDeck的选择行为称之为“竞争切换”。
OnDeck线程获得锁后即变为owner线程,无法获得锁则会依然留在EntryList中,考虑到公平性,在EntryList中的位置不 发生变化(依然在队头)。如果Owner线程被wait方法阻塞,则转移到WaitSet队列;如果在某个时刻被notify/notifyAll唤醒, 则再次转移到EntryList。
自旋锁
那些处于ContetionList、EntryList、WaitSet中的线程均处于阻塞状态,阻塞操作由操作系统完成(在Linxu下通 过pthread_mutex_lock函数)LockSupport.park() LockSupport.unPark()。线程被阻塞后便进入内核(Linux)调度状态,这个会导致系统在用户态与内核态之间来回切换,严重影响 锁的性能
缓解上述问题的办法便是自旋,其原理是:当发生争用时,若Owner线程能在很短的时间内释放锁,则那些正在争用线程可以稍微等一等(自旋), 在Owner线程释放锁后,争用线程可能会立即得到锁,从而避免了系统阻塞。但Owner运行的时间可能会超出了临界值,争用线程自旋一段时间后还是无法 获得锁,这时争用线程则会停止自旋进入阻塞状态(后退)。基本思路就是自旋,不成功再阻塞,尽量降低阻塞的可能性,这对那些执行时间很短的代码块来说有非 常重要的性能提高。自旋锁有个更贴切的名字:自旋-指数后退锁,也即复合锁。很显然,自旋在多处理器上才有意义。
还有个问题是,线程自旋时做些啥?其实啥都不做,可以执行几次for循环,可以执行几条空的汇编指令,目的是占着CPU不放,等待获取锁的机 会。所以说,自旋是把双刃剑,如果旋的时间过长会影响整体性能,时间过短又达不到延迟阻塞的目的。显然,自旋的周期选择显得非常重要,但这与操作系统、硬 件体系、系统的负载等诸多场景相关,很难选择,如果选择不当,不但性能得不到提高,可能还会下降,因此大家普遍认为自旋锁不具有扩展性。
自旋优化策略
对自旋锁周期的选择上,HotSpot认为最佳时间应是一个线程上下文切换的时间,但目前并没有做到。经过调查,目前只是通过汇编暂停了几个CPU周期,除了自旋周期选择,HotSpot还进行许多其他的自旋优化策略,具体如下:
如果平均负载小于CPUs则一直自旋
如果有超过(CPUs/2)个线程正在自旋,则后来线程直接阻塞
如果正在自旋的线程发现Owner发生了变化则延迟自旋时间(自旋计数)或进入阻塞
如果CPU处于节电模式则停止自旋
自旋时间的最坏情况是CPU的存储延迟(CPU A存储了一个数据,到CPU B得知这个数据直接的时间差)
自旋时会适当放弃线程优先级之间的差异
那synchronized实现何时使用了自旋锁?答案是在线程进入ContentionList时,也即第一步操作前。线程在进入等待队列时 首先进行自旋尝试获得锁,如果不成功再进入等待队列。这对那些已经在等待队列中的线程来说,稍微显得不公平。还有一个不公平的地方是自旋线程可能会抢占了 Ready线程的锁。自旋锁由每个监视对象维护,每个监视对象一个。
偏向锁
在JVM1.6中引入了偏向锁,偏向锁主要解决无竞争下的锁性能问题,首先我们看下无竞争下锁存在什么问题:
现在几乎所有的锁都是可重入的,也即已经获得锁的线程可以多次锁住/解锁监视对象,按照之前的HotSpot设计,每次加锁/解锁都会涉及到一些 CAS操 作(比如对等待队列的CAS操作),CAS操作会延迟本地调用,因此偏向锁的想法是一旦线程第一次获得了监视对象,之后让监视对象“偏向”这个 线程,之后的多次调用则可以避免CAS操作,说白了就是置个变量,如果发现为true则无需再走各种加锁/解锁流程。但还有很多概念需要解释、很多引入的 问题需要解决:
其意思是所有的CPU会共享一条系统总线(BUS),靠此总线连接主存。每个核都有自己的一级缓存,各核相对于BUS对称分布,因此这种结构称为“对称多处理器”。
而CAS的全称为Compare-And-Swap,是一条CPU的原子指令,其作用是让CPU比较后原子地更新某个位置的值,经过调查发现, 其实现方式是基于硬件平台的汇编指令,就是说CAS是靠硬件实现的,JVM只是封装了汇编调用,那些AtomicInteger类便是使用了这些封装后的 接口。
Core1和Core2可能会同时把主存中某个位置的值Load到自己的L1 Cache中,当Core1在自己的L1 Cache中修改这个位置的值时,会通过总线,使Core2中L1 Cache对应的值“失效”,而Core2一旦发现自己L1 Cache中的值失效(称为Cache命中缺失)则会通过总线从内存中加载该地址最新的值,大家通过总线的来回通信称为“Cache一致性流量”,因为总 线被设计为固定的“通信能力”,如果Cache一致性流量过大,总线将成为瓶颈。而当Core1和Core2中的值再次一致时,称为“Cache一致 性”,从这个层面来说,锁设计的终极目标便是减少Cache一致性流量。
而CAS恰好会导致Cache一致性流量,如果有很多线程都共享同一个对象,当某个Core CAS成功时必然会引起总线风暴,这就是所谓的本地延迟,本质上偏向锁就是为了消除CAS,降低Cache一致性流量。
上面提到Cache一致性,其实是有协议支持的,现在通用的协议是MESI(最早由Intel开始支持),具体参考:http://en.wikipedia.org/wiki/MESI_protocol,以后会仔细讲解这部分。
其实也不是所有的CAS都会导致总线风暴,这跟Cache一致性协议有关,具体参考:http://blogs.oracle.com/dave/entry/biased_locking_in_hotspot
NUMA(Non Uniform Memory Access Achitecture)架构:
与SMP对应还有非对称多处理器架构,现在主要应用在一些高端处理器上,主要特点是没有总线,没有公用主存,每个Core有自己的内存,针对这种结构此处不做讨论。
ReentrantLock
他是一种重入锁,除了能完成sychronized所能完成的所有工作之外,还提供了诸如可响应中断锁、可轮训锁、定时锁能避免多线程死锁的方法.
void lock()
如果锁处于空闲的状态,当前线程将获得锁
boolean tryLock()
如果锁可用,则获得锁,并且立即返回true,否则返回false.tryLock只是”试图”获取锁,如果锁不可用,不会导致这个线程处于wait状态
void unlock()
当前线程将释放持有的锁
Condition newCondition()
条件对象,获取等待通知组件. 该组件和当前的锁绑定.当前线程只有拥有了锁,才能调用该组件的await()方法.而调用了之后,当前线程会释放lock, 被加入到这个condition的等待队列中,这个线程不再会活动,until 其他线程操作这个conditional 调用signal()或者 signalAll(). 被await的线程会到lock的竞争队列中去竞争condition.
int getHoldCount()
查询当前线程保持此锁的次数, 重入的次数,也就是执行此线程执行lock方法的次数
int getQueueLength()
返回正等待获取此锁的线程数量
int getWaitQueueLength(Condition condition)
返回等待与此锁相关的给定条件线程估计数.比方说10个线程,用同一个condition对象,而且10对象都执行了await的方法.那么返回的值为10
bool hasWaiters(Condition condition)
某个Condition 是否有等待的Thread数量
boolean hasQueuedThread(Thread thread)
查询给定的线程是否拥有这个锁
boolean hasQueuedThreads()
是否有线程在等待这个lock
isFair()
是否是公平锁
isHeldByCurrentThread()
这个lock是不是被当前线程占有
isLock()
这个锁是否被其他线程占用
lockInterruptibly()
尝试获取锁,仅在调用时锁未被中断,获取锁
tryLock(Long timeout, TimeUnit unit)
在给定的时间内尝试获取锁,如果没有获取到,则放弃
公平锁和非公平锁
JVM 按随机、就近原则分配原则锁的机制则称为不公平锁,按照对锁提出获取请求的先后顺序分配到锁的机制被称为公平锁
ReetrantLock 与 Sychronized
reetrantLock通过lock()和unlock() 实现加锁和解锁. ReetrantLock 必须在finally控制块中进行解锁操作.
Semaphore
Semaphore是一种基于基数的型号量.它可以设定一个阈值.多个线程获取许可型号(acquire),做完自己的申请后归还(release)
共享锁和独占锁
独占锁模式下,每次只有一个线程能持有锁.共享锁允许多个线程同时获取锁,并发共享资源. 比方说ReadWriteLock.
READ | WRITE | |
---|---|---|
READ | ☑️ | ✖️ |
WRITE | ✖️ | ✖️ |
1 | public class MyReadWriteReetraintLock { |
Interrupt
中断一个线程,本意是给这个线程一个通知信号,会影响这个线程内部一个中断标识.
- 运行中的线程不能响应interrupt. 但是标识位会被设置.可以通过isInterrupted方法来知晓标识位的状态
- 在阻塞状态的进程被调用interrupt方法会抛出InterruptedException, 在抛出错误后会将标识位清除.
线程池的组成
- 线程池管理器
- 工作线程
- 任务接口
- 任务队列
构建一个线程池
1 | public static void main(String[] args) { |
拒绝策略
当线程池里面所有的线程都在运行中, 并且队列都满了情况下有若干的拒绝方式
AbortPolicy
直接抛出异常,阻止系统正常运行
CellerRunsPolicy
该策略直接在调用者线程中运行当前被丢弃的任务. 优点不会真的丢弃任务. 但是任务提交的性能可能会急剧下降
- DiscardOldestPolicy
抛弃队列中最老的线程
- DiscardPolicy
默默的丢弃将要处理的任务不做任何的处理
线程池的工作流程
当调用execute()方法添加一个任务的时候,线程池会做如下判断:
a. 如果正在运行的线程数量小于corePoolSize, 会马上创建线程运行这个任务
b. 如果运行的线程数量大于或者等于corePoolSize 会将这个任务放入队列
c. 如果队列满了, 线程数量小于maximumPoolSize, 那么还是要创建非核心线程来运行这个任务
f. 如果队列满了,而且现在正在运行的线程数量大于或者等于maximumPoolSize,那么线程池会抛出RejectExecutionExecution;
当一个线程无事可做,超过一定时间,线程池会判断,如果当前运行的线程数量大于corePoolSize,那么这个线程就会被停掉,最终线程池会被压缩到corePoolSize的大小
队列的方法
插入 | 移除 | 检查 | |
---|---|---|---|
会抛出异常 | add | remove | element |
不会抛出异常 | offer | poll | peek |
阻塞 | put | take |
ArrayBlockingQueue
数组形式实现的有界阻塞队列
LinkedBlockingQueue
对于消费者和生产者 采用不同的锁来控制数据同步 生产者不停的在队尾增加内容. 消费者不停的在队列头部生产内容
PriorityBlockingQueue
队列使用大根堆的方式进行组织的. 每次取堆的根节点.每次添加内容后会进行堆的rearrange.
DelayQueue
也是用小根堆的方式进行组织, 以延迟的时间作为权重值, 每次都从队列中取内容, 然后delay指定的时间
SynchronousQueue
每一个put必须等待一个take操作.可以看作是一个数据暂存的地方
LinkedTransferQueue
由一个链表组成的无界阻塞TransferQueue队列.相对于LinkedBlockingQueue多了transfer()方法
CountDownLunch
CyclicBarrier
Semaphore
Volatile
在 JMM(Java Memory Model)数据的执行主要分为 lock、unlock、read、load、use、assign、store、write。
- read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
- use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
- assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
- store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
- write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。
这些行为是不可分解的原子操作,在使用上相互依赖,read-load从主内存复制变量到当前工作内存,use-assign执行代码改变共享变量值,store-write用工作内存数据刷新主存相关内容。
要保证所谓的多线程的数据执行正确,就是要保证happen-before。A操作如果发生在B操作之前,那么我们能保证数据的正确。所以我们需要内存屏障的帮助, Java内存屏障主要分为四种,它保证在栅栏前初始化的load和store指令,能够严格有序的在栅栏后的load和store指令之前执行。
Load - Load
确保Load1所要读入的数据能够在被Load2和后续的load指令访问前读入。
Store-Store
确保Store1的数据在Store2以及后续Store指令操作相关数据之前对其它处理器可见(例如向主存刷新数据)
Load-Store
确保Load1的数据在Store2和后续Store指令被刷新之前读取。
Store-Load
确保Store1的数据在被Load2和后续的Load指令读取之前对其他处理器可见。StoreLoad屏障可以防止一个后续的load指令 不正确的使用了Store1的数据,而不是另一个处理器在相同内存位置写入一个新数据。正因为如此,所以在下面所讨论的处理器为了在屏障前读取同样内存位置存过的数据,必须使用一个StoreLoad屏障将存储指令和后续的加载指令分开。Storeload屏障在几乎所有的现代多处理器中都需要使用,但通常它的开销也是最昂贵的。它们昂贵的部分原因是它们必须关闭通常的略过缓存直接从写缓冲区读取数据的机制。这可能通过让一个缓冲区进行充分刷新(flush),以及其他延迟的方式来实现。
在下面讨论的所有处理器中,执行StoreLoad的指令也会同时获得其他三种屏障的效果。所以StoreLoad可以作为最通用的(但通常也是最耗性能)的一种Fence。(这是经验得出的结论,并不是必然)。反之不成立,为了达到StoreLoad的效果而组合使用其他屏障并不常见。
ThreadLocal 作用
这两篇说的很清楚了
每个Thread 对象内部都有ThreadMap<Entity, T>类型的属性,其中Entity extend WeakReference < ThreadLocal>的定义为.
1 | static class Entry extends WeakReference<ThreadLocal<?>> { |
WeakReference如字面意思,弱引用, 当一个对象仅仅被weak reference(弱引用)指向,而没有任何其他strong reference(强引用)指向的时候, 如果这时GC运行, 那么这个对象就会被回收。我们定义了Entity, 其中ThreadLocal 是弱引用,value 是强引用。 Entity中的ThreadLocal的值能被顺利的GC, 但是value中的值不能呗顺利的GC, 需要手动的设置,让JVM 让其GC。
每次ThreadLocal的set和get方法实质是在ThreadMap中放置或者读取值
ThreadLocal 内存泄露问题
在程序的上下文中
1 | void dosomething() { |
那么在结束之后, name会被gc, 因为entity的定义, 其对象也会被回收,但是ThreadLocalMap 中的table[i] 中还是放着value的值,导致内存泄漏 比较建议的做法是
1 | void dosomething() { |
ReetrantLock和Synchronized 之间的区别
- ReentrantLock 显示的获得锁和释放锁, Synchronized隐式的获得锁
- ReentrantLock可响应中断, Synchronized不可以
- ReetrantLock 是api级别 Sychronized 是JVM 级别
- ReetrantLock 可以实现公平锁
- ReetrantLock 通过condition可以绑定多个条件
- synchronized 是同步阻塞, lock是同步非阻塞
- synchronized发生异常的时候会自动释放锁,而Lock需要在finally中调用unlock()
- 通过Lock可以知道是否获得锁,而Synchronized不可以
ConcurrentHashMap
concurrentHashMap的效率要比HashTable效率高的原因是ConcurrentHashMap采用了分片的策略,默认分为16片。