Java多线程实现第三方数据同步

Java多线程实现第三方数据同步

本文实例为大家分享了Java多线程实现第三方数据同步的具体代码,供大家参考,具体内容如下

一、场景

最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+。在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换;增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换。在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!!

二、获取数据的方式

2.1 递归方式

使用递归方式时,要求数据量少,否则会出现栈溢出或堆溢出!!!并且递归方式是单线程,所以会导致同步速度很慢!!!

/**      * 数据同步 - 递归方式      * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。      * Data为自定义实体类,这里仅做示例!!! */     private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {         log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);         List<Data> datas= getDataByPage(pageIndex,pageSize);         if (CollectionUtils.isNotEmpty(datas)) {             dataService.saveOrUpdateBatch(datas);             log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);             if (datas.size() < pageSize) {                 log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);                 return;             }             // 递归操作-直到数据同步完毕             fetchAndSaveDB(pageIndex + 1, pageSize);         } else {             log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);             return;         }     }     /**       * 获取分页数据,Data为自定义实体类,这里仅做示例!!!      */     private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {         //通过feign调用第三方接口获取数据         String data = dataFeignService.fetchAllData(pageSize, pageIndex);         JSONObject jsonObject = JSONObject.parseObject(data);         JSONArray datalist = jsonObject.getJSONArray("datalist");         List<Data> datas = datalist.toJavaList(Data.class);         return datas;     }

2.2 多线程方式

由于递归方式是单线程,考虑到数据的庞大,且易造成内存溢出,因此将递归更换成多线程方式,不仅避免了内存溢出的情况,且速度大大的提升!!!

public void synAllData() {          // 定义原子变量 - 页数         AtomicInteger pageIndex = new AtomicInteger(0);          // 创建线程池          ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);         // 100万数据         int total = 1000000;//数据总量         int times = total / 1000;         if (total % 1000!= 0) {             times = times + 1;         }         LocalDateTime beginLocalDateTime = LocalDateTime.now();         log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));         for (int index = 1; index <= times; index++) {             fixedThreadPool.submit(new Runnable() {                 @Override                 public void run() {                     try {                         multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);                     } catch (Exception e) {                         log.error("并发获取并保存数据异常:{}", e);                     }                 }             });         }         LocalDateTime endLocalDateTime = LocalDateTime.now();         log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟",                 endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),                 Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());     }     /**      * 数据同步 - 【多线程方式】      *      * @throws Exception      */     private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {         log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);         List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1         if (CollectionUtils.isNotEmpty(datas)) {             log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);             if (datas.size() < pageSize) {                 log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);                 return;             }         } else {             log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);             return;         }     } 三、增量数据如何对接

增量数据需要写定时任务,可使用Scheduled注解,并需要将增量数据存放到目标库中且进行数据转换!!!此处就不再提供代码,可以参考上面的存量数据的方式编写!!!

推荐阅读