C语言网络编程:poll的核心用法

poll和select一样,都是Linux下I/O复用模型的一种实现。和select相比,poll主要在数据结构上有一些变化,通常情况下select最大描述符为1024,在linux内核中这个值被定义成了一个FD_SETSIZE宏(在CentOS发行版本的Linux下select.h所在路径是/usr/include/sys/select.h),如下:


#define FD_SETSIZE    __FD_SETSIZE

其中__FD_SETSIZE定义在内核中,一般默认是1024。很多文章可能会告诉你,重新定义一下FD_SETSIZE这个宏就可以突破这个限制了,到底对不对呢?我们看一下select初化位图的代码就知道了(还是在/usr/include/sys/select.h中),如下:


#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif

可以看到,初始化位图使用的是内核中的__FD_SETSIZE宏。所以,如果想突破1024的限制就必须修改内核的宏__FD_SETSIZE然后重新编译内核。这显然不是所有团队都有能力去做的,这也是select的一个缺点。

为了解决这个问题,poll出现了,poll和select的原理没什么区别,唯一的区别就是数据结构上发生了变化,使得poll可以轻松突破默认最大1024个描述符的限制。

poll函数签名如下:


int poll(struct pollfd *fdarr, unsigned long nfds, int timout);

从函数签名上看,poll比select要简单一些。第一个参数是要监听的I/O描述符事件集合,其结构如下:


struct pollfd {
  int fd;        // 描述符
  short events;  // 监听描述符发生的事件
  short revents; // 已经发生的事件
};

第二个参数nfds是要监听的套接字数量,这是poll和select最大的区别,前面我们说select最大描述符在内核中的_FD_SETSIZE宏写死了,一般是1024。而poll可以通过nfds参数来突破这个限制。

第三个参数是超时时间,一般有三种传值的方式,传-1表示在有可用描述符之前一直等待,传0表示不管有没有可用描述符都立即返回,传大于0的值表示超过对应毫秒即使没有事件发生也会立即返回。最后我们看一下,pollfd结构中的events和revents都有哪些事件,下面我列了一张表:

和select另一个不一样的地方,poll使用两个short型的整型来分别保存监听的事件,和已经发生的事件。这意味着,我们在调用poll函数之前不需要将我们监听的事件复制一份了。I/O设置监听事件使用events,判断是否有事件发生使用revents。下面我们用poll写一段代码实际看一下poll是怎么用的。


#include "stdio.h"
#include "sys/socket.h"
#include "poll.h"
#include "unistd.h"
#include "string.h"
#include "netinet/in.h"
#include "errno.h"
#include "fcntl.h"

int create_sock();

void make_nonblocking(int fd);

int main(int argc, char *argv[]) {
    int read_num, conn_fd, n;
    char buf[1024];
    struct sockaddr_in client_addr;

    int sock_fd = create_sock();

    make_nonblocking(sock_fd);

    struct pollfd fds[1024];
    fds[0].fd = sock_fd;
    fds[0].events = POLLRDNORM;

    int i;
    for (i = 1; i < 1024; i++) {
        fds[i].fd = -1;
    }

    for (;;) {
        if ((read_num = poll(fds, 1024, -1)) < 0) {
            perror ("poll failed.");
        }

        if (fds[0].revents & POLLRDNORM) {
            socklen_t cli_len = sizeof((client_addr));
            conn_fd = accept(sock_fd, (struct sockaddr *)&client_addr, &cli_len);

            printf("have a new connection: %d.\\n", conn_fd);

            for (i = 1; i < 1024; i++) {
                if (fds[i].fd < 0) {
                    fds[i].fd = conn_fd;
                    fds[i].events = POLLRDNORM;
                    break;
                }
            }

            if (i == 1024) {
                perror("too many clients.");
            }

            if (--read_num <= 0) {
                continue;
            }
        }

        for (i = 1; i < 1024; i++) {
            int client_fd;
            if ((client_fd = fds[i].fd) < 0) {
                continue;
            }

            if (fds[i].revents & (POLLRDNORM | POLLERR)) {
                if ((n = read(client_fd, buf, sizeof(buf))) > 0) {
                    if (write(client_fd, buf, n) < 0) {
                        perror("write failed.");
                    }
                } else if (n == 0 || errno == ECONNRESET) {
                    close(client_fd);
                    fds[i].fd = -1;
                } else {
                    perror("read failed.");
                }
            }

            if (--read_num <= 0) {
                break;
            }
        }
    }
}

我们通过上面的代码来梳理一下poll的使用。

首先,我们声明一个struct  pollfd的数组fds,长度为1024。

接着,将设置成非阻塞模式的服务端套接字放到fds的第一个位置,并且我们设置监听POLLRDNORM事件,这个事件前面表格中有,表示读事件。

紧接着,我们将剩下所有fds的fd都设置成了-1,pollfd的fd设置成-1表示忽略当前这个元素,poll在查找事件的时候就不会遍历这个元素了。

然后在一个循环里调用poll函数,第一个参数是前面我们创建的fds数组,第二个参数表示我们监听的描述符数量,最后一个参数传-1,表示如果没有可用描述符永远等待下去。

一旦有可用描述符,从poll函数返回,我们首先判断是否是当前服务端套接字。如果是,说明有客户端连接上来,我们调用accept可以拿到一个客户端的套接字。拿到之后,在fds中找一个fd等于-1的位置将它保存起来,并监听POLLRDNORM事件,到这里,客户端套接字的可读事件我们就通过poll就监听起来了,如果有数据可读,poll就会立马返回。

最后,我们处理的是客户端套接字,如果有POLLRDNORM或者POLLERR事件发生就收进来处理,只做了一件事,就是将客户端发送过来的数据原样再发回去。接着遍历下一个,直到将所有客户端套接字都处理一遍。

poll的原理、实现和select几乎是一样的,我们就不深入去分析它的实现了,有兴趣可以自己去找源码读一下。这里我们重点说一下select和poll不一样的地方。

第一,我们在声明pollfd结构数据的时候,是可以自行指定大小的。这个在select中是很难实现的,这也是poll相较select进步的地方。

第二,poll调用之后,不会修改pollfd数据,它是通过pollfd的events指定要监听的事件,再通过revents保存已发生的事件用于在poll返回时判断都有哪些事件发生了。

最后,poll和select都会遍历所有的描述符,很显然这在连接数非常多空洞非常大的时候是有性能问题的,而我们下一篇文章要讲的epoll就很好的解决了这个问题。

关于poll事件设置和判断

poll的事件设置一般通过&和|来操作,比如上面我们判断有POLLRDNORM或者POLLERR事件发生时,代码是这样的:


if (fds[i].revents & (POLLRDNORM | POLLERR)) {
  ...
}

前面的代码中,我们只监听了一个事件,直接用=号赋值,但如果我们想监听多个事件应该怎么设置呢?设置方法如下:


fds[i].events = POLLWRNORM | POLLRDNORM // 读和写事件监听 

在poll中使用掩码来设置events和判断revents,&和|分别表示按位与按位或,我们以POLLWRNORM和POLLRDNORM为例,它们在Linux中的值分别为:


#define POLLRDNORM 0x0040
#define POLLRDBAND 0x0080

假如我们同时关心这两个事件,也就是 POLLWRNORM | POLLRDNORM ,其语义表示的是 POLLWRNORM 或者 POLLRDNORM事件发生表示当前这个fd有事件发生,会导致poll函数返回。

关于按位与按位或都是比较基础的计算机知识,如果对这块不是很清楚的话,可以回头去补一下,相信你会对poll的这种设计有更加深入的理解。最后,我们使用poll改造一下非阻塞IO那篇文章的代码,将poll相关的代码封装在了poll.c/poll.h中,我们定义了一个poll_dispatcher数据结构,用于保存事件分发用到的上下文信息,其中就包含了套接字和读写Buffer的映射关系buffer_map,如下:


struct poll_dispatcher {
    int size;    // max fd nums
    int done;    // is done 0 not done  1 done
    int pair[2];
    pthread_cond_t cond;
    pthread_mutex_t mutex;
    struct pollfd *fds;
    struct buffer_map *buffer_map;
};

其中,size表示pollfd数组的大小,done表示的是dispatcher子线程初始化完成的状态,pair是一对本地通信的套接字,你可以简单理解为pair[0]和pair[1]代表一根网线的两端,这里为什么要使用pair后面我们慢慢讲,cond和mutex是用来线程同步的,fds是pollfd数组,buffer_map是套接字和buffer的映射关系。

创建一个dispatcher


struct poll_dispatcher *poll_new() {
    struct poll_dispatcher *poll_dis = malloc(sizeof(struct poll_dispatcher));
    poll_dis->size = MAX_POLL_SIZE;
    poll_dis->fds = malloc(sizeof(struct pollfd) * MAX_POLL_SIZE);

    pthread_cond_init(&poll_dis->cond, NULL);
    pthread_mutex_init(&poll_dis->mutex, NULL);

    if (socketpair(AF_UNIX, SOCK_STREAM, 0, poll_dis->pair) < 0) {
        perror("socketpair set fialed");
    }

    poll_dis->fds[0].fd = poll_dis->pair[1];
    poll_dis->fds[0].events = POLLRDNORM;

    int i;
    for (i = 1; i < MAX_POLL_SIZE; i++) {
        poll_dis->fds[i].fd = -1;
    }

    poll_dis->done = 0;
    poll_dis->buffer_map = new_buffer_map();

    return poll_dis;
}

创建dispatcher实际就是创建一个poll_dispathcer结构,分配内存并返回地址。这里面比较陌生的应该是初始化pair的代码,如下:


if (socketpair(AF_UNIX, SOCK_STREAM, 0, poll_dis->pair) < 0) {
    perror("socketpair set fialed");
 }
 poll_dis->fds[0].fd = poll_dis->pair[1];
 poll_dis->fds[0].events = POLLRDNORM;

这段代码,首先使用socketpair为pair分配套接字描述符,这个方法调用成功之后pair[0]和pair[1]就有了一个描述符了,往pair[0]发,pair[1]就可以收到数据。这里我们将pair[1]放到pollfd数组中,并监听读事件。至于为什么这么做,后面就知道了。

添加一个套接字到监听集合


int poll_add(struct poll_dispatcher *poll_dis, int fd) {
    int i;
    for (i = 0; i < poll_dis->size; i++) {
        if (poll_dis->fds[i].fd == -1) {
            poll_dis->fds[i].fd = fd;
            poll_dis->fds[i].events = POLLRDNORM;

            char c = 'a';
            ssize_t n= write(poll_dis->pair[0], &c, sizeof(c));
            if (n != sizeof(c)) {
                perror("write error");
            }

            printf("poll-add-fd: %d\\n", fd);
            return 0;
        }
    }

    perror("too many clients");

    return -1;
}

每次accept出来套接字之后,调用poll_add添加到pollfd数组中,就可以监听事件了。添加完成之后,我们往pair[0]里写一个字符,用来唤醒poll函数,使其从睡眠中醒来。这么做的原因是,事件分发和accept不在一个线程里,事件分发主循环比accept先运行。

此时,我们将accept出来的套接字添加到pollfd数组中,poll函数是感知不到的。所以,每次将accept出来的套接字添加到opllfd中之后,我们都要将poll唤醒。从而有机会再次调用poll函数将新添加的套接字监听起来(实际上就是把回调函数告诉内核),而pair[1]我们在初始化poll_dispatcher的时候就加入到了pollfd数组中,然后才阻塞在poll函数。所以,当pair[1]有读事件发生的时候,poll一定可以感知到,从而唤醒poll。这是个很有用的小技巧,我觉得可以多花些心思把它弄明白。

事件分发


int poll_dispatch(struct poll_dispatcher *poll_dis) {
    int ret, i;

    pthread_mutex_lock(&poll_dis->mutex);

    poll(poll_dis->fds, poll_dis->size, 0);
    poll_dis->done = 1;

    pthread_cond_signal(&poll_dis->cond);
    pthread_mutex_unlock(&poll_dis->mutex);

    while(1) {
        ret = poll(poll_dis->fds, poll_dis->size, -1);
        if (ret == -1) {
            perror("poll error");
            return -1;
        }

        printf("poll-returned: %d\\n", ret);

        for (i = 0; i < poll_dis->size; i++) {
            if (poll_dis->fds[i].fd == -1) {
                continue;
            }

            // wakeup
            if (poll_dis->fds[i].fd == poll_dis->pair[1] && poll_dis->fds[i].revents & POLLRDNORM) {
                char c;
                ssize_t n = read(poll_dis->pair[1], &c, sizeof(c));
                if (n != sizeof(c)) {
                    perror("read error");
                }
                printf("wakeup\\n");
                break;
            }

            if (poll_dis->fds[i].revents & (POLLRDNORM | POLLERR)) {
                int client_fd = poll_dis->fds[i].fd;
               do_msg(poll_dis->buffer_map->s_buffer[client_fd], client_fd);
            }
        }
   }
}

事件分发也比较简单,程序进来的时候使用了cond和mutex来同步主accept线程和worker线程,这是因为accept出来的套接字在加入到pollfd之前,我们要确保poll主循环已经跑起来了,这里我们简单的给poll超时传入了0,调用了一次poll(这种实现并不是很优雅,这里为了简单起见,后续的文章我们再来解决这个问题)。

然后将accept和poll的事件分发分别放在了两个线程中,代码封装在了accepter.c中,如下:


void *worker(void *arg) {
    struct poll_dispatcher *poll_dis = (struct poll_dispatcher *)arg;

    printf("poll-size: %d\\n", poll_dis->size);

    poll_dispatch(poll_dis);

    poll_free(poll_dis);
}

void accept_sock(int serv_fd) {
    int conn_fd;

    struct poll_dispatcher *poll_dis = poll_new();

//    make_nonblocking(serv_fd);

    struct sockaddr_in client_addr;
    socklen_t cli_len = sizeof(client_addr);

    pthread_t thread_id;

    pthread_create(&thread_id, NULL, worker, (void *)poll_dis);

    pthread_mutex_lock(&poll_dis->mutex);
    while(!poll_dis->done)
        pthread_cond_wait(&poll_dis->cond, &poll_dis->mutex);
    pthread_mutex_unlock(&poll_dis->mutex);

    while(1) {
        memset(&client_addr, 0, sizeof(client_addr));

        conn_fd = accept(serv_fd, (struct sockaddr *)&client_addr, &cli_len);
        if (conn_fd < 0) {
            perror("accept error");
            continue;
        }

        make_nonblocking(conn_fd);

        printf("accept a new connection: %d\\n", conn_fd);

        if (allocation_buffer_map(poll_dis->buffer_map, conn_fd) < 0) {
            perror("allocation_buffer_map error");
            continue;
        }

        poll_add(poll_dis, conn_fd);
    }
}

void make_nonblocking(int fd) {
    fcntl(fd, F_SETFL, O_NONBLOCK);
}

在accept中,我们在一个循环里判断poll_dis->done是否准备好了,如果没有准备好进入cond_wait获取到mutext锁,阻塞在cond_wait。当子线程dispatcher发来通知,cond醒来,再次拿到锁,然后再次尝试进入循环,此时已经准备完成了,跳出循环,释放锁。

接着在一个循环里不断accept套接字出来,放到pollfd中,并分配对应的读写buffer。注意,每个accept出来的套接字都设置成了非阻塞模式。

最后,在main方法中调用accept_sock方法


int main(int argc, char *argv[]) {
    if (argc != 2) {
        perror("usage: ./server ");
        exit(1);
    }

    int sock_fd = create_tcp_socket(atoi(argv[1]));

    printf("server started on port %s\\n", argv[1]);

    accept_sock(sock_fd);
}

这篇文章的代码: https://github.com/benggee/c-program/tree/main/poll

总结

上面的代码,已经非常接近准生产环境了。主线程不断的accept出来新的套接字,加入到poll的事件监听集合当中。

当对应的套接字有事件发生的时候,从poll返回。但如果你够仔细的话,会发现,上面的代码其实还有一个问题,一般程序比较耗时的都是处理业务逻辑,而我们在dispatch的时候,拿到一个套接之后,是阻塞在do-msg方法的。在do-msg结束之前,其它的套接字是没有机会得到处理的。