第七章-阻塞队列知道吗

队列和阻塞队列

image-20191107105850367

为什么用?有什么好处?

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒

为什么需要BlockingQueue:
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

BlockingQueue核心方法

image-20191107110031908

架构梳理+种类分析

image-20191107110054498

用在哪里

生产者消费者模式

传统版

image-20191107110218547

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* @desc
* 题目:一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,来5轮
* 线程 操作(方法) 资源类
* 判断 干活 通知
* 防止虚假唤醒:多线程判断一定要用while,防止虚假唤醒!!!
*
* @Author xw
* @Date 2019/8/23
*/
public class ProConsumer_TaditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
shareData.increment();
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.decrease();
}
}, "BB").start();
}
}

class ShareData {
volatile int number;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

// 加1
public void increment() {
lock.lock();
try {
// (1)判断:多线程判断一定要用while,防止虚假唤醒!!!
while (number != 0) {
// 等待,不能生产
condition.await();
}
// (2)干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// (3)通知
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// 减少
public void decrease() {
lock.lock();
try {
// (1)判断:多线程判断一定要用while,防止虚假唤醒!!!
while (number != 1) {
// 等待,不能生产
condition.await();
}
// (2)干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// (3)通知
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

阻塞队列版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* @desc
* 阻塞队列生产消费者模式
* (1)生产:BlockingQueue.offer(val, timeout, timeout_unit)
* (2)消费:BlockingQueue.poll(timeout, timeout_unit)
* (3)开关:需要一个标识,循环等待
* @Author xw
* @Date 2019/8/23
*/
public class ProConsumer_BlockingQueueDemo {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(1));
new Thread(() -> {
try {
myResource.myProd();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AA").start();
new Thread(() -> {
try {
myResource.myConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BB").start();

try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
myResource.stop();
System.out.println(Thread.currentThread().getName() + "\t 叫停");
}
}

class MyResource {
// 开关
private volatile boolean FLAG = true; // 默认开启,进行生产+消费者
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}

public void myProd() throws InterruptedException {
String data = null;
boolean retValue;
while (FLAG) { // (1)判断
// (2)干活+通知
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
}
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
}
System.out.println(Thread.currentThread().getName() + "\t大老板叫停,表示FLAG=false,生产动作结束");
}

public void myConsumer() throws InterruptedException {
String result;

while (FLAG) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == result || "".equals(result)) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 超过2秒没有取到蛋糕,消费退出");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "\t 消费队列蛋糕" + result + "成功");
}
}

public void stop() {
FLAG = false;
}
}

线程池

消息中间件