Nodejs中怎么实现一个线程池,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
nodejs虽然提供了线程的能力,但是很多时候,往往不能直接使用线程或者无限制地创建线程,比如我们有一个功能是cpu密集型的,如果一个请求就开一个线程,这很明显不是最好的实践,这时候,我们需要使用池化的技术,本文介绍在nodejs线程模块的基础上,如何设计和实现一个线程池库(https://github.com/theanarkh/nodejs-threadpool或npm i nodejs-threadpool )。下面是线程池的总体架构。
设计一个线程池,在真正写代码之前,有很多设计需要考虑,大概如下:
1任务队列的设计,一个队列,多个线程互斥访问,或者每个线程一个队列,不需要互斥访问。
2 线程退出的设计,可以由主线程检测空闲线程,然后使子线程退出。或者子线程退出,通知主线程。空闲不一定是没有任务就退出,可以设计空闲时间达到阈值后退出,因为创建线程是有时间开销的。
3 任务数的设计,每个线程可以有个任务数,还可以增加一个总任务数,即全部线程任务数加起来
4 选择线程的设计,选择任务数最少的线程。
5 线程类型的设计,可以区分核心线程和预备线程,任务少的时候,核心线程处理就行。任务多也创建预备线程帮忙处理。
6 线程池类型的设计,cpu密集型的,线程数等于核数,否则自定义线程数就行。
7 支持任务的取消和超时机制,防止一个任务时间过长或者死循环。
本文介绍的线程池具体设计思想如下(参考java):
1 主线程维护一个队列,子线程的任务由子线程负责分发,不需要互斥访问,子线程也不需要维护自己的队列。
2 线程退出的设计,主线程负责检查子线程空闲时间是否达到阈值,是则使子线程退出。
3 任务数的设计,主线程负责管理任务个数并应有相应的策略。
4 选择线程的设计,选择任务数最少的线程。
5 线程类型的设计,区分核心线程和预备线程,任务少的时候,核心线程处理就行。任务多也创建预备线程帮忙处理。
6 线程池类型的设计,cpu密集型的,线程数等于核数,否则自定义线程数就行。
7 支持任务的取消和超时机制,超时或者取消的时候,主线程判断任务是待执行还是正在执行,如果是待执行则从任务队列中删除,如果是正在执行则杀死对应的子线程。下面我们看一下具体的设计。
1 主线程和子线程通信的数据结构
// 任务类,一个任务对应一个id class Work { constructor({workId, filename, options}) { // 任务id this.workId = workId; // 任务逻辑,字符串或者js文件路径 this.filename = filename; // 任务返回的结果 this.data = null; // 任务返回的错误 this.error = null; // 执行任务时传入的参数,用户定义 this.options = options; } }
主线程给子线程分派一个任务的时候,就给子线程发送一个Work对象。在nodejs中线程间通信需要经过序列化和反序列化,所以通信的数据结构包括的信息不能过多。
2 子线程处理任务逻辑
const { parentPort } = require('worker_threads'); const vm = require('vm'); const { isFunction, isJSFile } = require('./utils'); // 监听主线程提交过来的任务 parentPort.on('message', async (work) => { try { const { filename, options } = work; let aFunction; if (isJSFile(filename)) { aFunction = require(filename); } else { aFunction = vm.runInThisContext(`(${filename})`); } if (!isFunction(aFunction)) { throw new Error('work type error: js file or string'); } work.data = await aFunction(options); parentPort.postMessage({event: 'done', work}); } catch (error) { work.error = error.toString(); parentPort.postMessage({event: 'error', work}); } }); process.on('uncaughtException', (...rest) => { console.error(...rest); }); process.on('unhandledRejection', (...rest) => { console.error(...rest); });
子线程的逻辑比较简单,就是监听主线程分派过来的任务,然后执行任务,执行完之后通知主线程。任务支持js文件和字符串代码的形式。需要返回一个Promise或者async函数。用于用于通知主线程任务已经完成。
3 线程池和业务的通信
// 提供给用户侧的接口 class UserWork extends EventEmitter { constructor({ workId }) { super(); // 任务id this.workId = workId; // 支持超时取消任务 this.timer = null; // 任务状态 this.state = WORK_STATE.PENDDING; } // 超时后取消任务 setTimeout(timeout) { this.timer = setTimeout(() => { this.timer && this.cancel() && this.emit('timeout'); }, ~~timeout); } // 取消之前设置的定时器 clearTimeout() { clearTimeout(this.timer); this.timer = null; } // 直接取消任务,如果执行完了就不能取消了,this.terminate是动态设置的 cancel() { if (this.state === WORK_STATE.END || this.state === WORK_STATE.CANCELED) { return false; } else { this.terminate(); return true; } } // 修改任务状态 setState(state) { this.state = state; } }
业务提交一个任务给线程池的时候,线程池会返回一个UserWork类,业务侧通过UserWork类和线程池通信。
4 管理子线程的数据结构
// 管理子线程的数据结构 class Thread { constructor({ worker }) { // nodejs的Worker对象,nodejs的worker_threads模块的Worker this.worker = worker; // 线程状态 this.state = THREAD_STATE.IDLE; // 上次工作的时间 this.lastWorkTime = Date.now(); } // 修改线程状态 setState(state) { this.state = state; } // 修改线程最后工作时间 setLastWorkTime(time) { this.lastWorkTime = time; } }
线程池中维护了多个子线程,Thread类用于管理子线程的信息。
5 线程池 线程池的实现是核心,我们分为几个部分讲。
5.1 支持的配置
constructor(options = {}) { this.options = options; // 子线程队列 this.workerQueue = []; // 核心线程数 this.coreThreads = ~~options.coreThreads || config.CORE_THREADS; // 线程池最大线程数,如果不支持动态扩容则最大线程数等于核心线程数 this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads; // 超过任务队列长度时的处理策略 this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD; // 是否预创建子线程 this.preCreate = options.preCreate === true; // 线程最大空闲时间,达到后自动退出 this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME; // 是否预创建线程池 this.preCreate && this.preCreateThreads(); // 保存线程池中任务对应的UserWork this.workPool = {}; // 线程池中当前可用的任务id,每次有新任务时自增1 this.workId = 0; // 线程池中的任务队列 this.queue = []; // 线程池总任务数 this.totalWork = 0; // 支持的最大任务数 this.maxWork = ~~options.maxWork || config.MAX_WORK; // 处理任务的超时时间,全局配置 this.timeout = ~~options.timeout; this.pollIdle(); }
上面的代码列出了线程池所支持的能力。
5.2 创建线程
newThread() { const worker = new Worker(workerPath); const thread = new Thread({worker}); this.workerQueue.push(thread); const threadId = worker.threadId; worker.on('exit', () => { // 找到该线程对应的数据结构,然后删除该线程的数据结构 const position = this.workerQueue.findIndex(({worker}) => { return worker.threadId === threadId; }); const exitedThread = this.workerQueue.splice(position, 1); // 退出时状态是BUSY说明还在处理任务(非正常退出) this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0; }); // 和子线程通信 worker.on('message', (result) => { const { work, event, } = result; const { data, error, workId } = work; // 通过workId拿到对应的userWork const userWork = this.workPool[workId]; // 不存在说明任务被取消了 if (!userWork) { return; } // 修改线程池数据结构 this.endWork(userWork); // 修改线程数据结构 thread.setLastWorkTime(Date.now()); // 还有任务则通知子线程处理,否则修改子线程状态为空闲 if (this.queue.length) { // 从任务队列拿到一个任务交给子线程 this.submitWorkToThread(thread, this.queue.shift()); } else { thread.setState(THREAD_STATE.IDLE); } switch(event) { case 'done': // 通知用户,任务完成 userWork.emit('done', data); break; case 'error': // 通知用户,任务出错 if (EventEmitter.listenerCount(userWork, 'error')) { userWork.emit('error', error); } break; default: break; } }); worker.on('error', (...rest) => { console.error(...rest); }); return thread; }
创建线程,并保持线程对应的数据结构、退出、通信管理、任务分派。子线程执行完任务后,会通知线程池,主线程通知用户。
5.3 选择线程
selectThead() { // 找出空闲的线程,把任务交给他 for (let i = 0; i < this.workerQueue.length; i++) { if (this.workerQueue[i].state === THREAD_STATE.IDLE) { return this.workerQueue[i]; } } // 没有空闲的则随机选择一个 return this.workerQueue[~~(Math.random() * this.workerQueue.length)]; }
当用户给线程池提交一个任务时,线程池会选择一个空闲的线程处理该任务。如果没有可用线程则任务插入待处理队列等待处理。
5.4 提交任务
// 给线程池提交一个任务 submit(filename, options = {}) { return new Promise(async (resolve, reject) => { let thread; // 没有线程则创建一个 if (this.workerQueue.length) { thread = this.selectThead(); // 该线程还有任务需要处理 if (thread.state === THREAD_STATE.BUSY) { // 子线程个数还没有达到核心线程数,则新建线程处理 if (this.workerQueue.length < this.coreThreads) { thread = this.newThread(); } else if (this.totalWork + 1 > this.maxWork){ // 总任务数已达到阈值,还没有达到线程数阈值,则创建 if(this.workerQueue.length < this.maxThreads) { thread = this.newThread(); } else { // 处理溢出的任务 switch(this.discardPolicy) { case DISCARD_POLICY.ABORT: return reject(new Error('queue overflow')); case DISCARD_POLICY.CALLER_RUN: const workId = this.generateWorkId(); const userWork = new UserWork({workId}); userWork.setState(WORK_STATE.RUNNING); userWork.terminate = () => { userWork.setState(WORK_STATE.CANCELED); }; this.timeout && userWork.setTimeout(this.timeout); resolve(userWork); try { let aFunction; if (isJSFile(filename)) { aFunction = require(filename); } else { aFunction = vm.runInThisContext(`(${filename})`); } if (!isFunction(aFunction)) { throw new Error('work type error: js file or string'); } const result = await aFunction(options); // 延迟通知,让用户有机会取消或者注册事件 setImmediate(() => { if (userWork.state !== WORK_STATE.CANCELED) { userWork.setState(WORK_STATE.END); userWork.emit('done', result); } }); } catch (error) { setImmediate(() => { if (userWork.state !== WORK_STATE.CANCELED) { userWork.setState(WORK_STATE.END); userWork.emit('error', error.toString()); } }); } return; case DISCARD_POLICY.OLDEST_DISCARD: const work = this.queue.shift(); // maxWork为1时,work会为空 if (work && this.workPool[work.workId]) { this.cancelWork(this.workPool[work.workId]); } else { return reject(new Error('no work can be discarded')); } break; case DISCARD_POLICY.DISCARD: return reject(new Error('discard')); case DISCARD_POLICY.NOT_DISCARD: break; default: break; } } } } } else { thread = this.newThread(); } // 生成一个任务id const workId = this.generateWorkId(); // 新建一个UserWork const userWork = new UserWork({workId}); this.timeout && userWork.setTimeout(this.timeout); // 新建一个work const work = new Work({ workId, filename, options }); // 修改线程池数据结构,把UserWork和Work关联起来 this.addWork(userWork); // 选中的线程正在处理任务,则先缓存到任务队列 if (thread.state === THREAD_STATE.BUSY) { this.queue.push(work); userWork.terminate = () => { this.cancelWork(userWork); this.queue = this.queue.filter((node) => { return node.workId !== work.workId; }); } } else { this.submitWorkToThread(thread, work); } resolve(userWork); }) } submitWorkToThread(thread, work) { const userWork = this.workPool[work.workId]; userWork.setState(WORK_STATE.RUNNING); // 否则交给线程处理,并修改状态和记录该线程当前处理的任务id thread.setState(THREAD_STATE.BUSY); thread.worker.postMessage(work); userWork.terminate = () => { this.cancelWork(userWork); thread.setState(THREAD_STATE.DEAD); thread.worker.terminate(); } } addWork(userWork) { userWork.setState(WORK_STATE.PENDDING); this.workPool[userWork.workId] = userWork; this.totalWork++; } endWork(userWork) { delete this.workPool[userWork.workId]; this.totalWork--; userWork.setState(WORK_STATE.END); userWork.clearTimeout(); } cancelWork(userWork) { delete this.workPool[userWork.workId]; this.totalWork--; userWork.setState(WORK_STATE.CANCELED); userWork.emit('cancel'); }
提交任务是线程池暴露给用户侧的接口,主要处理的逻辑包括,根据当前的策略判断是否需要新建线程、选择线程处理任务、排队任务等,如果任务数达到阈值,则根据丢弃策略处理该任务。
5.5 空闲处理
pollIdle() { setTimeout(() => { for (let i = 0; i < this.workerQueue.length; i++) { const node = this.workerQueue[i]; if (node.state === THREAD_STATE.IDLE && Date.now() - node.lastWorkTime > this.maxIdleTime) { node.worker.terminate(); } } this.pollIdle(); }, 1000); }
当子线程空闲时间达到阈值后,主线程会杀死子线程,避免浪费系统资源。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。