Nodejs中怎么实现多线程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
1 背景
需求中有以下场景
1 对称解密、非对称解密
2 压缩、解压
3 大量文件的增删改查
4 处理大量的字符串,解析协议
上面的场景都是非常耗时间的,解密、压缩、文件操作,nodejs使用了内置的线程池支持了异步。但是处理字符串和解析协议是单纯消耗cpu的操作。而且nodejs对解密的支持似乎不是很好。我使用了纯js的解密库,所以无法在nodejs主线程里处理。尤其rsa解密,非常耗时间。
所以这时候就要探索解决方案,nodejs提供了多线程的能力。所以自然就选择了这种方案。但是这只是初步的想法和方案。因为nodejs虽然提供了多线程能力,但是没有提供一个应用层的线程池。所以如果我们单纯地使用多线程,一个请求一个线程,这显然不现实。我们不得不实现自己的线程池。本文分享的内容是这个线程池的实现。
线程池的设计涉及到很多方面,对于纯cpu型的任务,线程数和cpu核数要相等才能达到最优的性能,否则过多的线程引起的上下文切换反而会导致性能下降。而对于io型的任务,更多的线程理论上是会更好,因为可以更早地给硬盘发出命令,磁盘会优化并持续地处理请求,想象一下,如果发出一个命令,硬盘处理一个,然后再发下一个命令,再处理一个,这样显然效率很低。当然,线程数也不是越多越好。线程过多会引起系统负载过高,过多上下文切换也会带来性能的下降。下面看一下线程池的实现方案。
2 设计思路
首先根据配置创建多个线程(分为预创建和懒创建),然后对用户暴露提交任务的接口,由调度中心负责接收任务,然后根据策略选择处理该任务的线程。子线程一直在轮询是否有任务需要处理。处理完通知调度中心。
下面看一下具体的实现
2.1 和用户通信的数据结构
class UserWork extends EventEmitter { constructor({ workId, threadId }) { super(); this.workId = workId; this.threadId = threadId; workPool[workId] = this; } }
用户提交任务的时候,调度中心返回一个UserWork对象。用户可以使用该对象和调度中心通信。
2.2 调度中心的实现
调度中心的实现大致分为以下几个逻辑。
2.2.1 初始化
constructor(options = {}) { this.options = options; // 线程池总任务数 this.totalWork = 0; // 子线程队列 this.workerQueue = []; // 核心线程数 this.coreThreads = ~~options.coreThreads || config.CORE_THREADS; // 线程池最大线程数,如果不支持动态扩容则最大线程数等于核心线程数 this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads; // 工作线程处理任务的模式 this.sync = options.sync !== false; // 超过任务队列长度时的处理策略 this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD; // 是否预创建子线程 this.preCreate = options.preCreate === true; this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME; this.pollIntervalTime = ~~options.pollIntervalTime || config.POLL_INTERVAL_TIME; this.maxWork = ~~options.maxWork || config.MAX_WORK; // 是否预创建线程池 this.preCreate && this.preCreateThreads(); }
从初始化代码中我们看到线程池大致支持的能力。
鸿蒙官方战略合作共建——HarmonyOS技术社区
核心线程数
最大线程数
过载时的处理策略,和过载的阈值
子线程空闲退出的时间和轮询任务的时间
是否预创建线程池
是否支持动态扩容
核心线程数是任务数没有达到阈值时的工作线程集合。是处理任务的主力军。任务数达到阈值后,如果支持动态扩容(可配置)则会创建新的线程去处理更多的任务。一旦负载变低,线程空闲时间达到阈值则会自动退出。如果扩容的线程数达到阈值,还有新的任务到来,则根据丢弃策略进行相关的处理。
2.2.2 创建线程
newThread() { let { sync } = this; const worker = new Worker(workerPath, {workerData: { sync, maxIdleTime: this.maxIdleTime, pollIntervalTime: this.pollIntervalTime, }}); const node = { worker, // 该线程处理的任务数量 queueLength: 0, }; this.workerQueue.push(node); const threadId = worker.threadId; worker.on('exit', (status) => { // 异常退出则补充线程,正常退出则不补充 if (status) { this.newThread(); } this.totalWork -= node.queueLength; this.workerQueue = this.workerQueue.filter((worker) => { return worker.threadId !== threadId; }); }); // 和子线程通信 worker.on('message', (result) => { const { work, event, } = result; const { data, error, workId } = work; // 通过workId拿到对应的userWorker const userWorker = workPool[workId]; delete workPool[workId]; // 任务数减一 node.queueLength--; this.totalWork--; switch(event) { case 'done': // 通知用户,任务完成 userWorker.emit('done', data); break; case 'error': // 通知用户,任务出错 if (EventEmitter.listenerCount(userWorker, 'error')) { userWorker.emit('error', error); } break; default: break; } }); worker.on('error', (...rest) => { console.log(...rest) }); return node; }
创建线程主要是调用nodejs提供的模块进行创建。然后监听子线程的退出和message、error事件。如果是异常退出则补充线程。调度中心维护了一个子线程的队列。记录了每个子线程(worker)的实例和任务数。
2.2.3 选择执行任务的线程
selectThead() { let min = Number.MAX_SAFE_INTEGER; let i = 0; let index = 0; // 找出任务数最少的线程,把任务交给他 for (; i < this.workerQueue.length; i++) { const { queueLength } = this.workerQueue[i]; if (queueLength < min) { index = i; min = queueLength; } } return this.workerQueue[index]; }
选择策略目前是选择任务数最少的,本来还支持随机和轮询方式,但是貌似没有什么场景和必要,就去掉了。
2.2.4 暴露提交任务的接口
submit(filename, options = {}) { return new Promise(async (resolve, reject) => { let thread; // 没有线程则创建一个 if (this.workerQueue.length) { thread = this.selectThead(); // 任务队列非空 if (thread.queueLength !== 0) { // 子线程个数还没有达到核心线程数,则新建线程处理 if (this.workerQueue.length < this.coreThreads) { thread = this.newThread(); } else if (this.totalWork + 1 > this.maxWork){ // 总任务数已达到阈值,还没有达到线程数阈值,则创建 if(this.workerQueue.length < this.maxThreads) { thread = this.newThread(); } else { // 处理溢出的任务 switch(this.discardPolicy) { case DISCARD_POLICY.ABORT: return reject(new Error('queue overflow')); case DISCARD_POLICY.CALLER_RUNS: const userWork = new UserWork({workId: this.generateWorkId(), threadId}); try { const asyncFunction = require(filename); if (!isAsyncFunction(asyncFunction)) { return reject(new Error('need export a async function')); } const result = await asyncFunction(options); resolve(userWork); setImmediate(() => { userWork.emit('done', result); }); } catch (error) { resolve(userWork); setImmediate(() => { userWork.emit('error', error); }); } return; case DISCARD_POLICY.DISCARD_OLDEST: thread.worker.postMessage({cmd: 'delete'}); break; case DISCARD_POLICY.DISCARD: return reject(new Error('discard')); case DISCARD_POLICY.NOT_DISCARD: break; default: break; } } } } } else { thread = this.newThread(); } // 生成一个任务id const workId = this.generateWorkId(); // 新建一个work,交给对应的子线程 const work = new Work({ workId, filename, options }); const userWork = new UserWork({workId, threadId: thread.worker.threadId}); thread.queueLength++; this.totalWork++; thread.worker.postMessage({cmd: 'add', work}); resolve(userWork); }) }
提交任务的函数比较复杂,提交一个任务的时候,调度中心会根据当前的负载情况和线程数,决定对一个任务做如何处理。如果可以处理,则把任务交给选中的子线程。最后给用户返回一个UserWorker对象。
2.3调度中心和子线程的通信数据结构
class Work { constructor({workId, filename, options}) { // 任务id this.workId = workId; // 文件名 this.filename = filename; // 处理结果,由用户代码返回 this.data = null; // 执行出错 this.error = null; // 执行时入参 this.options = options; } }
一个任务对应一个id,目前只支持文件的执行模式,后续会支持字符串。
2.4 子线程的实现
子线程的实现主要分为几个部分
2.4.1 监听调度中心分发的命令
parentPort.on('message', ({cmd, work}) => { switch(cmd) { case 'delete': return queue.shift(); case 'add': return queue.push(work); } });
2.4.2 轮询是否有任务需要处理
function poll() { const now = Date.now(); if (now - lastWorkTime > maxIdleTime && !queue.length) { process.exit(0); } setTimeout(async () => { // 处理任务 poll(); } }, pollIntervalTime); } // 轮询判断是否有任务 poll();
不断轮询是否有任务需要处理,如果没有并且空闲时间达到阈值则退出。
2.4.3 处理任务
处理任务模式分为同步和异步
while(queue.length) { const work = queue.shift(); try { const { filename, options } = work; const asyncFunction = require(filename); if (!isAsyncFunction(asyncFunction)) { return; } lastWorkTime = now; const result = await asyncFunction(options); work.data = result; parentPort.postMessage({event: 'done', work}); } catch (error) { work.error = error.toString(); parentPort.postMessage({event: 'error', work}); } }
用户需要导出一个async函数,使用这种方案主要是为了执行时可以给用户传入参数。并且实现同步。处理完后通知调度中心。下面是异步处理方式,子线程不需要同步等待用户的代码结果。
const arr = []; while(queue.length) { const work = queue.shift(); try { const { filename } = work; const asyncFunction = require(filename); if (!isAsyncFunction(asyncFunction)) { return; } arr.push({asyncFunction, work}); } catch (error) { work.error = error.toString(); parentPort.postMessage({event: 'error', work}); } } arr.map(async ({asyncFunction, work}) => { try { const { options } = work; lastWorkTime = now; const result = await asyncFunction(options); work.data = result; parentPort.postMessage({event: 'done', work}); } catch (e) { work.error = error.toString(); parentPort.postMessage({event: 'done', work}); } })
最后还有一些配置和定制化的功能。
module.exports = { // 最大的线程数 MAX_THREADS: 50, // 线程池最大任务数 MAX_WORK: Infinity, // 默认核心线程数 CORE_THREADS: 10, // 最大空闲时间 MAX_IDLE_TIME: 10 * 60 * 1000, // 子线程轮询时间 POLL_INTERVAL_TIME: 10, }; // 丢弃策略 const DISCARD_POLICY = { // 报错 ABORT: 1, // 在主线程里执行 CALLER_RUNS: 2, // 丢弃最老的的任务 DISCARD_OLDEST: 3, // 丢弃 DISCARD: 4, // 不丢弃 NOT_DISCARD: 5, };
支持多个类型的线程池
class AsyncThreadPool extends ThreadPool { constructor(options) { super({...options, sync: false}); } } class SyncThreadPool extends ThreadPool { constructor(options) { super({...options, sync: true}); } } // cpu型任务的线程池,线程数和cpu核数一样,不支持动态扩容 class CPUThreadPool extends ThreadPool { constructor(options) { super({...options, coreThreads: cores, expansion: false}); } } // 线程池只有一个线程,类似消息队列 class SingleThreadPool extends ThreadPool { constructor(options) { super({...options, coreThreads: 1, expansion: false }); } } // 线程数固定的线程池,不支持动态扩容线程 class FixedThreadPool extends ThreadPool { constructor(options) { super({ ...options, expansion: false }); } }
这就是线程池的实现,有很多细节还需要思考。下面是一个性能测试的例子。
3 测试
const { MAX } = require('./constants'); module.exports = async function() { let ret = 0; let i = 0; while(i++ < MAX) { ret++; Buffer.from(String(Math.random())).toString('base64'); } return ret; }
在服务器以单线程和多线程的方式执行以上代码,下面是MAX为10000和100000时,使用CPUThreadPool类型线程池的性能对比(具体代码参考https://github.com/theanarkh/nodejs-threadpool)。
10000
单线程 [ 358.35, 490.93, 705.23, 982.6, 1155.72 ]
多线程 [ 379.3, 230.35, 315.52, 429.4, 496.04 ]
100000
单线程 [ 2485.5, 4454.63, 6894.5, 9173.16, 11011.16 ]
多线程 [ 1791.75, 2787.15, 3275.08, 4093.39, 3674.91 ]
关于Nodejs中怎么实现多线程问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。