这篇文章主要介绍“怎么使用SpringBoot定时任务实现数据同步”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“怎么使用SpringBoot定时任务实现数据同步”文章能帮助大家解决问题。
业务的需求是,通过中台调用api接口获得,设备数据,要求现实设备数据的同步。
方案一:通过轮询接口的方式执行 pullData() 方法实现数据同步
该方式的原理是先清空之前的所有数据,然后重新插入通过api调用获取的最新数据。该方法的优点,逻辑简单。缺点是,频繁删除、插入数据。再调用查询数据时候,某一时刻,数据全部删除,还没及时插入的时候。数据可能有异常。
方案二:通过轮询接口的方式执行 pullDataNew() 方法实现数据同步
该方式的原理是先查询数据库,已有数据,然后和通过api调用获取的最新数据进行比对,找出数据中增量、减量和变量,进行同步更新。该方法的优点,减少对数据库的频繁操作,提升性能。缺点:无发现明显缺点。
package com.hxtx.spacedata.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.api.client.util.Lists;
import com.hxtx.spacedata.common.domain.ResponseDTO;
import com.hxtx.spacedata.config.SpringContextUtil;
import com.hxtx.spacedata.controller.file.FilesMinioController;
import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity;
import com.hxtx.spacedata.service.entityconfig.EntityPointService;
import com.hxtx.spacedata.util.HttpProxyUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* 中台设备数据 定时任务执行
*
* @author Tarzan Liu
* @version 1.0.0
* @description
* @date 2020/12/07
*/
@Component
@Slf4j
public class EntityPointTask {
@Autowired
private EntityPointService entityPointService;
@Value("${middleGround.server.host}")
private String host;
@Value("${middleGround.server.port}")
private String port;
private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class);
/**
* 设备定义点数据拉取
*
* @author tarzan Liu
* @date 2020/12/2
*/
@Scheduled(cron = "0/30 * * * * ?") // 30秒校验一次
public void pullDataTaskByCorn() {
String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list");
JSONObject jsonObject = JSON.parseObject(result);
if (Objects.nonNull(jsonObject)) {
JSONArray array = jsonObject.getJSONArray("data");
if (array != null && array.size() != 0) {
for (int i = 0; i < array.size(); i++) {
JSONObject obj = array.getJSONObject(i);
String systemId = obj.getString("id");
pullDataNew(systemId);
}
}
}
}
@Transactional(rollbackFor = Throwable.class)
public ResponseDTO<String> pullData(String code) {
List<EntityPointEntity> list = Lists.newArrayList();
String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
JSONObject jsonObject = JSON.parseObject(result);
if (Objects.nonNull(jsonObject)) {
JSONArray array = jsonObject.getJSONArray("data");
if (array != null && array.size() != 0) {
for (int i = 0; i < array.size(); i++) {
JSONObject obj = array.getJSONObject(i);
String pointId = obj.getString("pointId");
String name = obj.getString("name");
list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build());
}
List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue));
if (CollectionUtils.isNotEmpty(existList)) {
Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue));
list.forEach(e -> {
String value = existMap.get(e.getPointId());
if (value != null) {
e.setValue(value);
}
});
}
entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
entityPointService.saveBatch(list);
}
}
return ResponseDTO.succ();
}
@Transactional(rollbackFor = Throwable.class)
public ResponseDTO<String> pullDataNew(String code) {
String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
JSONObject jsonObject = JSON.parseObject(result);
if (Objects.nonNull(jsonObject)) {
JSONArray data = jsonObject.getJSONArray("data");
List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class);
if (CollectionUtils.isNotEmpty(list)) {
list.forEach(e -> e.setCode(code));
List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
if (CollectionUtils.isNotEmpty(existList)) {
//存在map
Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
//传输map
Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
//增量
List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(increment)) {
entityPointService.saveBatch(increment);
}
//减量
List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(decrement)) {
entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList()));
}
//变量
List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(variable)) {
variable.forEach(e -> {
e.setName(dataMap.get(e.getPointId()));
});
entityPointService.updateBatchById(variable);
}
} else {
entityPointService.saveBatch(list);
}
}
}
return ResponseDTO.succ();
}
}
数据库对应实体类
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@TableName(value = "t_entity_point")
public class EntityPointEntity implements Serializable {
private static final long serialVersionUID = 2181036545424452651L;
/**
* 定义点id
*/
@TableId(value = "id", type = IdType.ASSIGN_ID)
private Long id;
/**
* 定义点id
*/
private String pointId;
/**
* 名称
*/
private String name;
/**
* 绘制数据
*/
private String value;
/**
* 编码
*/
private String code;
/**
* 创建时间
*/
private Date createTime;
}
HTTP请求代理工具类
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
import javax.net.ssl.SSLContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLConnection;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* HTTP请求代理类
*
* @author tarzan Liu
* @description 发送Get Post请求
*/
@Slf4j
public class HttpProxyUtil {
/**
* 使用URLConnection进行GET请求
*
* @param api_url
* @return
*/
public static String sendGet(String api_url) {
return sendGet(api_url, "", "utf-8");
}
/**
* 使用URLConnection进行GET请求
*
* @param api_url
* @param param
* @return
*/
public static String sendGet(String api_url, String param) {
return sendGet(api_url, param, "utf-8");
}
/**
* 使用URLConnection进行GET请求
*
* @param api_url 请求路径
* @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空
* @param charset 字符集
* @return
*/
public static String sendGet(String api_url, String param, String charset) {
StringBuffer buffer = new StringBuffer();
try {
// 判断有无参数,若是拼接好的url,就不必再拼接了
if (param != null && !"".equals(param)) {
api_url = api_url + "?" + param;
}
log.info("请求的路径是:" + api_url);
URL realUrl = new URL(api_url);
// 打开联接
URLConnection conn = realUrl.openConnection();
// 设置通用的请求属性
conn.setRequestProperty("accept", "*/*");
conn.setRequestProperty("connection", "Keep-Alive");
conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
conn.setConnectTimeout(12000); //设置连接主机超时(单位:毫秒)
conn.setReadTimeout(12000); // 设置从主机读取数据超时(单位:毫秒)
conn.connect(); // 建立实际的联接
// 定义 BufferedReader输入流来读取URL的相应
try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
String line;
while ((line = in.readLine()) != null) {
// buffer.append("\n"+line);
buffer.append(line);
}
}
} catch (Exception e) {
log.error("发送GET请求出现异常! " + e.getMessage());
return null;
}
// log.info("响应返回数据:" + buffer.toString());
return buffer.toString();
}
/**
* 使用URLConnection进行POST请求
*
* @param api_url 请求路径
* @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
* @return
*/
public static String sendPost(String api_url, String param) {
return sendPost(api_url, param, "utf-8");
}
/**
* 使用URLConnection进行POST请求
*
* @param api_url 请求路径
* @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
* @param charset 字符集
* @return
*/
public static String sendPost(String api_url, String param, String charset) {
StringBuffer buffer = new StringBuffer();
try {
log.info("请求的路径是:" + api_url + ",参数是:" + param);
URL realUrl = new URL(api_url);
// 打开联接
URLConnection conn = realUrl.openConnection();
// 设置通用的请求属性
conn.setRequestProperty("accept", "*/*");
conn.setRequestProperty("connection", "Keep-Alive");
conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
conn.setConnectTimeout(12000); //设置连接主机超时(单位:毫秒)
conn.setReadTimeout(12000); // 设置从主机读取数据超时(单位:毫秒)
// 发送POST请求必须设置如下两行
conn.setDoOutput(true);
conn.setDoInput(true);
// 获取URLConnection对象对应的输出流
try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {
out.print(param); // 发送请求参数
out.flush();// flush输出流的缓冲
}
// 定义 BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API服务器XML的中文不能被成功识别
try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
String line;
while ((line = in.readLine()) != null) {
// buffer.append("\n"+line);
buffer.append(line);
}
}
} catch (Exception e) {
log.error("发送POST请求出现异常! " + e.getMessage());
e.printStackTrace();
}
log.info("响应返回数据:" + buffer.toString());
return buffer.toString();
}
public static CloseableHttpClient createSSLClientDefault() throws Exception {
SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new AllTrustStrategy()).build();
SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext);
return HttpClients.custom().setSSLSocketFactory(sslSf).build();
}
// 加载证书
private static class AllTrustStrategy implements TrustStrategy {
public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
return true;
}
}
/**
* 支持https请求
*
* @param url
* @param param
* @return
* @throws Exception
*/
public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception {
CloseableHttpClient httpClient = createSSLClientDefault();
HttpPost httpPost = null;
CloseableHttpResponse response = null;
String result = "";
try {
// 发起HTTP的POST请求
httpPost = new HttpPost(url);
List<NameValuePair> paramList = new ArrayList<NameValuePair>();
for (String key : param.keySet()) {
paramList.add(new BasicNameValuePair(key, param.get(key)));
}
log.info("http请求地址:" + url + ",参数:" + paramList.toString());
// UTF8+URL编码
httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build());
response = httpClient.execute(httpPost);
HttpEntity entity = response.getEntity();
int statusCode = response.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK == statusCode) { // 如果响应码是200
}
result = EntityUtils.toString(entity);
log.info("状态码:" + statusCode + ",响应信息:" + result);
} finally {
if (response != null) {
response.close();
}
if (httpPost != null) {
httpPost.releaseConnection();
}
httpClient.close();
}
return result;
}
}
关于“怎么使用SpringBoot定时任务实现数据同步”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注亿速云行业资讯频道,小编每天都会为大家更新不同的知识点。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。