Erlo

Java 并发编程 - 并发控制的哲学思想

2025-05-02 14:29:23 发布   55 浏览  
页面报错/反馈
收藏 点赞

Java 并发编程 - 并发难点及解决方法

并发难点

顺序性

因为多个CPU执行多个线程,所以谁先谁后就很重要,比如CPU1否则读取,CPUB负责计算,那么计算要保证在读取之后.

就好像你买菜,你媳妇做饭,那么做饭这件事要等待买菜完成,但是两件事是不同的两个人用不同的环境实现的.

现实世界的解决方案就是:你媳妇等你买菜,你买完菜了通知你媳妇可以开始做饭了.这就需要你媳妇等待做饭,你通知她做饭.此为等待通知机制

可见性

因为CPU读取数据的时候有缓存,写出数据的时候有缓冲区,所以CPU1读写完的数据并不一定能被CPU2看到结果.

就好像你媳妇给你和你老弟发短信去买白菜,如果你们俩不带电话,记在脑子里(缓存数据),那么你媳妇此时改变主意了,要买黄瓜了(其他线程修改数据),你就不知道.所以需要带上电话,你媳妇改变注意的时候,你俩就不知道,就会买错菜.

现实世界的解决方案就是,你们带上电话,你媳妇改变主意了给你们再发信息,此为广播通知,你们收到消息,知道改变主意了,在采购的时候重新查看一下短信,获取最新的购买需求,此为缓存一致.

原子性

因为多个CPU执行多个线程,所以无法保证一个数据的读写只能被一个CPU的一个线程执行,中间可能有其他CPU的其他线程干预.

就好像你媳妇做好的菜饭,需要放到橱柜,此时菜能吃,但是要防止你那个调皮的儿子来吐口水,如果吐了口水,则不能吃了.

现实世界的解决方案就是,给你儿子锁屋子里,但是锁太大了(总线锁),妨碍了你儿子玩你,所以锁了橱柜(缓存锁),你儿子仅仅不能碰那些大米饭.

解决方法

对某个资源加锁来保证原子性

多个操作者操作一个资源,需要对资源上锁,处理完再解锁.这样可以保证数据原子操作,就好像互联网常用的分布式锁,CPU也会有总线锁/缓存锁.

锁定的是公共资源

从现实世界来说,我们在使用公共资源的时候,通常都会独占某公共资源;但是你使用你的胳膊,嗓子,眼睛的时候,这是你的私有资源,不会影响别人,除非别人想摘掉你的胳膊,否则你的私有资源使用与别人无关.

注意锁的粒度

就好像你把你儿子关起来,还是把饭菜锁起来一样,锁需要有一个粒度,很明显的是,在入库的时候我们不会让整个系统其他所有入库操作都禁止,而是当前操作的那个仓库被锁住,想想商城里的公共卫生间不会因为有人进入卫生间,而锁住整个商场不让其他人进出.

同样,CPU在上锁的时候有两个选择①将整个CPU操作的总线锁定,②将会涉及到的缓存锁定.很明显锁定缓存让锁的粒度更小,其他CPU受到的影响也小.

因此我们要知道,锁的粒度会对系统性能有很大影响.

用比较和替换(CAS)来保证原子性

其实计算机中的数据只有读写两种操作,原子性操作就是保证你读取和写入之间没有别人写入.我们可以在写入的时候比较一下,如果是从当初你读取的值写入到当前值,则是中间没有操作入过,否则就是有操作写入过.这就是cas操作,比较后替换.

就好像你听见儿子说他饿了(读取儿子的状态),你去拿来奶喂它(写入充饥的数据),如果你去拿奶的时候你小舅子来照顾过你的儿子给他奶了,你再喂他,他可能就吃多了撑得慌,为了避免这种情况,你需要喂他之前再问问你儿子"小家伙,你还饿么?" 如果跟之前的信息(饿)不一致,则放弃喂奶.

用排队等待解决顺序性

多个操作的顺序乱序,可以让他们排队操作,此时需要等待通知来实现.首先要锁定这个资源,然后争抢使用这个资源的操作需要排队.

等待的一定是个公共资源

并且你只会在你所等待的公共资源的队列上排队;就好像你在烧烤店门口排队,那么你等待的资源就是烧烤店,而等待卫生间的人不会到烧烤店门口排队,所以你们俩之前不竞争,而是排队等待卫生间的那些人存在竞同一资源.换言之:这个世界上肯定是你占用了某资源才造成排队,比如公共电话亭,比如公共卫生间.

被动等待

就好像很多人去银行排队取钱,银行会给你一个号(排队),到你了用喇叭喊你(广播通知),你听见广播了(唤醒)就去柜台,在你拿到排队号的时候,你没得选,保安要求你必须排队等待.这是被动的等待,由Jvm给我们提供的shynchronized关键字.

主动等待

但是你在取钱的时候发现银行现在没钱了,那么你有两个选择,①放弃取钱,②等待总行来送钱.此时你会重新排队,但是队列不一样,是一个等待拿钱就走的队列,那么此时进入等待是你主动的,在运钞车来的时候,会通知等着钱的这个队列的人,给他们现金然后让他们走人.由jdk为我们提供的 Object.wait 和Object.notify 方法.

一定要获取锁才能主动等待

因为你一定要独占了这个资源,才有能力分辨出这个资源是否符合你操作的需要,否则别人也操作这个资源,你的判断可能就不准确.

就好像在卫生间门口排队,自然就是等待,而你进入卫生间才会发现里面没有纸,此时你已经获取了卫生间的使用权(关门,上锁)才能发现,那么你很可能让那些不需要纸就能上厕所的人先用,所以你出来了并重新排队(wait).所以你看,主动等待的时候会释放锁.

由此可见排队等待必须在某个资源上排队,也是唤醒排队某个资源的等待队列,所以synchronized关键字要对某个对象加锁,Object.wait和Object.notify也是针对某个对象上的队列.

让缓存失效保证可见性

每个操作之间有缓存,需要通过广播让缓存失效,失效之后要重新获取下最新数据.比如通过MQ发布一个主题消息,通知其他机器某个缓存失效了,订阅主题的机器收到消息之后将本地的缓存标记失效,这是分布式系统上的,从CPU微观来说,连接MQ的网线是CPU的总线,类似的机制实现就是CPU缓存一致性.

我们最好不用(也最好不要)每次失效都重新读取数据,因为如果在时间点1->2之间失效100次,那么则将重新获取100次数据,而这100次获取数据都不一定会被读取一次.就好像你10台机器上都有用户信息,但是不一定每台的用户信息都会被读取.(此为惰性缓存)

宏观来看

如果你不了解什么是微服务,不知道zookeeper,不知道分布式锁,不知道消息队列服务器,可以不看这些.

你完全可以将上面这些过程想象成很多个微服务实例下,拥有自己的本地缓存,在操作某个缓存数据的时候,会在zookeeper上加一个分布式锁,处理完成的时候会在mq上某个主题发布消息,通知缓存被修改,其他实例收到消息后,将本地缓存标记失效,在下次读取数据操作的时候,重新加载缓存.

这个时候你就发现,原本的CPU总线换成了网线传输数据,原本CPU的1,2,3级缓存变成了微服务的本地缓存,原本的缓存锁变成了分布式锁.

但是这还不够,有的时候用户拿到数据等了很久才提交,所以我们的数据上会有一个版本号,每次提交增加版本号,每次写回数据库的时候比较当前数据库版本和用户提交的版本,这就是cpu上的cas操作.

如何实现

比较替换的实现

在CPU层面为我们提供了 cmpxchg 指令来实现, 这是个IA-32的汇编指令.

缓存加锁的实现

在CPU层面为我们提供了 lock 指令前缀来实现, 这个指令会锁定总线/缓存,然后独享数据的去修改,并且修改完之后会将当前CPU缓冲区写会主存.这是个IA-32的汇编指令.

缓存失效的实现

在CPU层面为我们提供了总线广播监听机制,这个机制可以监听缓存的失效,并将其标记失效,下次看到失效的缓存则从主存读取,这个机制的学名为 MESI(缓存一致性协议).

线程等待的实现

等待有两种方式,一个是一直等待,一个是超时等待,一直等待直接将线程保存起来,等到有唤醒的时候再调度执行就行.超时等待则需要一个定时器.

定时器的具体实现是OS层面的东西,比较复杂,但是现在很成熟,同时有硬件层面的支持.硬件层面有一个固定时钟和计数器,每一个时钟周期将计数器递减,计数器为0的时候触发CPU中断,这看上去很消耗CPU,因为每次CPU的周期都需要检查.也就是说操作系统级别的等待是硬件级别的操作,所以消耗是很小很小的.

其实无论如何都离不开不断检查,从现实世界来说,如果我们想等待到某时间点,唯一的办法(不考虑闹钟等)就是不断的检查当前时间和任务列表.不可能有其他办法.考虑其实等待到某时间点和等待一段时间其实都是一样的,所以只需要一个定时检查的计时器就可以了.这就是为什么一个线程能等待一段时间.

  • 等到某时间点 = 当前时间 + 需要等待的一段时间
  • 等待一段时间 = 最终时间点 - 当前时

排队的实现思路

有了计时器,我们在加上排队,就可以实现排队-等待-通知,排队的实现需要一个队列,不管你是基于链表,还是数组,总之需要一个排队的地方,现实世界中有排队的地方,就有插队的人,不同的是看管理队列的人允不允许插队而已.

此时有两种实现思路,公平排队和非公平排队.

  1. 非公平排队就好像排队吃饭,门口拥堵了一大堆人,老板每次有空桌出来叫一个人,谁挤进去就是谁,这样挤得快的人马上进去吃饭,是不公平的.因为可能有人等了半天,就因为一不小心走神,就被你捷足先登了.
  2. 公平排队拿到号码之后老板按号叫人,到你了你就进去,这样一定公平,因为一定是有序的.谁先来谁先进,但是同时会牺牲排队的效率,因为你可能昨晚没睡好,叫你进去的时候你磨蹭半天没反应,这就浪费时间了,而且老板还要维护这样一堆的号码牌.

Java 层面的实现

Jvm级别

首先Java的线程跟OS线程是1:1映射关系,所以底层会有OS的系统调用库来实现线程创建,销毁,启动,等待操作,然后封装成 Thread 类操作. 提供synchronized实现对一个对象资源的锁定操作,提供volatile实现对CPU缓存的锁定和缓冲区flush的操作.

Jdk级别

然后在JDK层面,为我们提供了Object.wait 和Object.notify方法与synchronized配合,来实现排队-等待-通知机制.

CPU级别的CAS操作,jdk提供了UNSAFE类来操作,这个类里面有很多CAS操作的方法.

JUC级别

最后在JUC包内,为我们提供了LockSupport类,让我们可以直接操作一个线程进入等待,和唤醒线程;有了LockSupport类,便可以向OS操作线程一样去实现高级的排队-等待-通知-唤醒等操作,于是提供了AQS线程同步框架.

有了AQS线程同步框架,便可以利用将线程排队-等待这些操作,实现非常灵活的Lock类.

编程实现范式

普通等待通知

// 获得锁,判断条件是否满足,不满足,则等待
// 一个线程也能在没有被通知、中断或超时的情况下唤醒,也即所谓的“虚假唤醒”,虽然这点在实践中很少发生,应用应该检测导致线程唤醒的条件,并在条件不满足的情况下继续等待,以此来防止这一点。
           synchronized (obj) {
               while ()
                   obj.wait(timeout);
           }
// 摘录来自: jdk 源代码 wait方法注释

普通等待通知且支持超时

// 获得锁,计算结束时间,循环判断条件是否满足,等待剩余时间时间,被唤醒之后检查条件,并且重新计算等待时间
public synchronized Object get(long mills) throws InterruptedException {
       long future = System.currentTimeMillis() + mills;
       long remaining = mills;
       // 当超时大于0并且result返回值不满足要求
       while ((result == null) && remaining > 0) {
              wait(remaining);
              remaining = future - System.currentTimeMillis();
       } 
   return result;
}
// 摘录来自: 方腾飞,魏鹏,程晓明 著. “Java并发编程的艺术 (Java核心技术系列)。” Apple Books. 

具体的实现类

Object 类

Object 类提供了线程的基础等待通知实现方法,这些方法要求在synchronized关键字范围内的代码段执行,即:获取了jvm提供的锁.

之所以定义在Object方法中,一部分原因因为synchronized锁是在某个对象头加锁,换言之就是锁定某个对象资源.然后进行排队等待等调度,之所以这样设计,是因为必须要有一个资源上加锁之后才能进入等待队列,所以你需要锁定这个对象,然后调用这个对象上的wait-notify,来实现某个资源的锁定和等待通知机制.

  1. wait 方法需要先获取对象的锁,因为wait需要先释放持有的锁,然后进入该对象的等待队列。
  2. notify 方法需要先获取对象的锁,因为notify方法会在锁的对象对应的等待队列唤醒(一个或全部)线程。

Thread 类

  1. static currentThread 获取当前线程
  2. static yield 建议调度器让出CPU
  3. static sleep 让当前线程睡眠
  4. interrupt 设置线程的中断标志位为true
  5. static interrupted 判断线程中断标识位,并且清除标志位,换言之: 如果调用两次该方法,第二次一定返回false
  6. isInterrupted 判断线程中断标识位,但是不会影响标志位状态.
  7. isAlive 表示当前线程启动了,但是没有运行结束
Thread.join 实现原理

该方法是一个同步方法,使用一个this.wait的循环来实现当前线程的等待,while的条件是当前线程还存活,wait导致调用线程的等待,在join的线程结束的时候会调用this.notifyAll来唤醒当前等待的线程,这个方法是jvm负责调用.源代码中有一句话:不建议在程序中的线程实例中使用wait,notify或者notifyAll.

It is recommended that applications not use wait, notify, or notifyAll on Thread instances.

ThreadLocal 类

作用

ThreadLocal可以在当前线程存储一个变量.核心原理是在当前线程对象下创建一个map,然后将当前ThreadLocal的实例作为key存储变量.

关键点

  1. ThreadLocal跟Thrad类都属于java.lang,所以默认的成员变量可以互相访问.
  2. 在Thread类中存放一个threadLocals属性,该属性是ThreadLocal下的静态内部类.
  3. ThreadLocal在调用set,get方法的时候,会去初始化这个属性.
  4. ThreadLocal将自己的this作为key保存自己想要存放的变量到threadLocal属性.

安全隐患 如果线程没有被销毁,那么线程内部的threadLocals变量将会一直保持引用,无法回收,如果调用ThreadLocal#set方法设置的变量没有调用remove方法清理,则一直保持在ThreadLocalMap中,此时保持的值处于一个无法被回收的状态.

LockSupport 类

LockSupport类提供了park系列和unpark系列方,内部调用UNSAFE类实现,UNSAFE类由JVM在本地C++中实现,具体实现与平台有关,park方法在Linux下使用的是系统方法pthread_cond_wait实现;unpark方法在Linux下是使用pthread_cond_signal实现的.

在park(Object)的时候,dump线程会发现由wait的对象信息,是存储在Thread.parkBlocker字段上,通过UNSAFE.objectFieldOffset写到线程对象上的.

AQS 线程同步器

线程同步器实现了一个线程排队等待的框架,线程排队等待的实现离不开两点实现:①创建一个排队的线程,因为需要前后节节点的快速获取,此队列通常使用链表实现;②利用线程的等待通知机制将队列中的线程阻塞住.Java中的线程和OS的线程是1:1映射的,线程的等待利用的是OS的系统调用实现,封装成了Jdk中的LockSupport类; 这个机制保证了线程的有序性.

线程同步器还提供了一种原子性争抢资源的能力,他内部存储了一个状态标志,用volatile修饰,是一个数字类型,利用cas操作进行修改操作,如果操作成功,表示这个操作期间没有其他线程竞争,或者其他线程都竞争失败,此时其他线程发现自己竞争失败则进入等待队列,竞争成功的线程得以真正的执行,执行结束的时候再次修改这个状态表示,此时因为volatile的语义特性,最后这个修改操作会保证数据被(所有CPU缓冲区)写回主存,这个操作间接的保证了线程的原子性,可见性.

线程同步器提供了独占和共享的两组获取锁和释放锁的抽象方法

  1. tryAcquire 返回true 表示获取到了锁,false表示没有获取到锁,并且会进入自旋状态.
  2. tryAcquireShared 返回负数表示获取失败,0表示并发量耗尽(不能再获取了),正数表示获取成功,还能继续获取,(但是正数本身的值不做意义)
线程等待实现

实现线程等待在一个死循环中无限尝试获取锁,直到成功或者被中断,在执行过程中有可能会被 LockSupport.park 方法暂停掉,在首节点执行完毕的时候会唤醒排在其之后的第一个可运行节点,节点被唤醒之后马上重新在死循环中争抢锁.

总的来说,就是在死循环中处理等待通知机制.

独占锁的处理
  1. 线程首先尝试获取资源,如果获取失败,则使用cas操作加入等待队列;如果获取成功,则标记为首节点.
  2. 获取失败的线程进入资源申请的死循环,每次循环首先检查,如果前驱节点是首结点,则尝试CAS获取锁,如果不是,则调用LockSupport.park进入等待状态.
  3. 首节点的任务处理完毕之后会释放锁,在释放锁的时候唤醒下一个可执行的节点的线程.
  4. 唤醒的线程如果处于次(前驱是首节点)节点,此时线程应该处于一个自旋获取锁的状态,此时将快速的获取锁,否则处于一个等待状态,则恢复开始再次获取资源或进入等待.
共享锁的处理

初始一个令牌桶,桶中包含多个锁,锁的数量影响并发数量,,每次获取锁,则从桶中拿走一个(或者多个)令牌,每次释放,则从桶中放回锁,每次取出和释放的过程使用cas操作,保证原子性,则允许通过的线程是: 桶大小(锁总数)/桶粒度(每次获取锁数)

Condition 等待条件

Condition在自己内部又做了一个队列,在wait的时候将当前线程放入队列,在notify的时候从队列唤醒,因为condition要求在调用wait-notify的时候必须获取锁,所以内部在wait和notify的时候无需cas操作.

如何保证可见性

unlock操作会cas修改一个volatile变量,而volatile变量的修改会运行一条LOCK指令,该指令会锁缓存(或者总线)保证其他CPU不会并发访问数据,并且写完数据之后会立即将这颗CPU的将缓冲区数据刷入主存,同时

并发工具类

CountDownLatch

利用AQS实现,比如5个线程,初始设置state=5;每次结束一个线程(countDown)做减法: state=state-1;await等待方法则是从申请1个值,但是在tryAcquireShared方法中实现的是如果状态不是0,则不允许,如果是0,则允许.

Semaphore

利用AQS实现,类似CountDownLatch,初始化一个令牌桶,每次申请N个令牌桶,不够申请则等待.区别是Semaphore只要桶内数量足够申请就行,CountDownLatch要求必须只剩下0个.

CyclicBarrier

利用ReentrantLock 以及 ReentrantLock#newCondition ,内部使用一个int count 做计数,每次await一个线程的时候,加锁,做count--,如果count是0,则重新开始,重新开始的时候condition.notifyall;如果不是0,则condition.await(此时会释放锁);

原子操作类

大多数原子类,都是利用一个 Unsafe 类和一个volatile 变量包装的.执行原子更新的时候适用cas方式来执行更新.

Unsafe 类底层实现依赖于汇编指令 cmpxchg 实现原子性操作.

但是lazySet却是使用UNSAFE.putOrderedInt方法执行的,底层虚拟机调用的是C++方法的SET_FIELD_VOLATILE(obj, offset, jlong, x);仅仅是实现了volatile的语义操作,该操作导致修改后不是立即可见的.

计数器类

核心实现是创建一个base值,做基础累加,在cas操作base值失败之后,根据CPU数量创建多个Cell,将计算的逻辑分散到各个cell中,此时每个CPU最多操作自身的一个cell,几乎是资源独占的模式工作,所以累加效率会大有提高,但是读取值的时候需要将所有cell以及base值,读取速度可能变慢.

就好像是一个分桶排序的算法,因为一个桶放了太多数排序太慢,而分为多个桶来计算;或者说数据库表太大,则分成多个表.

其实我们的程序大多数都是写多读少的

并发编程锁

Java中锁的实现都是基于AQS实现的,具体一个锁的实现会选择实现AQS的独占锁或者共享锁的一种,在初始化的时候初始化一个state,独占锁通常为1,共享锁可能是大于1,获取锁则acquire(1),释放锁则release(1).

ReentrantLock

ReentrantLock 实现了初始state=0的一个AQS,每次acquire则增加1,release则减少1

重入实现

acquire的时候如果当前线程是获取到锁的线程,则会再次增加1,来实现重入获取,release在发现state=0的时候,则将当前获取锁的线程标记为null,来实现重入释放.state不是0,且当前线程没有持有锁,则不允许获取锁.

公平与非公平实现

公平锁在请求锁的时候会判断是不是有一个前驱节点在排队,如果有则放弃cas操作(tryAcquire 返回false),非公平锁则不判断直接进行cas操作.

判断前驱节点在排队的方法是查看前首节点(当前执行的节点)的下一节点不是当前线程,换言之:当前线程不是队列中排队第二的待执行线程.

ReentrantReadWriteLock

读写锁实现了AQS的共享锁和独占锁两种模式,原理是将一个int值,按位分割成高16位来标记读,和低16位来标记写.内部有一个 int exclusiveCount(int c) 用来计算独占锁的数量,一个 int sharedCount(int c) 用来计算共享锁的数量,

写锁的获取在有读锁已被获取的情况下进行等待,没有读的时候跟ReentrantLock 类似;读锁在没有写锁的情况下就可以用cas尝试更新state了,失败则放入等待队列,用死循环来等待.

写锁重入的实现跟ReentrantLock类似,只不过只能支持低16位最大数次重入,读锁的重入利用ThreadLocale来存放重入次数.

写锁获取

tryAcquire 方法注释说:

  1. 如果读状态不是0,或者写锁(独占锁)已经被获取,单不是当前线程,则直接失败()
  2. 如果写锁的重入次数太大,(16位存放不下)则抛出error(为什么是error,不是exception呢?)
  3. 其他情况下,这个线程将会执行类似ReentrantLock 的重入锁获取机制

读锁获取

tryAcquireShared 方法注释说:

  1. 如果独占锁数量不是0,并且获取的线程不是当前线程,则获取失败

  2. 其他情况下判断是否需要进入队列(此处用于处理公平不公平实现),如果不需要,则尝试CAS更新state

  3. 在上一步失败的情况下.进入一个循环重试的状态,死循环尝试获取锁,直到成功.

锁降级

根据读锁的获取流程可以看出,获取读锁的时候,如果当前线程已经获取了独占的写锁,也可以获取读锁,此被称为锁降级.但是写锁获取的时候,如果已经有读锁了,直接放弃,意味着无法锁升级.

锁降级是位了写完数据之后马上可以读取到数据,并且在独占的写锁中获取读锁,然后释放写锁,保证了在获取读锁的过程中不会有其他线程写入数据,保证自己写完之后看到的数据是最新的.

总结

我们讨论了Java并发编程的基础,从其面临的困难,以及如何解决这些困难的方案,到方案的具体实现.本文的基础章节很啰嗦,但是他很重要,那些高级的工具类,以及lock都是如何实现的并发包,但是基础章节却是为何要这么实现,以及如何思考到这些东西的思维演进.有了这些基础,我们可以很容易的实现线程池,阻塞队列,等高级工具.

  1. 并发编程的难点,这些难点在现实世界的样子
  2. 这些难点在现实世界以及计算机中的解决方案
  3. 这些解决方案的实现思路
  4. 这些思路的具体实现
  5. 这些具体实现的高级工具

登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认