这篇文章主要介绍“java异步并发请求和请求合并举例分析”,在日常操作中,相信很多人在java异步并发请求和请求合并举例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java异步并发请求和请求合并举例分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
在做业务系统需求开发中,经常需要从其他服务获取数据,拼接数据,然后返回数据给前端使用;常见的服务调用就是通过http接口调用,而对于http,通常一个请求会分配一个线程执行,在同步调用接口的情况下,整个线程是一直被占用或者阻塞的;如果有大量的这种请求,整个系统的吞吐量就比较低,而在依赖的服务响应时间比较低的情况下,我们希望先让出cpu,让其他请求先执行,等依赖的服务请求返回结果时再继续往下执行,这时我们会考虑将请求异步化,或者将相同的请求合并,从而达到提高系统执行效率和吞吐量的目的。
目前常见的几种调用方式是同步调用,线程池+future,异步回调completableFuture;协程也是异步调用的解决方式,但java目前不支持协程;对于future方式,只能用get或者while(!isDone)轮询这种阻塞的方式直到线程执行完成,这也不是我们希望的异步执行方式,jdk8提供的completableFuture其实也不是异步的方式,只是对依赖多服务的Callback调用结果处理做结果编排,来弥补Callback的不足,从而实现异步链式调用的目的,这也是比较推荐的方式。
RpcService rpcService = new RpcService();
HttpService httpService = new HttpService();
// 假设rpc1耗时10ms
Map<String, String> rpcResult1=rpcService.getRpcResult();
// 假设rpc2耗时20ms
Integer rpcResult2 = httpService.getHttpResult();
// 则线程总耗时:30ms
ExecutorService executor = Executors.newFixedThreadPool(2);
RpcService rpcService = new RpcService();
HttpService httpService = new HttpService();
future1 = executor.submit(() -> rpcService.getRpcResult());
future2 = executor.submit(() -> httpService.getHttpResult());
//rpc1耗时10ms
Map<String, String> rpcResult1 = future1.get(300, TimeUnit.MILLISECONDS);
//rpc2耗时20ms
Integer rpcResult2 = future2.get(300, TimeUnit.MILLISECONDS);
//则线程总耗时20ms
/**
* 场景:两个接口并发异步调用,返回CompletableFuture,不阻塞主线程
* 两个服务也是异步非阻塞调用
**/
CompletableFuture future1 = service.getHttpData("http://www.vip.com/showGoods/50");
CompletableFuture future2 = service.getHttpData("http://www.vip.com/showGoods/50");
CompletableFuture future3 = future1.thenCombine(future2, (f1, f2) -> {
//处理业务....
return f1 + "," + f2;
}).exceptionally(e -> {
return "";
});
CompletableFuture使用ForkJoinPool执行线程,在ForkJoinPool类注册ForkJoinWorkerThread线程时可以看到,ForkJoinPool里面的线程都是daemon线程(垃圾回收线程就是一个典型的deamon线程),
/**
* Callback from ForkJoinWorkerThread constructor to establish and
* record its WorkQueue.
*
* @param wt the worker thread
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
#注册守护线程
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
当主线程执行完后,jvm就会退出,所以需要考虑主线程执行完成时间和fork出去的线程执行时间,也需要考虑线程池的大小,默认为当前cpu的核数-1,可以参考下其他系统的故障记录:CompletableFuture线程池问题。
当系统遇到瞬间产生大量请求时,可以考虑将相同的请求合并,最大化利用系统IO,提高系统的吞吐量。
设计时,可以将符合条件的url请求,先收集起来,直到满足以下条件之一时进行合并发送:
收集到的请求数超过预设的最大请求数。
距离上次请求发送时长超过预设的最大时长。
实现方案有自行使用阻塞队列方式:并发环境下的请求合并,也可以考虑Hystrix:Hystrix实现请求合并/请求缓存
目前公司网关组件janus也是通过合并auth请求的方式减少网络开销,提高cpu的利用率和系统吞吐量的。
nginx同样有合并请求模块nginx-http-concat用来减少请求io,参考:nginx 合并多个js/css请求为一个请求
赖泽坤@vipshop.com
到此,关于“java异步并发请求和请求合并举例分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/939952/blog/3095845