目录
1. 阻塞队列
2. 阻塞队列的优点
(1) 实现服务器之间的"低耦合".
(2) 实现"削峰填谷"的功能.
3. 阻塞队列代码举例
4. 自己实现阻塞队列
1. 阻塞队列
我们知道, 标准库中原有的队列Queue及其子类, 都是线程不安全的, 所以java封装了一个名为"阻塞队列" (Blocking Queue) 的接口, "阻塞队列"在普通队列的基础上做了扩充, 它有如下特性:
(1) "阻塞队列"是线程安全的.
(2) 如果队列为空时, 进行出队操作, 此时就会出现阻塞, 一直阻塞到其他线程往队列里增加元素为止 (一直阻塞到队列不为空为止).
(3) 如果队列为满时, 进行入队操作, 此时就会出现阻塞, 一直阻塞到其他线程取走队列元素为止 (一直阻塞到队列不为满为止).
[注]: 阻塞队列主要应用于"生产者-消费者"模型.
2. 阻塞队列的优点
(1) 实现服务器之间的"低耦合".
如果说A, B两台服务器之间是直接调用的关系, 那么编写A的代码时, 就会出现很多B的代码, 编写B的代码时, 也会出现很多A的代码, 这样的话两台服务器之间的耦合程度就非常高. 如果一台服务器挂掉的话, 另一台服务器也会受到很大的影响.
如果我们引入阻塞队列 ( 在服务器领域, 我们给这样的阻塞队列起了一个新的名字: "消息队列" (Message Queue "MQ" ) ), 让A与阻塞队列建立直接的联系(A只与阻塞队列通信), B与阻塞队列建立直接的联系(B也只与阻塞队列通信). 那么此时A不知道B的存在, 不也不会知道A的存在, A, B之间的耦合程度就非常低了, 现在即使B挂掉了, 对A也几乎没有影响.
(2) 实现"削峰填谷"的功能.
如果某一时刻, 客户端的请求量突然激增, 那么如果没有消息队列, 下游的服务器 ("下游服务器指的是要对数据或请求作进一步处理的服务器, 它比上游服务器要做的操作更复杂, 消耗的资源更多")很可能被巨大的请求量冲垮, 导致系统崩溃. 但是如果有了这样一个消息队列, 那么在请求量激增的时候, 消息队列就会起到一个"缓冲"的作用, 把巨大的请求量"挡住", 仍然以原有的速度 (或者比原本速度快一点的速度) 把请求传给下游服务器, 这样下游服务器就不会被冲垮. 而到了请求量偏低的时候, 消息队列又会把积攒的请求传给下游服务器, 让它不空闲下来. 这样一个"削峰填谷"的作用类似于"三峡大坝". 在汛期蓄水, 控制流速; 在旱期防水, 不至于让水流干涸.
注意: 消息队列也有其缺点, 比如: (1) 需要更多的机器来部署这样的消息队列. (2) 由于增加了消息队列这样一个东西, A与B之间通信的延时会变长.
3. 阻塞队列代码举例
我们可以看到, BlockingQueue这个接口有很多实现类, 这里我们以ArrayBlockingQueue为例举例.
public class Demo26 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);// 创建一个阻塞队列对象, 并指定其最大容量为3.queue.put("111");System.out.println("put成功");queue.put("111");System.out.println("put成功");queue.put("111");System.out.println("put成功");queue.put("111");System.out.println("put成功");}
}
运行上面这个程序, 我们可以看到, 前三个"111"都put成功, 但是执行到第四个put的时候, 发现队列已经满了, 那么这时候该线程就会进入阻塞状态. 直到其他线程取走队列元素(队列不满时), 才继续再进行put操作.
public class Demo27 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);// 创建一个阻塞队列对象, 并指定其最大容量为3.queue.put("111");System.out.println("put成功");queue.put("111");System.out.println("put成功");queue.take();System.out.println("take成功");queue.take();System.out.println("take成功");queue.take();System.out.println("take成功");}
}
运行上面这个程序, 我们可以看到: 两个put成功, 两个take成功. 当执行到第三个take的时候, 发现此时队列为空, 那么此时线程就会进入阻塞状态, 直到其他线程往队列里新增元素(队列不空时), 才继续再进行take操作.
public class Demo28 {public static void main(String[] args) {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);// 生产者线程Thread t1 = new Thread(() -> {int i = 1;while (true) {try {queue.put(i);System.out.println("生产元素 " + i);i++;// 给生产操作, 加上 sleep, 生产慢点, 消费快点Thread.sleep(1300);} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 消费者线程Thread t2 = new Thread(() -> {while (true) {try {Integer i = queue.take();System.out.println("消费元素 " + i);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}
如上述代码, 我们给生产者线程加了sleep操作. 让生产得慢一点, 那么队列中没有元素的时候, 消费者线程只能阻塞等待, 等到队列中新增元素元素的时候再take, 进行消费元素. 所以, 上述代码的执行效果如下:
4. 自己实现阻塞队列
class MyBlockingQueue {private String[] data = null;private volatile int head = 0;private volatile int tail = 0;private volatile int size = 0;// private Object locker = new Object();public MyBlockingQueue(int capacity) {data = new String[capacity];}public void put(String s) throws InterruptedException {// 加锁的对象, 可以单独定义一个, 也可以直接就地使用 this.synchronized (this) {if (size == data.length) {// 队列满了// return;this.wait();}data[tail] = s;tail++;if (tail >= data.length) {tail = 0;}size++;this.notify();}}public String take() throws InterruptedException{String ret = "";synchronized (this) {if (size == 0) {// 队列为空// return null;this.wait();}ret = data[head];head++;if (head >= data.length) {head = 0;}size--;this.notify();}return ret;}
}
这里对变量加上volatile, 对操作加上synchronized, 保证线程安全. put方法中,当队列满的时候, 进入wait()等待, take方法执行完take操作后会进行通知 notify() ; take方法中,当队列空的时候, 进入wait()等待, put方法执行完put操作后会进行通知 notify(); 相互通知, 保证了"阻塞"这个功能.
上述代码写成这个样子就比较完整了, 但是还存在一个小问题: 我们这里的wait()只有notify一种方式能唤醒吗? --> 显然不是, 那么, 比如说: 在 if (size == 0) 这里, 我们判定size == 0, 进入循环, 然后执行wait(), 进入等待状态. 如果此时wait()被提前唤醒了(eg: 被Interrupt唤醒), 而此时size还是0 (队列还没来得及新插入数据, wait()就被提前唤醒了), 那么唤醒之后继续往下执行程序必然会出现错误. 所以, 我们在这里可以做一个小小的改进: 将 if 替换成 wile . 这样做的目的就是为了能够"循环判定", 即使wait()被提前唤醒, 这里还会执行一次while的判定, 看 size 是否为0, 如果仍然是0, 那就继续wait(); 如果不为0了, 就执行后面的代码. 这样一来, 这个代码是不是就更加完善了呢? 不仅在这里, 日常开发中, 也有很多地方, 用 while 会比 if 好很多, 因为 while 是"循环判定", 而 if 只判定一次.
class MyBlockingQueue {private String[] data = null;private volatile int head = 0;private volatile int tail = 0;private volatile int size = 0;// private Object locker = new Object();public MyBlockingQueue(int capacity) {data = new String[capacity];}public void put(String s) throws InterruptedException {// 加锁的对象, 可以单独定义一个, 也可以直接就地使用 this.synchronized (this) {while (size == data.length) { //将if换成while// 队列满了// return;this.wait();}data[tail] = s;tail++;if (tail >= data.length) {tail = 0;}size++;this.notify();}}public String take() throws InterruptedException{String ret = "";synchronized (this) {while (size == 0) { //将if换成while// 队列为空// return null;this.wait();}ret = data[head];head++;if (head >= data.length) {head = 0;}size--;this.notify();}return ret;}
}
好了, 本篇文章就介绍到这里啦, 大家如果有疑问欢迎评论, 如果喜欢小编的文章, 记得点赞收藏~~