动态

详情 返回 返回

一文説透IO多路複用select/poll/epoll - 动态 详情

概述

如果我們要開發一個高併發的TCP程序。常規的做法是:多進程或者多線程。即:使用其中一個線程或者進程去監聽有沒有客户端連接上來,一旦有新客户端連接,就新開一個線程(進程),將其扔到線程(或進程)中去處理具體的讀寫操作等業務邏輯,主線程(進程)繼續等待,監聽其他的客户端。
image.png
這樣操作往往存在很大的弊端。首先是浪費資源,要知道,單個進程的最大虛擬內存是4G,單個線程的虛擬內存也有將近8M,那麼,如果上萬個客户端連接上來,服務器將會承受不住。
其次是浪費時間,因為你必須一直等在accept那個地方,十分被動。
上述的網絡模型,其實説白了,就是一個線程一路IO,在單個線程裏只能處理一個IO。因此,也可稱之為單路IO。而一路IO,就是一個併發。有多少個併發,就必須要開啓多少個線程,因此,對資源的浪費是不言而喻的。
那麼,有沒有一種方式,可以在一個線程裏,處理多路IO呢?
我們回顧一下多線程模型 ,它最大的技術難點是acceptrecv函數都是阻塞的。只要沒有新連接上來,accept就阻塞住了,無法處理後續的業務邏輯;沒有數據過來,recv又阻塞住了 ,無法處理新的accept請求。因此,只要能夠搞定在同一個線程裏同時acceptrecv的問題,似乎所有問題就迎刃而解了。
有人説,這怎麼可能嘛?肯定要兩個線程的 。
還真有可能,而這所謂的可能 ,就是IO多路複用技術。

IO多路複用

所謂的IO多路複用,它的核心思想就是,把監聽新客户端連接、讀寫事件等的操作轉包出去,讓系統內核來做這件事情。即由內核來負責監聽有沒有連接建立、有沒有讀寫請求,作為服務端,只需要註冊相應的事件,當事件觸發時,由內核通知服務端程序去處理就行了。
這樣做的好處顯而易見:只需要在一個主線程裏,就可以完成所有的工作,既不會阻塞,也不會浪費太多資源。
説得通俗易懂一些,就是原來需要由主線程乾的活,現在都交給內核去幹了。我們不用阻塞在acceptrecv那裏,而是由內核告訴程序,有新客户端連接上來了 ,或者有數據發送過來了,我們再去調用acceptrecv就行了,其餘時間,我們可以處理其他的業務邏輯。
那麼有人問了,你不還是要調用acceptrecv嗎?為什麼現在就不會阻塞了呢 ?
這就要深入説一下listenaccept的關係了。
假如服務器是海底撈火鍋店的話,listen就是門口迎賓的小姐,當來了一個客人(客户端),就將其迎進店內。而accept則是店內的大堂經理 ,當沒人來的時候,就一直閒在那裏沒事做,listen將客人 迎進來之後,accept就會分配一個服務員(fd)專門服務於這個客人。
所以説,只要listen正常工作,就能源源不斷地將客人迎進飯店(客户端能正常連接上服務器),即使此時並沒有accept。那麼,有人肯定有疑問,總不能一直往裏迎吧,酒店也是有大小的,全部擠在大堂也裝不下那麼多人啊。還記得listen函數的第二個參數backlog嗎?它就表示在沒有accept之前,最多可以迎多少個客人進來。
因此,對於多線程模型來説,accept作為大堂經理,在沒客人來的時候,就眼巴巴地盯着門口 ,啥也不幹,當listen把人迎進來了,才開始幹活。只能説,摸魚,還是accpet會啊。
IO多路複用,則相當於請了一個秘書。accept作為大堂經理,肯定有很多其他事情可以忙,他就不用一直盯着門口,當listen把人迎進來之後,秘書就會把客人(們)帶到經理身邊,讓經理安排服務員(fd)。
只是這個秘書是內核提供的,因此不僅免費,而且勤快。免費的勞動力,何樂而不為呢?
它的流程圖大概是下面這樣子的:
image.png
我們通常所説的IO多路複用技術,在Linux環境下,主要有三種實現,分別為selectpollepoll,當然還有內核新增的io_uring。在darwin平台 ,則有kqueueWindows下則是iocp。從性能上來説,iocp要優於epoll,與io_uring不相上下。但selectpollepoll的演變是一個持續迭代的過程,雖説從效率以及使用普及率上來説,epoll堪稱經典,但並不是另外兩種實現就毫無用處,也是有其存在的意義的,尤其是select
本文不會花太多筆墨來介紹kqueue,筆者始終認為,拿MacOS作為服務器開發,要麼腦子瓦特了,要麼就是錢燒的。基本上除了自己寫寫demo外,極少能在生產環境真正用起來。而iocp自成一派,未來有暇,將專門開闢專題細説。io_uring作為較新的內核才引入的特性,本文也不宜大肆展開。
唯有selectpoll以及epoll,久經時間考驗,已被廣泛運用於各大知名網絡應用,並由此誕生出許多經典的網絡模型,實在是值得好好細説。

select

原型

select函數原型:

/* According to POSIX.1-2001, POSIX.1-2008 */
       #include <sys/select.h>

       /* According to earlier standards */
       #include <sys/time.h>
       #include <sys/types.h>
       #include <unistd.h>

       int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

參數説明:

  • nfds: 最大的文件描述符+1,代表監聽這一組描述符(為什麼要+1?因為除了當前最大描述符之外,還有可能有新的fd連接上來)
  • fd_set: 是一個位圖集合, 對於同一個文件描述符,可以監聽不同的事件
  • readfds:文件描述符“可讀”事件
  • writefds:文件描述符“可寫”事件
  • exceptfds:文件描述符“異常”事件,一般內核用的,實際編程很少使用
  • timeout:超時時間:0是立即返回,-1是一直阻塞,如果大於0,則達到設置值的微秒數即返回
  • 返回值: 所監聽的所有監聽集合中滿足條件的總數(滿足條件的讀、寫、異常事件的總數),出錯時返回-1,並設置errno。如果超時時間觸發,則返回0。

select的函數原型可知,它主要依賴於三個bitmap的集合,分別為可讀事件集合,可寫事件集合,以及異常事件集合。我們只需要將待監聽的fd加入到對應的集合中,當有對應事件觸發,我們再從集合中將其拿出來進行處理就行了。
那麼,怎麼將文件描述符加到監聽事件集合中呢?
內核為我們提供了四個操作宏:

void FD_CLR(int fd, fd_set *set);    //將fd從set中清除出去,位圖置為0
int  FD_ISSET(int fd, fd_set *set);   //判斷fd是否在集合中,返回值為1,説明滿足了條件
void FD_SET(int fd, fd_set *set);    //將fd設置到set中去,位圖置為1
void FD_ZERO(fd_set *set);    //將set集合清空為0值 

有了以上基礎,我們 就能大致梳理一下select處理的流程:

  1. 創建fd_set 位圖集合(3個集合,一個readfds,一個writefds,一個exceptfds
  2. FD_ZEROset清空
  3. 使用FD_SET將需要監聽的fd設置對應的事件
  4. select函數監聽事件,只要select函數返回了大於1的值,説明有事件觸發,這時候把set拿出來做判斷
  5. FD_ISSET判斷fd到底觸發了什麼事件

實現

其代碼 實現如下所示:

#include <stdio.h>
#include <stdlib.h>
#include  <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <ctype.h>

int main(int argc, char *argv[]){
    int i, n, maxi;

    int nready,  client[FD_SETSIZE];    // FD_SETSIZE 為內核定義的,大小為1024, client保存已經被監聽的文件描述符,避免每次都遍歷1024個fd
    int maxfd, listenfd, connfd,  sockfd;
    char  buf[BUFSIZ], str[INET_ADDRSTRLEN];  // INET_ADDRSTRLEN = 16

    struct sockaddr_in clie_addr, serv_addr;
    socklen_t clie_addr_len;
    fd_set rset, allset;   //allset為所有已經被監聽的fd集合,rset為select返回的有監聽事件的fd

    listenfd = socket(AF_INET, SOCK_STREAM, 0);   //創建服務端fd

    bzero(&serv_addr, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(8888);

    if (bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
        perror("bind");
    }
    listen(listenfd, 20);

    maxfd = listenfd;
    maxi = -1;
    for(i = 0; i < FD_SETSIZE; i++) {
        client[i] = -1;
    }

    FD_ZERO(&allset);
    FD_SET(listenfd, &allset);

    //----------------------------------------------------------
    //至此,初始化全部完成, 開始監聽

    while(1) {
        rset = allset;  //allset不能被select改掉了,所以要複製一份出來放到rset
        nready = select(maxfd+1, &rset, NULL, NULL,  NULL);
        if (nready < 0) {
            perror("select");
        }

        //listenfd有返回,説明有新連接建立了 
        if (FD_ISSET(listenfd, &rset)) {
            clie_addr_len = sizeof(clie_addr);
            connfd = accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len);
            printf("received form %s at port %d\n", inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)), ntohs(clie_addr.sin_port));

            //把新連接的client fd放到client數組中
            for (i = 0; i < FD_SETSIZE; i++) {
                if (client[i] == -1) {
                    client[i] = connfd;
                    break;
                }
            }

            //連接數超過了1024, select函數處理不了了,直接報錯返回
            if (i == FD_SETSIZE) {
                fputs("too many clients\n", stderr);
                exit(1);
            }

            FD_SET(connfd, &allset);    //把新的客户端fd加入到下次要監聽的列表中
            if (connfd > maxfd) {
                maxfd = connfd;      //主要給select第一個參數用的
            }

            if (i > maxi) {
                maxi = i;    //保證maxi存的總是client數組的最後一個下標元素
            }

            //如果nready = 0, 説明新連接都已經處理完了,且沒有已建立好的連接觸發讀事件
            if (--nready == 0) {
                continue;
            }
        }

        for (i = 0; i <= maxi; i++) {
            if ((sockfd = client[i]) < 0) {
                continue;
            }

            //sockfd 是存在client裏的fd,該函數觸發,説明有數據過來了
            if (FD_ISSET(sockfd, &rset)) {
                if ((n = read(sockfd, buf, sizeof(buf))) == 0) {
                    printf("socket[%d] closed\n", sockfd);
                    close(sockfd);
                    FD_CLR(sockfd, &allset);
                    client[i] = -1;
                } else if (n > 0) {
                    //正常接收到了數據
                    printf("accept data: %s\n", buf);
                }

                if (--nready == 0) {
                    break;
                }        
            }
        }
    }
    close(listenfd);
    return 0;
}

缺點

select作為IO多路複用的初始版本,只能説是能用而已,性能並不能高到哪兒去,使用的侷限性也比較大。主要體現在以下幾個方面:

  • 文件描述符上限:1024,同時監聽的最大文件描述符也為1024
  • select需要遍歷所有的文件描述符(1024個),所以通常需要自定義數據結構(數組),單獨存文件描述符,減少遍歷
  • 監聽集合和滿足條件的集合是同一個集合,導致判斷和下次監聽時需要對集合讀寫,也就是説,下次監聽時需要清零,那麼當前的集合結果就需要單獨保存。

優點

select也並不是一無是處,我個人是十分喜歡select這個函數的,主要得益於以下幾個方面:

  • 它至少提供了單線程同時處理多個IO的一種解決方案,在一些簡單的場景(比如併發小於 1024)的時候 ,還是很有用處的
  • select的實現比起pollepoll,要簡單明瞭許多,這也是我為什麼推薦在一些簡單場景優先使用select的原因
  • select是跨平台的,相比於pollepollUnix獨有,select明顯有更加廣闊的施展空間
  • 利用select的跨平台特性,可以實現很多有趣的功能。比如實現一個跨平台的sleep函數。

    • 我們知道,Linux下的原生sleep函數是依賴於sys/time.h的,這也就意味着它無法被Windows平台調用。
    • 因為select函數本身跨平台,而第五個參數恰好是一個超時時間,即:我們可以傳入一個超時時間,此時程序就會阻塞在select這裏,直到超時時間觸發,這也就間接地實現了sleep功能。
    • 代碼實現如下

      //傳入一個微秒時間
      void general_sleep(int t){
          struct timeval tv; 
          tv.tv_usec = t % 10e6;
          tv.tv_sec = t / 10e6;
          select(0, NULL, NULL, NULL, &tv);
      }
    • select實現的sleep函數至少有兩個好處:

      • 可以跨平台調用
      • 精度可以精確到微秒級,比起Linux原生的sleep函數,精度要高得多。

poll

原型

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
      int   fd;         /* file descriptor */
      short events;     /* requested events */
      short revents;    /* returned events */
};

參數説明:

fds: 數組的首地址
nfds: 最大監聽的文件描述符個數
timeout: 超時時間

鑑於select函數的一些 缺點和侷限性,poll的實現就做了一些升級。首先,它突破了1024文件描述符的限制,其次,它將事件封裝了一下 ,構成了pollfd的結構體,並將這個 結構體中註冊的事件直接與fd進行了綁定,這樣 就無需每次有事件觸發,就遍歷所有的fd了,我們只需要遍歷這個 結構體數組中的fd即可。

那麼 ,poll函數可以註冊哪些事件類型呢?

POLLIN 讀事件
POLLPRI 觸發異常條件
POLLOUT 寫事件
POLLRDHUP  關閉連接
POLLERR  發生了錯誤
POLLHUP  掛斷
POLLNVAL 無效請求,fd未打開
POLLRDNORM 等同於POLLIN
POLLRDBAND 可以讀取優先帶數據(在 Linux 上通常不使用)。
POLLWRNORM 等同於POLLOUT
POLLWRBAND 可以寫入優先級數據。

事件雖然比較多,但我們主要關心POLLINPOLLOUT就行了。

實現

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <errno.h>
#include <ctype.h>


#define OPEN_MAX 1024


int main(int argc, char *argv[]){
    int i, maxi, listenfd, connfd,  sockfd;
    int nready;  // 接受poll返回值,記錄滿足監聽事件的fd個數
    ssize_t n;  
    char  buf[BUFSIZ], str[INET_ADDRSTRLEN];  // INET_ADDRSTRLEN = 16
    struct pollfd client[OPEN_MAX];     //用來存放監聽文件描述符和事件的集合
    struct sockaddr_in cliaddr, servaddr;
    socklen_t clilen;
   
    listenfd = socket(AF_INET, SOCK_STREAM, 0);   //創建服務端fd


    int opt = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));  //設置端口複用


    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(8888);


    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind");
    }
    listen(listenfd, 128);


    //設置第一個要監聽的文件描述符,即服務端的文件描述符
    client[0].fd = listenfd;
    client[0].events = POLLIN;   //監聽讀事件



    for(i = 1; i < OPEN_MAX; i++) {   //注意從1開始,因為0已經被listenfd用了
        client[i].fd = -1;
    }


    maxi = 0;   //因為已經加進去一個了,所以從0開始就行


    //----------------------------------------------------------
    //至此,初始化全部完成, 開始監聽


    for(;;) {
        nready = poll(client,  maxi+1, -1);   // 阻塞監聽是否有客户端讀事件請求


        if (client[0].revents & POLLIN) {   // listenfd觸發了讀事件
            clilen = sizeof(cliaddr);
            connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);    //接受新客户端的連接請求
            printf("recieved from %s at port %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port));


            for (i = 0; i < OPEN_MAX; i++) {
                if (client[i].fd < 0) {
                    client[i].fd = connfd;  // 將新連接的文件描述符加到client數組中
                    break;
                }
            }


            if (i == OPEN_MAX) {
                perror("too many open connections");
            }


            client[i].events = POLLIN;


            if(i > maxi) {
                maxi = i;
            }


            if (--nready == 0) {
                continue;
            }
        }
        
        //前面的if沒滿足,説明有數據發送過來,開始遍歷client數組
        for (i = 1; i <= maxi; i++) {
            if ((sockfd = client[i].fd) < 0) {
                continue;
            }

            //讀事件滿足,用read去接受數據
            if (client[i].revents & POLLIN) {
                if ((n = read(sockfd, buf, sizeof(buf))) < 0) {
                    if (errno = ECONNRESET) {   // 收到RST標誌
                        printf("client[%d] aborted conection\n", client[i].fd);
                        close(sockfd);
                        client[i].fd = -1;  //poll中不監控該文件描述符,直接置-1即可,無需像select中那樣移除
                    } else {
                        perror("read error");
                    }
                } else if (n == 0) {    //客户端關閉連接
                    printf("client[%d] closed connection\n", client[i].fd);
                    close(sockfd);
                    client[i].fd = -1; 
                } else {
                    printf("recieved data: %s\n", buf);
                }
            }
            if (--nready <= 0) {
                break;
            }
            
        }
    }
    close(listenfd);
    return 0;
}

優點

poll函數相比於select函數來説,最大的優點就是突破了1024個文件描述符的限制,這使得百萬併發變得可能。

而且不同於selectpoll函數的監聽和返回是分開的,因此不用在每次操作之前都單獨備份一份了,簡化了代碼實現。因此,可以理解為select的升級增強版。

缺點

雖然poll不需要遍歷所有的文件描述符了,只需要遍歷加入數組中的描述符,範圍縮小了很多,但缺點仍然是需要遍歷。假設真有百萬併發的場景,當僅有兩三個事件觸發的時候,仍然要遍歷上百萬個文件描述符,只為了找到那觸發事件的兩三個fd,這樣看來 ,就有些得不償失了。而這個缺點,將在epoll中得以徹底解決。

poll作為 一個過度版本的實現 ,説實話地位有些尷尬:它既不具備select函數跨平台的優勢,又不具備epoll的高性能。因此使用面以及普及程度相對來説,反而是三者之中最差勁的一個。

若説它的唯一使用場景,大概也就是開發者既想突破1024文件描述符的限制,又不想把代碼寫得像epoll那樣複雜了。

epoll

原型

epoll可謂是當前IO多路複用的最終形態,它是poll的 增強版本。我們説poll函數,雖然突破了select函數1024文件描述符的限制,且把監聽事件和返回事件分開了,但是説到底還是要遍歷所有文件描述符,才能知道到底是哪個文件描述符觸發了事件,或者需要單獨定義一個數組。

epoll則可以返回一個觸發了事件的所有描述符的數組集合,在這個數組集合裏,所有的文件描述符都是需要處理的,就不需要我們再單獨定義數組了。

雖然epoll功能強大了,但是使用起來卻麻煩得多。不同於selectpoll使用一個函數監聽即可,epoll提供了三個函數。

epoll_create

首先,需要使用epoll_create創建一個句柄:

#include <sys/epoll.h>

int epoll_create(int size);

該函數返回一個文件描述符,這個文件描述符並不是 一個常規意義的文件描述符,而是一個平衡二叉樹(準確來説是紅黑樹)的根節點。size則是樹的大小,它代表你將監聽多少個文件描述符。epoll_create將按照傳入的大小,構造出一棵大小為size的紅黑樹。

注意:這個size只是建議值,實際內核並不一定侷限於size的大小,可以監聽比size更多的文件描述符。但是由於平衡二叉樹增加節點時可能需要自旋,如果size與實際監聽的文件描述符差別過大,則會增加內核開銷。

epoll_ctl

第二個函數是epoll_ctl, 這個函數主要用來操作epoll句柄,可以使用該函數往紅黑樹裏增加文件描述符,修改文件描述符,和刪除文件描述符。

可以看到,selectpoll使用的都是bitmap位圖,而epoll使用的是紅黑樹。

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl有四個參數,參數1就是epoll_create創建出來的句柄。

第二個參數op是操作標誌位,有三個值,分別如下:

  • EPOLL_CTL_ADD 向樹增加文件描述符
  • EPOLL_CTL_MOD 修改樹中的文件描述符
  • EPOLL_CTL_DEL 刪除樹中的文件描述符

第三個參數就是需要操作的文件描述符,這個沒啥説的。

重點看第四個參數,它是一個結構體。這個結構體原型如下:

           typedef union epoll_data {
               void        *ptr;
               int          fd;
               uint32_t     u32;
               uint64_t     u64;
           } epoll_data_t;


           struct epoll_event {
               uint32_t     events;      /* Epoll events */
               epoll_data_t data;        /* User data variable */
           };

第一個元素為uint32_t類型的events,這個和poll類似,是一個bit mask,主要使用到的標誌位有:

  • EPOLLIN 讀事件
  • EPOLLOUT 寫事件
  • EPOLLERR 異常事件

這個結構體還有第二個元素,是一個epoll_data_t類型的聯合體。我們先重點關注裏面的fd,它代表一個文件描述符,初始化的時候傳入需要監聽的文件描述符,當監聽返回時,此處會傳出一個有事件發生的文件描述符,因此,無需我們遍歷得到結果了。

epoll_wait

epoll_wait才是真正的監聽函數,它的原型如下:

int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);

第一個參數不用説了, 注意第二個參數,它雖然也是struct epoll_event *類型,但是和epoll_ctl中的含義不同,epoll_ctl代表傳入進去的是一個地址,epoll_wait則代表傳出的是一個數組。這個數組就是返回的所有觸發了事件的文件描述符集合。

第三個參數maxevents代表這個數組的大小。

timeout不用説了,它代表的是超時時間。不過要注意的是,0代表立即返回,-1代表永久阻塞,如果大於0,則代表毫秒數(注意selecttimeout是微秒)。

這個函數的返回值也是有意義的,它代表有多少個事件觸發,也就可以簡單理解為傳出參數events的大小。

監聽流程

大致梳理一下epoll的監聽流程:

  • 首先,要有一個服務端的listenfd
  • 然後,使用epoll_create創建一個句柄
  • 使用epoll_ctllistenfd加入到樹中,監聽EPOLLIN事件
  • 使用epoll_wait監聽
  • 如果EPOLLIN事件觸發,説明有客户端連接上來,將新客户端加入到events中,重新監聽
  • 如果再有EPOLLIN事件觸發:
  • 遍歷events,如果fdlistenfd,則説明又有新客户端連接上來,重複上面的步驟,將新客户端加入到events
  • 如果fd不為listenfd,這説明客户端有數據發過來,直接調用read函數讀取內容即可。

觸發

epoll有兩種觸發方式,分別為水平觸發邊沿觸發

  • 水平觸發

    所謂的水平觸發,就是隻要仍有數據處於就緒狀態,那麼可讀事件就會一直觸發。

    舉個例子,假設客户端一次性發來了4K數據 ,但是服務器recv函數定義的buffer大小僅為1024字節,那麼一次肯定是不能將所有數據都讀取完的,這時候就會繼續觸發可讀事件,直到所有數據都處理完成。

    epoll默認的觸發方式就是水平觸發。

  • 邊沿觸發

    邊沿觸發恰好相反,邊沿觸發是隻有數據發送過來的時候會觸發一次,即使數據沒有讀取完,也不會繼續觸發。必須client再次調用send函數觸發了可讀事件,才會繼續讀取。

    假設客户端 一次性發來4K數據,服務器recvbuffer大小為1024字節,那麼服務器在第一次收到1024字節之後就不會繼續,也不會有新的可讀事件觸發。只有當客户端再次發送數據的時候,服務器可讀事件觸發 ,才會繼續讀取第二個1024字節數據。

    注意:第二次可讀事件觸發時,它讀取的仍然是上次未讀完的數據 ,而不是客户端第二次發過來的新數據。也就是説:數據沒讀完雖然不會繼續觸發EPOLLIN,但不會丟失數據。

  • 觸發方式的設置:

    水平觸發和邊沿觸發在內核裏 使用兩個bit mask區分,分別為:

    EPOLLLT 水平 觸發
    EPOLLET 邊沿觸發

    我們只需要在註冊事件的時候將其與需要註冊的事件做一個位或運算即可:

    ev.events = EPOLLIN;    //LT
    ev.events = EPOLLIN | EPOLLET;   //ET

實現

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<ctype.h>

#define OPEN_MAX 1024

int main(int argc, char **argv){
    int i, listenfd, connfd, sockfd,epfd, res, n;
    ssize_t nready = 0;
    char buf[BUFSIZ] = {0};
    char str[INET_ADDRSTRLEN];
    socklen_t clilen;
    struct sockaddr_in cliaddr, servaddr;
    struct epoll_event event, events[OPEN_MAX];

    //開始創建服務端套接字
    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    int opt = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(8888);

    bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    listen(listenfd, 128);

    //開始初始化epoll
    epfd = epoll_create(OPEN_MAX);

    event.events = EPOLLIN;
    event.data.fd = listenfd;
    res = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event);
    if (res == -1) {
        perror("server epoll_ctl error");
        exit(res);
    }
    
    for(;;) {
        //開始監聽
        nready = epoll_wait(epfd, events, OPEN_MAX, -1);
        if (nready == -1) {
            perror("epoll_wait error");
            exit(nready);
        }

        for (i = 0; i < nready; i++) {
            if (!(events[i].events & EPOLLIN)) {
                continue;
            }
            if (events[i].data.fd == listenfd) {
                //有新客户端連接
                clilen = sizeof(cliaddr);
                connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);
                printf("received from %s at port %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
                    ntohs(cliaddr.sin_port));

                event.events = EPOLLIN;
                event.data.fd = connfd;
                if (epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &event) == -1) {
                    perror("client epoll_ctl error");
                    exit(-1);
                }
            } else {
                //有數據可以讀取
                sockfd = events[i].data.fd;
                n = read(sockfd, buf, sizeof(buf));
                if (n ==0) {
                    //讀到0,説明客户端關閉
                    epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
                    close(sockfd);
                    printf("client[%d] closed connection\n", sockfd);
                } else if (n < 0){
                    //出錯
                    epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
                    close(sockfd);
                    printf("client[%d] read error\n", sockfd);
                } else {
                    //讀到了數據
                    printf("received data: %s\n", buf);
                }
            }
        }
    }
    close(listenfd);
    close(epfd);
     
    return 0;
}

優點

epoll的優點顯而易見,它解決了poll需要遍歷所有註冊的fd的問題,只需要關心觸發了事件的極少量fd即可,大大提升了效率。

而更有意思 的是epoll_data_t這個聯合體,它裏面有四個元素:

typedef union epoll_data {
      void        *ptr;
      int          fd;
      uint32_t     u32;
      uint64_t     u64;
 } epoll_data_t;

簡單開發時,我們可以將fd記錄在其中,但是我們注意到 這裏面還有一個void *類型的元素,那就提供了無限可能。它可以是一個struct,也可以是一個callback,也可以是struct嵌套callback,從而實現無線的擴展可能。大名鼎鼎的reactor反應堆模型就是通過這種方式完成的。

在下篇專題裏,筆者將帶大家走進reactor模型,領略epoll的神奇魅力。

缺點

什麼?epoll也有缺點?當然有,我認為epoll的最大缺點就是代碼實現起來變得複雜了,寫起來複雜,理解起來更復雜。

而且還有一個不能算缺點的缺點,對於筆者這樣一個長期開發跨平台應用程序的開發者來説,epoll雖好,但無法實現一套跨平台的接口封裝,卻過於雞肋了。


本專欄知識點是通過<零聲教育>的系統學習,進行梳理總結寫下文章,對C/C++課程感興趣的讀者,可以點擊鏈接,查看詳細的服務:C/C++Linux服務器開發/高級架構師

Add a new 评论

Some HTML is okay.