上一篇我们讲解了两个线程间的通信,只有一个线程对 EventQueue 进行 offer 操作,也只有一个线程对 EventQueue 进行 take 操作,如果多个线程同时进行 take 或者 offer,那么上面的程序就会出现问题。在篇中,我们将介绍如何在多线程的情况下进行通信,并且还会讲解 wait set 线程休息室的相关内容。生产者消费notifyAll方法
多线程间通信需要用到 Object 的 notifyAll 方法,该方法与 notify 比较类似,都可以唤醒由于调用了 wait 方法而阻塞的线程,但是 notify 方法只能唤醒其中的一个线程,而 notifyAll 方法则可以同时唤醒全部的阻塞线程,同样被唤醒的线程仍需要继续争抢 monitor 的锁。
生产者消费者
之前我们曾定义了一个 EventQueue,该队列在多个线程同时并发的情况下会出现数据不一致的情况,读者可以自行增加 EventClient 中的线程数量进行测试,在笔者的测试中出现了数据不一致的情况,大致可分为两类:
其一是 LinkedList 中没有元素的时候仍旧调用了 removeFirst 方法其二是当 LinkedList 中的元素超过10个的时候仍旧执行了 addLast 方法,下面通过图示的方法分别对其进行分析:LinkedList 为空时执行 removeFirst 方法
也许有同学会有疑问,EventQueue 中的方法都增加了 synchronized 数据同步,为何还会存在数据不一致的情况?假设 EventQueue 中的元素为空,两个线程在执行 take 方法时分别调用 wait 方法进入了阻塞之中,另外一个 offer 线程执行 addLast 方法之后唤醒了其中一个阻塞的 take 线程,该线程顺利消费了一个元素之后恰巧再次唤醒了一个 take 线程,这时就会导致执行空 LinkedList 的 removeFirst 方法,执行过程如图所示:
LinkedList 元素为10时执行 addLast 方法
假设某个时刻 EventQueue 中存在10个 Event 数据,其中两个线程在执行 offer 方法的时候分别因为调用了 wait 方法而进入阻塞中,另外的一个线程执行 take 方法消费了 event 元素并且唤醒了一个 offer 线程,而该 offer 线程执行了 addLast 方法之后,queue 中的元素为10,并且再次执行唤醒方法,恰巧另外一个 offer 线程也被唤醒,因此可以绕开阀值检查 eventQueue().size>=max,致使 EventQueue 中的元素超过10个,执行过程如图所示:
改进
在分析完多线程情况下出现的问题之后,我们将对其进行改进。实际上在真实的开发中,绝大多数时候遇到的都是多线程间通信的情况,其中生产者消费者的例子就是最好的模型,示例如下:
public void offer(Event event){ synchronized (eventQueue) { while (eventQueue.size() >= max) { try { console(” the queue is full.”); eventQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } console(” the new event is submitted”); eventQueue.addLast(event); eventQueue.notifyAll(); }}public Event take(){ synchronized (eventQueue) { while (eventQueue.isEmpty()) { try { console(” the queue is empty.”); eventQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Event event = eventQueue.removeFirst(); this.eventQueue.notifyAll(); console(” the event ” event ” is handled.”); return event; }}
只需要将临界值的判断 if 更改为 while,将 notify 更改为 notifyAll 即可。
线程休息室wait set
在虚拟机规范中存在一个 wait set(wait set 又被称为线程休息室)的概念,至于该 wait set 是怎样的数据结构,JDK 官方并没有给出明确的定义,不同厂家的 JDK 有着不同的实现方式,甚至相同的 JDK 厂家不同的版本也存在着差异,但是不管怎样,线程调用了某个对象的 wait 方法之后都会被加入与该对象 monitor 关联的 wait set 中,并且释放 monitor 的所有权。
下图是若干个线程调用了 wait 方法之后被加入与 monitor 关联的 wait set 中,待另外一个线程调用该 monitor 的 notify 方法之后,其中一个线程会从 wait set 中弹出,至于是随机弹出还是以先进先出的方式弹出,虚拟机规范同样也没有给出强制的要求:
而执行 notifyAll 则不需要考虑哪个线程会被弹出,因为 wait set 中的所有 wait 线程都将被弹出,如图所示:
自定义显示锁
接下来,我们将利用前面所掌握的知识,构建一个自定义的显式锁,类似于 Java utils 包下的 Lock 接口,并且分析 synchronized 关键字的缺陷。
synchronized关键的缺陷
synchronized 关键字提供了一种排他式的数据同步机制,某个线程在获取 monitor lock 的时候可能会被阻塞,而这种阻塞有两个很明显的缺陷:
第一,无法控制阻塞时长。第二,阻塞不可被中断。
下面通过示例来进行分析,如下:
public class SynchronizedDefect{ public synchronized void syncMethod(){ try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException{ SynchronizedDefect defect = new SynchronizedDefect(); Thread t1 = new Thread(defect::syncMethod, “T1”); //make sure the t1 started. t1.start(); TimeUnit.MILLISECONDS.sleep(2); Thread t2 = new Thread(defect::syncMethod, “T2”); t2.start(); }}
上面的代码中有一个同步方法 syncMethod,这里启动了两个线程分别调用该方法,在该方法中线程会休眠1小时的时间,为了确保 T1 线程能够最先进入同步方法,在 T1 线程启动后主线程休眠了2毫秒的时间。T2 线程启动执行 syncMethod 方法时会进入阻塞,T2 什么时候能够获得 syncMethod 的执行完全取决于 T1 何时对其释放,如果 T2 计划最多1分钟获得执行权,否则就放弃,很显然这种方式是做不到的,这也就是前面所说的阻塞时长无法控制:
第二个缺陷是 T2 若因争抢某个 monitor 的锁而进入阻塞状态,那么它是无法中断的,虽然可以设置 T2 线程的 interrupt 标识,但是 synchronized 阻塞不像 sleep 和 wait 方法一样能够捕获得到中断信号。下面将 main 方法的代码稍作修改,试图打断 T2 线程,来看看会怎样。
public static void main(String[] args) throws InterruptedException{ SynchronizedDefect defect = new SynchronizedDefect(); Thread t1 = new Thread(defect::syncMethod, “T1”); //make sure the t1 started. t1.start(); TimeUnit.MILLISECONDS.sleep(2); Thread t2 = new Thread(defect::syncMethod, “T2”); t2.start(); //make sure the t2 started. TimeUnit.MILLISECONDS.sleep(2); t2.interrupt(); System.out.println(t2.isInterrupted()); System.out.println(t2.getState());}
程序的输出一定会让你感到失望,但是同时也证明了被 synchronized 同步的线程不可被中断。
显示锁BooleanLock
接下来,我们将利用前面所学的知识,构造一个显式的 BooleanLock,使其在具备 synchronized 关键字所有功能的同时又具备可中断和 lock 超时的功能。
定义Lock接口
示例代码如下:
public interface Lock{ void lock() throws InterruptedException; void lock(long mills) throws InterruptedException, TimeoutException; void unlock(); List<Thread> getBlockedThreads();}
在上述代码中:
lock()方法永远阻塞,除非获取到了锁,这一点和 synchronized 非常类似,但是该方法是可以被中断的,中断时会抛出 InterruptedException 异常。lock(long mills)方法除了可以被中断以外,还增加了对应的超时功能。unlock()方法可用来进行锁的释放。getBlockedThreads()用于获取当前有哪些线程被阻塞。
实现 BooleanLock
BooleanLock 是 Lock 的一个 Boolean 实现,通过控制一个 Boolean 变量的开关来决定是否允许当前的线程获取该锁,首先定义三个非常重要的成员变量,如下所示:
public class BooleanLock implements Lock{ private Thread currentThread; private boolean locked = false; private final List<Thread> blockedList = new ArrayList<>();}
其中 currentThread 代表当前拥有锁的线程,locked 是一个 boolean 开关,false 代表当前该锁没有被任何线程获得或者已经释放,true 代表该锁已经被某个线程获得,该线程就是 currentThread;blockedList 用来存储哪些线程在获取当前线程时进入了阻塞状态。下面继续实现 lock()方法,代码如下:
@Overridepublic void lock() throws InterruptedException{ synchronized (this)//① { while (locked) //② { blockedList.add(currentThread()); this.wait(); } blockedList.remove(currentThread());//③ this.locked = true;//④ this.currentThread = currentThread();//⑤ }}
在上述代码中:
①Lock()方法使用同步代码块的方式进行方法同步。②如果当前锁已经被某个线程获得,则该线程将加入阻塞队列,并且使当前线程 wait 释放对 this monitor 的所有权。③如果当前锁没有被其他线程获得,则该线程将尝试从阻塞队列中删除自己(注意:如果当前线程从未进入过阻塞队列,删除方法不会有任何影响;如果当前线程是从 wait set 中被唤醒的,则需要从阻塞队列中将自己删除)。④locked 开关被指定为 true。⑤记录获取锁的线程。
继续实现带有超时功能的 lock(long mills)方法,代码如下:
@Overridepublic void lock(long mills) throws InterruptedException, TimeoutException{ synchronized (this) { if (mills <= 0) ① { this.lock(); } else { long remainingMills = mills; long endMills = currentTimeMillis() remainingMills; while (locked) { if (remainingMills <= 0) //② throw new TimeoutException(“can not get the lock during ” mills if (!blockedList.contains(currentThread())) blockedList.add(currentThread()); this.wait(remainingMills); //③ remainingMills = endMills – currentTimeMillis(); //④ } blockedList.remove(currentThread()); //⑤ this.locked = true; this.currentThread = currentThread(); } }}
相比较 lock()方法,该方法稍微有些复杂,我们逐步解释。在上述代码中:
①如果 mills 不合法,则默认调用 lock()方法,当然也可以抛出参数非法的异常,一般来说,抛出异常是一种比较好的做法。②如果 remainingMills 小于等于0,则意味着当前线程被其他线程唤醒或者在指定的 wait 时间到了之后还没有获得锁,这种情况下会抛出超时的异常。③等待 remainingMills 的毫秒数,该值最开始是由其他线程传入的,但在多次 wait 的过程中会重新计算。④重新计算 remainingMills 时间。⑤获得该锁,并且从 block 列表中删除当前线程,将 locked 的状态修改为 true 并且指定获得锁的线程就是当前线程。
unlock()方法需要做的仅仅是将 locked 状态修改为 false,并且唤醒 wait set 中的其他线程,再次争抢锁资源。但是需要注意的一点是,哪个线程加的锁只能由该线程来解锁:
@Overridepublic void unlock(){ synchronized (this) { if (currentThread == currentThread())//① { this.locked = false; // ② this.notifyAll(); // ③ } }}
在上述代码中:
①判断当前线程是否为获取锁的那个线程,只有加了锁的线程才有资格进行解锁。②将锁的 locked 状态修改为 false。③通知其他在 wait set 中的线程,你们可以再次尝试抢锁了,这里使用 notify 和 notifyAll 都可以。
下面是 BooleanLock 的完整代码:
public class BooleanLock implements Lock{ private Thread currentThread; private boolean locked = false; private final List<Thread> blockedList = new ArrayList<>(); @Override public void lock() throws InterruptedException{ synchronized (this) { while (locked) { if (!blockedList.contains(currentThread())) blockedList.add(currentThread()); this.wait(); } blockedList.remove(currentThread()); this.locked = true; this.currentThread = currentThread(); } } @Override public void lock(long mills) throws InterruptedException, TimeoutException{ synchronized (this) { if (mills <= 0) { this.lock(); } else { long remainingMills = mills; long endMills = currentTimeMillis() remainingMills; while (locked) { if (remainingMills <= 0) throw new TimeoutException(“can not get the lock during ” \ mills ” ms.”); if (!blockedList.contains(currentThread())) blockedList.add(currentThread()); this.wait(remainingMills); remainingMills = endMills – currentTimeMillis(); } blockedList.remove(currentThread()); this.locked = true; this.currentThread = currentThread(); } } } @Override public void unlock(){ synchronized (this) { if (currentThread == currentThread()) { this.locked = false; Optional.of(currentThread().getName() ” release the lock.”) .ifPresent(System.out::println); this.notifyAll(); } } } @Override public List<Thread> getBlockedThreads(){ return Collections.unmodifiableList(blockedList); }}
至此 BooleanLock 的基本功能已经完成,当然读者也可以对其进行扩充,比如增加 tryLock 的功能,也就是说能获得锁便获得,获得不了就退出,压根不会阻塞。
使用 BooleanLock
上面我们充分利用了 wait、notify 方法的功能,实现了一个显式锁,可中断被阻塞的线程。在使用该显式锁的时候,务必借助于 try finally 语句来确保每次获取到锁之后都可以正常的释放,本节将列举一些简单的例子来展示如何使用 BooleanLock。
多个线程通过 lock()方法争抢锁public class BooleanLockTest{ //定义 BooleanLock private final Lock lock = new BooleanLock(); //使用 try..finally 语句块确保 lock 每次都能被正确释放 public void syncMethod(){ //加锁 lock.lock(); try { int randomInt = current().nextInt(10); System.out.println(currentThread() ” get the lock.”); TimeUnit.SECONDS.sleep(randomInt); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 lock.unlock(); } } public static void main(String[] args){ BooleanLockTest blt = new BooleanLockTest(); //定义一个线程并且启动 IntStream.range(0, 10) .mapToObj(i -> new Thread(blt::syncMethod)) .forEach(Thread::start); }}
上面的代码比较简单,代码注释已给出相应的说明,运行上面代码,查看执行结果,输出如下:
Thread[Thread-0,5,main] get the lock.Thread-0 release the lock monitor.Thread[Thread-9,5,main] get the lock.Thread-9 release the lock monitor.Thread[Thread-1,5,main] get the lock.Thread-1 release the lock monitor.Thread[Thread-6,5,main] get the lock.Thread-6 release the lock monitor.Thread[Thread-2,5,main] get the lock.Thread-2 release the lock monitor.Thread[Thread-5,5,main] get the lock.Thread-5 release the lock monitor.Thread[Thread-3,5,main] get the lock.Thread-3 release the lock monitor.Thread[Thread-7,5,main] get the lock.Thread-7 release the lock monitor.Thread[Thread-4,5,main] get the lock.Thread-4 release the lock monitor.Thread[Thread-8,5,main] get the lock.Thread-8 release the lock monitor.
根据控制台输出可以看到,每次都会确保只有一个线程能够获得锁的执行权限,这一点已经与 synchronized 同步非常类似。接下来再讲解 BooleanLock 的可中断特性。
可中断被阻塞的线程
将 main 函数中的代码稍作修改,启动 T1 线程以确保能够获得锁,紧接着启动另外一个 T2,main 线程将稍作休眠然后打断 T2 线程,代码如下:
BooleanLockTest blt = new BooleanLockTest();new Thread(blt::syncMethod, “T1”).start();TimeUnit.MILLISECONDS.sleep(2);Thread t2 = new Thread(blt::syncMethod, “T2″);t2.start();TimeUnit.MILLISECONDS.sleep(10);t2.interrupt();
运行上面的程序,在 T2 线程启动10毫秒以后,主动将其中断,T2 线程会收到中断信号,也就是 InterruptedException 异常,这样也就弥补了 Synchronized 同步方式不可被中断的缺陷。上述程序的运行结果如下:
Thread[T1,5,main] get the lock.java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at com.wangwenjun.concurrent.chapter05.BooleanLock.lock(BooleanLock.java:31) at com.wangwenjun.concurrent.chapter05.BooleanLockTest.syncMethod(BooleanLock-Test.java:18) at com.wangwenjun.concurrent.chapter05.BooleanLockTest$$Lambda$2/834600351.run (Unknown Source) at java.lang.Thread.run(Thread.java:745)T1 release the lock monitor.
但是,BooleanLock 还存在一个问题,如果某个线程被中断,那么它将有可能还存在于 blockList 中,该问题的修复也非常简单,可以对 BooleanLock 的 lock 方法进行进一步的增强加以修复,另外一个 lock 重载方法的实现思路与之类似,如下代码所示:
@Override public void lock() throws InterruptedException{ synchronized (this) { while (locked) { //暂存当前线程 final Thread tempThread = currentThread(); try { if (!blockedList.contains(tempThread)) blockedList.add(tempThread); this.wait(); } catch(InterruptedException e) { //如果当前线程在 wait 时被中断,则从 blockedList 中将其删除,避免内存泄漏 blockedList.remove(tempThread); //继续抛出中断异常 throw e; } } blockedList.remove(currentThread()); this.locked = true; this.currentThread = currentThread(); } }阻塞的线程可超时
最后再写一个具有超时功能 lock 的使用示例,同样定义两个线程 T1 和 T2,确保 T1 先执行能够最先获得锁,T2 稍后启动,在 1000ms 以内未获得锁则会抛出超时异常,代码如下:
public void syncMethodTimeoutable(){ try { lock.lock(1000); System.out.println(currentThread() ” get the lock.”); int randomInt = current().nextInt(10); TimeUnit.SECONDS.sleep(randomInt); } catch (InterruptedException | TimeoutException e) { e.printStackTrace(); } finally { lock.unlock(); }}public static void main(String[] args) throws InterruptedException{ BooleanLockTest blt = new BooleanLockTest(); new Thread(blt::syncMethod, “T1”).start(); TimeUnit.MILLISECONDS.sleep(2); Thread t2 = new Thread(blt::syncMethodTimeoutable, “T2”); t2.start(); TimeUnit.MILLISECONDS.sleep(10);}
这里的代码比较简单,不再赘述其实现细节,程序的输出结果如下所示:
Thread[T1,5,main] get the lock.java.util.concurrent.TimeoutException: can not get the lock during 1000 ms. at com.wangwenjun.concurrent.chapter05.BooleanLock.lock(BooleanLock.java:55) at com.wangwenjun.concurrent.chapter05.BooleanLockTest.syncMethodTimeoutable(BooleanLockTest.java:35) at com.wangwenjun.concurrent.chapter05.BooleanLockTest$$Lambda$2/834600351.run(Unknown Source) at java.lang.Thread.run(Thread.java:745)T1 release the lock monitor.