第11章-Java并发编程实践

本章主要介绍了生产者和消费者模式进行并发编程、线上问题排查手段和性能测试实战。

生产者和消费者模式

生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信。这个阻塞队列就是用来给生产者和消费者解耦的。

[案例]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* @desc
* 阻塞队列生产消费者模式
* (1)生产:BlockingQueue.offer(val, timeout, timeout_unit)
* (2)消费:BlockingQueue.poll(timeout, timeout_unit)
* (3)开关:需要一个标识,循环等待
* @Author xw
* @Date 2019/8/23
*/
public class ProConsumer_BlockingQueueDemo {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(1));
new Thread(() -> {
try {
myResource.myProd();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AA").start();
new Thread(() -> {
try {
myResource.myConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BB").start();
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
myResource.stop();
System.out.println(Thread.currentThread().getName() + "\t 叫停");
}
}
class MyResource {
// 开关
private volatile boolean FLAG = true; // 默认开启,进行生产+消费者
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws InterruptedException {
String data = null;
boolean retValue;
while (FLAG) { // (1)判断
// (2)干活+通知
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
}
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
}
System.out.println(Thread.currentThread().getName() + "\t大老板叫停,表示FLAG=false,生产动作结束");
}
public void myConsumer() throws InterruptedException {
String result;
while (FLAG) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == result || "".equals(result)) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 超过2秒没有取到蛋糕,消费退出");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "\t 消费队列蛋糕" + result + "成功");
}
}
public void stop() {
FLAG = false;
}
}

线上问题定位

有时候,有很多问题只有在线上或者预发环境才能发现,而线上又不能调试代码,所以线上问题定位就只能看日志、系统状态和dump线程。

  1. 先用top命令找出CPU占比最高的
    Alt text

  2. ps -ef或者jps进一步定位进程

  3. 定位到具体线程或代码

    [案例]
    1
    2
    3
    4
    # -m 显示所有的线程
    # -p pid 进程使用cpu的时间
    # -o 该参数后是用户自定义格式
    $ ps -mp 进程 -o THREAD,tid,time、
  4. 将需要的线程ID转换为16进制格式(英文小写格式)

    [案例]
    1
    2
    3
    4
    dump出来的线程ID(nid)是十六进制的,而我们用TOP命令看到的线程ID是十进制的,
    所以要用printf命令转换一下进制。然后用十六进制的ID去dump里找到对应的线程。
    $ printf "%x\n" 31558
    输出:7b46。
  5. 定位异常代码块

    [案例]
    1
    2
    $ jstatck 进程ID | grep tid(16进制线程ID小写英文)-A60
    输出:异常代码块

性能测试

因为要支持某个业务,有同事向我们提出需求,希望系统的某个接口能够支持2万的QPS,因为我们的应用部署在多台机器上,要支持两万的QPS,我们必须先要知道该接口在单机上能支持多少QPS,如果单机能支持1千QPS,我们需要20台机器才能支持2万的QPS。

[常用命令]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1. 查询有多少台机器连接到这个端口上
$ netstat -nat | grep 12200 –c
$ 10
2. 查看已经使用了多少个数据库连接
$ netstat -nat | grep 3306 –c
$ 12
3. 通过ps命令查看下线程数是否增长了
$ ps -eLf | grep java -c
$ 1520
4. 其它命令
# 查看网络流量。
$ cat /proc/net/dev
# 查看系统平均负载。
$ cat /proc/loadavg
# 查看系统内存情况。
$ cat /proc/meminfo
# 查看CPU的利用率。