Java实现多任务执行助手

Java实现多任务执行助手

本文实例为大家分享了Java实现多任务执行助手的具体代码,供大家参考,具体内容如下

1.多线程执行任务类

package com.visy.threadpool; import com.visy.executor.ExecutorFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Configuration; @Configuration public class ThreadPoolConfig {     private TheadPoolProperties theadPoolProperties;     private ThreadPoolExecutor executor;     private ThreadPoolExecutor executorChild;     public ThreadPoolConfig(TheadPoolProperties theadPoolProperties) {         this.theadPoolProperties = theadPoolProperties;         this.executor = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize());         this.executorChild = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service-child", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize());     }     public <V> List<V> doConcurrentTask(List<Callable<V>> taskList, ThreadPoolExecutor... executorChilds) {         if (taskList != null && !taskList.isEmpty()) {             List<V> resultList = new ArrayList();             List futureList = null;             try {                 if (this.executor.getQueue().size() >= this.theadPoolProperties.getQueueSize()) {                     throw new RuntimeException("queue size bigger than 100, now size is " + this.executor.getQueue().size());                 }                 if (executorChilds != null && executorChilds.length > 0 && executorChilds[0] != null) {                     futureList = executorChilds[0].invokeAll(taskList);                 } else {                     futureList = this.executor.invokeAll(taskList, (long)this.theadPoolProperties.getTimeOut(), TimeUnit.SECONDS);                 }             } catch (InterruptedException var6) {                 var6.printStackTrace();             }             this.doFutureList(resultList, futureList);             return resultList;         } else {             return null;         }     }     <V> void doFutureList(List<V> resultList, List<Future<V>> futureList) {         if (futureList != null) {             Iterator var3 = futureList.iterator();             while(var3.hasNext()) {                 Future future = (Future)var3.next();                 try {                     resultList.add(future.get());                 } catch (ExecutionException | InterruptedException var6) {                     var6.printStackTrace();                 }             }         }     }     public <V> void doVoidConcurrentTask(List<Callable<V>> taskList) {         if (taskList != null && !taskList.isEmpty()) {             Iterator var2 = taskList.iterator();             while(var2.hasNext()) {                 Callable<V> call = (Callable)var2.next();                 this.executor.submit(call);             }         }     }     public TheadPoolProperties getTheadPoolProperties() {         return this.theadPoolProperties;     }     public ThreadPoolExecutor getExecutor() {         return this.executor;     }     public ThreadPoolExecutor getExecutorChild() {         return this.executorChild;     }     public void setTheadPoolProperties(TheadPoolProperties theadPoolProperties) {         this.theadPoolProperties = theadPoolProperties;     }     public void setExecutor(ThreadPoolExecutor executor) {         this.executor = executor;     }     public void setExecutorChild(ThreadPoolExecutor executorChild) {         this.executorChild = executorChild;     }     public boolean equals(Object o) {         if (o == this) {             return true;         } else if (!(o instanceof ThreadPoolConfig)) {             return false;         } else {             ThreadPoolConfig other = (ThreadPoolConfig)o;             if (!other.canEqual(this)) {                 return false;             } else {                 label47: {                     Object this$theadPoolProperties = this.getTheadPoolProperties();                     Object other$theadPoolProperties = other.getTheadPoolProperties();                     if (this$theadPoolProperties == null) {                         if (other$theadPoolProperties == null) {                             break label47;                         }                     } else if (this$theadPoolProperties.equals(other$theadPoolProperties)) {                         break label47;                     }                     return false;                 }                 Object this$executor = this.getExecutor();                 Object other$executor = other.getExecutor();                 if (this$executor == null) {                     if (other$executor != null) {                         return false;                     }                 } else if (!this$executor.equals(other$executor)) {                     return false;                 }                 Object this$executorChild = this.getExecutorChild();                 Object other$executorChild = other.getExecutorChild();                 if (this$executorChild == null) {                     if (other$executorChild != null) {                         return false;                     }                 } else if (!this$executorChild.equals(other$executorChild)) {                     return false;                 }                 return true;             }         }     }     protected boolean canEqual(Object other) {         return other instanceof ThreadPoolConfig;     }     public int hashCode() {         int PRIME = true;         int result = 1;         Object $theadPoolProperties = this.getTheadPoolProperties();         int result = result * 59 + ($theadPoolProperties == null ? 43 : $theadPoolProperties.hashCode());         Object $executor = this.getExecutor();         result = result * 59 + ($executor == null ? 43 : $executor.hashCode());         Object $executorChild = this.getExecutorChild();         result = result * 59 + ($executorChild == null ? 43 : $executorChild.hashCode());         return result;     }     public String toString() {         return "ThreadPoolConfig(theadPoolProperties=" + this.getTheadPoolProperties() + ", executor=" + this.getExecutor() + ", executorChild=" + this.getExecutorChild() + ")";     } }

2.执行器工厂类

package com.visy.executor; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ExecutorFactory {     private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class);     private static final Map<String, ThreadPoolExecutor> threadPoolExecutorMap = new ConcurrentHashMap();     private static final int DEFAULT_QUEUE_SIZE = 1000;     private static final String DEFAULT_EXECUTOR_NAME = "default-executor";     private static final int MAX_THREAD_NUM = 100;     private static final int CORE_THREAD_NUM = 1;     private static volatile ExecutorFactory instance;     private ExecutorFactory() {     }     public static ExecutorFactory getInstance() {         if (instance == null) {             Class var0 = ExecutorFactory.class;             synchronized(ExecutorFactory.class) {                 if (instance == null) {                     instance = new ExecutorFactory();                 }             }         }         return instance;     }     public ThreadPoolExecutor getThreadPoolExecutorByName(String name) {         return (ThreadPoolExecutor)threadPoolExecutorMap.get(name);     }     public static Map<String, ThreadPoolExecutor> getThreadPoolExecutorMap() {         return threadPoolExecutorMap;     }     public ThreadPoolExecutor getThreadPoolExecutor(String threadPoolExecutorName, int queueSize, int coreThreadNum, int maxPoolSize) {         if (StringUtils.isBlank(threadPoolExecutorName)) {             throw new IllegalArgumentException("thread name empty");         } else {             if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) {                 Class var5 = ExecutorFactory.class;                 synchronized(ExecutorFactory.class) {                     if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) {                         ThreadPoolExecutor executor = (new ThreadPool(coreThreadNum, maxPoolSize, 30L, queueSize, threadPoolExecutorName)).getExecutor();                         threadPoolExecutorMap.put(threadPoolExecutorName, executor);                         logger.info("thread name: {} executor created", threadPoolExecutorName);                     }                 }             }             return (ThreadPoolExecutor)threadPoolExecutorMap.get(threadPoolExecutorName);         }     }     public <T extends Runnable> void submit(T t) {         ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor();         defaultExecutor.submit(t);     }     public <T extends Runnable> void submit(String poolName, T t) {         ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName);         if (executor == null) {             logger.error("thread name: {} executor not exist.", poolName);             throw new IllegalArgumentException("thread name:" + poolName + " executor not exist.");         } else {             executor.submit(t);         }     }     public <T extends Callable<Object>> Future<Object> submit(T t) {         ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor();         return defaultExecutor.submit(t);     }     public <T extends Callable<Object>> Future<Object> submit(String poolName, T t) {         ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName);         if (executor == null) {             logger.error("thread poolName: {} executor not exist.", poolName);             throw new IllegalArgumentException("thread poolName:" + poolName + " executor not exist.");         } else {             return executor.submit(t);         }     }     public ThreadPoolExecutor getThreadPoolExecutor() {         return this.getThreadPoolExecutor("default-executor", 1000, 1, 100);     } }

3.多线程配置类

package com.visy.threadpool; import javax.validation.constraints.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.validation.annotation.Validated; @Validated @Configuration @ConfigurationProperties(prefix = "visy.threadpool") public class TheadPoolProperties {     // 执行并行任务时,等待多久时间超时(单位:秒)     @NotNull     private Integer timeOut;     // 队列大小     @NotNull     private Integer queueSize;      // 核心线程数量     @NotNull     private Integer coreThreadNum;     // 线程池最大线程数量     @NotNull     private Integer maxPoolSize;     // 并行执行每组大小     private Integer groupSize = 20;     public TheadPoolProperties() {     }     public Integer getTimeOut() {         return this.timeOut;     }     public Integer getQueueSize() {         return this.queueSize;     }     public Integer getCoreThreadNum() {         return this.coreThreadNum;     }     public Integer getMaxPoolSize() {         return this.maxPoolSize;     }     public Integer getGroupSize() {         return this.groupSize;     }     public void setTimeOut(Integer timeOut) {         this.timeOut = timeOut;     }     public void setQueueSize(Integer queueSize) {         this.queueSize = queueSize;     }     public void setCoreThreadNum(Integer coreThreadNum) {         this.coreThreadNum = coreThreadNum;     }     public void setMaxPoolSize(Integer maxPoolSize) {         this.maxPoolSize = maxPoolSize;     }     public void setGroupSize(Integer groupSize) {         this.groupSize = groupSize;     }     public boolean equals(Object o) {         if (o == this) {             return true;         } else if (!(o instanceof TheadPoolProperties)) {             return false;         } else {             TheadPoolProperties other = (TheadPoolProperties)o;             if (!other.canEqual(this)) {                 return false;             } else {                 label71: {                     Object this$timeOut = this.getTimeOut();                     Object other$timeOut = other.getTimeOut();                     if (this$timeOut == null) {                         if (other$timeOut == null) {                             break label71;                         }                     } else if (this$timeOut.equals(other$timeOut)) {                         break label71;                     }                     return false;                 }                 Object this$queueSize = this.getQueueSize();                 Object other$queueSize = other.getQueueSize();                 if (this$queueSize == null) {                     if (other$queueSize != null) {                         return false;                     }                 } else if (!this$queueSize.equals(other$queueSize)) {                     return false;                 }                 label57: {                     Object this$coreThreadNum = this.getCoreThreadNum();                     Object other$coreThreadNum = other.getCoreThreadNum();                     if (this$coreThreadNum == null) {                         if (other$coreThreadNum == null) {                             break label57;                         }                     } else if (this$coreThreadNum.equals(other$coreThreadNum)) {                         break label57;                     }                     return false;                 }                 Object this$maxPoolSize = this.getMaxPoolSize();                 Object other$maxPoolSize = other.getMaxPoolSize();                 if (this$maxPoolSize == null) {                     if (other$maxPoolSize != null) {                         return false;                     }                 } else if (!this$maxPoolSize.equals(other$maxPoolSize)) {                     return false;                 }                 Object this$groupSize = this.getGroupSize();                 Object other$groupSize = other.getGroupSize();                 if (this$groupSize == null) {                     if (other$groupSize == null) {                         return true;                     }                 } else if (this$groupSize.equals(other$groupSize)) {                     return true;                 }                 return false;             }         }     }     protected boolean canEqual(Object other) {         return other instanceof TheadPoolProperties;     }     public int hashCode() {         int PRIME = true;         int result = 1;         Object $timeOut = this.getTimeOut();         int result = result * 59 + ($timeOut == null ? 43 : $timeOut.hashCode());         Object $queueSize = this.getQueueSize();         result = result * 59 + ($queueSize == null ? 43 : $queueSize.hashCode());         Object $coreThreadNum = this.getCoreThreadNum();         result = result * 59 + ($coreThreadNum == null ? 43 : $coreThreadNum.hashCode());         Object $maxPoolSize = this.getMaxPoolSize();         result = result * 59 + ($maxPoolSize == null ? 43 : $maxPoolSize.hashCode());         Object $groupSize = this.getGroupSize();         result = result * 59 + ($groupSize == null ? 43 : $groupSize.hashCode());         return result;     }     public String toString() {         return "TheadPoolProperties(timeOut=" + this.getTimeOut() + ", queueSize=" + this.getQueueSize() + ", coreThreadNum=" + this.getCoreThreadNum() + ", maxPoolSize=" + this.getMaxPoolSize() + ", groupSize=" + this.getGroupSize() + ")";     } }

4.列表拆分工具类

package com.visy.utils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.jar.Attributes; /**  * 列表或数组按指定大小分组,用于批量取一部分数据循环处理  *  */ public class ArraySplitUtil<T> {     /**      * 按指定大小对列表分组      * @param list      * @param splitSize      * @return      */     public List<List<T>> splistList(List<T> list, int splitSize) {         if (null == list || list.size() == 0) {             return null;         }         int listSize = list.size();         List<List<T>> newList = new ArrayList<>();         if (listSize < splitSize) {             newList.add(list);             return newList;         }         int addLength = splitSize;         int times = listSize / splitSize;         if (listSize % splitSize != 0) {             times += 1;         }         int start = 0;         int end = 0;         int last = times - 1;         for (int i = 0; i < times; i++) {             start = i * splitSize;             if (i < last) {                 end = start + addLength;             } else {                 end = listSize;             }             newList.add(list.subList(start, end));         }         return newList;     }     /**      * 按指定大小对数组分组      * @param array      * @param splitSize      * @return      */     public List<T[]> splistArray(T[] array, int splitSize) {         if (null == array) {             return null;         }         int listSize = array.length;         List<T[]> newList = new ArrayList<>();         if (listSize < splitSize) {             newList.add(array);             return newList;         }         int addLength = splitSize;         int times = listSize / splitSize;         if (listSize % splitSize != 0) {             times += 1;         }         int start = 0;         int end = 0;         int last = times - 1;         for (int i = 0; i < times; i++) {             start = i * splitSize;             if (i < last) {                 end = start + addLength;             } else {                 end = listSize;             }             newList.add(Arrays.copyOfRange(array, start, end));         }         return newList;     }     public static <E> ArraySplitUtil<E> build(){         return new ArraySplitUtil<>();     } }

5.多任务执行助手类

package com.visy.helper; import com.baomidou.mybatisplus.toolkit.CollectionUtils; import com.google.common.collect.Lists; import com.visy.utils.ArraySplitUtil; import com.visy.threadpool.ThreadPoolConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; /**  * 多任务助手  * @author visy.wang  * @date 2022/5/9 14:38  */ @Service public class MultiTaskHelper {     @Autowired     private ThreadPoolConfig threadPoolConfig;     private static final Map<String,ArraySplitUtil<?>> ArraySplitUtilCache = new ConcurrentHashMap<>();     public <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<I,O> handler){         return createAndRunListTask(list, null, handler);     }     public <I,O> List<List<O>> createAndRunListTaskV2(List<I> list, Function<List<I>, List<O>> handler){         return createAndRunListTask(list, handler, null);     }     public <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<I> handler){         createAndRunListTaskWithoutReturn(list, null, handler);     }     public <I> void createAndRunListTaskWithoutReturnV2(List<I> list, Consumer<List<I>> handler){         createAndRunListTaskWithoutReturn(list, handler, null);     }     /**      * 把列表按线程数分组      * @param list 列表      * @return 分组后的列表      */     @SuppressWarnings("unchecked")     private <T> List<List<T>> listSplit(List<T> list){         String key = list.get(0).getClass().getName();         int groupSize = threadPoolConfig.getTheadPoolProperties().getGroupSize();         ArraySplitUtil<T> arraySplitUtil = (ArraySplitUtil<T>)ArraySplitUtilCache.get(key);         if(Objects.isNull(arraySplitUtil)){             arraySplitUtil = ArraySplitUtil.build();             ArraySplitUtilCache.put(key, arraySplitUtil);         }         return arraySplitUtil.splistList(list, groupSize);     }     /**      * 创建并运行多任务      * @param list 输入数据列表      * @param handler1 处理器1 (优先级使用)      * @param handler2 处理器2      * @param <I> 输入数据类型      * @param <O> 输出数据类型      * @return 执行结果分组列表      */     private <I,O> List<List<O>> createAndRunListTask(List<I> list,  Function<List<I>, List<O>> handler1, Function<I,O> handler2){         List<List<I>> listGroup = listSplit(list);         //设定每个组的任务         List<Callable<List<O>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size());         listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> {             taskList.add(() -> {                 if(Objects.nonNull(handler1)){                     return handler1.apply(subList);                 }else if(Objects.nonNull(handler2)){                     return subList.stream().map(handler2).collect(Collectors.toList());                 }else{                     return null;                 }             });         });         return threadPoolConfig.doConcurrentTask(taskList);     }     /**      * 创建并运行多任务(无返回结果)      * @param list 输入数据列表      * @param handler1 处理器1 (优先级更高)      * @param handler2 处理器2      * @param <I> 输入数据类型      */     private <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<List<I>> handler1, Consumer<I> handler2){         List<List<I>> listGroup = listSplit(list);         //设定每个组的任务         List<Callable<List<?>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size());         listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> {             taskList.add(() -> {                 if(Objects.nonNull(handler1)){                     handler1.accept(subList);                 }else if(Objects.nonNull(handler2)){                     subList.forEach(handler2);                 }                 return null;             });         });         threadPoolConfig.doConcurrentTask(taskList);     } }

6.多任务助手使用:

@Autowired package com.zoom.fleet.schedule.service; import com.visy.helper.MultiTaskHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /**  * 多任务助手使用示例  * @author visy.wang  * @date 2022/5/13 14:11  */ @Service public class MultiTaskTest {     @Autowired     private MultiTaskHelper multiTaskHelper;     private void test(){         //待多任务执行的数据列表         List<String> idList = new ArrayList<>();         //1.有返回结果的执行方式一, 定义单个数据的处理逻辑,返回多任务执行结果和合集         List<List<Long>> resultList = multiTaskHelper.createAndRunListTask(idList, id->{             //每一项数据的业务代码             return Long.valueOf(id);         });         //2.有返回结果的执行方式二, 定义单个数线程的处理逻辑,返回多任务执行结果和合集         resultList = multiTaskHelper.createAndRunListTaskV2(idList, subIdList->{             //每一个线程下列表操作的业务代码             return subIdList.stream().map(id->{                 //每一项数据的业务代码                 return Long.valueOf(id);             }).collect(Collectors.toList());         });         //3.无返回结果的执行方式一, 定义单个数据的处理逻辑         multiTaskHelper.createAndRunListTaskWithoutReturn(idList, id->{             //每一项数据的业务代码...         });         //3.无返回结果的执行方式一, 定义单个数据的处理逻辑         multiTaskHelper.createAndRunListTaskWithoutReturnV2(idList, subIdList->{             subIdList.forEach(id->{                 //每一项数据的业务代码...             });             //继续操作subIdList...         });     } }

推荐阅读