线程笔记(4)

线程池

  • 简介:线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象或者Callable对象传给线程池,线程池就会启动一个线程来执行它们的run或call方法,当run或call方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一个Runnable或Callable对象。
  • 作用:使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数可以控制系统中的并发线程数不超过此数。

Executors、Executor和ExecutorService

Executors是一个工厂类,包含了一些静态方法来创建线程池;Executor是一个接口,它是ExecutorService接口的父接口(类似Collections和Collection)

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadPoolTest {
public static void main(String[] args) {
//创建一个具有固定线程数的线程池
ExecutorService pool = Executors.newFixedThreadPool(6);
//创建一个Runnable对象
Runnable target = () -> {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "的i = " + i);
}
};
//向线程池中提交两个线程
pool.submit(target);
pool.submit(target);
//关闭线程池
pool.shutdown();
}
}

Executors创建线程池的几种方式

  1. newCachedThreadPool():创建一个具有缓存功能的线程池,该线程池没有长度限制,对于新的任务,如果有空闲的线程,则使用空闲的线程执行,如果没有,则新建一个线程来执行任务。如果线程池长度超过处理需要,可灵活回收空闲线程。
  2. newFixedThreadPool(int nThreads):创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。定长线程池的大小通常根据系统资源进行设置:Runtime.getRuntime().availableProcessors()。

    1
    ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  3. newSingleThreadScheduledExecutor():创建一个只有单个线程的线程池,它相当于调用newFixedThreadPool方法时传入参数为1。

  4. newScheduledThreadPool(int corePoolSize):创建一个定长线程池,并且支持定时和周期性的执行任务。
1
2
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(10);  
scheduledPool.schedule(target, 10, TimeUnit.SECONDS); //延迟8秒执行
1
2
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(10);
scheduledPool.scheduleAtFixedRate(target, 10, 5, TimeUnit.SECONDS); //延迟10s执行,每5s执行一次
1
2
3
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(10);
//第一次延迟10s执行,随后在每一次执行结束和下一次执行开始之间的间隔为5s
scheduledPool.scheduleWithFixedDelay(target, 10, 5, TimeUnit.SECONDS);
  1. newSingleThreadScheduledExecutor():创建一个只有单个线程的线程池,并且支持定时和周期性地执行任务。
  2. newWorkStealingPool(int parallelism):创建持有足够线程的线程池来支持给定的并行级别(parallelism),该方法还会使用多个队列来减少竞争。
  3. newWorkStealingPool():该方法相当于前一个方法传入参数Runtime.getRuntime().availableProcessors(),即如果当前机器有4个CPU,则传入4。
  • 方法总结
    • 上述各个方法,除了最后两个,其他的方法都有另一个具有ThreadFactory threadFactory参数的重载方法,该参数的作用是定义如何启动一个线程,可以设置线程的名称,并且可以确定是否是后台线程等。
    • 最后两个方法是Java8新增的,这两个方法充分利用多CPU并行的能力,这两个方法生成的work stealing池,都相当于后台线程池,如果所有的前台线程都死亡了,该池中的线程会自动死亡。

关闭线程池

  • 用完一个线程池后,应该调用该线程池的shutdown()方法,调用该方法后,线程池不再接受新任务,但会将以前所有已提交的任务执行完成。当所有任务都执行完成,池中的所有线程都会死亡。
  • 也可以调用shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

ForkJoinPool

  • 背景:为了充分利用多CPU、多核CPU的优势,可以考虑把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成后,再将这些执行结果合并起来即可。Java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行执行,再把多个“小任务”的结果合并成总的结果。
  • 简介:ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。

使用

  1. 创建ForkJoinPool对象
  • 使用Executors工具类
    • ExecutorService newWorkStealingPool()
    • ExecutorService newWorkStealingPool(int parallelism)
  • 两个常用的构造器
    • ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool
    • ForkJoinPool():相当于在上一个方法中传入参数Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),其中MAX_CAP = 0x7fff;
  • Java8增加了通用池功能,提供了如下两个静态方法进行操作
    • ForkJoinPool commonPool():该方法返回一个通用池,其运行状态不会受shutdown()或shutdownNow()方法的影响。当然,如果程序直接执行System.exit(0);来终止虚拟机,通用池中正在执行的任务将自动终止。
    • int getCommonPoolParallelism():返回通用池的并行级别。
  1. 提交任务到ForkJoinPool
  • 提交没有返回值的任务
    • void execute(ForkJoinTask<?> task)
    • void execute(Runnable task):Runnable最终包装成ForkJoinTask
  • 提交有返回值的任务
    • public ForkJoinTask submit(ForkJoinTask task)
    • public ForkJoinTask submit(Callable task)
    • public ForkJoinTask submit(Runnable task, T result)
    • public ForkJoinTask<?> submit(Runnable task)
  • 同步提交,阻塞等结果返回
    • public T invoke(ForkJoinTask task)
  • 总结:所有的任务最终都会以ForkJoinTask类型提交到线程池中

ForkJoinTask

  • ForkJoinTask是一个抽象类,其实现了Future接口。它还有两个抽象子类:RecursiveAction和RecursiveTask,RecursiveAction代表没有返回值的任务,而RecursiveTask代表有返回值的任务。

RecursiveAction示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//打印0~300
public class RecursiveActionTest {
public static void main(String[] args) throws InterruptedException{
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new PrintTask(0, 300)); //提交任务
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
//当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。
forkJoinPool.shutdown();
}
}

class PrintTask extends RecursiveAction {
//每个小任务最多打印个数
private static final int PER_MAX = 50;
private int start;
private int end;

public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}

/**
* 重写该方法,执行计算操作
*/
@Override
protected void compute() {
if (end - start < PER_MAX) { //此时开始打印
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + ":i = " + i);
}
} else { //要打印的数超过了最大值时,将大任务分解成两个“小任务”
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
//并行执行两个“小任务”
left.fork();
right.fork();
}
}
}

RecursiveTask示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//使用RecursiveTask对一个长度为100的数字的元素值进行累加
public class RecursiveTaskTest {
public static void main(String[] args) throws Exception{
//创建及初始化数组arr,元素值为1,2,...,100
int [] arr = new int[100];
for (int i = 0; i < 100; i++) {
arr[i] = i+1;
}
//创建一个通用池
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
//提交有返回值的任务
Future<Integer> future = forkJoinPool.submit(new CalTask(arr, 0, arr.length));
//获取结果
System.out.println("结果为:" + future.get());
//关闭线程池
forkJoinPool.shutdown();
}
}

class CalTask extends RecursiveTask<Integer> {
//每个小任务最多累加数
private static final int PER_MAX = 20;
private int [] arr;
private int start;
private int end;

public CalTask(int[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start < PER_MAX) { //开始进行累加
int sum = 0;
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else { //超过最大累加数是,将大任务分成两个“小任务”
int middle = (start + end) / 2;
CalTask left = new CalTask(arr, start, middle);
CalTask right = new CalTask(arr, middle, end);
//并行执行两个“小任务”
left.fork();
right.fork();
//把两个“小任务”累加的结果合并起来
return left.join() + right.join();
}
}
}

参考

-------------    本文到此结束  感谢您的阅读    -------------
0%