Java实现多线程大批量同步数据(分页)

Java实现多线程大批量同步数据(分页)

背景

最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据。

最后是打算把这两张表的数据同时存储到MongoDB中去,以提高查询效率。

一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!!

最后,改造了一番,2小时,就成功同步了300w+数据。

以下是主要逻辑。

线程的个数请根据你自己的服务器性能酌情设置。

思路

先通过count查出结果集的总条数,设置每个线程分页查询的条数,通过总条数和单次条数得到线程数量,通过改变limit的下标实现分批查询。

代码实现 package com.github.admin.controller.loans; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.github.admin.model.entity.CaseCheckCallRecord; import com.github.admin.model.entity.duyan.DuyanCallRecordDetail; import com.github.admin.model.entity.loans.CaseCallRemarkRecord; import com.github.admin.service.duyan.DuyanCallRecordDetailService; import com.github.admin.service.loans.CaseCallRemarkRecordService; import com.github.common.constant.MongodbConstant; import com.github.common.util.DingDingMsgSendUtils; import com.github.common.util.ListUtils; import com.github.common.util.Response; import com.github.common.util.concurrent.Executors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; /**  * 多线程同步历史数据  * @author songfayuan  * @date 2019-09-26 15:38  */ @Slf4j @RestController @RequestMapping("/demo") public class SynchronizeHistoricalDataController implements DisposableBean {     private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController");  //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。     @Value("${spring.profiles.active}")     private String profile;     @Autowired     private DuyanCallRecordDetailService duyanCallRecordDetailService;     @Autowired     private MongoTemplate mongoTemplate;     @Autowired     private CaseCallRemarkRecordService caseCallRemarkRecordService;     /**      * 多线程同步通话记录历史数据      * @param params      * @return      * @throws Exception      */     @GetMapping("/syncHistoryData")     public Response syncHistoryData(Map<String, Object> params) throws Exception {         executor.execute(new Runnable() {             @Override             public void run() {                 try {                     logicHandler(params);                 } catch (Exception e) {                     log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);                     DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);                 }             }         });         return Response.success("请求成功");     }     /**      * 处理数据逻辑      * @param params      * @throws Exception      */     private void logicHandler(Map<String, Object> params) throws Exception {         /******返回结果:多线程处理完的最终数据******/         List<DuyanCallRecordDetail> result = new ArrayList<>();         /******查询数据库总的数据条数******/         int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()                 .eq("is_delete", 0)                 .eq("platform_type", 1));         DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。"); //        int count = 2620266;         /******限制每次查询的条数******/         int num = 1000;         /******计算需要查询的次数******/         int times = count / num;         if (count % num != 0) {             times = times + 1;         }         /******每个线程开始查询的行数******/         int offset = 0;         /******添加任务******/         List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();         for (int i = 0; i < times; i++) {             Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);             tasks.add(qfe);             offset = offset + num;         }         /******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/         List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);         for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {             if (CollectionUtils.isNotEmpty(callableList)) { //                executor.execute(new Runnable() { //                    @Override //                    public void run() { //                        log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName()); // //                        log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName()); //                    } //                });                 try {                     List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);                     /******处理线程返回结果******/                     if (futures != null && futures.size() > 0) {                         for (Future<List<DuyanCallRecordDetail>> future : futures) {                             List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();                             if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){                                 executor.execute(new Runnable() {                                     @Override                                     public void run() {                                         /******异步存储******/                                         log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());                                         saveMongoDB(duyanCallRecordDetailList);                                         log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());                                     }                                 });                             }                             //result.addAll(future.get());                         }                     }                 } catch (Exception e) {                     log.warn("任务拆分执行异常,errMsg = {}", e);                     DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);                 }             }         }     }     /**      * 数据存储MongoDB      * @param duyanCallRecordDetailList      */     private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {         for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {             /******重复数据不同步MongoDB******/             org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();             query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));             List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);             if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {                 log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());                 continue;             }             /******关联填写的记录******/             CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()                     .eq("is_delete", 0)                     .eq("call_uuid", duyanCallRecordDetail.getCallUuid()));             CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();             BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);             //补充             caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());             if (caseCallRemarkRecord != null) {                 //补充                 caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());                         }             log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());             this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);         }     }     @Override     public void destroy() throws Exception {         executor.shutdown();     } } class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {     /******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/     private DuyanCallRecordDetailService myService;     /******查询条件 根据条件来定义该类的属性******/     private Map<String, Object> params;     /******分页index******/     private int offset;     /******数量******/     private int num;     public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {         this.myService = myService;         this.params = params;         this.offset = offset;         this.num = num;     }     @Override     public List<DuyanCallRecordDetail> call() throws Exception {         /******通过service查询得到对应结果******/         List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()                 .eq("is_delete", 0)                 .eq("platform_type", 1)                 .last("limit "+offset+", "+num));         return duyanCallRecordDetailList;     } }

ListUtils工具

package com.github.common.util; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import java.io.*; import java.util.ArrayList; import java.util.List; /**  * 描述:List工具类  * @author songfayuan  * 2018年7月22日下午2:23:22  */ @Slf4j public class ListUtils {     /**      * 描述:list集合深拷贝      * @param src      * @return      * @throws IOException      * @throws ClassNotFoundException      * @author songfayuan      * 2018年7月22日下午2:35:23      */     public static <T> List<T> deepCopy(List<T> src) {         try {             ByteArrayOutputStream byteout = new ByteArrayOutputStream();             ObjectOutputStream out = new ObjectOutputStream(byteout);             out.writeObject(src);             ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());             ObjectInputStream in = new ObjectInputStream(bytein);             @SuppressWarnings("unchecked")             List<T> dest = (List<T>) in.readObject();             return dest;         } catch (ClassNotFoundException e) {             e.printStackTrace();             return null;         } catch (IOException e) {             e.printStackTrace();             return null;         }     }     /**      * 描述:对象深拷贝      * @param src      * @return      * @throws IOException      * @throws ClassNotFoundException      * @author songfayuan      * 2018年12月14日      */     public static <T> T objDeepCopy(T src) {         try {             ByteArrayOutputStream byteout = new ByteArrayOutputStream();             ObjectOutputStream out = new ObjectOutputStream(byteout);             out.writeObject(src);             ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());             ObjectInputStream in = new ObjectInputStream(bytein);             @SuppressWarnings("unchecked")             T dest = (T) in.readObject();             return dest;         } catch (ClassNotFoundException e) {             log.error("errMsg = {}", e);             return null;         } catch (IOException e) {             log.error("errMsg = {}", e);             return null;         }     }     /**      * 将一个list均分成n个list,主要通过偏移量来实现的      * @author songfayuan      * 2018年12月14日      */     public static <T> List<List<T>> averageAssign(List<T> source, int n) {         List<List<T>> result = new ArrayList<List<T>>();         int remaider = source.size() % n;  //(先计算出余数)         int number = source.size() / n;  //然后是商         int offset = 0;//偏移量         for (int i = 0; i < n; i++) {             List<T> value = null;             if (remaider > 0) {                 value = source.subList(i * number + offset, (i + 1) * number + offset + 1);                 remaider--;                 offset++;             } else {                 value = source.subList(i * number + offset, (i + 1) * number + offset);             }             result.add(value);         }         return result;     }     /**      * List按指定长度分割      * @param list the list to return consecutive sublists of (需要分隔的list)      * @param size the desired size of each sublist (the last may be smaller) (分隔的长度)      * @author songfayuan      * @date 2019-07-07 21:37      */     public static <T> List<List<T>> partition(List<T> list, int size){         return  Lists.partition(list, size); // 使用guava     }     /**      * 测试      * @param args      */     public static void main(String[] args) {         List<Integer> bigList = new ArrayList<>();         for (int i = 0; i < 101; i++){             bigList.add(i);         }         log.info("bigList长度为:{}", bigList.size());         log.info("bigList为:{}", bigList);         List<List<Integer>> smallists = partition(bigList, 20);         log.info("smallists长度为:{}", smallists.size());         for (List<Integer> smallist : smallists) {             log.info("拆分结果:{},长度为:{}", smallist, smallist.size());         }     } }

推荐阅读