—— 24.10.8
一、问题提出
目前队列存在的问题
1.很多场景要求分离生产者、消费者两个角色、它们需要由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
2.poll方法,队列为空,那么在之前的实现里会返回null,如果就是硬要拿到一个元素呢?以现在的实现只能不断循环尝试
3.offer方法,队列为满,那么在之前的实现里会返回false,如果就是硬要塞入一个元素呢?以现在的实现只能不断循环尝试
4.指令交错,多个线程会造成混乱效果
二、解决方法
为解决线程不安全问题,需要给线程加锁,使线程局部阻塞
用条件变量让 poll 或 offer 线程进入等待 状态,而不是不断循环尝试,让CPU空转
三、单锁实现
Java中两种锁的选择
① synchronized:关键字,功能少
② ReentrantLock:可重入锁,功能丰富
lock() 加锁
unlock() 解锁
lockInterruptibly() 加锁(可在阻塞时打断,提前唤醒)
offer方法实现
if判断
问题
从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待
public void offer(String e) throws InterruptedException {// 加锁,可重入锁阻塞时可打断方法(可被强制唤醒)lock.lockInterruptibly();try {// 判断是否为满if (isFull()){// 队列满时,使offer线程阻塞,直到poll线程取走后,有位置时再恢复运行// tail.signal() 唤醒线程tailWaits.await();}array[tail] = e;if (++tail == array.length){tail = 0;}size++;}finally {// 解锁lock.unlock();}}
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class TestThreadUnsafe {private final String[] array = new String[10];// 尾指针private int tail = 0;// 元素个数private int size = 0;// 创建一个可重入锁对象ReentrantLock lock = new ReentrantLock();// 条件变量对象(集合线程)Condition tailWaits = lock.newCondition();public void offer(String e) throws InterruptedException {// 加锁,可重入锁阻塞时可打断方法(可被强制唤醒)lock.lockInterruptibly();try {// 判断是否为满if (isFull()){// 队列满时,使offer线程阻塞,直到poll线程取走后,有位置时再恢复运行// tail.signal() 唤醒线程tailWaits.await();}array[tail] = e;if (++tail == array.length){tail = 0;}size++;}finally {// 解锁lock.unlock();}}public void poll(String e) throws InterruptedException {}@Overridepublic String toString() {return Arrays.toString(array);}private boolean isFull(){return size == array.length;}private boolean isEmpty(){return size == 0;}public static void main(String[] args) throws InterruptedException{TestThreadUnsafe queue = new TestThreadUnsafe();for (int i = 0; i < 10; i++) {queue.offer("e"+i);}new Thread(()->{try {System.out.println(Thread.currentThread().getName()+"添加元素之前");queue.offer("e10");System.out.println(Thread.currentThread().getName()+"添加元素成功");} catch (InterruptedException e) {throw new RuntimeException(e);}},"t1").start();new Thread(()->{System.out.println("开始唤醒");try{queue.lock.lockInterruptibly();queue.tailWaits.signal();} catch (InterruptedException e) {throw new RuntimeException(e);}finally {queue.lock.unlock();}},"t2").start();}
}
while判断
解决了虚假唤醒的问题
@Overridepublic void offer(E e) throws InterruptedException { // poll 等待队列非空lock.lockInterruptibly();try{while (isFull()){// 放在条件变量等待tailWaits.await();}array[tail] = e;if (++tail == array.length){tail = 0;}size++;// 唤醒等待线程headWaits.signal();}finally {lock.unlock();}}
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class BlockingQueue1<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private int size;// 根据容量创造一个数组public BlockingQueue1(int capacity) {array = (E[]) new Object[capacity];}// 加可重入锁private ReentrantLock lock = new ReentrantLock();// 配合poll方法条件变量,在队列头部删除private Condition headWaits = lock.newCondition();// 配合offer方法条件变量,在队列尾部加入private Condition tailWaits = lock.newCondition();// 判空private boolean isEmpty(){return head == tail;}// 判满private boolean isFull(){return size == array.length;}@Overridepublic String toString() {return "array=" + Arrays.toString(array);}@Overridepublic void offer(E e) throws InterruptedException { // poll 等待队列非空lock.lockInterruptibly();try{while (isFull()){// 放在条件变量等待tailWaits.await();}array[tail] = e;if (++tail == array.length){tail = 0;}size++;// 唤醒等待线程headWaits.signal();}finally {lock.unlock();}}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {lock.lockInterruptibly();try{// 将毫秒时间转换为纳秒时间long t = TimeUnit.MILLISECONDS.toNanos(timeout);while (isFull()){if (t<=0){return false;}// 最多等待多少纳秒tailWaits.awaitNanos(t);}array[tail] = e;if (++tail == array.length){tail = 0;}size++;// 唤醒等待线程headWaits.signal();return true;}finally {lock.unlock();}}@Overridepublic E poll() throws InterruptedException {lock.lockInterruptibly();try {while (isEmpty()){headWaits.await();}E e = array[head];array[head] = null;if (++head == array.length){head = 0;}size--;tailWaits.signal();return e;}finally {lock.unlock();}}
}
main函数
public class TestBlockingQueue1 {public static void main(String[] args) throws InterruptedException {BlockingQueue1<String> queue = new BlockingQueue1<>(3);Thread t1 = new Thread(() -> {try {System.out.println(System.currentTimeMillis()+" begin ");queue.offer("任务1");System.out.println(queue);queue.offer("任务2");System.out.println(queue);queue.offer("任务3");System.out.println(queue);queue.offer("任务4",5000);System.out.println(queue);System.out.println(System.currentTimeMillis() + " end ");} catch (InterruptedException e) {throw new RuntimeException(e);}},"生产者");t1.start();Thread.sleep(2000);queue.poll();}
}
四、双锁实现
单锁问题:
单锁实现的缺陷:两个线程用了同一把锁,一个执行时,另一个就需阻塞,而offer方法添加元素和poll方法取走元素使用了同一把锁,这样两个线程不能同时执行,两方法相互阻塞
解决方法:
offer方法主要操作尾指针,poll方法主要操作头指针,将offer方法和poll方法分别添加一个锁,用两把锁分别保护头指针和尾指针,从而分别保护offer和poll两个方法
代码实现
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class BlockingQueue2<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private int size;// tailLock给offer方法入队用,Condition分别创建两个等待的条件变量private ReentrantLock tailLock = new ReentrantLock();private Condition notEmpty = tailLock.newCondition();// headLock给poll方法出队用private ReentrantLock headLock = new ReentrantLock();private Condition notFull = headLock.newCondition();public BlockingQueue2(int capacity) {this.array = (E[]) new Object[capacity];}// 判空private boolean isEmpty(){return size == 0;}// 判满private boolean isFull(){return size == array.length;}@Overridepublic String toString() {return "array=" + Arrays.toString(array);}@Overridepublic void offer(E e) throws InterruptedException {// 加锁tailLock.lockInterruptibly();try{while (isFull()) {notEmpty.await();}array[tail] = e;if (++tail == array.length) {tail = 0;}size++;}finally {tailLock.unlock();}}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {return false;}@Overridepublic E poll() throws InterruptedException {// 加锁headLock.lockInterruptibly();try{while (isEmpty()){notEmpty.await();}E e = array[head];array[head] = null;if (++head == array.length) {head = 0;}size--;return e;}finally {headLock.unlock();}}public static void main(String[] args) throws InterruptedException {BlockingQueue2<String> queue = new BlockingQueue2<>(3);queue.offer("任务1");new Thread(()->{try {queue.offer("任务2");} catch (InterruptedException e) {throw new RuntimeException(e);}},"offer").start();new Thread(()->{try {System.out.println(queue.poll());} catch (InterruptedException e) {throw new RuntimeException(e);}},"poll").start();}
}
size自增/自减问题
size的自增自减不能保障安全,size自增自减在多个线程同时执行时可能遇到冲突
解决方法
用原子变量AtomicInteger类型保证安全
getAndIncrement 自增方法,能保证线程安全
getAndDecrement 自减方法,能保证线程安全