SpringBoot定时任务实现数据同步的方法

SpringBoot定时任务实现数据同步的方法

本文实例为大家分享了SpringBoot定时任务实现数据同步的具体代码,供大家参考,具体内容如下

前言

业务的需求是,通过中台调用api接口获得,设备数据,要求现实设备数据的同步。

方案一:通过轮询接口的方式执行 pullData() 方法实现数据同步

该方式的原理是先清空之前的所有数据,然后重新插入通过api调用获取的最新数据。该方法的优点,逻辑简单。缺点是,频繁删除、插入数据。再调用查询数据时候,某一时刻,数据全部删除,还没及时插入的时候。数据可能有异常。

方案二:通过轮询接口的方式执行 pullDataNew() 方法实现数据同步

该方式的原理是先查询数据库,已有数据,然后和通过api调用获取的最新数据进行比对,找出数据中增量、减量和变量,进行同步更新。该方法的优点,减少对数据库的频繁操作,提升性能。缺点:无发现明显缺点。

package com.hxtx.spacedata.task; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.google.api.client.util.Lists; import com.hxtx.spacedata.common.domain.ResponseDTO; import com.hxtx.spacedata.config.SpringContextUtil; import com.hxtx.spacedata.controller.file.FilesMinioController; import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity; import com.hxtx.spacedata.service.entityconfig.EntityPointService; import com.hxtx.spacedata.util.HttpProxyUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; /**  * 中台设备数据 定时任务执行  *  * @author Tarzan Liu  * @version 1.0.0  * @description  * @date 2020/12/07  */ @Component @Slf4j public class EntityPointTask {     @Autowired     private EntityPointService entityPointService;     @Value("${middleGround.server.host}")     private String host;     @Value("${middleGround.server.port}")     private String port;     private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class);     /**      * 设备定义点数据拉取      *      * @author tarzan Liu      * @date 2020/12/2      */     @Scheduled(cron = "0/30 * * * * ?") // 30秒校验一次     public void pullDataTaskByCorn() {         String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list");         JSONObject jsonObject = JSON.parseObject(result);         if (Objects.nonNull(jsonObject)) {             JSONArray array = jsonObject.getJSONArray("data");             if (array != null && array.size() != 0) {                 for (int i = 0; i < array.size(); i++) {                     JSONObject obj = array.getJSONObject(i);                     String systemId = obj.getString("id");                     pullDataNew(systemId);                 }             }         }     }     @Transactional(rollbackFor = Throwable.class)     public ResponseDTO<String> pullData(String code) {         List<EntityPointEntity> list = Lists.newArrayList();         String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);         JSONObject jsonObject = JSON.parseObject(result);         if (Objects.nonNull(jsonObject)) {             JSONArray array = jsonObject.getJSONArray("data");             if (array != null && array.size() != 0) {                 for (int i = 0; i < array.size(); i++) {                     JSONObject obj = array.getJSONObject(i);                     String pointId = obj.getString("pointId");                     String name = obj.getString("name");                     list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build());                 }                 List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue));                 if (CollectionUtils.isNotEmpty(existList)) {                     Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue));                     list.forEach(e -> {                         String value = existMap.get(e.getPointId());                         if (value != null) {                             e.setValue(value);                         }                     });                 }                 entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));                 entityPointService.saveBatch(list);             }         }         return ResponseDTO.succ();     }     @Transactional(rollbackFor = Throwable.class)     public ResponseDTO<String> pullDataNew(String code) {         String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);         JSONObject jsonObject = JSON.parseObject(result);         if (Objects.nonNull(jsonObject)) {             JSONArray data = jsonObject.getJSONArray("data");             List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class);             if (CollectionUtils.isNotEmpty(list)) {                 list.forEach(e -> e.setCode(code));                 List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));                 if (CollectionUtils.isNotEmpty(existList)) {                     //存在map                     Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));                     //传输map                     Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));                     //增量                     List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList());                     if (CollectionUtils.isNotEmpty(increment)) {                         entityPointService.saveBatch(increment);                     }                     //减量                     List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList());                     if (CollectionUtils.isNotEmpty(decrement)) {                         entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList()));                     }                     //变量                     List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList());                     if (CollectionUtils.isNotEmpty(variable)) {                         variable.forEach(e -> {                             e.setName(dataMap.get(e.getPointId()));                         });                         entityPointService.updateBatchById(variable);                     }                 } else {                     entityPointService.saveBatch(list);                 }             }         }         return ResponseDTO.succ();     } }

数据库对应实体类 

import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; @Builder @NoArgsConstructor @AllArgsConstructor @Data @TableName(value = "t_entity_point") public class EntityPointEntity implements Serializable {     private static final long serialVersionUID = 2181036545424452651L;     /**      * 定义点id      */     @TableId(value = "id", type = IdType.ASSIGN_ID)     private Long id;     /**      * 定义点id      */     private String pointId;     /**      * 名称      */     private String name;     /**      * 绘制数据      */     private String value;     /**      * 编码      */     private String code;     /**      * 创建时间      */     private Date createTime; }

HTTP请求代理工具类 

import lombok.extern.slf4j.Slf4j; import org.apache.http.Consts; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; import javax.net.ssl.SSLContext; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.URL; import java.net.URLConnection; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; import java.util.Map; /**  * HTTP请求代理类  *  * @author tarzan Liu  * @description 发送Get Post请求  */ @Slf4j public class HttpProxyUtil {     /**      * 使用URLConnection进行GET请求      *      * @param api_url      * @return      */     public static String sendGet(String api_url) {         return sendGet(api_url, "", "utf-8");     }     /**      * 使用URLConnection进行GET请求      *      * @param api_url      * @param param      * @return      */     public static String sendGet(String api_url, String param) {         return sendGet(api_url, param, "utf-8");     }     /**      * 使用URLConnection进行GET请求      *      * @param api_url 请求路径      * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空      * @param charset 字符集      * @return      */     public static String sendGet(String api_url, String param, String charset) {         StringBuffer buffer = new StringBuffer();         try {             // 判断有无参数,若是拼接好的url,就不必再拼接了             if (param != null && !"".equals(param)) {                 api_url = api_url + "?" + param;             }             log.info("请求的路径是:" + api_url);             URL realUrl = new URL(api_url);             // 打开联接             URLConnection conn = realUrl.openConnection();             // 设置通用的请求属性             conn.setRequestProperty("accept", "*/*");             conn.setRequestProperty("connection", "Keep-Alive");             conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");             conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)             conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)             conn.connect();    // 建立实际的联接             // 定义 BufferedReader输入流来读取URL的相应             try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {                 String line;                 while ((line = in.readLine()) != null) { //                    buffer.append("\n"+line);                     buffer.append(line);                 }             }         } catch (Exception e) {             log.error("发送GET请求出现异常! " + e.getMessage());             return null;         }         //  log.info("响应返回数据:" + buffer.toString());         return buffer.toString();     }     /**      * 使用URLConnection进行POST请求      *      * @param api_url 请求路径      * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空      * @return      */     public static String sendPost(String api_url, String param) {         return sendPost(api_url, param, "utf-8");     }     /**      * 使用URLConnection进行POST请求      *      * @param api_url 请求路径      * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空      * @param charset 字符集      * @return      */     public static String sendPost(String api_url, String param, String charset) {         StringBuffer buffer = new StringBuffer();         try {             log.info("请求的路径是:" + api_url + ",参数是:" + param);             URL realUrl = new URL(api_url);             // 打开联接             URLConnection conn = realUrl.openConnection();             // 设置通用的请求属性             conn.setRequestProperty("accept", "*/*");             conn.setRequestProperty("connection", "Keep-Alive");             conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");             conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)             conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)             // 发送POST请求必须设置如下两行             conn.setDoOutput(true);             conn.setDoInput(true);             // 获取URLConnection对象对应的输出流             try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {                 out.print(param); // 发送请求参数                 out.flush();// flush输出流的缓冲             }             // 定义 BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API服务器XML的中文不能被成功识别             try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {                 String line;                 while ((line = in.readLine()) != null) { //                    buffer.append("\n"+line);                     buffer.append(line);                 }             }         } catch (Exception e) {             log.error("发送POST请求出现异常! " + e.getMessage());             e.printStackTrace();         }         log.info("响应返回数据:" + buffer.toString());         return buffer.toString();     }     public static CloseableHttpClient createSSLClientDefault() throws Exception {         SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new AllTrustStrategy()).build();         SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext);         return HttpClients.custom().setSSLSocketFactory(sslSf).build();     }     // 加载证书     private static class AllTrustStrategy implements TrustStrategy {         public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {             return true;         }     }     /**      * 支持https请求      *      * @param url      * @param param      * @return      * @throws Exception      */     public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception {         CloseableHttpClient httpClient = createSSLClientDefault();         HttpPost httpPost = null;         CloseableHttpResponse response = null;         String result = "";         try {             // 发起HTTP的POST请求             httpPost = new HttpPost(url);             List<NameValuePair> paramList = new ArrayList<NameValuePair>();             for (String key : param.keySet()) {                 paramList.add(new BasicNameValuePair(key, param.get(key)));             }             log.info("http请求地址:" + url + ",参数:" + paramList.toString());             // UTF8+URL编码             httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));             httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build());             response = httpClient.execute(httpPost);             HttpEntity entity = response.getEntity();             int statusCode = response.getStatusLine().getStatusCode();             if (HttpStatus.SC_OK == statusCode) { // 如果响应码是200             }             result = EntityUtils.toString(entity);             log.info("状态码:" + statusCode + ",响应信息:" + result);         } finally {             if (response != null) {                 response.close();             }             if (httpPost != null) {                 httpPost.releaseConnection();             }             httpClient.close();         }         return result;     } }

推荐阅读