温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

协程库Libtask源码分析之如何使用架构

发布时间:2021-10-18 17:49:02 来源:亿速云 阅读:118 作者:iii 栏目:开发技术

这篇文章主要讲解了“协程库Libtask源码分析之如何使用架构”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“协程库Libtask源码分析之如何使用架构”吧!

libtask是google大佬Russ  Cox(Go的核心开发者)所写,本文介绍libtask的基础原理。我们从libtask的main函数开始,这个main函数就是我们在c语言中使用的c函数,libtask本身实现了main这个函数,用户使用libtask时,要实现的是taskmain函数。taskmain和main的函数声明是一样的。下面我们看一下main函数。

int main(int argc, char **argv) {     struct sigaction sa, osa;     // 注册SIGQUIT信号处理函数     memset(&sa, 0, sizeof sa);     sa.sa_handler = taskinfo;     sa.sa_flags = SA_RESTART;     sigaction(SIGQUIT, &sa, &osa);      // 保存命令行参数     argv0 = argv[0];     taskargc = argc;     taskargv = argv;      if(mainstacksize == 0)         mainstacksize = 256*1024;     // 创建第一个协程     taskcreate(taskmainstart, nil, mainstacksize);     // 开始调度     taskscheduler();     fprint(2, "taskscheduler returned in main!\n");     abort();     return 0; }

main函数主要的两个逻辑是taskcreate和taskscheduler函数。我们先来看taskcreate。

int taskcreate(void (*fn)(void*), void *arg, uint stack) {     int id;     Task *t;      t = taskalloc(fn, arg, stack);     taskcount++;     id = t->id;     // 记录位置     t->alltaskslot = nalltask;     // 保存到alltask中     alltask[nalltask++] = t;     // 修改状态为就绪,可以被调度,并且加入到就绪队列     taskready(t);     return id; }

taskcreate首先调用taskalloc分配一个表示协程的结构体Task。我们看看这个结构体的定义。

struct Task {     char    name[256];    // offset known to acid     char    state[256];     // 前后指针     Task    *next;     Task    *prev;     Task    *allnext;     Task    *allprev;     // 执行上下文     Context    context;     // 睡眠时间     uvlong    alarmtime;     uint    id;     // 栈信息     uchar    *stk;     uint    stksize;     //是否退出了     int    exiting;     // 在alltask的索引     int    alltaskslot;     // 是否是系统协程     int    system;     // 是否就绪状态     int    ready;     // 入口函数     void    (*startfn)(void*);     // 入口参数     void    *startarg;     // 自定义数据     void    *udata; };

接着看看taskalloc的实现。

// 分配一个协程所需要的内存和初始化某些字段 static Task* taskalloc(void (*fn)(void*), void *arg, uint stack) {     Task *t;     sigset_t zero;     uint x, y;     ulong z;      /* allocate the task and stack together */     // 结构体本身的大小+栈大小     t = malloc(sizeof *t+stack);     memset(t, 0, sizeof *t);     // 栈的内存位置     t->stk = (uchar*)(t+1);     // 栈大小     t->stksize = stack;     // 协程id     t->id = ++taskidgen;     // 协程工作函数和参数     t->startfn = fn;     t->startarg = arg;      /* do a reasonable initialization */     memset(&t->context.uc, 0, sizeof t->context.uc);     sigemptyset(&zero);     // 初始化uc_sigmask字段为空,即不阻塞信号     sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);      /* must initialize with current context */     // 初始化uc字段     getcontext(&t->context.uc)      // 设置协程执行时的栈位置和大小     t->context.uc.uc_stack.ss_sp = t->stk+8;     t->context.uc.uc_stack.ss_size = t->stksize-64;     z = (ulong)t;     y = z;     z >>= 16;    /* hide undefined 32-bit shift from 32-bit compilers */     x = z>>16;     // 保存信息到uc字段     makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);      return t; }

taskalloc函数代码看起来很多,但是逻辑不算复杂,就是申请Task结构体所需的内存和执行时栈的内存,然后初始化各个字段。这样,一个协程就诞生了。接着执行taskready把协程加入就绪队列。

// 修改协程的状态为就绪并加入就绪队列 void taskready(Task *t) {     t->ready = 1;     addtask(&taskrunqueue, t); }  // 把协程插入队列中,如果之前在其他队列,则会被移除 void addtask(Tasklist *l, Task *t) {     if(l->tail){         l->tail->next = t;         t->prev = l->tail;     }else{         l->head = t;         t->prev = nil;     }     l->tail = t;     t->next = nil; }

taskrunqueue记录了所有就绪的协程。创建了协程并加入队列后,协程还没有开始执行,就像操作系统的进程和线程一样,需要有一个调度器来调度执行。下面我们看看调度器的实现。

// 协程调度中心 static void taskscheduler(void) {     int i;     Task *t;     for(;;){         // 没有用户协程了,则退出         if(taskcount == 0)             exit(taskexitval);         // 从就绪队列拿出一个协程         t = taskrunqueue.head;         if(t == nil){             fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount);             exit(1);         }         // 从就绪队列删除该协程         deltask(&taskrunqueue, t);         t->ready = 0;         // 保存正在执行的协程         taskrunning = t;         // 切换次数加一         tasknswitch++;         // 切换到t执行,并且保存当前上下文到taskschedcontext(即下面要执行的代码)         contextswitch(&taskschedcontext, &t->context);         // 执行到这说明没有协程在执行(t切换回来的),置空         taskrunning = nil;         // 刚才执行的协程t退出了         if(t->exiting){             // 不是系统协程,则个数减一             if(!t->system)                 taskcount--;             // 当前协程在alltask的索引             i = t->alltaskslot;             // 把最后一个协程换到当前协程的位置,因为他要退出了             alltask[i] = alltask[--nalltask];             // 更新被置换协程的索引             alltask[i]->alltaskslot = i;             // 释放堆内存             free(t);         }     } }

调度器的代码看起来很多,但是核心逻辑就三个 1 从就绪队列中拿出一个协程t,并把t移出就绪队列 2 通过contextswitch切换到协程t中执行 3  协程t切换回调度中心,如果t已经退出,则修改数据结构,然后回收他占据的内存。如果t没退出,则继续调度其他协程执行。至此,协程就开始跑起来了。并且也有了调度系统。这里的调度机制是比较简单的,就是按着先进先出的方式就绪调度,并且是非抢占的。即没有按时间片调度的概念,一个协程的执行时间由自己决定,放弃执行的权力也是自己控制的,当协程不想执行了可以调用taskyield让出cpu。

// 协程主动让出cpu int taskyield(void) {     int n;     // 当前切换协程的次数     n = tasknswitch;     // 插入就绪队列,等待后续的调度     taskready(taskrunning);     taskstate("yield");     // 切换协程     taskswitch();     // 等于0说明当前只有自己一个协程,调度的时候tasknswitch加一,所以这里减一     return tasknswitch - n - 1; }  /*     切换协程,taskrunning是正在执行的协程,taskschedcontext是调度协程(主线程)的上下文,     切换到调度中心,并保持当前上下文到taskrunning->context */ void taskswitch(void) {     needstack(0);     contextswitch(&taskrunning->context, &taskschedcontext); }  // 真正切换协程的逻辑 static void contextswitch(Context *from, Context *to) {     if(swapcontext(&from->uc, &to->uc) < 0){         fprint(2, "swapcontext failed: %r\n");         assert(0);     } }

yield的逻辑也很简单,因为协程在执行的时候,是不处于就绪队列的,当协程准备让出cpu时,协程首先把自己重新加入到就绪队列,等待下次被调度执行。当然我们也可以直接调度contextswitch切换到其他协程。重点在于什么时候应该让出cpu,又什么时候应该被调度执行。接下来会详细讲解。至此,我们已经有了支持协程所需要的底层基础。我们看到这个实现的思路也不是很复杂,首先有一个队列表示待执行的的协程,每一个协程对应一个Task结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。接下来我们看看,基于这些底层基础,如果实现一个基于协程的服务器。下面我们通过一个例子进行讲解。

void taskmain(int argc, char **argv) {     // 启动一个tcp服务器     if((fd = netannounce(TCP, 0, atoi(argv[1]))) < 0){         // ...     }     // 改为非阻塞模式     fdnoblock(fd);     // accept成功后创建一个客户端协程     while((cfd = netaccept(fd, remote, &rport)) >= 0){         taskcreate(proxytask, (void*)cfd, STACK);     } }

我们刚才讲过taskmain是我们需要实现的函数,首先通过netannounce建立一个tcp服务器。接着把fd改成非阻塞的,这个非常重要,因为在后面调用accept的时候,如果是阻塞的文件描述符,那么就会引起进程挂起,而非阻塞模式下,操作系统会返回EAGAIN的错误码,通过这个错误码我们可以决定下一步做什么。我们看看netaccept的实现。

// 处理(摘下)连接 int netaccept(int fd, char *server, int *port) {     int cfd, one;     struct sockaddr_in sa;     uchar *ip;     socklen_t len;     // 注册事件到epoll,等待事件触发     fdwait(fd, 'r');     len = sizeof sa;     // 触发后说明有连接了,则执行accept     if((cfd = accept(fd, (void*)&sa, &len)) < 0){         return -1;     }     // 和客户端通信的fd也改成非阻塞模式     fdnoblock(cfd);     one = 1;     setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof one);     return cfd; }

netaccept就是通过调用accept逐个处理tcp连接,但是在accept之前,有一个非常重要的操作fdwait。

// 协程因为等待io需要切换 void fdwait(int fd, int rw) {         // 是否已经初始化epoll     if(!startedfdtask){         startedfdtask = 1;         epfd = epoll_create(1);         // 没有初始化则创建一个协程,做io管理         taskcreate(fdtask, 0, 32768);     }     struct epoll_event ev = {0};     // 记录事件对应的协程和感兴趣的事件     ev.data.ptr = taskrunning;     switch(rw){     case 'r':         ev.events |= EPOLLIN | EPOLLPRI;         break;     case 'w':         ev.events |= EPOLLOUT;         break;     }      int r = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);     // 切换到其他协程,等待被唤醒     taskswitch();     // 唤醒后函数刚才注册的事件     epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); }

fdwait首先把fd注册到epoll中,然后把协程切换到下一个待执行的协程。这里有个细节,当协程X被调度执行的时候,他是脱离了就绪队列的,而taskswitch函数只是实现了切换上下文到调度中心,调度中心会从就绪队列从选择下一个协程执行,那么这时候,脱离就绪队列的协程X就处于孤岛状态,看起来再也无法给调度中心选中执行,这个问题的处理方式是,把协程、fd和感兴趣的事件信息一起注册到epoll中,当epoll监听到某个fd的事件发生时,就会把对应的协程加入就绪队列,这样协程就可以被调度执行了。在fdwait函数一开始那里处理了epoll相关的逻辑。epoll的逻辑也是在一个协程中执行的,但是epoll所在协程和一般协程不一样,类似于操作系统的内核线程一样,epoll所在的协程成为系统协程,即不是用户定义的,而是系统定义的。我们看一下实现

void fdtask(void *v) {     int i, ms;     Task *t;     uvlong now;     // 变成系统协程     tasksystem();     struct epoll_event events[1000];     for(;;){         /* let everyone else run */         // 大于0说明还有其他就绪协程可执行,则先让给他们执行,否则往下执行         while(taskyield() > 0)             ;         /* we're the only one runnable - poll for i/o */         errno = 0;         // 没有定时事件则一直阻塞         if((t=sleeping.head) == nil)             ms = -1;         else{             /* sleep at most 5s */             now = nsec();             if(now >= t->alarmtime)                 ms = 0;             else if(now+5*1000*1000*1000LL >= t->alarmtime)                 ms = (t->alarmtime - now)/1000000;             else                 ms = 5000;         }         int nevents;         // 等待事件发生,ms是等待的超时时间         if((nevents = epoll_wait(epfd, events, 1000, ms)) < 0){             if(errno == EINTR)                 continue;             fprint(2, "epoll: %s\n", strerror(errno));             taskexitall(0);         }          /* wake up the guys who deserve it */         // 事件触发,把对应协程插入就绪队列         for(i=0; i<nevents; i++){             taskready((Task *)events[i].data.ptr);         }          now = nsec();         // 处理超时事件         while((t=sleeping.head) && now >= t->alarmtime){             deltask(&sleeping, t);             if(!t->system && --sleepingcounted == 0)                 taskcount--;             taskready(t);         }     } }

我们看到epoll的处理逻辑和一般服务器的类似,通过epoll_wait阻塞,然后epoll_wait返回时,处理每一个发生的事件,而且libtask还支持超时事件。另外libtask中当还有其他就绪协程的时候,是不会进入epoll_wait的,它会把cpu让给就绪的协程(通过taskyield函数),当就绪队列只有epoll所在的协程时才会进入epoll的逻辑。至此,我们看到了libtask中如何把异步变成同步的。当用户要调用一个可能会引起进程挂起的接口时,就可以调用libtask提供的一个相应的API,比如我们想读一个文件,我们可以调用libtask的fdread。

int fdread(int fd, void *buf, int n) {     int m;     // 非阻塞读,如果不满足则再注册到epoll,参考fdread1     while((m=read(fd, buf, n)) < 0 && errno == EAGAIN)         fdwait(fd, 'r');     return m; }

这样就不需要担心进程被挂起,同时也不需要处理epoll相关的逻辑(注册事件,事件触发时的处理等等)。异步转同步,libtask的方式就是通过提供对应的API,先把用户的fd注册到epoll中,然后切换到其他协程,等epoll监听到事件触发时,就会把对应的协程插入就绪队列,当该协程被调度中心选中执行时,就会继续执行剩下的逻辑而不会引起进程挂起,因为这时候所等待的条件已经满足。

感谢各位的阅读,以上就是“协程库Libtask源码分析之如何使用架构”的内容了,经过本文的学习后,相信大家对协程库Libtask源码分析之如何使用架构这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI