第8章-Java中的并发工具类

本章主要介绍了使用JDK并发包里的CountDownLatch、CyclicBarrier和Semaphore工具类来控制并发流程以及使用Exchanger工具类在线程间交换数据。

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

[案例一]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run() {
}
});
Thread parser2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser2 finish");
}
});
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}
[案例二]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @desc 倒计时
* 5个人都走了,才关门?
* @Author xw
* @Date 2019/8/9
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for(int i = 1; i <= 5; i++) { // 5个同学
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "离开");
TimeUnit.SECONDS.sleep(1);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "同学[" + i + "]").start();
}
countDownLatch.await();
System.out.println("班长关门");
}
}

CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

[案例]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @desc 计数器
* 5个人都到齐了才出发
* @Author xw
* @Date 2019/8/9
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "同学到了");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "T" + i).start();
}
while ((cyclicBarrier.getNumberWaiting()+1) != 5) {
}
System.out.println("人数到齐准备出发!");
}
}

Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

[案例]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @desc 信号量
* 停车场(5个车位,20人抢)
* @Author xw
* @Date 2019/8/9
*/
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5); // 停车场只有5个车位
for (int i = 1; i <= 20; i++) { // 20个人抢车位
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "》》》》抢到车位");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "离开了=======");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, "同学" + i).start();
}
}
}

Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。

[案例]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ExchangerTest {
private static final Exchanger<String>exgr = new Exchanger<String>();
private static ExecutorServicethreadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A";    // A录入银行流水数据
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B";    // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
+ A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}