poll和select一样,都是Linux下I/O复用模型的一种实现。和select相比,poll主要在数据结构上有一些变化,通常情况下select最大描述符为1024,在linux内核中这个值被定义成了一个FD_SETSIZE宏(在CentOS发行版本的Linux下select.h所在路径是/usr/include/sys/select.h),如下:
#define FD_SETSIZE __FD_SETSIZE
其中__FD_SETSIZE定义在内核中,一般默认是1024。很多文章可能会告诉你,重新定义一下FD_SETSIZE这个宏就可以突破这个限制了,到底对不对呢?我们看一下select初化位图的代码就知道了(还是在/usr/include/sys/select.h中),如下:
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
可以看到,初始化位图使用的是内核中的__FD_SETSIZE宏。所以,如果想突破1024的限制就必须修改内核的宏__FD_SETSIZE然后重新编译内核。这显然不是所有团队都有能力去做的,这也是select的一个缺点。
为了解决这个问题,poll出现了,poll和select的原理没什么区别,唯一的区别就是数据结构上发生了变化,使得poll可以轻松突破默认最大1024个描述符的限制。
poll函数签名如下:
int poll(struct pollfd *fdarr, unsigned long nfds, int timout);
从函数签名上看,poll比select要简单一些。第一个参数是要监听的I/O描述符事件集合,其结构如下:
struct pollfd {
int fd; // 描述符
short events; // 监听描述符发生的事件
short revents; // 已经发生的事件
};
第二个参数nfds是要监听的套接字数量,这是poll和select最大的区别,前面我们说select最大描述符在内核中的_FD_SETSIZE宏写死了,一般是1024。而poll可以通过nfds参数来突破这个限制。
第三个参数是超时时间,一般有三种传值的方式,传-1表示在有可用描述符之前一直等待,传0表示不管有没有可用描述符都立即返回,传大于0的值表示超过对应毫秒即使没有事件发生也会立即返回。最后我们看一下,pollfd结构中的events和revents都有哪些事件,下面我列了一张表:
和select另一个不一样的地方,poll使用两个short型的整型来分别保存监听的事件,和已经发生的事件。这意味着,我们在调用poll函数之前不需要将我们监听的事件复制一份了。I/O设置监听事件使用events,判断是否有事件发生使用revents。下面我们用poll写一段代码实际看一下poll是怎么用的。
#include "stdio.h"
#include "sys/socket.h"
#include "poll.h"
#include "unistd.h"
#include "string.h"
#include "netinet/in.h"
#include "errno.h"
#include "fcntl.h"
int create_sock();
void make_nonblocking(int fd);
int main(int argc, char *argv[]) {
int read_num, conn_fd, n;
char buf[1024];
struct sockaddr_in client_addr;
int sock_fd = create_sock();
make_nonblocking(sock_fd);
struct pollfd fds[1024];
fds[0].fd = sock_fd;
fds[0].events = POLLRDNORM;
int i;
for (i = 1; i < 1024; i++) {
fds[i].fd = -1;
}
for (;;) {
if ((read_num = poll(fds, 1024, -1)) < 0) {
perror ("poll failed.");
}
if (fds[0].revents & POLLRDNORM) {
socklen_t cli_len = sizeof((client_addr));
conn_fd = accept(sock_fd, (struct sockaddr *)&client_addr, &cli_len);
printf("have a new connection: %d.\\n", conn_fd);
for (i = 1; i < 1024; i++) {
if (fds[i].fd < 0) {
fds[i].fd = conn_fd;
fds[i].events = POLLRDNORM;
break;
}
}
if (i == 1024) {
perror("too many clients.");
}
if (--read_num <= 0) {
continue;
}
}
for (i = 1; i < 1024; i++) {
int client_fd;
if ((client_fd = fds[i].fd) < 0) {
continue;
}
if (fds[i].revents & (POLLRDNORM | POLLERR)) {
if ((n = read(client_fd, buf, sizeof(buf))) > 0) {
if (write(client_fd, buf, n) < 0) {
perror("write failed.");
}
} else if (n == 0 || errno == ECONNRESET) {
close(client_fd);
fds[i].fd = -1;
} else {
perror("read failed.");
}
}
if (--read_num <= 0) {
break;
}
}
}
}
我们通过上面的代码来梳理一下poll的使用。
首先,我们声明一个struct pollfd的数组fds,长度为1024。
接着,将设置成非阻塞模式的服务端套接字放到fds的第一个位置,并且我们设置监听POLLRDNORM事件,这个事件前面表格中有,表示读事件。
紧接着,我们将剩下所有fds的fd都设置成了-1,pollfd的fd设置成-1表示忽略当前这个元素,poll在查找事件的时候就不会遍历这个元素了。
然后在一个循环里调用poll函数,第一个参数是前面我们创建的fds数组,第二个参数表示我们监听的描述符数量,最后一个参数传-1,表示如果没有可用描述符永远等待下去。
一旦有可用描述符,从poll函数返回,我们首先判断是否是当前服务端套接字。如果是,说明有客户端连接上来,我们调用accept可以拿到一个客户端的套接字。拿到之后,在fds中找一个fd等于-1的位置将它保存起来,并监听POLLRDNORM事件,到这里,客户端套接字的可读事件我们就通过poll就监听起来了,如果有数据可读,poll就会立马返回。
最后,我们处理的是客户端套接字,如果有POLLRDNORM或者POLLERR事件发生就收进来处理,只做了一件事,就是将客户端发送过来的数据原样再发回去。接着遍历下一个,直到将所有客户端套接字都处理一遍。
poll的原理、实现和select几乎是一样的,我们就不深入去分析它的实现了,有兴趣可以自己去找源码读一下。这里我们重点说一下select和poll不一样的地方。
第一,我们在声明pollfd结构数据的时候,是可以自行指定大小的。这个在select中是很难实现的,这也是poll相较select进步的地方。
第二,poll调用之后,不会修改pollfd数据,它是通过pollfd的events指定要监听的事件,再通过revents保存已发生的事件用于在poll返回时判断都有哪些事件发生了。
最后,poll和select都会遍历所有的描述符,很显然这在连接数非常多空洞非常大的时候是有性能问题的,而我们下一篇文章要讲的epoll就很好的解决了这个问题。
关于poll事件设置和判断
poll的事件设置一般通过&和|来操作,比如上面我们判断有POLLRDNORM或者POLLERR事件发生时,代码是这样的:
if (fds[i].revents & (POLLRDNORM | POLLERR)) {
...
}
前面的代码中,我们只监听了一个事件,直接用=号赋值,但如果我们想监听多个事件应该怎么设置呢?设置方法如下:
fds[i].events = POLLWRNORM | POLLRDNORM // 读和写事件监听
在poll中使用掩码来设置events和判断revents,&和|分别表示按位与和按位或,我们以POLLWRNORM和POLLRDNORM为例,它们在Linux中的值分别为:
#define POLLRDNORM 0x0040
#define POLLRDBAND 0x0080
假如我们同时关心这两个事件,也就是 POLLWRNORM | POLLRDNORM ,其语义表示的是 POLLWRNORM 或者 POLLRDNORM事件发生表示当前这个fd有事件发生,会导致poll函数返回。
关于按位与和按位或都是比较基础的计算机知识,如果对这块不是很清楚的话,可以回头去补一下,相信你会对poll的这种设计有更加深入的理解。最后,我们使用poll改造一下非阻塞IO那篇文章的代码,将poll相关的代码封装在了poll.c/poll.h中,我们定义了一个poll_dispatcher数据结构,用于保存事件分发用到的上下文信息,其中就包含了套接字和读写Buffer的映射关系buffer_map,如下:
struct poll_dispatcher {
int size; // max fd nums
int done; // is done 0 not done 1 done
int pair[2];
pthread_cond_t cond;
pthread_mutex_t mutex;
struct pollfd *fds;
struct buffer_map *buffer_map;
};
其中,size表示pollfd数组的大小,done表示的是dispatcher子线程初始化完成的状态,pair是一对本地通信的套接字,你可以简单理解为pair[0]和pair[1]代表一根网线的两端,这里为什么要使用pair后面我们慢慢讲,cond和mutex是用来线程同步的,fds是pollfd数组,buffer_map是套接字和buffer的映射关系。
创建一个dispatcher
struct poll_dispatcher *poll_new() {
struct poll_dispatcher *poll_dis = malloc(sizeof(struct poll_dispatcher));
poll_dis->size = MAX_POLL_SIZE;
poll_dis->fds = malloc(sizeof(struct pollfd) * MAX_POLL_SIZE);
pthread_cond_init(&poll_dis->cond, NULL);
pthread_mutex_init(&poll_dis->mutex, NULL);
if (socketpair(AF_UNIX, SOCK_STREAM, 0, poll_dis->pair) < 0) {
perror("socketpair set fialed");
}
poll_dis->fds[0].fd = poll_dis->pair[1];
poll_dis->fds[0].events = POLLRDNORM;
int i;
for (i = 1; i < MAX_POLL_SIZE; i++) {
poll_dis->fds[i].fd = -1;
}
poll_dis->done = 0;
poll_dis->buffer_map = new_buffer_map();
return poll_dis;
}
创建dispatcher实际就是创建一个poll_dispathcer结构,分配内存并返回地址。这里面比较陌生的应该是初始化pair的代码,如下:
if (socketpair(AF_UNIX, SOCK_STREAM, 0, poll_dis->pair) < 0) {
perror("socketpair set fialed");
}
poll_dis->fds[0].fd = poll_dis->pair[1];
poll_dis->fds[0].events = POLLRDNORM;
这段代码,首先使用socketpair为pair分配套接字描述符,这个方法调用成功之后pair[0]和pair[1]就有了一个描述符了,往pair[0]发,pair[1]就可以收到数据。这里我们将pair[1]放到pollfd数组中,并监听读事件。至于为什么这么做,后面就知道了。
添加一个套接字到监听集合
int poll_add(struct poll_dispatcher *poll_dis, int fd) {
int i;
for (i = 0; i < poll_dis->size; i++) {
if (poll_dis->fds[i].fd == -1) {
poll_dis->fds[i].fd = fd;
poll_dis->fds[i].events = POLLRDNORM;
char c = 'a';
ssize_t n= write(poll_dis->pair[0], &c, sizeof(c));
if (n != sizeof(c)) {
perror("write error");
}
printf("poll-add-fd: %d\\n", fd);
return 0;
}
}
perror("too many clients");
return -1;
}
每次accept出来套接字之后,调用poll_add添加到pollfd数组中,就可以监听事件了。添加完成之后,我们往pair[0]里写一个字符,用来唤醒poll函数,使其从睡眠中醒来。这么做的原因是,事件分发和accept不在一个线程里,事件分发主循环比accept先运行。
此时,我们将accept出来的套接字添加到pollfd数组中,poll函数是感知不到的。所以,每次将accept出来的套接字添加到opllfd中之后,我们都要将poll唤醒。从而有机会再次调用poll函数将新添加的套接字监听起来(实际上就是把回调函数告诉内核),而pair[1]我们在初始化poll_dispatcher的时候就加入到了pollfd数组中,然后才阻塞在poll函数。所以,当pair[1]有读事件发生的时候,poll一定可以感知到,从而唤醒poll。这是个很有用的小技巧,我觉得可以多花些心思把它弄明白。
事件分发
int poll_dispatch(struct poll_dispatcher *poll_dis) {
int ret, i;
pthread_mutex_lock(&poll_dis->mutex);
poll(poll_dis->fds, poll_dis->size, 0);
poll_dis->done = 1;
pthread_cond_signal(&poll_dis->cond);
pthread_mutex_unlock(&poll_dis->mutex);
while(1) {
ret = poll(poll_dis->fds, poll_dis->size, -1);
if (ret == -1) {
perror("poll error");
return -1;
}
printf("poll-returned: %d\\n", ret);
for (i = 0; i < poll_dis->size; i++) {
if (poll_dis->fds[i].fd == -1) {
continue;
}
// wakeup
if (poll_dis->fds[i].fd == poll_dis->pair[1] && poll_dis->fds[i].revents & POLLRDNORM) {
char c;
ssize_t n = read(poll_dis->pair[1], &c, sizeof(c));
if (n != sizeof(c)) {
perror("read error");
}
printf("wakeup\\n");
break;
}
if (poll_dis->fds[i].revents & (POLLRDNORM | POLLERR)) {
int client_fd = poll_dis->fds[i].fd;
do_msg(poll_dis->buffer_map->s_buffer[client_fd], client_fd);
}
}
}
}
事件分发也比较简单,程序进来的时候使用了cond和mutex来同步主accept线程和worker线程,这是因为accept出来的套接字在加入到pollfd之前,我们要确保poll主循环已经跑起来了,这里我们简单的给poll超时传入了0,调用了一次poll(这种实现并不是很优雅,这里为了简单起见,后续的文章我们再来解决这个问题)。
然后将accept和poll的事件分发分别放在了两个线程中,代码封装在了accepter.c中,如下:
void *worker(void *arg) {
struct poll_dispatcher *poll_dis = (struct poll_dispatcher *)arg;
printf("poll-size: %d\\n", poll_dis->size);
poll_dispatch(poll_dis);
poll_free(poll_dis);
}
void accept_sock(int serv_fd) {
int conn_fd;
struct poll_dispatcher *poll_dis = poll_new();
// make_nonblocking(serv_fd);
struct sockaddr_in client_addr;
socklen_t cli_len = sizeof(client_addr);
pthread_t thread_id;
pthread_create(&thread_id, NULL, worker, (void *)poll_dis);
pthread_mutex_lock(&poll_dis->mutex);
while(!poll_dis->done)
pthread_cond_wait(&poll_dis->cond, &poll_dis->mutex);
pthread_mutex_unlock(&poll_dis->mutex);
while(1) {
memset(&client_addr, 0, sizeof(client_addr));
conn_fd = accept(serv_fd, (struct sockaddr *)&client_addr, &cli_len);
if (conn_fd < 0) {
perror("accept error");
continue;
}
make_nonblocking(conn_fd);
printf("accept a new connection: %d\\n", conn_fd);
if (allocation_buffer_map(poll_dis->buffer_map, conn_fd) < 0) {
perror("allocation_buffer_map error");
continue;
}
poll_add(poll_dis, conn_fd);
}
}
void make_nonblocking(int fd) {
fcntl(fd, F_SETFL, O_NONBLOCK);
}
在accept中,我们在一个循环里判断poll_dis->done是否准备好了,如果没有准备好进入cond_wait获取到mutext锁,阻塞在cond_wait。当子线程dispatcher发来通知,cond醒来,再次拿到锁,然后再次尝试进入循环,此时已经准备完成了,跳出循环,释放锁。
接着在一个循环里不断accept套接字出来,放到pollfd中,并分配对应的读写buffer。注意,每个accept出来的套接字都设置成了非阻塞模式。
最后,在main方法中调用accept_sock方法
int main(int argc, char *argv[]) {
if (argc != 2) {
perror("usage: ./server ");
exit(1);
}
int sock_fd = create_tcp_socket(atoi(argv[1]));
printf("server started on port %s\\n", argv[1]);
accept_sock(sock_fd);
}
这篇文章的代码: https://github.com/benggee/c-program/tree/main/poll
总结
上面的代码,已经非常接近准生产环境了。主线程不断的accept出来新的套接字,加入到poll的事件监听集合当中。
当对应的套接字有事件发生的时候,从poll返回。但如果你够仔细的话,会发现,上面的代码其实还有一个问题,一般程序比较耗时的都是处理业务逻辑,而我们在dispatch的时候,拿到一个套接之后,是阻塞在do-msg方法的。在do-msg结束之前,其它的套接字是没有机会得到处理的。