今天小编给大家分享一下怎么使用Java多线程实现第三方数据同步的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+。在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换;增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换。在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!!
2.1 递归方式
使用递归方式时,要求数据量少,否则会出现栈溢出或堆溢出!!!并且递归方式是单线程,所以会导致同步速度很慢!!!
/**
* 数据同步 - 递归方式
* 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。
* Data为自定义实体类,这里仅做示例!!!
*/
private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
List<Data> datas= getDataByPage(pageIndex,pageSize);
if (CollectionUtils.isNotEmpty(datas)) {
dataService.saveOrUpdateBatch(datas);
log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
if (datas.size() < pageSize) {
log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
return;
}
// 递归操作-直到数据同步完毕
fetchAndSaveDB(pageIndex + 1, pageSize);
} else {
log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
return;
}
}
/**
* 获取分页数据,Data为自定义实体类,这里仅做示例!!!
*/
private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {
//通过feign调用第三方接口获取数据
String data = dataFeignService.fetchAllData(pageSize, pageIndex);
JSONObject jsonObject = JSONObject.parseObject(data);
JSONArray datalist = jsonObject.getJSONArray("datalist");
List<Data> datas = datalist.toJavaList(Data.class);
return datas;
}
2.2 多线程方式
由于递归方式是单线程,考虑到数据的庞大,且易造成内存溢出,因此将递归更换成多线程方式,不仅避免了内存溢出的情况,且速度大大的提升!!!
public void synAllData() {
// 定义原子变量 - 页数
AtomicInteger pageIndex = new AtomicInteger(0);
// 创建线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
// 100万数据
int total = 1000000;//数据总量
int times = total / 1000;
if (total % 1000!= 0) {
times = times + 1;
}
LocalDateTime beginLocalDateTime = LocalDateTime.now();
log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
for (int index = 1; index <= times; index++) {
fixedThreadPool.submit(new Runnable() {
@Override
public void run() {
try {
multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);
} catch (Exception e) {
log.error("并发获取并保存数据异常:{}", e);
}
}
});
}
LocalDateTime endLocalDateTime = LocalDateTime.now();
log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟",
endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());
}
/**
* 数据同步 - 【多线程方式】
*
* @throws Exception
*/
private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1
if (CollectionUtils.isNotEmpty(datas)) {
log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
if (datas.size() < pageSize) {
log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
return;
}
} else {
log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
return;
}
}
增量数据需要写定时任务,可使用Scheduled注解,并需要将增量数据存放到目标库中且进行数据转换!
以上就是“怎么使用Java多线程实现第三方数据同步”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。