Java线程池并发执行多个任务
Java线程池的正确使用
1.Executors存在什么问题
2.Executors为什么存在缺陷
3. 线程池参数详解
4. 线程池正确打开方式
Java线程池并发执行多个任务Java在语言层面提供了多线程的支持,线程池能够避免频繁的线程创建和销毁的开销,因此很多时候在项目当中我们是使用的线程池去完成多线程的任务。
Java提供了Executors 框架提供了一些基础的组件能够轻松的完成多线程异步的操作,Executors提供了一系列的静态工厂方法能够获取不同的ExecutorService实现,ExecutorService扩展了Executors接口,Executors相当简单:
public interface Executor {
void execute(Runnable command);
}
把任务本身和任务的执行解耦了,如果说Runnable是可异步执行任务的抽象,那Executor就是如何执行可异步执行任务的抽象,说起来比较绕口。
本文不讲解线程的一些基础知识,因为网上的其他文章已经写的足够详细和泛滥。我写写多个异步任务的并发执行与结果的获取问题。
假设这样一个场景:我们要组装一个对象,这个对象由大量小的内容组成,这些内容是无关联无依赖关系的,如果我们串行的去执行,如果每个任务耗时10秒钟,一共有10个任务,那我们就需要100秒才能获取到结果。显然我们可以采用线程池,每个任务起一个线程,忽略线程启动时间,我们只需要10秒钟就能获取到结果。这里还有两种选择,这10秒钟我们可以去做其他事,也可以等待结果。
我们来完成这样的操作:
// 这是任务的抽象
class GetContentTask implements Callable<String> {
private String name;
private Integer sleepTimes;
public GetContentTask(String name, Integer sleepTimes) {
this.name = name;
this.sleepTimes = sleepTimes;
}
public String call() throws Exception {
// 假设这是一个比较耗时的操作
Thread.sleep(sleepTimes * 1000);
return "this is content : hello " + this.name;
}
}
采用completionService :
// 方法一
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService(executorService);
ExecuteServiceDemo executeServiceDemo = new ExecuteServiceDemo();
// 十个
long startTime = System.currentTimeMillis();
int count = 0;
for (int i = 0;i < 10;i ++) {
count ++;
GetContentTask getContentTask = new ExecuteServiceDemo.GetContentTask("micro" + i, 10);
completionService.submit(getContentTask);
}
System.out.println("提交完任务,主线程空闲了, 可以去做一些事情。");
// 假装做了8秒种其他事情
try {
Thread.sleep(8000);
System.out.println("主线程做完了,等待结果");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
// 做完事情要结果
for (int i = 0;i < count;i ++) {
Future<String> result = completionService.take();
System.out.println(result.get());
}
long endTime = System.currentTimeMillis();
System.out.println("耗时 : " + (endTime - startTime) / 1000);
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
执行结果为:
提交完任务,主线程空闲了, 可以去做一些事情。
主线程做完了,等待结果
this is content : hello micro9
this is content : hello micro7
this is content : hello micro2
this is content : hello micro5
this is content : hello micro4
this is content : hello micro8
this is content : hello micro1
this is content : hello micro3
this is content : hello micro0
this is content : hello micro6
耗时 : 10
如果多个不想一个一个提交,可以采用 invokeAll一并提交,但是会同步等待这些任务
// 方法二
ExecutorService executeService = Executors.newCachedThreadPool();
List<GetContentTask> taskList = new ArrayList<GetContentTask>();
long startTime = System.currentTimeMillis();
for (int i = 0;i < 10;i ++) {
taskList.add(new GetContentTask("micro" + i, 10));
}
try {
System.out.println("主线程发起异步任务请求");
List<Future<String>> resultList = executeService.invokeAll(taskList);
// 这里会阻塞等待resultList获取到所有异步执行的结果才会执行
for (Future<String> future : resultList) {
System.out.println(future.get());
}
// 主线程假装很忙执行8秒钟
Thread.sleep(8);
long endTime = System.currentTimeMillis();
System.out.println("耗时 : " + (endTime - startTime) / 1000);
} catch (Exception e) {
e.printStackTrace();
}
主线程发起异步任务请求
this is content : hello micro0
this is content : hello micro1
this is content : hello micro2
this is content : hello micro3
this is content : hello micro4
this is content : hello micro5
this is content : hello micro6
this is content : hello micro7
this is content : hello micro8
this is content : hello micro9
耗时 : 10
如果一系列请求,我们并不需要等待每个请求,我们可以invokeAny,只要某一个请求返回即可。
ExecutorService executorService = Executors.newCachedThreadPool();
ArrayList<GetContentTask> taskList = new ArrayList<GetContentTask>();
taskList.add(new GetContentTask("micro1",3));
taskList.add(new GetContentTask("micro2", 6));
try {
List<Future<String>> resultList = executorService.invokeAll(taskList);// 等待6秒
// String result2 = executorService.invokeAny(taskList); // 等待3秒
// invokeAll 提交一堆任务并行处理并拿到结果
// invokeAny就是提交一堆并行任务拿到一个结果即可
for (Future<String> result : resultList) {
System.out.println(result.get());
}
// System.out.println(result2);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("主线程等待");
如果我虽然发送了一堆异步的任务,但是我只等待一定的时间,在规定的时间没有返回我就不要了,例如很多时候的网络请求其他服务器如果要数据,由于网络原因不能一直等待,在规定时间内去拿,拿不到就我使用一个默认值。
这样的场景,我们可以使用下面的写法:
try {
ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<String>> taskList = new ArrayList<Callable<String>>();
taskList.add(new GetContentTask("micro1", 4));
taskList.add(new GetContentTask("micro2", 6));
// 等待五秒
List<Future<String>> resultList = executorService.invokeAll(taskList, 5, TimeUnit.SECONDS);
for (Future<String> future : resultList) {
System.out.println(future.get());
}
} catch (Exception e) {
e.printStackTrace();
}
this is content : hello micro1
java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.micro.demo.spring.ExecuteServiceDemo.main(ExecuteServiceDemo.java:105)
因为只等待5秒,6秒的那个任务自然获取不到,抛出异常,如果将等待时间设置成8秒,就都能获取到。
Java线程池的正确使用线程可认为是操作系统可调度的最小的程序执行序列,一般作为进程的组成部分,同一进程中多个线程可共享该进程的资源(如内存等)。JVM线程跟内核轻量级进程有一对一的映射关系,所以JVM中的线程是很宝贵的。
一般在工程上多线程的实现是基于线程池的。因为相比自己创建线程,多线程具有以下优点:
线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。
可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。
1.Executors存在什么问题看阿里巴巴开发手册并发编程这块有一条:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式。
2.Executors为什么存在缺陷2.1 线程池工作原理
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
2.2 newFixedThreadPool分析
Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。
这里的问题就出在:不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,如果我们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。而newFixedThreadPool中创建LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来说,是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题。
2.3 newCachedThreadPool分析
结合上述流程图,核心线程数=0,最大线程无限大,由于SynchronousQueue是一个缓存值为1的阻塞队列。当有大量任务请求时,线程池会创建大量线程,造成OOM。
3. 线程池参数详解3.1 构造方法
3.2 线程池拒绝策略
RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。。以下是JDK1.5提供的四种策略。
AbortPolicy:直接抛出异常
CallerRunsPolicy:只用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。DiscardPolicy:不处理,丢弃掉。
当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
4. 线程池正确打开方式4.1 创建线程池
避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。
4.2 向线程池提交任务
我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务知否被线程池执行成功。通过以下代码可知execute方法输入的任务是一个Runnable类的实例。
我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
4.3 关闭线程池
shutdown关闭线程池
方法定义:public void shutdown()
线程池的状态变成SHUTDOWN状态,此时不能再往线程池中添加新的任务,否则会抛出RejectedExecutionException异常。
线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
注意:这个函数不会等待提交的任务执行完成,要想等待全部任务完成,可以调用:
public boolean awaitTermination(longtimeout, TimeUnit unit)
shutdownNow关闭线程池并中断任务
方法定义:public List shutdownNow()
线程池的状态立刻变成STOP状态,此时不能再往线程池中添加新的任务。
终止等待执行的线程,并返回它们的列表;
试图停止所有正在执行的线程,试图终止的方法是调用Thread.interrupt(),但是大家知道,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
4.4 如何配置线程池大小
CPU密集型任务
该任务需要大量的运算,并且没有阻塞,CPU一直全速运行,CPU密集任务只有在真正的多核CPU上才可能通过多线程加速 CPU密集型任务配置尽可能少的线程数量:
CPU核数+1个线程的线程池。
例如: CPU 16核,内存32G。线程数=16
IO密集型任务
IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如:CPU核数*2
某大厂设置策略:IO密集型时,大部分线程都阻塞,故需要多配置线程数:
CPU核数/(1-阻塞系数)
例如: CPU 16核, 阻塞系数 0.9 ------------->16/(1-0.9) = 160 个线程数。
此时非阻塞线程=16
以上为个人经验,希望能给大家一个参考,也希望大家多多支持易知道(ezd.cc)。