Java 工具类之线程相关

ThreadUtil

下面是一个非常扎实的具备生产能力的线程工具类。在高并发场景下,直接使用 Executors 快捷创建线程池是极其危险的(容易导致 OOM),这个类使用分类治理、资源隔离、优雅关闭的核心架构思想来规范线程的使用。

在实际的环境中,可能存在各种各样的不同任务。这个类根据任务的性质(CPU 密集 vs IO 密集)进行了科学的隔离,这是防止系统由于某个耗时操作导致全盘瘫痪的关键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

/**
* @author KJ
* @description 线程工具类
*/
public class ThreadUtil {

/**
* CPU核数
**/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

/**
* 空闲保活时限,单位秒
*/
private static final int KEEP_ALIVE_SECONDS = 30;

/**
* 有界队列size
*/
private static final int QUEUE_SIZE = 10000;

/**
* 最大线程数
*/
private static final int CPU_MAX_POOL_SIZE = CPU_COUNT;

/**
* IO线程池最大线程数
*/
private static final int IO_MAX_POOL_SIZE = Math.max(2, CPU_COUNT * 2);

/**
* 混合线程池
*/
private static final int MIXED_MAX_POOL_SIZE = 128; // 最大线程数
private static final String MIXED_THREAD_AMOUNT = "mixed.thread.amount";


/**
* 定制的线程工厂:把每一个新生的线程,都变成一个 “有身份、有规矩” 的系统公民。
*/
public static class CustomThreadFactory implements ThreadFactory {
// 线程池的数量
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
// 线程数量
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String threadTag;

CustomThreadFactory(String threadTag) {
// 获取当前线程组,保证所有由该工厂创建的线程都在同一个逻辑分组下,便于安全管理和整体监控。
this.group = Thread.currentThread().getThreadGroup();
this.threadTag = "appPool-" + poolNumber.getAndIncrement() + "-" + threadTag + "-";
}

@Override
public Thread newThread(Runnable target) {
// 线程名称:appPool-{线程池编号}-{线程Tag}-{线程编号}
Thread t = new Thread(group, target, threadTag + threadNumber.getAndIncrement(), 0);
// 强制将线程设为非守护线程。JVM 只有在所有非守护线程都结束时才会退出。服务器系统中哪怕主线程停了,只要线程池里还有消息在转发,JVM 就不能强行关闭。
if (t.isDaemon()) {
t.setDaemon(false);
}
// 重置优先级为普通(5)。防止新线程继承了父线程(可能是某个高优先级或低优先级的特殊线程)的优先级,导致系统 CPU 分配不均。
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}

/**
* Shutdown 通用钩子线程
*/
static class ShutdownHookThread extends Thread {
private volatile boolean hasShutdown = false;
private final Callable<?> callback;

public ShutdownHookThread(String name, Callable<?> callback) {
super("JVM退出钩子(" + name + ")");
this.callback = callback;
}

@Override
public void run() {
synchronized (this) {
System.out.println(getName() + " starting.... ");
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
try {
this.callback.call();
} catch (Exception e) {
System.out.println(getName() + " error: " + e.getMessage());
}
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
System.out.println(getName() + " 耗时(ms): " + consumingTimeTotal);
}
}
}
}

/**
* 懒汉式单例创建线程池:用于CPU密集型任务
*
* 虽然 EXECUTOR 被声明为 static final,但因为它被包裹在静态内部类中,而JVM加载ThreadUtil类时不会加载 CpuThreadPoolInstanceHolder。只有当你第一次真正调用
* ThreadUtil.getCpuThreadPoolInstance() 时,代码会访问 CpuThreadPoolInstanceHolder.EXECUTOR。这时JVM才发现需要加载这个内部类,线程池才会被创建。
* 这种写法在设计模式中被称为 Holder 模式,它比双重检查锁(DCL)更简洁、更安全。传统的懒汉式需要 synchronized 来保证多线程下只创建一个实例,这会带来性能开销。
* 而此处的计利用了JVM的类加载锁,JVM保证了一个类在加载过程中是线程安全的。你不需要写任何锁代码,就能获得高性能的单例。
*/
private static class CpuThreadPoolInstanceHolder {
// 线程池:用于CPU密集型任务
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
CPU_MAX_POOL_SIZE,
CPU_MAX_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE),
new CustomThreadFactory("cpu"));

static {
// 默认状态非核心线程如果闲置时间超过 keepAliveTime 会被回收,而核心线程永远驻留。开启这个参数之后核心线程也受 keepAliveTime 参数的限制,如果长时间没有任务进来,线程池最终会收缩到0个线程。
// 如果是互联网大厂的核心服务(流量每秒几万),那么通常不开启,为了追求极致的响应速度,宁愿浪费点内存也要让线程在那待命。
EXECUTOR.allowCoreThreadTimeOut(true);
// JVM关闭时的钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread("CPU密集型任务线程池", () -> {
// 优雅关闭线程池
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}));
}
}

/**
* 懒汉式单例创建线程池:用于IO密集型任务
*/
private static class IoThreadPoolInstanceHolder {
// 线程池:用于IO密集型任务
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
IO_MAX_POOL_SIZE,
IO_MAX_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE),
new CustomThreadFactory("io"));

static {
EXECUTOR.allowCoreThreadTimeOut(true);
// JVM关闭时的钩子函数
Runtime.getRuntime().addShutdownHook(
new ShutdownHookThread("IO密集型任务线程池", () -> {
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}));
}
}

/**
* 懒汉式单例创建线程池:用于混合型任务
*/
private static class MixedThreadPoolInstanceHolder {
// 首先从环境变量 mixed.thread.amount 中获取预先配置的线程数
// 如果没有对 mixed.thread.amount 做配置,则使用常量 MIXED_MAX 作为线程数
private static final int max = (null != System.getProperty(MIXED_THREAD_AMOUNT)) ?
Integer.parseInt(System.getProperty(MIXED_THREAD_AMOUNT)) : MIXED_MAX_POOL_SIZE;
// 线程池:用于混合型任务
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
max,
max,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE),
new CustomThreadFactory("mixed"));

static {
EXECUTOR.allowCoreThreadTimeOut(true);
// JVM关闭时的钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread("混合型任务线程池", () -> {
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}));
}
}

/**
* 懒汉式单例创建线程池:用于定时任务、顺序排队执行任务
*/
static class SeqOrScheduledTargetThreadPoolLazyHolder {
// 线程池:用于定时任务、顺序排队执行任务
static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(
1,
new CustomThreadFactory("seq"));

static {
Runtime.getRuntime().addShutdownHook(
new ShutdownHookThread("定时和顺序任务线程池", (Callable<Void>) () -> {
shutdownThreadPoolGracefully(EXECUTOR);
return null;
}));
}
}

/**
* 获取执行CPU密集型任务的线程池
*/
public static ThreadPoolExecutor getCpuThreadPoolInstance() {
return CpuThreadPoolInstanceHolder.EXECUTOR;
}

/**
* 获取执行IO密集型任务的线程池
*/
public static ThreadPoolExecutor getIoThreadPoolInstance() {
return IoThreadPoolInstanceHolder.EXECUTOR;
}

/**
* 获取执行混合型任务的线程池
*/
public static ThreadPoolExecutor getMixedThreadPoolInstance() {
return MixedThreadPoolInstanceHolder.EXECUTOR;
}

/**
* 获取可调度线程池(包含提交延迟、定时、周期性、顺序性执行的任务)
*/
public static ScheduledThreadPoolExecutor getSeqOrScheduledExecutorService() {
return SeqOrScheduledTargetThreadPoolLazyHolder.EXECUTOR;
}

/**
* 顺序排队执行
*/
public static void seqExecute(Runnable command) {
getSeqOrScheduledExecutorService().execute(command);
}

/**
* 延迟执行
*/
public static void delayRun(Runnable command, int i, TimeUnit unit) {
getSeqOrScheduledExecutorService().schedule(command, i, unit);
}

/**
* 固定频率执行
*/
public static void scheduleAtFixedRate(Runnable command, int i, TimeUnit unit) {
getSeqOrScheduledExecutorService().scheduleAtFixedRate(command, i, i, unit);
}

/**
* 优雅线程池关闭
*/
public static void shutdownThreadPoolGracefully(ExecutorService threadPool) {
if (threadPool == null || threadPool.isTerminated()) {
return;
}
try {
threadPool.shutdown(); // 拒绝接受新任务
} catch (SecurityException | NullPointerException e) {
return;
}
try {
// 等待 60 s,等待线程池中的任务完成执行
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
// 调用 shutdownNow 取消正在执行的任务
threadPool.shutdownNow();
// 再次等待 60 s,如果还未结束,可以再次尝试,或则直接放弃
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池任务未正常执行结束");
}
}
} catch (InterruptedException ie) {
// 捕获异常,重新调用 shutdownNow
threadPool.shutdownNow();
}
// 仍然没有关闭,循环关闭1000次,每次等待10毫秒
if (!threadPool.isTerminated()) {
try {
for (int i = 0; i < 1000; i++) {
if (threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
threadPool.shutdownNow();
}
} catch (Throwable e) {
System.err.println(e.getMessage());
}
}
}

/**
* 线程睡眠
*/
public static void sleepSeconds(int second) {
LockSupport.parkNanos(second * 1000L * 1000L * 1000L);
}

/**
* 线程睡眠
*/
public static void sleepMilliSeconds(int millisecond) {
LockSupport.parkNanos(millisecond * 1000L * 1000L);
}

/**
* 获取当前线程名称
*/
public static String getCurThreadName() {
return Thread.currentThread().getName();
}

/**
* 获取当前线程ID
*/
public static long getCurThreadId() {
return Thread.currentThread().getId();
}

/**
* 获取当前线程
*/
public static Thread getCurThread() {
return Thread.currentThread();
}

/**
* 调用栈中的类名
*/
public static String stackClassName(int level) {
// 调用的类名
return Thread.currentThread().getStackTrace()[level].getClassName();
}

/**
* 调用栈中的方法名称
*/
public static String stackMethodName(int level) {
// 调用的类名
return Thread.currentThread().getStackTrace()[level].getMethodName();
}
}