I/O复用:poll


poll 函数

poll提供的功能与select类似,不过在处理流设备时,它能够提供额外的信息。

1
2
3
#include <poll.h>
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);
返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1

第一个参数是指向一个结构数组第一个元素的指针。每个数组元素都是一个pollfd结构,用于指定测试某个给定描述符fd的条件。

1
2
3
4
5
struct pollfd {
int fd; /* descriptor to check */
short events; /* events of interest on fd */
short revents /* events that occurred on fd */
};

要测试的条件由events成员指定,函数在相应的revents成员中返回该描述符的状态。这两个成员中的每一个都由指定的某个特定条件的一位或多位构成。用于指定events标志以及测试revents标志的一些常值。

  • POLLIN: 普通或优先级数据可读
  • POLLRDNORM: 普通数据可读
  • POLLRDBAND: 优先级带数据可读
  • POLLPRI: 高优先级数据可读
  • POLLOUT: 普通数据可写
  • POLLWRNORM: 普通数据可写
  • POLLWRBAND:优先级带数据可写
  • POLLERR:发生错误
  • POLLHUP:发生挂起
  • POLLNVAL:描述符不是一个打开的文件

POLLERRPOLLHUPPOLLNVAL这三个常值不能再events中设置,但是当相应条件存在时就在revents中返回。

POLLIN可被定义为POLLRDNORMPOLLRDBAND的逻辑或。POLLOUT等同于POLLWRNORM

就TCP和UDP套接字而言,以下条件引起poll返回特定的revent

  • 所有正规TCP数据和所有UDP数据都被认为是普通的数据。
  • TCP的带外数据被认为是优先级带数据。
  • 当TCP连接的读半部关闭时(譬如收到了一个来自对端的 FIN ),也被认为是普通数据。随后的读操作将返回0。
  • TCP连接存在错误既可认为是普通数据,也可认为是错误(POLLERR)。无论哪种情况,随后的读操作将返回-1,并把errno设置成合适的值。这可用于处理诸如接收到 RST 或发生超时等条件。
  • 在监听套接字上有新的连接可用既可认为是普通数据,也可认为是优先级数据。大多数实现视为普通数据。
  • 非阻塞式connect的完成被认为是使相应套接字可写。

结构数组中元素的个数是由nfds参数指定。

timeout参数指定poll函数返回前等待多长时间。它是一个指定应等待毫秒数的正值。INFTIM表示永远等待。0立即返回,不阻塞进程。大于0等待指定数目的毫秒数。

INFTIM常值被定义为一个负值。

当发生错误时,poll函数的返回值为-1,若定时器到时之前没有任何描述符就绪,则返回0,否则返回就绪描述符的个数,即revents成员值非0的描述符个数。

如果我们不再关心某个特定描述符,那么可以把与它对应的pollfd结构的fd成员设置成一个负值。poll函数将忽略这样的pollfd结构的events成员,返回时将它的revents成员的值置为0。

使用poll的echo服务器程序

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <poll.h>
#include <sys/time.h>

#include "error.h"

#define MAXLINE 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define LISTENQ 1024
#define OPEN_MAX 1024
#define INFTIM -1

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Bind(int fd, struct sockaddr *sa, socklen_t salen) {
if(bind(fd, sa, salen) < 0) {
err_sys("bind error");
}
}

static void Listen(int fd, int backlog) {
char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}

if(listen(fd, backlog) < 0) {
err_sys("listen error");
}
}

static int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno != EINTR && errno != ECONNABORTED) {
err_sys("accept error");
}
}

return n;
}

static void Close(int fd) {
if(close(fd) == -1) {
err_sys("close error");
}
}

static int tcp_listen(void) {
int listenfd;
struct sockaddr_in servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

return listenfd;
}

static int writen(int fd, void *vptr, size_t nbytes) {
const char *ptr = vptr;
size_t left = nbytes;
int nwritten;

while(left > 0) {
if((nwritten = write(fd, vptr, left)) < 0) {
return -1;
}

ptr += nwritten;
left -= nwritten;
}

return nbytes;
}

static void Writen(int fd, void *vptr, size_t nbytes) {
if(writen(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static int Poll(struct pollfd *fdarray, unsigned long nfds, int timeout) {
int n;

if((n = poll(fdarray, nfds, timeout)) < 0) {
err_sys("poll error");
}

return n;
}

int main(int argc, char *argv[]) {
int listenfd;
int max_index;
int i;
struct pollfd client[OPEN_MAX];
int num_ready;
int connfd;
int n;
char buf[MAXLINE];

listenfd = tcp_listen();

client[0].fd = listenfd;
client[0].events = POLLRDNORM;

for(i = 1; i < OPEN_MAX; ++i) {
client[i].fd = -1;
}

max_index = 0;

for(;;) {
num_ready = Poll(client, max_index + 1, INFTIM);

/* new connection*/
if(client[0].revents & POLLRDNORM) {
connfd = Accept(client[0].fd, NULL, NULL);

for(i = 1; i < OPEN_MAX; ++i) {
if(client[i].fd == -1) {
client[i].fd = connfd;
client[i].events = POLLRDNORM;
break;
}
}

if(i == OPEN_MAX) {
err_quit("too many clients");
}

if(i > max_index) {
max_index = i;
}

if(--num_ready == 0) {
continue; /* no more readable descriptor */
}
}

for(i = 1; i <= max_index; ++i) {
if(client[i].fd == -1) {
continue;
}

if(client[i].revents & (POLLRDNORM | POLLERR)) {
if((n = read(client[i].fd, buf, MAXLINE)) < 0) {

/* connection reset by client */
if(errno == ECONNRESET) {
Close(client[i].fd);
client[i].fd = -1;
} else {
err_sys("read error");
}
} else if(n == 0) {

/* connection close by client */
Close(client[i].fd);
client[i].fd = -1;
} else {
Writen(client[i].fd, buf, n);
}

if(--num_ready == 0) {
break; /* no more readable descriptor */
}
}
}
}

return 0;
}
  1. 声明一个pollfd结构数组client,存在OPEN_MAX个元素。client数组的第一项用于监听套接字,并把其余各项的描述符成员置为-1。第一项设置POLLRDNORM事件,这样当有新的连接准备好被接受时poll将通知我们。max_index变量表示client数组当前正在使用的最大下标值。
  2. 调用poll以等待新的连接或者现有连接上有数据可读。当一个新的连接被接受后,在client数组中查找第一个描述符成员为-1的可用项。注意,下标是从1开始搜索,因为client[0]固定用于监听套接字。找到一个可用项之后,把新连接的描述符保存到其中,并设置POLLRDNORM事件。
  3. 检查某个现有连接上的数据,两个返回事件是POLLRDNORMPOLLERRPOLLERR并没有在event成员中设置,因为它在条件成立时总是返回。检查POLLERR的原因在于:有些实现在一个连接上接收到 RST 时返回的是POLLERR事件,而其他实现返回的只是POLLRDNORM。无论哪种情形,我们都调用read,当有错误发生时,read将返回这个错误。当一个现有连接由它的客户终止时,就把它的fd成员置为-1。