这篇文章主要介绍“如何理解Java多线程CompletionService”,在日常操作中,相信很多人在如何理解Java多线程CompletionService问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何理解Java多线程CompletionService”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
CompletionService
用于提交一组Callable
任务,其take方法返回已完成的一个Callable
任务对应的Future
对象。
如果你向Executor
提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以将每个任务的Future
保存进一个集合,然后循环这个集合调用Future
的get()
取出数据。幸运的是CompletionService
帮你做了这件事情。CompletionService
整合了Executor
和BlockingQueue
的功能。你可以将Callable
任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future
。CompletionService
的take返回的future
是哪个先完成就先返回哪一个,而不是根据提交顺序。
首先看一下 构造方法:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
构造法方法主要初始化了一个阻塞队列,用来存储已完成的task
任务。
然后看一下 completionService.submit
方法:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
可以看到,callable
任务被包装成QueueingFuture
,而 QueueingFuture
是 FutureTask
的子类,所以最终执行了FutureTask
中的run()
方法。
来看一下该方法:
public void run() {
//判断执行状态,保证callable任务只被运行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//这里回调我们创建的callable对象中的call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//处理执行结果
set(result);
}
} finally {
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以看到在该 FutureTask
中执行run
方法,最终回调自定义的callable
中的call
方法,执行结束之后,
通过 set(result)
处理执行结果:
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
继续跟进finishCompletion()
方法,在该方法中找到 done()
方法:
protected void done() { completionQueue.add(task); }
可以看到该方法只做了一件事情,就是将执行结束的task
添加到了队列中,只要队列中有元素,我们调用take()
方法时就可以获得执行的结果。
到这里就已经清晰了,异步非阻塞获取执行结果的实现原理其实就是通过队列来实现的,FutureTask
将执行结果放到队列中,先进先出,线程执行结束的顺序就是获取结果的顺序。
CompletionService
实际上可以看做是Executor
和BlockingQueue
的结合体。CompletionService
在接收到要执行的任务时,通过类似BlockingQueue
的put和take获得任务执行的结果。CompletionService
的一个实现是ExecutorCompletionService
,ExecutorCompletionService
把具体的计算任务交给Executor
完成。
在实现上,ExecutorCompletionService
在构造函数中会创建一个BlockingQueue
(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue
的作用是保存Executor
执行的结果。当计算完成时,调用FutureTask
的done方法。当提交一个任务到ExecutorCompletionService
时,首先将任务包装成QueueingFuture
,它是FutureTask
的一个子类,然后改写FutureTask
的done方法,之后把Executor
执行的计算结果放入BlockingQueue
中。
QueueingFuture
的源码如下:
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
public class CompletionServiceTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
for (int i = 1; i <=10; i++) {
final int seq = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
}
threadPool.shutdown();
for (int i = 0; i < 10; i++) {
try {
System.out.println(
completionService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
到此,关于“如何理解Java多线程CompletionService”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。