线程的深入理解

Dec 16, 2017


概要

  1. 线程的基本概念(线程的含义、线程状态图、以及jstack返回信息分析)
  2. 线程栈的结构
  3. synchronized实现原理
  4. Lock(ReentrantLock、ReentrantWriteReadLock)
  5. 线程之间的同步(wait和notify notifyAll,await signal signalAll)、如何实现阻塞队列、以及实现生产者和消费者集中方法(通过 wait notify实现线程之间同步、使用阻塞队列、管道 PipelineOutputStrean PipelineInputStream)
  6. 通过java的内存模型理解volatile
  7. CountDownLatch 和 CylicBarrier Semaphore如何使用以及区别
  8. 线程池的概念(线程池作用、状态转换、类图)

线程基本概念


1、线程和进程、程序的关系:

  • 程序是静态的,一个存放在磁盘中的指令和数据集合。
  • 进程是动态的,将磁盘中的程序加载到内存中就变成了进程。操作系统给进程分配相应的资源、然后等待分配到cpu资源
  • 线程是cpu调度的单元,一个进程中可以包含至少一个线程。线程之间共享进程的地址空间(共用堆内存)、其实线程有自己运行必备的空间(程序计数器、栈、一组寄存器)。线程之间可以并发、当线程被强占掉了cpu就会进行线程上下文的切换,由于线程共享进程的资源、保存上下文(保存当前线程被强占的位置)的信息代价也小,并发性也很高,相比进程之间的上下文切换要高效
  • 线程可以看作是cpu和指令之间协调者。要想在计算机上运行我们编写的程序,就必须要建立一个线程,最后通过线程的协助下,cpu才能根据指令输出我们想要的结果
  • 一个程序中的任务可看作是一场马拉松,在中途各个地方配置的供水部好比进程,而路途中进行跑部的人好比线程。

2、线程状态图

props

1、新建状态 线程通过 new Thread()进行创建

2、可运行状态 全部资源已经准备好了,就等到os调度分配cpu资源

3、运行 当线程获取到cpu时后执行程序代码

4、阻塞

当线程等某些资源而放弃cpu资源,停止运行,等到变成可运行状态时才会等到os分配cpu资源执行。其中阻塞分为 同步阻塞、等待阻塞、其他阻塞。

  • 同步阻塞 当线程获取锁资源时发现该锁已经被其他线程占用,则该线程处于Block然后进入锁池等待重新获取锁标识进入可运行状态

  • 等待阻塞 当在同步方法或同步块中调用 obj.wait()方法时则进入等待队列,释放锁或监视器资源,等待其他线程在同步方法或同步块中调用notify notifyAll方法唤醒等待的线程。然后此线程进入了锁池重新竞争锁标识,获取到了后进入可运行状态。

    为了wait和notify notifyAll必须在同步条件下执行?

    首先wait和notify notifyAll是Object对象的方法,java中的一切对象都继承了Object,而每个对象都会有一把对象锁,wait notify notifyAll方法实现就用到了锁的标识。线程处于等待时会记录相应的对象锁,同时notfiy时通知对象锁对应的阻塞线程。 如果不同步调用wait和notify方法,会导致,线程还没有进入等待队列时就已经调用了notify方法这是此线程会一直阻塞下去

  • 其他阻塞 I/O和sleep t.join都不会释放锁或监视器 I/O阻塞例如等待用户输入,用户输入完成时线程就进入了可运行状态 sleep(long time) 线程进入暂停阻塞状态,等待time时间后自动进入可运行状态 t.join() 当线程t执行结束后主线程进入可运行状态

3、工作中如何分析jstack中的日志

jstack pid可以将某个进程的某个时刻进程的状态快照打印出来

有一次线上出现cpu占用率很高的情况,进一步排查 top 查看占用cpu的进程,拿到pid 进一步找到该进程中使用cpu最多的线程的id。 top -Hp pid 然后将此线程id转换成16进制 0xaefc 然后thread dump两次 查看nid=0xaefc的线程的状态情况并且进行对比, jstack | grep 0xaefc > thread.dump

因等待获取对象监视器锁而进入Entry Set waiting for object entry

props

很明显:线程1获取到锁,处于RUNNABLE状态,线程2处于BLOCK状态

1、locked <0x000000076bf62208>说明线程1对地址为0x000000076bf62208对象进行了加锁;

2、waiting to lock <0x000000076bf62208> 说明线程2在等待地址为0x000000076bf62208对象上的锁;

3、waiting for monitor entry [0x000000001e21f000]说明线程1是通过synchronized关键字进入了监视器的临界区,并处于”Entry Set”队列,等待monitor,具体实现可以参考深入分析synchronized的JVM实现

因在同步块中调用了wait方法而进入等待队列 in Object.wait()

static class Task implements Runnable {
@Override
public void run() {
    synchronized (lock) {
        try {
            lock.wait();
            //TimeUnit.SECONDS.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
} } dump结果:

props

线程1和2都处于WAITING状态

1、线程1和2都是先locked <0x000000076bf62500>,再waiting on <0x000000076bf62500>,之所以先锁再等同一个对象,是因为wait方法需要先通过synchronized获得该地址对象的monitor;

2、waiting on <0x000000076bf62500>说明线程执行了wait方法之后,释放了monitor,进入到”Wait Set”队列,等待其它线程执行地址为0x000000076bf62500对象的notify方法,并唤醒自己,最后进入锁池中等待获取锁,然后进入Runnable状态 具体实现可以参考深入分析Object.wait/notify实现机制;

因sleep(time)或join导致线程阻塞此时不释放锁资源 TIME_WAITING

static class Task implements Runnable {
 @Override
public void run() {
    synchronized (lock) {
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  }
}

props

runnable:状态一般为RUNNABLE 就绪或者正在执行。

in Object.wait():等待区等待,状态为WAITING或TIMED_WAITING wait() 和 wait(timeout)

waiting for monitor entry:进入区等待,状态为BLOCKED 获取对象锁时

waiting on condition:等待某个条件,当条件(休眠的时间、网络资源、队列资源等)一满足就可以获得执行机会

Thread.sleep表示线程暂停一定时间后再运行,此时并不释放锁资源,其中parking是指挂起,当大量出现wait on condition表示正等待某个条件,很可能说名大部分线程都一直阻塞并等待网络上可读的数据,其中如果传统的NIO哪怕没有网络中没有可读数据时线程也会一直阻塞。此时应该确定是否网络堵塞、还是路由出问题了。 BlockqQueue是一个阻塞队列,当调用put方法时,如果队列已经满了就会一直阻塞,take,当队列没有数据可取时会一直阻塞直到队列有数据。

线程栈的结构


props

一个线程中可以包含多个栈帧(方法),而一个方法就相当于一个栈帧。当前被cpu执行的方法的栈帧成为当前栈帧,该帧对应的方法为当前方法、当前被调度获取到cpu的线程称为当前线程。

需要特别注意的是:
栈帧是线程本地私有的数据,不可能在一个栈帧之中引用另一条线程的栈帧。

1、栈帧的结构:

局部变量在栈内存所需大小和操作数栈的深度在编译的时候就确定了保存在方法区类字节码文件中,栈帧随方法调用而产生,方法无论正常还是异常结束后都会自动销毁,当不恰当使用递归会导致StackOverflowException异常。其中栈中还有一个异常表引用

  • 局部变量表

局部变量表中存放的就是slot[] 分配内存单位为变量槽slot(32为系统为32位、64位系统为64位),float、returnAddress(一般指向字节码指令地址) 其中boolean、short、char 都会转换成int都是1个slot。reference(可以查找堆中对象的起始地址索引和方法区中对象类型的数据)可能是占1个slot或2个slot。但是double long占用2个slot。 其中可以通过下标访问对应的变量。例如访问double变量时只需取出3号slot,然后+1就可以获取整个地址。如果该方法是非static的话默认slot[0] 为this指向当前对象在堆中起始地址索引。当变量超出了作用域名、slot变量槽可以被复用,当slot一直占用时会影响gc

  • 操作数栈

props

局部变量表是通过下标访问的,而操作数是通过进栈和出栈完成相应的操作。其中操作数可以是调用方法的参数或接受方法返回的结果

  • 整体栈活动图

props

注意:上图是贴过来的,有个小问题,栈帧上面的addTwoTypes为addAndPrint,最后面的addAndPrint执行结果改成addTwoTypes执行结果放在addAndPrint的操作数栈中。

  • 栈信息(动态连接、返回地址、其他附加信息)

返回地址:一般栈帧存放的返回地址就是PC程序计数器中指令的地址

一种是遇到返回字节码指令或方法正常结束完成,其中如果有返回值则压入被调用的方法栈帧的操作数栈中,返回地址为线程中PC中当前指向指令地址,修改PC中的值,继续在上层调用的方法栈帧中执行下一条指令

第二种是异常结束 在执行过程抛出了异常或者是遇到throw指令时,如果在方法的异常表中没有找到匹配的异常处理器时,则抛给向上层调用的方法处理。此时不会给上层调用产生任何返回值

其实无论异常退出还是正常退出,都要回到上层调用的方法,此时会恢复上层方法栈帧的局部变量表和操作数栈、如果有返回值则压入操作数栈、最后调整PC计数器值以指向方法调用指令后面的一条指令

动态链接

是指向常量池中地址引用,当需要执行常量池指令时可以通过这个地址进行访问

像c语言在编译成目标对象时,通过链接器就把各个目标对象拼成一个可执行文件,每个目标文件的符号引用替换成可执行文件的相对偏移地址(逻辑地址)。 而java的链接是在运行时动态完成的。 在字节码文件中像类中的类名、字段、方法的引用都看做是符号的引用,存放在该类的常量池中

  • 饥饿式 当字节码文件被加载时就完成符号引用解析过程(将所有符号引用转换成对应相对偏移的逻辑地址)
  • 懒汉式 当该符号第一次被使用的时候被解析

绑定:就是将类、字段、方法的符号引用转换成直接引用过程,只发生一次,一旦绑定符号引用会被完全替换。其中直接引用就是相当类或方法的一个偏移地址。如果使用的类的符号引用还没被解析则加载改类并完成绑定

栈帧组成图配详细说明

props

synchronized实现原理


1、synchronized对应的jvm字节码如何实现的

使用在静态方法或实例方法上,或实例块中。

	synchronized(o){
    	代码
	} 可以使用javap -c Test 查看synchronized转换成对应的class字节码指令为 

	monitorenter 
	指令 
	monitorexit 

其中 monitorenter和monitorexit指令调用了系统层面的Mutex Lock这是会导致用户态转换成内核态、其中转换代价很大,极大降低了程序的性能,下面会分析jdk1.6以后如何实现synchronized,并优化了对象锁。

其中在静态方法或实例方法上

public void synchronized a(){
    	代码
	}

使用 javap -c Test查看对应的class字节码指令为,在其a方法标识 ACC_SYNCHRONIZED=1 其实synchronized的同步方法也可以上面的monitorenter monitorexit这两个指令。当执行方法时首先判断ACC_SYNCHRONIZED是否为1,若为1则表示同步方法,在执行时或尝试获取对象锁(若实例方法则为实例对象,静态方法则为类对象)监视器锁,若获取成功则执行方法体否则阻塞,此时没有使用jvm指令获取锁而是隐式的通过操作系统进行操作。

2、jdk1.6是如何对锁进行优化的

synchronized是一种悲观锁的实现,就是一种只允许一个线程获取锁,其他线程阻塞。其中会导致用户态切换到内核态,代价非常大,可能在进入阻塞态不久就释放了锁,那么此时线程又要重新竞争锁,极大的影响了程序的性能。

CAS(compare and swap)是一种乐观锁的实现方式,线程并不会进行阻塞,使用死循环方式一只占用cpu,失败执行某个操作时会不断尝试直到获取成功。如果争用锁不严重的话并且方法执行时间短,则极大提高了性能。

CAS原理:其实CAS会导致两个问题:

  • 著名的ABA问题(例如:链表结构 A->B 线程1将链表替换成A->C->D,而线程2并不知道,然后还是比较链头部发现仍然是A则又进行替换成B,从而C和D就不能再被访问了,AtomicInteger就有ABA问题,为了避免此问题使用版本号解决AtomicStampReference就是这么解决的)

  • Cache一致性流量,也即是本地延迟问题。现在计算机一般都是多个cpu,其中每个cpu中都会有L1缓存,cpu和cpu之间通信,cpu和内存通信都是通过bus总线实现的,当某个cpu中的L1缓存某个数据改变了,此时需要更新缓存同时需要通知其他的cpu需要重新到内存获取该数据。最终达成了Cache一致性,CAS就会产生大量的bus通信,进而bus成为瓶颈,最后会导致本地延迟。CAS是一个cpu原子指令,当执行操作时某个cpu取出缓存值和传入的expect进行比较,若想等则设置新的值到内存中,然后通知其他cpu再次从内存读取新的数据,此时过程都是原子的,其他的线程更新时发现内存数据和except不一样了则更新失败。

为什么jdk1.6后会在synchronized中使用到CAS和偏向锁、适应性自旋锁呢?

其实就是为了降低线程进入阻塞,也就是从用户态进入操作系统层面的内核态的可能性。

锁的粗化:避免多次使用lock unlock从而对整个方法或块进行加锁

锁的消除;在运行时JIT编译器会对在单线程运行环境下还通过同步的方式执行方法进行对锁的消除。

轻量级锁:为了避免一下获取锁失败就进入了阻塞状态,在用户态增加了尝试的操作,使用CAS(cpu原子指令)完成对锁的竞争。

偏向锁:由于CAS相对于synchronized降低了不少的性能,但是CAS会存在本地延迟的现象,所以在无需锁的情况下,可以设置成偏向锁,让该锁偏向于某个线程

适应性自旋锁:当CAS仍然获取锁失败的情况下,首先会让线程忙等待,然后再进行尝试,尝试一定次数后仍然失败则进入阻塞状态

对象头:

props

对象头占用位数因不同的机器而不同 32位机器则对象头为32 64位的机器则对象头为64位。MarkWord分为两部分 一部分为LockWord和低3位是锁状态位。Klass ptr指向方法区中字节码的地址,Fields表示对象实例字段

props

monitor record:是线程的一种私有的数据结构,每个线程都有一个可用的monitor record列表和一个全局的可用列表

Owner:拥有改对象锁的唯一的线程标识

EntryQ:关联一个系统互斥锁(semaphore),阻塞所有试图锁住

monitor record失败的线程。

RfThis:表示blocked或waiting在该monitor record上的所有线程的个数

Nest: 初始为1 当调用同类的其他同步方法时Nest+1

HashCode:拷贝对象锁的头的HashCode值

Candidate:用来避免不必要的阻塞或等待线程唤醒,因为每一次只有一个线程能够成功拥有锁,如果每次前一个释放锁的线程唤醒所有正在阻塞或等待的线程,会引起不必要的上下文切换(从阻塞到就绪然后因为竞争锁失败又被阻塞)从而导致性能严重下降。Candidate只有两种可能的值0表示没有需要唤醒的线程1表示要唤醒一个继任线程来竞争锁

synchronized加锁的过程

1、当前对象的状态位是无锁状态,则各个线程会从monitor record空闲列表获取一个monitor record表,然后初始化Nest=1 Owner=自己线程id,当都初始化完成后,使用CAS将自己的monitor record的首地址安装到对象头的LockWord(MarkWord),此时对象已经膨胀了(对象头的地址变大了,为了保证对象头空间效率所以将monitor record表挂在外面,只需要attach这个表即可),安装成功的线程中的monitor reocrd作为全局的monitor record。其他调用CAS失败的线程会到monitorenter重新开始获取锁的过程

2、对象膨胀并且Owner就是为该线程id,这种情况属于锁的重入,此时并不需要原子操作,只需要Nest+1,效率非常高。

3、对象膨胀,Owner=NULL,当其等待或阻塞的线程尝试获取锁的同时上个锁的拥有者释放锁,此时会使用CAS 线程竞争把自己的id设置到monitor record中的Owner上,首先获取内存中的值赋给 a = nid,然后判断如果再次从cpu缓存中取出Owner中的值和变量a比较,若相等则设置新的线程id,因此获取到该锁,那么其他线程只有等待了。那么进入 step 4

4、对象膨胀,Owner不为NULL(已经锁住了),此时线程会忙等待后再尝试,尝试一定次数后仍然获取失败,则进入阻塞状态,此时rfThis会加1,然后再判断对象是否仍然和这个monitor record关联(有可能被其他线程所改变),若改变了,则又会从monitorenter开始使用CAS重新竞争锁,如果还是失败了则进入阻塞状态

释放锁的过程

1、首先判断monitor record表中的Owner释放和当前线程的id是否相等,不等则抛出异常

2、先判断Nest 若Nest>1则需要进行减1直到Nest=1后进入 step 3

3、判断rfThis 若rfThis>1 将Owner设置为NULL 则进行唤醒其他线程获取锁,若为0则进入step 4

4、缩减(deflate)对象头中MarkWord设置为原来的HashCode,和monitor record解除关联,归还monitor record到可用列表中

Lock实现原理


Lock类于接口的关系

props

	public interface Lock {
	    void lock();
	    void lockInterruptibility() throws InterruptedException;  // 可以响应中断
	    boolean tryLock();
	    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;  // 可以响应中断
	    void unlock();
	    Condition newCondition(); //和线程之间同步有关系
	}

lock() 线程获取锁,未获取到则进行等待

tryLock() 如果获取了锁则会返回true否则立即返回false

tryLock(time,TimeUnit)throws InterruptException 线程等待一定时间,若还是没获取到则返回false,否则返回true,其间可以发生响应中断

lockInterruptibility() throws InterruptException 线程如果没有获取到锁会进行阻塞,但是可以响应中断请求

unlock() 释放锁资源,在finally调用

	public interface ReadWriteLock {
	    /**
	     * Returns the lock used for reading.
	     *
	     * @return the lock used for reading.
	     */
	    Lock readLock();
	
	    /**
	     * Returns the lock used for writing.
	     *
	     * @return the lock used for writing.
	     */
	    Lock writeLock();
	} ###synchronized和Lock的区别?为什么要引入Lock?
  • Lock可以响应中断,也就是对等待获取锁的线程可以中断,而使用synchronized时进入锁池中等待的线程不能中断,除非是wait()进入等待队列或sleep()的线程才可以中断
  • ReentrantReadWriteLock中返回 WriteLock ReadLock 实现仅有一个线程进行写操作,但是可以多个线程并发读

不过要注意的是,如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程也会一直等待释放写锁。

  • Lock可以清楚知道线程有没有获取到锁

###如何使用Lock 由于lock必须手动去释放锁,而synchronized不需要,一抛出异常则自动释放锁

	void a1() throws InterruptException {
		lock.lockInterruptibility()
		try{
		//代码
		}catch(InterruptException e){
		}finally{
		lock.unlock();
		}
		}
		和下面进行对比
	void a2(){
		try{
		lock.lockInterruptibility()
		}catch(InterruptException e){
		}finally{
		lock.unlock();
		}
		}

其中如果线程2执行方法2时 thread2正在等待锁资源时发送了中断请求, thread2.intrrupt()那么虽然捕获了InterruptException异常,但是又会在lock.unlock发生IllegalMonitorStateException异常,此时没有捕获则终止程序运行。由于thread2并没有获取到对象lock锁,而执行unlock前提条件是获取lock,因此就会抛出异常。应该像a1方法那样将异常抛出去。

Lock锁的特性

  • 可重入性

锁的概念是针对线程的而不是针对方法的 下面就是锁的可重入性,如果锁不能重入,那么当执行a方法时获取了对象o的锁,当执行b仍然需要获取对象o的锁,那么此时就会导致死锁。因此对于同个线程可以调用任意一个同步方法,不需要重新获取锁 Lock可以synchronized都具有可重入性 public synchronized a(){ b(); } public synchronized b(){ }

  • 中断性

Lock锁可支持对等待锁资源的线程进行中断,但是synchronized 线程会一直等待获取锁资源不能响应中断

  • 公平性

Lock可以设置成公平锁或非公平锁,公平锁就会给先来的线程优先分配锁资源,而非公平的锁就是随机分配,无法保证按照线程请求锁的顺序分配,这样会导致某个线程永远获取不到锁资源。synchronized就是非公平锁

Lock实现原理

props

ReentrantLock实现了Lock接口,内部有三个内部类,Sync、NonfairSync、FairSync,Sync是一个抽象类型,它继承AbstractQueuedSynchronizer,这个AbstractQueuedSynchronizer是一个模板类,它实现了许多和锁相关的功能,并提供了钩子方法供用户实现,比如tryAcquire,tryRelease等。Sync实现了AbstractQueuedSynchronizer的tryRelease方法。NonfairSync和FairSync两个类继承自Sync,实现了lock方法,然后分别公平抢占和非公平抢占针对tryAcquire有不同的实现。

AbstractQueuedSynchronizer

AQS即是AbstractQueuedSynchronizer,一个用来构建锁和同步工具的框架,包括常用的ReentrantLock、CountDownLatch、Semaphore等。

AQS没有锁之类的概念,它有个state变量,是个int类型,在不同场合有着不同含义。本文研究的是锁,为了好理解,姑且先把state当成锁。

AQS围绕state提供两种基本操作“获取”和“释放”,有条双向队列存放阻塞的等待线程,并提供一系列判断和处理方法,简单说几点:

volatile int state state是独占的,还是共享的; state被获取后,其他线程需要等待; state被释放后,唤醒等待线程; 线程等不及时,如何退出等待。 至于线程是否可以获得state,如何释放state,就不是AQS关心的了,要由子类具体实现。

其AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,在AbstractOwnableSynchronizer类有个exclusiveOwnerThread表示独占锁的线程,AbstractQueuedSynchronizer使用CLH锁队列来将并发执行变成串行执行。整个队列是一个双向链表。每个CLH锁队列的节点,会保存前一个节点和后一个节点的引用,当前节点对应的线程,以及一个状态。这个状态用来表明该线程是否应该block。当节点的前一个节点被释放的时候,当前节点就被唤醒,成为头部。新加入的节点会放在队列尾部。

NonFairLock中的lock方法活动图

props

lock详细描述

设计到的方法有: AbstractQueuedSynchronizer类 acquire() acquireQueued(addWaiter()) selfInterrupt() NonFairSync类 tryAcquire()

一、尝试获取锁

1、在初始化ReentrantLock的时候,如果我们不传参数是否公平,那么默认使用非公平锁,也就是NonfairSync。

2、当我们调用ReentrantLock的lock方法的时候,实际上是调用了NonfairSync的lock方法,这个方法先用CAS操作,去尝试抢占该锁。如果成功,就把当前线程设置在这个锁上,表示抢占成功。如果失败,则调用AbstractQueuedSynchronizer 模板类的acquire方法,等待抢占。代码如下:

	static final class NonfairSync extends Sync {
	        final void lock() {
	            if (compareAndSetState(0, 1))
	                setExclusiveOwnerThread(Thread.currentThread());
	            else
	                acquire(1);
	        }
	 
	        protected final boolean tryAcquire(int acquires) {
	            return nonfairTryAcquire(acquires);
	        }
	}

3、调用acquire(1)实际上使用的是AbstractQueuedSynchronizer的acquire方法,它是一套锁抢占的模板,总体原理是先去尝试获取锁,如果没有获取成功,就在CLH队列中增加一个当前线程的节点,表示等待抢占。然后进入CLH队列的抢占模式,进入的时候也会去执行一次获取锁的操作,如果还是获取不到,就调用LockSupport.park将当前线程挂起。那么当前线程什么时候会被唤醒呢?当持有锁的那个线程调用unlock的时候,会将CLH队列的头节点的下一个节点上的线程唤醒,调用的是LockSupport.unpark方法。acquire代码比较简单,具体如下:

		public final void acquire(int arg) {
		        if (!tryAcquire(arg) &&
		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		            selfInterrupt();
		}

注:在因获取不到锁时,阻塞起线程之前,会多次调用tryAcquire()重新获取锁,会有可能在线程进入阻塞前不久刚好释放锁,那么就避免了线程因进入阻塞而唤醒重新竞争锁,提高了效率

4、acquire方法内部先使用tryAcquire这个钩子方法去尝试再次获取锁,这个方法在NonfairSync这个类中其实就是使用了nonfairTryAcquire,具体实现原理是先比较当前锁的状态是否是0,如果是0,则尝试去原子抢占这个锁(设置状态为1,然后把当前线程设置成独占线程),如果当前锁的状态不是0,就去比较当前线程和占用锁的线程是不是一个线程,如果是,会去增加状态变量的值,从这里看出可重入锁之所以可重入,就是同一个线程可以反复使用它占用的锁。如果以上两种情况都不通过,则返回失败false。代码如下:

	final boolean nonfairTryAcquire(int acquires) {
	            final Thread current = Thread.currentThread();
	            int c = getState();
	            if (c == 0) {
	                if (compareAndSetState(0, acquires)) {
	                    setExclusiveOwnerThread(current);
	                    return true;
	                }
	            }
	            else if (current == getExclusiveOwnerThread()) {
	                int nextc = c + acquires;
	                if (nextc < 0) // overflow
	                    throw new Error("Maximum lock count exceeded");
	                setState(nextc);
	                return true;
	            }
	            return false;
	        }

tryAcquire一旦返回false,就会则进入acquireQueued流程,也就是基于CLH队列的抢占模式:

二、线程进入等待队列

5、首先,在CLH锁队列尾部增加一个等待节点,这个节点保存了当前线程,通过调用addWaiter实现,这里需要考虑初始化的情况,在第一个等待节点进入的时候,需要初始化一个头节点然后把当前节点加入到尾部,后续则直接在尾部加入节点就行了。

	private Node addWaiter(Node mode) {
			// 初始化一个节点,这个节点保存当前线程
	        Node node = new Node(Thread.currentThread(), mode);
	        // 当CLH队列不为空的视乎,直接在队列尾部插入一个节点
	        Node pred = tail;
	        if (pred != null) {
	            node.prev = pred;
	            if (compareAndSetTail(pred, node)) {
	                pred.next = node;
	                return node;
	            }
	        }
			// 当CLH队列为空的时候,调用enq方法初始化队列
	        enq(node);
	        return node;
	}
	
	
	

	 
	private Node enq(final Node node) {
	        for (;;) {
	            Node t = tail;
	            if (t == null) { // 初始化节点,头尾都指向一个空节点
	                if (compareAndSetHead(new Node()))
	                    tail = head;
	            } else {// 考虑并发初始化
	                node.prev = t;
	                if (compareAndSetTail(t, node)) {
	                    t.next = node;
	                    return t;
	                }
	            }
	        }
	}

enq是个死循环,保证Node一定能插入队列。注意到,当队列为空时,会先为头节点创建一个空的Node,因为头节点代表获取了锁的线程,现在还没有,所以先空着。

三、阻塞等待线程

6、标记1是线程唤醒后尝试获取锁的过程。如果前一个节点正好是head,表示自己排在第一位,可以马上调用tryAcquire尝试。如果获取成功就简单了,直接修改自己为head。这步是实现公平锁的核心,保证释放锁时,由下个排队线程获取锁。至于上一个节点,它的next变量被设置为null,在下次GC的时候会清理掉。 如果本次循环没有获取到锁,就进入线程挂起阶段,也就是shouldParkAfterFailedAcquire这个方法。 代码如下

	final boolean acquireQueued(final Node node, int arg) {
	    boolean failed = true;
	    try {
	        boolean interrupted = false;
	        for (;;) {
	            //1
	            final Node p = node.predecessor();
	            if (p == head && tryAcquire(arg)) {
	                setHead(node);
	                p.next = null; // help GC
	                failed = false;
	                return interrupted;
	            }
	            //2
	            if (shouldParkAfterFailedAcquire(p, node) &&
	                parkAndCheckInterrupt())
	                interrupted = true;
	        }
	    } finally {
	        if (failed)
	            cancelAcquire(node);
	    }
	}

7、如果尝试获取锁失败,就会进入shouldParkAfterFailedAcquire方法,会判断当前线程是否挂起,如果前一个节点已经是SIGNAL状态,则当前线程需要挂起。如果前一个节点是取消状态,则需要将取消节点从队列移除。如果前一个节点状态是其他状态,则尝试设置成SIGNAL状态,并返回不需要挂起,从而进行第二次抢占。完成上面的事后进入挂起阶段。

Node变量waitState描述了线程的等待状态,一共四种情况:

	static final int CANCELLED =  1;   //取消
	static final int SIGNAL    = -1;     //下个节点需要被唤醒
	static final int CONDITION = -2;  //线程在等待条件触发
	static final int PROPAGATE = -3; //(共享锁)状态需要向后传播
 

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //
            return true;
        if (ws > 0) {
            //
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

当进入挂起阶段,会进入parkAndCheckInterrupt方法,则会调用LockSupport.park(this)将当前线程挂起。

	private final boolean parkAndCheckInterrupt() {
	        LockSupport.park(this); //线程被阻塞,一直等待当前获取锁的线程释放锁后,唤醒该线程,重新竞争锁
	        return Thread.interrupted();
	} parkAndCheckInterrupt使用了LockSupport,和cas一样,最终使用UNSAFE调用Native方法实现线程阻塞(park和unpark类似于wait 和 notify)。最后返回唤醒后的线程中断位,其中lock()和lockInterruptibitlity针对这个返回的状态位有不一样的实现。

总结一下获取锁的过程:线程去竞争一个锁,可能成功也可能失败。成功就直接持有资源,不需要进入队列;失败的话进入队列阻塞,等待唤醒后再尝试竞争锁

四、释放锁

NonFairLock的unlock活动图

props

unlock方法描述 通过上面详细的获取锁过程分析,释放锁过程大概可以猜到:头节点是获取锁的线程,先移出队列,再通知后面的节点获取锁。

	public void unlock() {
	    sync.release(1);//sync = new NonFairSync();
	}

RentrantLock的unlock调用了 AQS的release方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}  tryRelease方法调用的是Sync类中的方法

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);//将AbstractOwnableSynchronizer的exclusiveOwnerThread设置为null表示当前锁没有线程被占用
    }
    setState(c);
    return free;
}

因为锁是可以重入的,所以每次lock会让state加1,对应地每次unlock要让state减1,直到为0时将独占线程变量设置为空,返回标记是否彻底释放锁

最后,调用unparkSuccessor将头节点的下个节点唤醒:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
} 如果头部节点的下个节点为null或waitStatus>0则 从队列尾向前查询的,找到线程后调用LockSupport的unpark方法唤醒线程。否则直接唤醒head的下个节点。被唤醒的线程会重新执行acquireQueued里的for循环重新竞争锁。

中断锁

就是对正在等待获取锁资源时的线程发起中断请求,现场响应中断,则就不会继续等待锁资源了。但对正在运行的线程,中断请求不起作用。

	static void selfInterrupt() {
	    Thread.currentThread().interrupt();
	}

为什么要这样操作呢?因为LockSupport.park阻塞线程后,有两种可能被唤醒。

第一种情况,前节点是头节点,释放锁后,会调用LockSupport.unpark唤醒当前线程。整个过程没有涉及到中断,最终acquireQueued返回false时,不需要调用selfInterrupt。

第二种情况,LockSupport.park支持响应中断请求,能够被其他线程通过interrupt()唤醒。但这种唤醒并没有用,因为线程前面可能还有等待线程,在acquireQueued的循环里,线程会再次被阻塞。parkAndCheckInterrupt返回的是Thread.interrupted(),不仅返回中断状态,还会清除中断状态,保证阻塞线程忽略中断。最终acquireQueued返回true时,真正的中断状态已经被清除,需要调用selfInterrupt维持中断状态。

因此普通的lock方法并不能被其他线程中断,ReentrantLock是可以支持中断,需要使用lockInterruptibly。

两者的逻辑基本一样,不同之处是parkAndCheckInterrupt返回true时,lockInterruptibly在 acquireQueued方法中

	if (shouldParkAfterFailedAcquire(p, node) &&
	                    parkAndCheckInterrupt())
	throw new InterruptedException();

公平锁和非公平锁

其中主要区别是: 非公平锁 只要一有机会线程就会去竞争锁,而公平锁 遵循FIFO原则,后来的线程乖乖的排队去。 其中非公平锁可能会导致某个线程一直都获取不到线程。 而ReentrantLock默认使用非公平锁,其中效率会比公平锁高,由于公平锁为了保证规规矩矩排队,增加了阻塞和唤醒时间开销。 如果直接插队获取非公平锁,(使用CAS避免了不必要的线程阻塞)跳过了耗时的阻塞和唤醒步骤,速度会更快

lock方法的比较:

	非公平锁:
	
	final void lock() {
	    if (compareAndSetState(0, 1))
	        setExclusiveOwnerThread(Thread.currentThread());
	    else
	        acquire(1);
	}
	公平锁:
	final void lock() {
	acquire(1);
	}
	
	非公平锁:
	
	final boolean nonfairTryAcquire(int acquires) {
	  final Thread current = Thread.currentThread();
	  int c = getState();
	  if (c == 0) {
	      if (compareAndSetState(0, acquires)) {
	          setExclusiveOwnerThread(current);
	          return true;
	      }
	  }
	  else if (current == getExclusiveOwnerThread()) {
	      int nextc = c + acquires;
	      if (nextc < 0) // overflow
	          throw new Error("Maximum lock count exceeded");
	      setState(nextc);
	      return true;
	  }
	  return false;
	}
	
   公平锁:
   
   protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && //和非公平锁相比多了hashQueuePredecessors判断
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

这个方法用于判断CHL队列中是否有节点,对于公平锁,如果CHL队列有节点,则新进入竞争的线程一定要在CHL上排队,而非公平锁则是无视CHL队列中的节点,直接进行竞争抢占,这就有可能导致CHL队列上的节点永远获取不到锁,这就是非公平锁之所以不公平的原因。

总结

线程使用ReentrantLock获取锁分为两个阶段,第一个阶段是初次竞争,第二个阶段是基于CHL队列的竞争。在初次竞争的时候是否考虑队列节点直接区分出了公平锁和非公平锁。在基于CHL队列的锁竞争中,依靠CAS操作保证原子操作,依靠LockSupport来做线程的挂起和唤醒,使用队列来保证并发执行变成了串行执行,从而消除了并发所带来的问题。总体来说,ReentrantLock是一个比较轻量级的锁,而且使用面向对象的思想去实现了锁的功能,比原来的synchronized关键字更加好理解

线程之间同步的方式


(wait和notify notifyAll,await signal signalAll)、如何实现阻塞队列、以及实现生产者和消费者集中方法(通过 wait notify实现线程之间同步、使用阻塞队列、管道 PipelineOutputStrean PipelineInputStream)

jvm内存模型


JMM

Java内存模型(即Java Memory Model,简称JMM)本身是一种抽象的概念,并不真实存在,由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),用于存储线程私有的数据,而Java内存模型中规定所有实例变量或者静态变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝的自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,工作内存中存储着主内存中的变量副本拷贝,前面说过,工作内存是每个线程的私有数据区域,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成,其简要访问过程如下图

props

根据虚拟机规范,对于一个实例对象中的成员方法而言,如果方法中包含本地变量是基本数据类型,将直接存储在工作内存的帧栈结构中,但倘若本地变量是引用类型,那么该变量的引用会存储在功能内存的帧栈中,而对象实例将存储在主内存(共享数据区域,堆)中。但对于实例对象的成员变量,不管它是基本数据类型还是引用类型,都会被存储到堆区。至于static变量以及类本身相关信息将会存储在主内存中。需要注意的是,在主内存中的实例对象可以被多线程共享,倘若两个线程同时调用了同一个对象的同一个方法,那么两条线程会将要操作的数据拷贝一份到自己的工作内存中,例如对实例变量 a在线程1和线程2都会有一份拷贝,执行完成操作后才刷新到主内存,简单示意图如下所示:

props

硬件内存的架构

props

寄存器: cpu会有一组寄存器,是存放cpu操作数据的临时空间。一般cpu都会从内存取出数据到寄存器里,进行处理数据,由于内存处理速度远远低于cpu,导致cpu处理指令时会等待内存做好准备,运行cpu执行效率,所以就会有cpu的一级缓存或二级缓存

缓存:由于对内存存取的速度远远低于cpu的执行效率,所以在寄存器和内存之间加个缓存,缓存很小但是速度比内存快很多,首先cpu会读取一部分数据到缓存中(如果缓存中有,直接读取),进而将缓存中数据读入到寄存器中。当cpu写数据到主存中,首先会刷新寄存器数据到缓存中,然后把缓存中数据刷新到内存中。

多核:一枚处理器(CPU)中集成两个或多个完整的计算引擎(内核),这样就可以支持多任务并行执行,从多线程的调度来说,每个线程都会映射到各个CPU核心中并行运行。

Java线程与处理器

在Window系统和Linux系统上,Java线程的实现是基于一对一的线程模型,所谓的一对一模型,实际上就是通过语言级别层面程序去间接调用系统内核的线程模型,即我们在使用Java线程时,Java虚拟机内部是转而调用当前操作系统的内核线程来完成当前任务。内核线程(Kernel-Level Thread,KLT),它是由操作系统内核(Kernel)支持的线程,这种线程是由操作系统内核来完成线程切换,内核通过操作调度器进而对线程执行调度,并将线程的任务映射到各个处理器上。每个内核线程可以视为内核的一个分身,这也就是操作系统可以同时处理多任务的原因。由于我们编写的多线程程序属于语言层面的,程序一般不会直接去调用内核线程,取而代之的是一种轻量级的进程(Light Weight Process),也是通常意义上的线程,由于每个轻量级进程都会映射到一个内核线程,因此我们可以通过轻量级进程调用内核线程,进而由操作系统内核将任务映射到各个处理器,这种轻量级进程与内核线程间1对1的关系就称为一对一的线程模型。如下图

props

如图所示,每个线程最终都会映射到CPU中进行处理,如果CPU存在多核,那么一个CPU将可以并行执行多个线程任务。

JMM存在的必要性

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),用于存储线程私有的数据,线程与主内存中的变量操作必须通过工作内存间接完成,主要过程是将变量从主内存拷贝的每个线程各自的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,如果存在两个线程同时对一个主内存中的实例对象的变量进行操作就有可能诱发线程安全问题。 如下图,主内存中存在一个共享变量x,现在有A和B两条线程分别对该变量x=1进行操作,A/B线程各自的工作内存中存在共享变量副本x。假设现在A线程想要修改x的值为2,而B线程却想要读取x的值,那么B线程读取到的值是A线程更新后的值2还是更新前的值1呢?答案是,不确定,即B线程有可能读取到A线程更新前的值1,也有可能读取到A线程更新后的值2,这是因为工作内存是每个线程私有的数据区域,而线程A变量x时,首先是将变量从主内存拷贝到A线程的工作内存中,然后对变量进行操作,操作完成后再将变量x写回主内,而对于B线程的也是类似的,这样就有可能造成主内存与工作内存间数据存在一致性问题,假如A线程修改完后正在将数据写回主内存,而B线程此时正在读取主内存,即将x=1拷贝到自己的工作内存中,这样B线程读取到的值就是x=1,但如果A线程已将x=2写回主内存后,B线程才开始读取的话,那么此时B线程读取到的就是x=2,但到底是哪种情况先发生呢?这是不确定的,这也就是所谓的线程安全问题。

props

为了解决类似上述的问题,JVM定义了一组规则,通过这组规则来决定一个线程对共享变量的写入何时对另一个线程可见,这组规则也称为Java内存模型(即JMM),JMM是围绕着程序执行的原子性、有序性、可见性展开的,下面我们看看这三个特性。

JMM中的特性

  • 原子性

原子性指的是一个操作是不可中断的,即使是在多线程环境下,一个操作一旦开始就不会被其他线程影响。比如对于一个静态变量int x,两条线程同时对他赋值,线程A赋值为1,而线程B赋值为2,不管线程如何运行,最终x的值要么是1,要么是2,线程A和线程B间的操作是没有干扰的,这就是原子性操作,不可被中断的特点。 例如在32为操作系统中 对long和double(byte、boolean、char、short、int、float读写都是原子性的)不是原子性的,因为对于32位虚拟机来说,每次原子读写是32位的,而long和double则是64位的存储单元,这样会导致一个线程在写时,操作完前32位的原子操作后,轮到B线程读取时,恰好只读取到了后32位的数据,这样可能会读取到一个既非原值又不是线程修改值的变量,它可能是“半个变量”的数值,即64位数据被两个线程分成了两次读取。但是现在的jvm虚拟机都实现了对64位数据的读写操作作为原子性操作。

指令重排 计算机在执行程序时,为了提高性能,编译器和处理器的常常会对指令做重排,一般分以下3种

1、编译器优化的重排

编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。

2、指令并行的重排

现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性(即后一个执行的语句无需依赖前面执行的语句的结果),处理器可以改变语句对应的机器指令的执行顺序

3、内存系统的重排

由于处理器使用缓存和读写缓存冲区,这使得加载(load)和存储(store)操作看上去可能是在乱序执行,因为三级缓存的存在,导致内存与缓存的数据同步存在时间差。

其中编译器优化的重排属于编译期重排,指令并行的重排和内存系统的重排属于处理器重排,在多线程环境中,这些重排优化可能会导致程序出现内存可见性问题。

	class MixedOrder{
	    int a = 0;
	    boolean flag = false;
	    public void writer(){
	        a = 1;
	        flag = true;
	    }
	 
	    public void read(){
	        if(flag){
	            int i = a + 1;
	        }
	    }
	}

如上述代码,同时存在线程A和线程B对该实例对象进行操作,其中A线程调用写入方法,而B线程调用读取方法,由于指令重排等原因,可能导致程序执行顺序变为如下:

     线程A                     线程B
	 writer:                 read:
	 1:flag = true;           1:flag = true;
	 2:a = 1;                 2: a = 0 ; //误读
	                          3: i = 1 ; 由于指令重排的原因,线程A的flag置为true被提前执行了,而a赋值为1的程序还未执行完,此时线程B,恰好读取flag的值为true,直接获取a的值(此时B线程并不知道a为0)并执行i赋值操作,结果i的值为1,而不是预期的2,这就是多线程环境下,指令重排导致的程序乱序执行的结果。因此,请记住,指令重排只会保证单线程中串行语义的执行的一致性,但并不会关心多线程间的语义一致性。 修复这个问题的方式很简单,要么给writer()方法和read()方法添加同步手段,如synchronized或者给变量flag添加volatile关键字,确保线程A修改的值对线程B总是可见。
  • 可见性

理解了指令重排现象后,可见性容易了,可见性指的是当一个线程修改了某个共享变量的值,其他线程是否能够马上得知这个修改的值。对于串行程序来说,可见性是不存在的,因为我们在任何一个操作中修改了某个变量的值,后续的操作中都能读取这个变量值,并且是修改过的新值。但在多线程环境中可就不一定了,前面我们分析过,由于线程对共享变量的操作都是线程拷贝到各自的工作内存进行操作后才写回到主内存中的,这就可能存在一个线程A修改了共享变量x的值,还未写回主内存时,另外一个线程B又对主内存中同一个共享变量x进行操作,但此时A线程工作内存中共享变量x对线程B来说并不可见,这种工作内存与主内存同步延迟现象就造成了可见性问题,另外指令重排以及编译器优化也可能导致可见性问题,通过前面的分析,我们知道无论是编译器优化还是处理器优化的重排现象,在多线程环境下,确实会导致程序乱序执行的问题,从而也就导致可见性问题。例如线程A对变量a的赋予1的值对线程B时可见的。

  • 有序性

因为程序编译成机器码指令后可能会出现指令重排现象,重排后的指令与原指令的顺序未必一致

###JMM提供的解决多线程下的原子性、可见性、有序性

如原子性问题,除了JVM自身提供的对基本数据类型读写操作的原子性外,对于方法级别或者代码块级别的原子性操作,可以使用synchronized关键字或者重入锁(ReentrantLock)保证程序执行的原子性。而工作内存与主内存同步延迟现象导致的可见性问题,可以使用synchronized关键字或者volatile关键字解决,它们都可以使一个线程修改后的变量立即对其他线程可见。对于指令重排导致的可见性问题和有序性问题,则可以利用volatile关键字解决,因为volatile的另外一个作用就是禁止重排序优化,关于volatile稍后会进一步分析。除了靠sychronized和volatile关键字来保证原子性、可见性以及有序性外,JMM内部还定义一套happens-before 原则来保证多线程环境下两个操作间的原子性、可见性以及有序性。

理解JMM中的happens-before 原则 倘若在程序开发中,仅靠sychronized和volatile关键字来保证原子性、可见性以及有序性,那么编写并发程序可能会显得十分麻烦,幸运的是,在Java内存模型中,还提供了happens-before 原则来辅助保证程序执行的原子性、可见性以及有序性的问题,它是判断数据是否存在竞争、线程是否安全的依据,happens-before 原则内容如下

  • 程序顺序原则,即在一个线程内必须保证语义串行性,也就是说按照代码顺序执行。

  • 锁规则 解锁(unlock)操作必然发生在后续的同一个锁的加锁(lock)之前,也就是说,如果对于一个锁解锁后,再加锁,那么加锁的动作必须在解锁动作之后(同一个锁)。

  • volatile规则 volatile变量的写,先发生于读,这保证了volatile变量的可见性,简单的理解就是,volatile变量在每次被线程访问时,都强迫从主内存中读该变量的值,而当该变量发生变化时,又会强迫将最新的值刷新到主内存,任何时刻,不同的线程总是能够看到该变量的最新值。

  • 线程启动规则 线程的start()方法先于它的每一个动作,即如果线程A在执行线程B的start方法之前修改了共享变量的值,那么当线程B执行start方法时,线程A对共享变量的修改对线程B可见

  • 传递性 A先于B ,B先于C 那么A必然先于C

  • 线程终止规则 线程的所有操作先于线程的终结,Thread.join()方法的作用是等待当前执行的线程终止。假设在线程B终止之前,修改了共享变量,线程A从线程B的join方法成功返回后,线程B对共享变量的修改将对线程A可见。

  • 线程中断规则 对线程 interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测线程是否中断。

  • 对象终结规则 对象的构造函数执行,结束先于finalize()方法

###理解volatile

volatile有两个作用:

  • 保证被volatile修饰的共享变量对所有线程总是可见的,也就是当一个线程修改了一个被volatile修饰共享变量的值,新值总数可以被其他线程立即得知。

  • 禁止指令重排序优化。

可见性 关于volatile的可见性作用,我们必须意识到被volatile修饰的变量对所有线程总数立即可见的,对volatile变量的所有写操作总是能立刻反应到其他线程中,但是对于volatile变量运算操作在多线程环境并不保证安全性,如下:

	public class VolatileVisibility {
	    public static volatile int i =0;
	 
	    public static void increase(){
	        i++;
	    }
	}

正如上述代码所示,i变量的任何改变都会立马反应到其他线程中,但是如此存在多条线程同时调用increase()方法的话,就会出现线程安全问题,毕竟i++;操作并不具备原子性,该操作是先读取值,然后写回一个新值,相当于原来的值加上1,分两步完成,如果第二个线程在第一个线程读取旧值和写回新值期间读取i的域值,那么第二个线程就会与第一个线程一起看到同一个值,并执行相同值的加1操作,这也就造成了线程安全失败,因此对于increase方法必须使用synchronized修饰,以便保证线程安全,需要注意的是一旦使用synchronized修饰方法后,由于synchronized本身也具备与volatile相同的特性,即可见性,因此在这样种情况下就完全可以省去volatile修饰变量。 现在来看另外一种场景,可以使用volatile修饰变量达到线程安全的目的,如下:

	public class VolatileSafe {
	 
	    volatile boolean close;
	 
	    public void close(){
	        close=true;
	    }
	 
	    public void doWork(){
	        while (!close){
	            System.out.println("safe....");
	        }
	    }
	}

由于对于boolean变量close值的修改属于原子性操作,因此可以通过使用volatile修饰变量close,使用该变量对其他线程立即可见,从而达到线程安全的目的。那么JMM是如何实现让volatile变量对其他线程立即可见的呢?实际上,当写一个volatile变量时,JMM会把该线程对应的工作内存中的共享变量值刷新到主内存中,当读取一个volatile变量时,JMM会把该线程对应的工作内存置为无效,那么该线程将只能从主内存中重新读取共享变量。volatile变量正是通过这种写-读方式实现对其他线程可见(但其内存语义实现则是通过内存屏障,稍后会说明)。

禁止重排优化:

volatile关键字另一个作用就是禁止指令重排优化,从而避免多线程环境下程序出现乱序执行的现象,关于指令重排优化前面已详细分析过,这里主要简单说明一下volatile是如何实现禁止指令重排优化的。先了解一个概念,内存屏障(Memory Barrier)。

内存屏障:

  memory barrier
  volatile int a
  memory barrier

又称内存栅栏,是一个CPU指令,它的作用有两个,一是保证特定操作的执行顺序,二是保证某些变量的内存可见性(利用该特性实现volatile的内存可见性)。由于编译器和处理器都能执行指令重排优化。如果在指令间插入一条Memory Barrier则会告诉编译器和CPU,不管什么指令都不能和这条Memory Barrier指令重排序,也就是说通过插入内存屏障禁止在内存屏障前后的指令执行重排序优化。Memory Barrier的另外一个作用是当有指令间有内存栅栏指(cpu指令)时读区的数据会从主存重新获取,原来工作线程变量失效,同时更新时也会同步到主存中。因此任何CPU上的线程都能读取到这些数据的最新版本。总之,volatile变量正是通过内存屏障实现其在内存中的语义,即可见性和禁止重排优化。下面看一个非常典型的禁止重排优化的例子DCL,如下:

	public class DoubleCheckLock {
	 
	    private static DoubleCheckLock instance;
	 
	    private DoubleCheckLock(){}
	 
	    public static DoubleCheckLock getInstance(){
	 
	        //第一次检测
	        if (instance==null){
	            //同步
	            synchronized (DoubleCheckLock.class){
	                if (instance == null){
	                    //多线程环境下可能会出现问题的地方
	                    instance = new DoubleCheckLock();
	                }
	            }
	        }
	        return instance;
	    }
	}

上述代码一个经典的单例的双重检测的代码,这段代码在单线程环境下并没有什么问题,但如果在多线程环境下就可以出现线程安全问题。原因在于某一个线程执行到第一次检测,读取到的instance不为null时,instance的引用对象可能没有完成初始化。因为instance = new DoubleCheckLock();可以分为以下3步完成(伪代码)

	memory = allocate(); //1.分配对象内存空间
	instance(memory);    //2.初始化对象
	instance = memory;   //3.设置instance指向刚分配的内存地址,此时instance!=null

由于步骤1和步骤2间可能会重排序,如下:

	memory = allocate(); //1.分配对象内存空间
	instance = memory;   //3.设置instance指向刚分配的内存地址,此时instance!=null,但是对象还没有初始化完成!
	instance(memory);    //2.初始化对象

由于步骤2和步骤3不存在数据依赖关系,而且无论重排前还是重排后程序的执行结果在单线程中并没有改变,因此这种重排优化是允许的。但是指令重排只会保证串行语义的执行的一致性(单线程),但并不会关心多线程间的语义一致性。所以当一条线程访问instance不为null时,由于instance实例未必已初始化完成,也就造成了线程安全问题。那么该如何解决呢,很简单,我们使用volatile禁止instance变量被执行指令重排优化即可。

	  //禁止指令重排优化
	  private volatile static DoubleCheckLock instance;

JMM就是一组规则,这组规则意在解决在并发编程可能出现的线程安全问题,并提供了内置解决方案(happen-before原则)及其外部可使用的同步手段(synchronized/volatile等),确保了程序执行在多线程环境中的应有的原子性,可视性及其有序性

CountDownLatch 和 CylicBarrier Semaphore如何使用以及区别


1、CountDownLatch: 一个线程一直等待直到其他线程完成某个任务时才会执行

public CountDownLatch(int count) //count为计数值 countDown() 使得 count-1 await() 线程一直阻塞直到 count=0 await(long timeout, TimeUnit unit) 线程阻塞超时时间

在其他线程使用 countDownLath.countDown() 而在主线程(等待的线程)使用 countDownLatch.await()

public class Test {
     public static void main(String[] args) {   
         final CountDownLatch latch = new CountDownLatch(2);
      
     new Thread(){
         public void run() {
             try {
                 System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                Thread.sleep(3000);
                System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
         };
     }.start();
      
     new Thread(){
         public void run() {
             try {
                 System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                 Thread.sleep(3000);
                 System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                 latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
         };
     }.start();
      
     try {
         System.out.println("等待2个子线程执行完毕...");
        latch.await();
        System.out.println("2个子线程已经执行完毕");
        System.out.println("继续执行主线程");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
 } }

2、CyclicBarrier: N个线程相互等待,直到所有线程都到一个状态(barrier状态),这个N个线程才会执行下面的程序

CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:

	public CyclicBarrier(int parties, Runnable barrierAction) { //当所有线程都达到某个状态时,会选择任意一个线程执行 barrierAction中run方法
	}
	 
	public CyclicBarrier(int parties) {
	}

await() 线程达到barrier状态

await(time) 如果该线程等待了一定时间其他线程还没有达到这个状态时则继续运行下面的程序

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread().getName());   
            }
        });
     
    for(int i=0;i<N;i++)
        new Writer(barrier).start();
}
static class Writer extends Thread{
    private CyclicBarrier cyclicBarrier;
    public Writer(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
 
    @Override
    public void run() {
        System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
        try {
            Thread.sleep(5000);      //以睡眠来模拟写入数据操作
            System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }catch(BrokenBarrierException e){
            e.printStackTrace();
        }
        System.out.println("所有线程写入完毕,继续处理其他任务...");
    }
} }

3、Semaphore 用来控制对一组资源访问

注意:Semaphore实现原理和上诉ReentrantLock实现方式类似,都使用了NonFairSync和FairSync

acquire() 获取一个许可

release()释放一个许可

	public Semaphore(int permits) {          //参数permits表示许可数目,即同时可以允许多少线程进行访问,其中底层使用是一种变量volatile int state来维护同步关系
	    sync = new NonfairSync(permits);
	}
	public Semaphore(int permits, boolean fair) {    //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
	    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
	}

下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:

		public void acquire() throws InterruptedException {  }     //获取一个许可
		public void acquire(int permits) throws InterruptedException { }    //获取permits个许可
		public void release() { }          //释放一个许可
		public void release(int permits) { }    //释放permits个许可

acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。   release()用来释放许可。注意,在释放许可之前,必须先获获得许可。    这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

		 public boolean tryAcquire() { };    //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
        public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
        public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
        public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立返回true,否则超过规定时间为获取到返回false,中途被其他线程interrupt()则抛出异常
        
        
   public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }
     
    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
         
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放出机器");
                semaphore.release();           
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

使用区别:

1、CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同;CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

  2、Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。

线程池的概念


线程池的概念

props

这里面的实现类涉及到三个: ForkJoinPool:一个类似于Map/Reduce模型的框架,线程级的,详细可有去看我之前写的文章Fork/Join-Java并行计算框架。 ThreadPoolExecutor:这是Java线程池的实现,也是本文的主角,Executors提供的几种线程池主要使用该类。 ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor,添加了调度功能。 ThreadPoolExecutor参数 int corePoolSize  线程池基本大小 int maximumPoolSize  线程池最大大小 long keepAliveTime  保持活动时间 TimeUnit unit  保持活动时间单位 BlockingQueue workQueue  工作队列 ThreadFactory threadFactory  线程工厂 RejectedExecutionHandler handler  驳回回调 这些参数这样描述起来很空洞,下面结合执行任务的流程来看一下 ThreadPoolExecutor执行任务流程 当我们调用execute方法时,这个流程就开始了,请看下图:

props

当线程池大小 >= corePoolSize 且 队列未满时,这时线程池使用者与线程池之间构成了一个生产者-消费者模型。线程池使用者生产任务,线程池消费任务,任务存储在BlockingQueue中,注意这里入队使用的是offer,当队列满的时候,直接返回false,而不会等待。

props

keepAliveTime

当线程处于空闲状态时,线程池需要对它们进行回收,避免浪费资源。但空闲多长时间回收呢,keepAliveTime就是用来设置这个时间的。默认情况下,最终会保留corePoolSize个线程避免回收,即使它们是空闲的,以备不时之需。但我们也可以改变这种行为,通过设置allowCoreThreadTimeOut(true)。

RejectedExecutionHandler

当队列满 且 线程池大小 >= maximumPoolSize时会触发驳回,因为这时线程池已经不能响应新提交的任务,驳回时就会回调这个接口rejectedExecution方法,JDK默认提供了4种驳回策略,代码比较简单,直接上代码分析,具体使用何种策略,应该根据业务场景来选择,线程池的默认策略是AbortPolicy。

ThreadPoolExecutor.AbortPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // 直接抛出运行时异常
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}
 ThreadPoolExecutor.CallerRunsPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        // 转成同步调用
        r.run();
    }
}
 ThreadPoolExecutor.DiscardPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // 空实现,意味着直接丢弃了
}
 ThreadPoolExecutor.DiscardOldestPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        // 取出队首,丢弃
        e.getQueue().poll();
        // 重新提交
        e.execute(r);
    }
}

Hook methods

ThreadPoolExecutor预留了以下三个方法,我们可以通过继承该类来做一些扩展,比如监控、日志等等。

	protected void beforeExecute(Thread t, Runnable r) { 
	
	}
	
	protected void afterExecute(Thread t, Runnable r) { 
	
	}
	
	protected void terminated() {//当workCount=0 线程池状态位terminated时回调
	
	}

ThreadPoolExecutor状态 线程池的工作流程我们应该大致清楚了,其内部同时维护了一个状态,现在来看一下每种状态对于任务会造成什么影响以及状态之间的流转。 RUNNING 初始状态,接受新任务并且处理已经在队列中的任务。 SHUTDOWN 不接受新任务,但处理队列中的任务。 STOP 不接受新任务,不处理排队的任务,并中断正在进行的任务。 TIDYING 所有任务已终止,workerCount为零,线程转换到状态TIDYING,这时回调terminate()方法。 TERMINATED 终态,terminated()执行完成。

props

上图是这5种状态间的流转,可以看到它们是单向的、不可逆的。

线程池的类型

FixedThreadPool - 线程池大小固定,任务队列无界

下面是 Executors 类 newFixedThreadPool 方法的源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

可以看到 corePoolSize 和 maximumPoolSize 设置成了相同的值,此时不存在线程数量大于核心线程数量的情况,所以KeepAlive时间设置不会生效。任务队列使用的是不限制大小的 LinkedBlockingQueue ,由于是无界队列所以容纳的任务数量没有上限。 因此,FixedThreadPool的行为如下: 1、从线程池中获取可用线程执行任务,如果没有可用线程则使用ThreadFactory创建新的线程,直到线程数达到nThreads 线程池线程数达到nThreads以后,新的任务将被放入队列 2、FixedThreadPool的优点是能够保证所有的任务都被执行,永远不会拒绝新的任务;同时缺点是队列数量没有限制,在任务执行时间无限延长的这种极端情况下会造成内存问题。

SingleThreadExecutor - 线程池大小固定为1,任务队列无界

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

这个工厂方法中使用无界LinkedBlockingQueue,并的将线程数设置成1,除此以外还使用FinalizableDelegatedExecutorService类进行了包装。这个包装类的主要目的是为了屏蔽ThreadPoolExecutor中动态修改线程数量的功能,仅保留ExecutorService中提供的方法。虽然是单线程处理,一旦线程因为处理异常等原因终止的时候,ThreadPoolExecutor会自动创建一个新的线程继续进行工作。 SingleThreadExecutor 适用于在逻辑上需要单线程处理任务的场景,同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝;缺点和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题

CachedThreadPool - 线程池无限大(MAX INT),等待队列长度为1

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SynchronousQueue是一个只有1个元素的队列,入队的任务需要一直等待直到队列中的元素被移出。核心线程数是0,意味着所有任务会先入队列;例如线程A和线程B,例如线程A先从cachedThreadPool线程池获取线程执行,使用SynchronousQueued队列可以保证线程A在线程B执行之前。最大线程数是Integer.MAX_VALUE,可以认为线程数量是没有限制的。KeepAlive时间被设置成60秒,意味着在没有任务的时候线程等待60秒以后退出。CachedThreadPool对任务的处理策略是提交的任务会立即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端情况下会创建过多的线程。

总结一点: 当任务数大于正在执行的核心线程数则将任务放在队列中,其中当队列满了时就会创建新的线程知道超过了最大线程数,因此任务会执行驳回逻辑(自己实现的RejectedExecutionHandler逻辑)

线程池相关的问题

1、为什么newFixedThreadPool中要将corePoolSize和maximumPoolSize设置成一样?

因为newFixedThreadPool中用的是LinkedBlockingQueue(是无界队列),只要当前线程大于等于corePoolSize来的任务就直接加入到无界队列中,所以线程数不会超过corePoolSize,这样maximumPoolSize没有用

2、为什么newFixedThreadPool中队列使用LinkedBlockingQueue?

设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池是大小固定的,要保证线程池大小固定则需要LinkedBlockingQueue(无界队列)来保证来的任务能够放到任务队列中,不至于触发拒绝策略

3、为什么newFixedThreadPool中keepAliveTime会设置成0?

因为corePoolSize和maximumPoolSize一样大,KeepAliveTime设置的时间会失效,所以设置为0

4、为什么newCachedThreadPool中要将corePoolSize设置成0?

因为队列使用SynchronousQueue,队列中只能存放一个任务,保证所有任务会先入队列,用于那些互相依赖的线程,比如线程A必须在线程B之前先执行

5、为什么newCachedThreadPool中要将corePoolSize设置成0?

因为队列使用SynchronousQueue,队列中只能存放一个任务,保证所有任务会先入队列,用于那些互相依赖的线程,比如线程A必须在线程B之前先执行

6、为什么newSingleThreadExecutor中使用DelegatedExecutorService去包装ThreadPoolExecutor?

SingleThreadExecutor是单线程化线程池,用DelegatedExecutorService包装为了屏蔽ThreadPoolExecutor动态修改线程数量的功能,仅保留Executor中的方法