I/O复用:select


在实现TCP的echo服务的那节中,客户端同时处理两个输入:标准输入和TCP套接字。会遇到这样的问题:客户在阻塞于(标准输入上的)fgets调用期间,服务器进程会被杀死,服务器TCP虽然正确地给客户TCP发送了一个 FIN ,但是既然客户进程正阻塞于标准输入读入过程,它将看不到这个 EOF ,直到从套接字读时为止(可能过了很长时间)。这样的进程需要一种预先告知内核的能力,使得内核一旦发现进程指定的一个或多个I/O 条件就绪(也就是说输入已准备好被读取,或者描述符已能承接更多的输入),它就通知进程。这称为I/O复用(I/O multiplexing),在linux下由selectpollepoll实现。

I/O复用典型使用在下列网络应用场合。

  • 当客户处理多个描述符(通常是交互式输入和网络套接字)时,必须使用I/O复用。
  • 如果一个TCP服务器既然处理监听套接字,又要处理已连接套接字,一般就要使用I/O复用。
  • 如果一个服务器既要处理TCP,又要处理UDP,一般就要使用I/O复用。
  • 如果一个服务器要处理多个服务或者多个协议,一般就要使用I/O复用。

I/O复用模型

有了I/O复用(I/O multiplexing),就可以调用selectpollepoll,阻塞在这几个系统调用中的某一个之上,而不是阻塞在真正的I/O。

select函数

该函数允许进程指示内核等待多个事件中的任何一个发生,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它。

1
2
3
4
5
#include <sys/select.h>
#include <sys/time.h>

int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval* timeout);
返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1

timeout告知内核等待所指定描述符中的任何一个就绪可花多长时间。其timeval结构用于指定这段时间的秒数和微秒数。

1
2
3
4
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};

这个参数有以下三种可能。

  1. 永远等待下去:仅在有一个描述符准备好I/O时才返回。为此,该参数设置为空指针。
  2. 等待一段固定时间:在有一个描述符准备好I/O时返回,但是不超过由该参数所指向timeval结构中指定的秒数和微秒数。
  3. 根本不等待:检查描述符后立即返回,这称为轮询(polling)。为此,该参数必须指向一个timeval结构。而且其中的定时器值(由该结构指定的秒数和微秒数)必须为0。

前两种情形的等待通常会被进程在等待期间捕获的信号中断,并从信号处理函数返回。

尽管timeval结构运行我们指定了一个微妙级的分辨率,然而内核支持的真实分辨率往往粗糙得多。举例来说,许多Unix内核把超时值向上舍入10ms的倍数。另外还涉及调度延迟,也就是说定时器时间到后,内核还需花一点时间调度相应进程运行。

中间的三个参数readsetwritesetexceptset指定要让内核测试读、写和异常条件的描述符。目前支持的异常条件只有两个:

  1. 某个套接字的带外数据的到达。
  2. 某个已置为分组模式的伪终端存在可从其主端读取的控制状态信息。

select使用描述符集,通常是一个整数数组,其中每个整数中的每一位对应一个描述符。举例来说,假设使用32位整数,那么该数组的第一个元素对应于描述符0~31,第二个元素对应于描述符32~63,以此类推。所有这些实现细节都与应用程序无关,它们隐藏在名为fd_set的数据类型和以下四个宏中:

1
2
3
4
void FD_ZERO(fd_set *fdset)        /* clear all bits in fdset */
void FD_SET(int fd, fd_set *fdset) /* turn on the bit for fd in fdset */
void FD_CLR(int fd, fd_set *fdset) /* turn off the bit for fd in fdset */
int FD_ISSET(int fd, fd_set *fdset) /* is the bit for fd on in fdset ? */

描述符集的初始化非常重要,因为作为自动变量分配的一个描述符集如果没有初始化,那么可能发生不可预期的后果。

select函数的中间三个参数readsetwritesetexceptset中,如果我们对某一个的条件不感兴趣,就可以把它设为空指针。事实上这三个指针均为空,我们就有了一个比Unix的sleep函数更为精确的定时器(sleep睡眠以秒为最小单位)。

maxfdp1参数指定待测试的描述符个数,它的值是待测试的最大描述符加1,描述符0,1,2…一直到maxfdp1-1均将被测试。

select函数修改由指针readsetwritesetexceptset所指向的描述符集,因而这三个参数都是值-结果参数。调用该函数时,我们指定所关心的描述符的值,该函数返回时,结果将指示哪些描述符已就绪。该函数返回后,我们使用FD_ISSET宏来测试fd_set数据类型中的描述符。描述符集内任何未就绪描述符对应的位返回时均清成0。为此,每次重新调用select函数时,我们都得再次把所有描述符集内所关心的位均置为1。

该函数的返回值表示跨所有描述符集的已就绪的总位数。如果在任何描述符就绪之前定时器到时,那么返回0。返回-1表示出错(这是可能发生的,譬如本函数被一个所捕获的信号中断)。

描述符就绪的条件

满足下列四个条件中的任何一个时,一个套接字准备好读。

  1. 该套接字接收缓冲区中的数据字节数大于等于套接字接收缓冲区低水位标记的当前大小。对这样的套接字执行读操作不会阻塞并将返回一个大于0的值(也就是返回准备好读入的数据)。我们可以使用SO_RCVLOWAT套接字选项设置该套接字的低水位标记。对于TCP和UDP套接字而言,其默认值为1。
  2. 该连接的读半部关闭(也就接收了FIN的TCP连接)。对这样的套接字的读操作将不阻塞返回0(也就是返回EOF)。
  3. 该套接字是一个监听套接字且已完成的连接数不为0。对这样的套接字的accept通常不会阻塞。
  4. 其上有一个套接字错误待处理。对这样的套接字的读操作将不阻塞并返回-1(也就是一个错误),同时把errno设置成确切的错误条件。这些待处理错误也可以通过制定SO_ERROR套接字选项调用getsockopt获取并清除。

下列四个条件中的任何一个满足时,一个套接字准备好写。

  1. 该套接字发送缓冲区中的可用空间字节数大于等于套接字发送缓冲区低水位标记的当前大小,并且或者该套接字设置成非阻塞,写操作将不阻塞并返回一个正值(例如由传输层接受的字节数)。我们可以使用SO_SNDLOWAT套接字选项来设置该套接字的低水位标记。对于TCP和UDP套接字而言,其默认值通常为2048
  2. 该连接的写半部关闭。对这样的套接字的写操作将产生SIGPIPE信号。
  3. 使用非阻塞式connect的套接字已建立连接,或者connect已经以失败告终。
  4. 其上有一个套接字错误待处理。对这样的套接字的读操作将不阻塞并返回-1(也就是一个错误),同时把errno设置成确切的错误条件。这些待处理错误也可以通过制定SO_ERROR套接字选项调用getsockopt获取并清除。

select的最大描述符数

头文件<sys/select.h>中定义的FD_SETSIZE常值是数据类型fd_set中的描述符总数,其值通常是1024。不同的内核可能不同。

使用select的echo客户端程序: str_cli函数(修订版)

使用select重写了str_cli函数:这样服务器进程一终止,客户就能马上得到通知。客户端的套接字上的三个条件处理如下:

  1. 如果对端TCP发送数据,那么该套接字变为可读,并且read返回一个大于0的值(即读入数据的字节数)。
  2. 如果对端TCP发送一个 FIN (对端进程终止),那么该套接字变为可读,并且read返回0( EOF )。
  3. 如果对端TCP发送一个 RST (对端主机崩溃并重新启动),那么该套接字变为可读,并且read返回-1,而errno中含有确切的错误码。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#include <stdio.h> 
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>

#include "error.h"

#define MAXLINE 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define max(a, b) ((a) > (b) ? (a) : (b))

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Inet_pton(int family, const char *strptr, void *addrptr) {
if(inet_pton(family, strptr, addrptr) <=0) {
err_sys("inet_pton error for %s", strptr);
}
}

static void Connect(int fd, struct sockaddr *sa, socklen_t salen) {
if(connect(fd, sa, salen) < 0) {
err_sys("connect error");
}
}

static int Select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *exceptfds, struct timeval *timeout) {
int n;

while(n = select(nfds, rfds, wfds, exceptfds, timeout) < 0) {
if(errno != EINTR) {
err_sys("select error");
}
}

return n;
}

static const char* Fgets(char *ptr, int n, FILE *stream) {
char *ret_ptr;

if((ret_ptr = fgets(ptr, n, stream)) == NULL && ferror(stream)) {
err_sys("fgets error");
}

return ret_ptr;
}

static int writen(int fd, void *vptr, size_t nbytes) {
const char *ptr = vptr;
size_t left = nbytes;
int nwritten;

while(left > 0) {
if((nwritten = write(fd, vptr, left)) < 0) {
return -1;
}

ptr += nwritten;
left -= nwritten;
}

return nbytes;
}

static void Writen(int fd, void *vptr, size_t nbytes) {
if(writen(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static int readline(int fd, void *vptr, size_t maxlen) {
int i;
char *ptr = vptr;
int n;
char c;

for(i = 0; i < maxlen - 1; ++i) {
if((n = read(fd, &c, 1)) == 1) {
*ptr++ = c;
if(c == '\n') {
break; /* newline */
}
} else if(n == 0) {
if(i == 0) {
*ptr = '\0';
return i; /* EOF, no data read */
} else {
break;
}
} else {
if(errno == EINTR) {
continue;
} else {
return -1; /* read error */
}
}
}

*ptr = '\0';

return i + 1;
}

static int Readline(int fd, void *vptr, size_t maxlen) {
int n;

if((n = readline(fd, vptr, maxlen)) < 0) {
err_sys("read error");
}

return n;
}

static void Fputs(const char *ptr, FILE *stream) {
if(fputs(ptr, stream) == EOF) {
err_sys("fputs error");
}
}

static void str_cli(FILE *fp, int sockfd) {
char sendline[MAXLINE];
char recvline[MAXLINE];
fd_set rset;
int maxfd1 = max(fileno(fp), sockfd) + 1;

FD_ZERO(&rset);
for(;;) {
FD_SET(fileno(fp), &rset);
FD_SET(sockfd, &rset);

Select(maxfd1, &rset, NULL, NULL, NULL);

if(FD_ISSET(fileno(fp), &rset)) {
if(Fgets(sendline, MAXLINE, fp) == NULL) {
return;
}
Writen(sockfd, sendline, strlen(sendline));
}

if(FD_ISSET(sockfd, &rset)) {
if(Readline(sockfd, recvline, MAXLINE) == 0) {
err_quit("server terminated permaturely");
}

Fputs(recvline, stdout);
}
}
}

int main(int argc, char *argv[]) {
struct sockaddr_in servaddr;
int sockfd;

if(argc != 2) {
err_quit("%s <IPAddress>", argv[0]);
}

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
Inet_pton(AF_INET, argv[1], &servaddr.sin_addr);

Connect(sockfd, (SA *) &servaddr, sizeof(servaddr));

str_cli(stdin, sockfd);

return 0;
}

str_cli函数

  1. 定义一个可读性的描述符集,该集合由FD_ZERO初始化,并用FD_SET打开两位:一个对应于标准I/O文件指针fp,一位对应于套接字sockfdfileno函数把标准I/O文件指针转换为对应的描述符。计算出两个描述符中的较大值后,调用select。在该调用中,写集合指针和异常集合指针都是空指针。最后一个参数(时间限制)也是空指针,因为希望调用阻塞到某个描述符就绪为止。
  2. 如果标准输入可读,那就先用fgets读入一行文本,再用writen把它写到套接字。
  3. 如果在select返回时套接字是可读的,那就先用readline读入服务器返回的数据,再用fputs输出它。

批量输入

上面的str_cli函数还是有问题。假设发出第一个请求后,立即发出下一个,紧接着再下一个。客户端能够以网络可以接受它们的最快速度持续发送请求,并且能够以网络可提供给它们的最快速度处理应答。如图:

select.png

在写完最后一个请求后,我们并不能立即关闭连接,因为管道中还有其他的数据和应答。问题在于对标准输入中的EOF的处理:str_cli函数就此返回到main函数,而main函数随后终止。然而在批量方式下,标准输入中的EOF并不意味着我们同时也完成了从套接字的读入,可能仍有请求在去往服务器的路上,或者仍有应答在返回客户的路上。

我们需要的是一种关闭TCP连接其中一半的方法。也就是说,给服务器发送一个 FIN ,告诉服务器客户端已经完成了数据发送,但是仍然保持套接字打开以便读取。这可以由shutdown函数来完成。

一般地说,为提升性能而引入缓冲机制增加了网络应用程序的复杂性,考虑有多个来自标准输入的文本输入行可用的情况。selectfgets读取输入,这又转而使已可用的文本输入行被读入到stdio所用的缓冲区中。然而fgets只返回其中第一行,其余输入行仍在stdio缓冲区中。fgets返回的单个输入行写给服务器,随后select再次被调用以等待新的工作,而不管stdio缓冲区中还有额外的输入待消费。究竟原因在于select不知道stdio使用了缓冲区——它只是从read系统调用的角度指出是否有数据可读,而不是从fgets之类调用的角度考虑。基于上述原因,混合使用stdioselect被认为是非常容易犯错误的,在这样做时必须极其小心。

shutdown函数

终止网络连接的通常方法是调用close函数。不过close有两个限制,却可以使用shutdown来避免。

  1. close把描述符的引用计数减1,仅在该计数变为0时才关闭套接字。使用shtudown可以不管引用计数就激发TCP的正常连接终止序列。
  2. close终止读和写两个方向的数据传送。既然TCP连接是全双工的,有时候我们需要告知对端我们已经完成了数据发送,即使对端仍有数据要发送给我们。这就是上面遇到的str_cli函数在批量输入时的情况。
1
2
3
#include <sys/socket.h>
int shutdown(int sockfd, int howto);
返回:若成功则为0,若出错则为-1

该函数的行为依赖于howto参数的值。

SHUT_RD 关闭连接的读这一半——套接字中不再有数据可接收,而且套接字接收缓冲区中的现有数据都被丢弃。进程不能再对这样的套接字调用任何读函数。对一个TCP套接字这样调用shutdown函数后,由该套接字接收的来自对端的任何数据都被确认,然后悄然丢弃。

SHUT_WR 关闭连接的写这一半——对于TCP套接字,这称为半关闭(half-close)。当前留在套接字发送缓冲区中的数据将被发送掉,后跟TCP的正常连接终止序列。不管套接字描述符的引用计数是否等于0,这样的写半部关闭照样执行。进程不能再对这样的套接字调用任何写函数。

SHUT_REWR 连接的读半部和写半部都关闭——这与调用shutdown两次等效:第一次调用SHUT_RD,第二次调用指定SHUT_WR。

str_cli函数(再修订版)

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void str_cli(FILE *fp, int sockfd) {
char buf[MAXLINE];
int n;
fd_set rset;
int stdineof = 0;
int maxfd1 = max(fileno(fp), sockfd) + 1;

FD_ZERO(&rset);
for(;;) {
if(stdineof == 0) {
FD_SET(fileno(fp), &rset);
}

FD_SET(sockfd, &rset);

Select(maxfd1, &rset, NULL, NULL, NULL);

if(FD_ISSET(fileno(fp), &rset)) {

/* input is readable */
if((n = Read(fileno(fp), buf, MAXLINE)) == 0) {
stdineof = 1;
Shoutdown(sockfd, SHUT_WR); /* send FIN */
FD_CLR(fileno(fp), &rset);
continue;
}
Writen(sockfd, buf, n);

}

if(FD_ISSET(sockfd, &rset)) {

/* socket is readable */
if((n = Read(sockfd, buf, MAXLINE)) == 0) {
if(stdineof == 1) {
return; /* normal termination */
} else {
err_quit("server terminated permaturely");
}
}
Write(fileno(stdout), buf, n);
}
}
}
  1. stdineof是一个初始化为0的新标志。只要该标志位0,则每次在主循环中设置标准输入为可读。
  2. 当在套接字上读到EOF时,如果已在标准输入上遇到EOF,那就是正常的终止,于是函数返回。如果在标准输出上没有遇到EOF,那么服务器进程已过早终止。
  3. 在标准输入上遇到EOF时,把新标志stdineof置为1,并把第二个参数指定为SHUT_WR来调用shutdown以发送FIN。

使用select的echo服务器程序

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>

#include "error.h"

#define MAXLINE 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define LISTENQ 1024
#define max(a, b) ((a) > (b) ? (a) : (b))

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Bind(int fd, struct sockaddr *sa, socklen_t salen) {
if(bind(fd, sa, salen) < 0) {
err_sys("bind error");
}
}

static void Listen(int fd, int backlog) {
char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}

if(listen(fd, backlog) < 0) {
err_sys("listen error");
}
}

static int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno != EINTR && errno != ECONNABORTED) {
err_sys("accept error");
}
}

return n;
}

static int tcp_listen(void) {
int listenfd;
struct sockaddr_in servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

return listenfd;
}

static void Close(int fd) {
if(close(fd) == -1) {
err_sys("close error");
}
}

static int writen(int fd, void *vptr, size_t nbytes) {
const char *ptr = vptr;
size_t left = nbytes;
int nwritten;

while(left > 0) {
if((nwritten = write(fd, vptr, left)) < 0) {
return -1;
}

ptr += nwritten;
left -= nwritten;
}

return nbytes;
}

static void Writen(int fd, void *vptr, size_t nbytes) {
if(writen(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static int Read(int fd, void * ptr, size_t nbytes) {
int n;

if((n = read(fd, ptr, nbytes)) < 0) {
err_sys("read error");
}
return n;
}

static int Select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *exceptfds, struct timeval *timeout) {
int n;

while(n = select(nfds, rfds, wfds, exceptfds, timeout) < 0) {
if(errno != EINTR) {
err_sys("select error");
}
}

return n;
}

int main(int argc, char *argv[]) {
int listenfd;
int connfd;
int num_ready;
int i;
int max_index;
int max_fd;
int client[FD_SETSIZE];
char buf[MAXLINE];
fd_set rset;
fd_set allset;
int n;

listenfd = tcp_listen();

for(i = 0; i < FD_SETSIZE; ++i) {
client[i] = -1;
}

max_index = -1;
max_fd = listenfd;

FD_ZERO(&allset);
FD_SET(listenfd, &allset);

for(;;) {
rset = allset;

num_ready = Select(max_fd + 1, &rset, NULL, NULL, NULL);

if(FD_ISSET(listenfd, &rset)) {

/* new connection*/
connfd = Accept(listenfd, NULL, NULL);

for(i = 0; i < FD_SETSIZE; ++i) {
if(client[i] < 0) {
client[i] = connfd; /* save descriptor */
break;
}
}

if(i == FD_SETSIZE) {
err_quit("too many clients");
}

if(i > max_index) {
max_index = i;
}

max_fd = max(max_fd, connfd);
FD_SET(connfd, &allset); /* add new descriptor to set */

if(--num_ready <= 0) {
continue; /* no more readable descriptor */
}
}

for(i = 0; i <= max_index; ++i) {
if(client[i] < 0) {
continue;
}

if(FD_ISSET(client[i], &rset)) {
if((n = read(client[i], buf, MAXLINE)) < 0) {
if(errno == ECONNRESET) {
Close(client[i]);
FD_CLR(client[i], &allset);
client[i] = -1;
} else {
err_sys("read error");
}
} else if(n == 0) {
Close(client[i]);
FD_CLR(client[i], &allset);
client[i] = -1;
} else {
Writen(client[i], buf, n);
}

if(--num_ready <= 0) {
break; /* no more readable descriptor */
}
}
}
}

return 0;
}
  1. 调用select等待某个事件发生:新客户连接的建立、或是数据、 FINRST 的到达。
  2. 如果监听套接字变为可读,那么已建立了一个新的连接。调用accept接收新的连接,使用client数组中的第一个未用项记录这个已连接的描述符。检查select的返回值是否还有就绪的描述符。
  3. 对于每个现有的客户连接,要测试其描述符是否在select返回的描述符集中。如果是就从客户端读入数据并返回给它。如果客户端关闭了连接,那么将返回0,则清除该描述符。