亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

協程庫Libtask源碼分析之如何使用架構

發布時間:2021-10-18 17:49:02 來源:億速云 閱讀:114 作者:iii 欄目:開發技術

這篇文章主要講解了“協程庫Libtask源碼分析之如何使用架構”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“協程庫Libtask源碼分析之如何使用架構”吧!

libtask是google大佬Russ  Cox(Go的核心開發者)所寫,本文介紹libtask的基礎原理。我們從libtask的main函數開始,這個main函數就是我們在c語言中使用的c函數,libtask本身實現了main這個函數,用戶使用libtask時,要實現的是taskmain函數。taskmain和main的函數聲明是一樣的。下面我們看一下main函數。

int main(int argc, char **argv) {     struct sigaction sa, osa;     // 注冊SIGQUIT信號處理函數     memset(&sa, 0, sizeof sa);     sa.sa_handler = taskinfo;     sa.sa_flags = SA_RESTART;     sigaction(SIGQUIT, &sa, &osa);      // 保存命令行參數     argv0 = argv[0];     taskargc = argc;     taskargv = argv;      if(mainstacksize == 0)         mainstacksize = 256*1024;     // 創建第一個協程     taskcreate(taskmainstart, nil, mainstacksize);     // 開始調度     taskscheduler();     fprint(2, "taskscheduler returned in main!\n");     abort();     return 0; }

main函數主要的兩個邏輯是taskcreate和taskscheduler函數。我們先來看taskcreate。

int taskcreate(void (*fn)(void*), void *arg, uint stack) {     int id;     Task *t;      t = taskalloc(fn, arg, stack);     taskcount++;     id = t->id;     // 記錄位置     t->alltaskslot = nalltask;     // 保存到alltask中     alltask[nalltask++] = t;     // 修改狀態為就緒,可以被調度,并且加入到就緒隊列     taskready(t);     return id; }

taskcreate首先調用taskalloc分配一個表示協程的結構體Task。我們看看這個結構體的定義。

struct Task {     char    name[256];    // offset known to acid     char    state[256];     // 前后指針     Task    *next;     Task    *prev;     Task    *allnext;     Task    *allprev;     // 執行上下文     Context    context;     // 睡眠時間     uvlong    alarmtime;     uint    id;     // 棧信息     uchar    *stk;     uint    stksize;     //是否退出了     int    exiting;     // 在alltask的索引     int    alltaskslot;     // 是否是系統協程     int    system;     // 是否就緒狀態     int    ready;     // 入口函數     void    (*startfn)(void*);     // 入口參數     void    *startarg;     // 自定義數據     void    *udata; };

接著看看taskalloc的實現。

// 分配一個協程所需要的內存和初始化某些字段 static Task* taskalloc(void (*fn)(void*), void *arg, uint stack) {     Task *t;     sigset_t zero;     uint x, y;     ulong z;      /* allocate the task and stack together */     // 結構體本身的大小+棧大小     t = malloc(sizeof *t+stack);     memset(t, 0, sizeof *t);     // 棧的內存位置     t->stk = (uchar*)(t+1);     // 棧大小     t->stksize = stack;     // 協程id     t->id = ++taskidgen;     // 協程工作函數和參數     t->startfn = fn;     t->startarg = arg;      /* do a reasonable initialization */     memset(&t->context.uc, 0, sizeof t->context.uc);     sigemptyset(&zero);     // 初始化uc_sigmask字段為空,即不阻塞信號     sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);      /* must initialize with current context */     // 初始化uc字段     getcontext(&t->context.uc)      // 設置協程執行時的棧位置和大小     t->context.uc.uc_stack.ss_sp = t->stk+8;     t->context.uc.uc_stack.ss_size = t->stksize-64;     z = (ulong)t;     y = z;     z >>= 16;    /* hide undefined 32-bit shift from 32-bit compilers */     x = z>>16;     // 保存信息到uc字段     makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);      return t; }

taskalloc函數代碼看起來很多,但是邏輯不算復雜,就是申請Task結構體所需的內存和執行時棧的內存,然后初始化各個字段。這樣,一個協程就誕生了。接著執行taskready把協程加入就緒隊列。

// 修改協程的狀態為就緒并加入就緒隊列 void taskready(Task *t) {     t->ready = 1;     addtask(&taskrunqueue, t); }  // 把協程插入隊列中,如果之前在其他隊列,則會被移除 void addtask(Tasklist *l, Task *t) {     if(l->tail){         l->tail->next = t;         t->prev = l->tail;     }else{         l->head = t;         t->prev = nil;     }     l->tail = t;     t->next = nil; }

taskrunqueue記錄了所有就緒的協程。創建了協程并加入隊列后,協程還沒有開始執行,就像操作系統的進程和線程一樣,需要有一個調度器來調度執行。下面我們看看調度器的實現。

// 協程調度中心 static void taskscheduler(void) {     int i;     Task *t;     for(;;){         // 沒有用戶協程了,則退出         if(taskcount == 0)             exit(taskexitval);         // 從就緒隊列拿出一個協程         t = taskrunqueue.head;         if(t == nil){             fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount);             exit(1);         }         // 從就緒隊列刪除該協程         deltask(&taskrunqueue, t);         t->ready = 0;         // 保存正在執行的協程         taskrunning = t;         // 切換次數加一         tasknswitch++;         // 切換到t執行,并且保存當前上下文到taskschedcontext(即下面要執行的代碼)         contextswitch(&taskschedcontext, &t->context);         // 執行到這說明沒有協程在執行(t切換回來的),置空         taskrunning = nil;         // 剛才執行的協程t退出了         if(t->exiting){             // 不是系統協程,則個數減一             if(!t->system)                 taskcount--;             // 當前協程在alltask的索引             i = t->alltaskslot;             // 把最后一個協程換到當前協程的位置,因為他要退出了             alltask[i] = alltask[--nalltask];             // 更新被置換協程的索引             alltask[i]->alltaskslot = i;             // 釋放堆內存             free(t);         }     } }

調度器的代碼看起來很多,但是核心邏輯就三個 1 從就緒隊列中拿出一個協程t,并把t移出就緒隊列 2 通過contextswitch切換到協程t中執行 3  協程t切換回調度中心,如果t已經退出,則修改數據結構,然后回收他占據的內存。如果t沒退出,則繼續調度其他協程執行。至此,協程就開始跑起來了。并且也有了調度系統。這里的調度機制是比較簡單的,就是按著先進先出的方式就緒調度,并且是非搶占的。即沒有按時間片調度的概念,一個協程的執行時間由自己決定,放棄執行的權力也是自己控制的,當協程不想執行了可以調用taskyield讓出cpu。

// 協程主動讓出cpu int taskyield(void) {     int n;     // 當前切換協程的次數     n = tasknswitch;     // 插入就緒隊列,等待后續的調度     taskready(taskrunning);     taskstate("yield");     // 切換協程     taskswitch();     // 等于0說明當前只有自己一個協程,調度的時候tasknswitch加一,所以這里減一     return tasknswitch - n - 1; }  /*     切換協程,taskrunning是正在執行的協程,taskschedcontext是調度協程(主線程)的上下文,     切換到調度中心,并保持當前上下文到taskrunning->context */ void taskswitch(void) {     needstack(0);     contextswitch(&taskrunning->context, &taskschedcontext); }  // 真正切換協程的邏輯 static void contextswitch(Context *from, Context *to) {     if(swapcontext(&from->uc, &to->uc) < 0){         fprint(2, "swapcontext failed: %r\n");         assert(0);     } }

yield的邏輯也很簡單,因為協程在執行的時候,是不處于就緒隊列的,當協程準備讓出cpu時,協程首先把自己重新加入到就緒隊列,等待下次被調度執行。當然我們也可以直接調度contextswitch切換到其他協程。重點在于什么時候應該讓出cpu,又什么時候應該被調度執行。接下來會詳細講解。至此,我們已經有了支持協程所需要的底層基礎。我們看到這個實現的思路也不是很復雜,首先有一個隊列表示待執行的的協程,每一個協程對應一個Task結構體。然后調度中心不斷地按照先進先出的方式去調度協程的執行就可以。因為沒有搶占機制,所以調度中心是依賴協程本身去驅動的,協程需要主動讓出cpu,把上下文切換回調度中心,調度中心才能進行下一輪的調度。接下來我們看看,基于這些底層基礎,如果實現一個基于協程的服務器。下面我們通過一個例子進行講解。

void taskmain(int argc, char **argv) {     // 啟動一個tcp服務器     if((fd = netannounce(TCP, 0, atoi(argv[1]))) < 0){         // ...     }     // 改為非阻塞模式     fdnoblock(fd);     // accept成功后創建一個客戶端協程     while((cfd = netaccept(fd, remote, &rport)) >= 0){         taskcreate(proxytask, (void*)cfd, STACK);     } }

我們剛才講過taskmain是我們需要實現的函數,首先通過netannounce建立一個tcp服務器。接著把fd改成非阻塞的,這個非常重要,因為在后面調用accept的時候,如果是阻塞的文件描述符,那么就會引起進程掛起,而非阻塞模式下,操作系統會返回EAGAIN的錯誤碼,通過這個錯誤碼我們可以決定下一步做什么。我們看看netaccept的實現。

// 處理(摘下)連接 int netaccept(int fd, char *server, int *port) {     int cfd, one;     struct sockaddr_in sa;     uchar *ip;     socklen_t len;     // 注冊事件到epoll,等待事件觸發     fdwait(fd, 'r');     len = sizeof sa;     // 觸發后說明有連接了,則執行accept     if((cfd = accept(fd, (void*)&sa, &len)) < 0){         return -1;     }     // 和客戶端通信的fd也改成非阻塞模式     fdnoblock(cfd);     one = 1;     setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof one);     return cfd; }

netaccept就是通過調用accept逐個處理tcp連接,但是在accept之前,有一個非常重要的操作fdwait。

// 協程因為等待io需要切換 void fdwait(int fd, int rw) {         // 是否已經初始化epoll     if(!startedfdtask){         startedfdtask = 1;         epfd = epoll_create(1);         // 沒有初始化則創建一個協程,做io管理         taskcreate(fdtask, 0, 32768);     }     struct epoll_event ev = {0};     // 記錄事件對應的協程和感興趣的事件     ev.data.ptr = taskrunning;     switch(rw){     case 'r':         ev.events |= EPOLLIN | EPOLLPRI;         break;     case 'w':         ev.events |= EPOLLOUT;         break;     }      int r = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);     // 切換到其他協程,等待被喚醒     taskswitch();     // 喚醒后函數剛才注冊的事件     epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); }

fdwait首先把fd注冊到epoll中,然后把協程切換到下一個待執行的協程。這里有個細節,當協程X被調度執行的時候,他是脫離了就緒隊列的,而taskswitch函數只是實現了切換上下文到調度中心,調度中心會從就緒隊列從選擇下一個協程執行,那么這時候,脫離就緒隊列的協程X就處于孤島狀態,看起來再也無法給調度中心選中執行,這個問題的處理方式是,把協程、fd和感興趣的事件信息一起注冊到epoll中,當epoll監聽到某個fd的事件發生時,就會把對應的協程加入就緒隊列,這樣協程就可以被調度執行了。在fdwait函數一開始那里處理了epoll相關的邏輯。epoll的邏輯也是在一個協程中執行的,但是epoll所在協程和一般協程不一樣,類似于操作系統的內核線程一樣,epoll所在的協程成為系統協程,即不是用戶定義的,而是系統定義的。我們看一下實現

void fdtask(void *v) {     int i, ms;     Task *t;     uvlong now;     // 變成系統協程     tasksystem();     struct epoll_event events[1000];     for(;;){         /* let everyone else run */         // 大于0說明還有其他就緒協程可執行,則先讓給他們執行,否則往下執行         while(taskyield() > 0)             ;         /* we're the only one runnable - poll for i/o */         errno = 0;         // 沒有定時事件則一直阻塞         if((t=sleeping.head) == nil)             ms = -1;         else{             /* sleep at most 5s */             now = nsec();             if(now >= t->alarmtime)                 ms = 0;             else if(now+5*1000*1000*1000LL >= t->alarmtime)                 ms = (t->alarmtime - now)/1000000;             else                 ms = 5000;         }         int nevents;         // 等待事件發生,ms是等待的超時時間         if((nevents = epoll_wait(epfd, events, 1000, ms)) < 0){             if(errno == EINTR)                 continue;             fprint(2, "epoll: %s\n", strerror(errno));             taskexitall(0);         }          /* wake up the guys who deserve it */         // 事件觸發,把對應協程插入就緒隊列         for(i=0; i<nevents; i++){             taskready((Task *)events[i].data.ptr);         }          now = nsec();         // 處理超時事件         while((t=sleeping.head) && now >= t->alarmtime){             deltask(&sleeping, t);             if(!t->system && --sleepingcounted == 0)                 taskcount--;             taskready(t);         }     } }

我們看到epoll的處理邏輯和一般服務器的類似,通過epoll_wait阻塞,然后epoll_wait返回時,處理每一個發生的事件,而且libtask還支持超時事件。另外libtask中當還有其他就緒協程的時候,是不會進入epoll_wait的,它會把cpu讓給就緒的協程(通過taskyield函數),當就緒隊列只有epoll所在的協程時才會進入epoll的邏輯。至此,我們看到了libtask中如何把異步變成同步的。當用戶要調用一個可能會引起進程掛起的接口時,就可以調用libtask提供的一個相應的API,比如我們想讀一個文件,我們可以調用libtask的fdread。

int fdread(int fd, void *buf, int n) {     int m;     // 非阻塞讀,如果不滿足則再注冊到epoll,參考fdread1     while((m=read(fd, buf, n)) < 0 && errno == EAGAIN)         fdwait(fd, 'r');     return m; }

這樣就不需要擔心進程被掛起,同時也不需要處理epoll相關的邏輯(注冊事件,事件觸發時的處理等等)。異步轉同步,libtask的方式就是通過提供對應的API,先把用戶的fd注冊到epoll中,然后切換到其他協程,等epoll監聽到事件觸發時,就會把對應的協程插入就緒隊列,當該協程被調度中心選中執行時,就會繼續執行剩下的邏輯而不會引起進程掛起,因為這時候所等待的條件已經滿足。

感謝各位的閱讀,以上就是“協程庫Libtask源碼分析之如何使用架構”的內容了,經過本文的學習后,相信大家對協程庫Libtask源碼分析之如何使用架構這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

武山县| 楚雄市| 醴陵市| 民勤县| 阿巴嘎旗| 建湖县| 铜鼓县| 东乌珠穆沁旗| 宁德市| 桐乡市| 镇安县| 会昌县| 道真| 靖江市| 江油市| 和田市| 巩义市| 祁东县| 琼海市| 泾川县| 久治县| 湘西| 兴海县| 唐山市| 合作市| 新平| 沙雅县| 紫阳县| 藁城市| 霍城县| 平塘县| 德安县| 孟连| 镇巴县| 吴川市| 砀山县| 库伦旗| 油尖旺区| 盐边县| 新河县| 乐至县|