BlockingQueue
是 Java 并发编程中用于在生产者和消费者线程之间传递数据的一种阻塞队列。它可以用于实现线程池,以便在有限的线程资源下高效地处理任务。下面是一个简单的线程池实现,使用 BlockingQueue
作为任务队列:
import java.util.concurrent.*;
public class BlockingQueueThreadPool {
private final int corePoolSize;
private final BlockingQueue<Runnable> taskQueue;
private final ThreadPoolExecutor threadPoolExecutor;
public BlockingQueueThreadPool(int corePoolSize, int maxPoolSize, int queueCapacity) {
this.corePoolSize = corePoolSize;
this.taskQueue = new LinkedBlockingQueue<>(queueCapacity);
this.threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L,
TimeUnit.SECONDS,
taskQueue
);
}
public void submit(Runnable task) throws InterruptedException {
if (threadPoolExecutor.getCorePoolSize()< corePoolSize) {
synchronized (this) {
if (threadPoolExecutor.getCorePoolSize()< corePoolSize) {
threadPoolExecutor.setCorePoolSize(corePoolSize);
}
}
}
threadPoolExecutor.execute(task);
}
public void shutdown() {
threadPoolExecutor.shutdown();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueueThreadPool threadPool = new BlockingQueueThreadPool(2, 4, 10);
for (int i = 0; i < 20; i++) {
final int taskId = i;
threadPool.submit(() -> {
System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
}
在这个实现中,我们创建了一个名为 BlockingQueueThreadPool
的类,它包含一个核心线程池大小、一个最大线程池大小和一个任务队列容量。我们使用 ThreadPoolExecutor
来管理线程池,并将 LinkedBlockingQueue
作为任务队列。
submit()
方法用于向线程池提交任务。在提交任务之前,我们会检查当前核心线程池的大小是否小于预期的核心线程池大小。如果是,则将核心线程池大小设置为预期值。这样可以确保在任务提交时,线程池中始终有足够的线程来处理任务。
shutdown()
方法用于关闭线程池。在关闭线程池之前,所有已提交的任务都将被执行完毕。