非阻塞式I/O


套接字的默认状态是阻塞的。这就意味着当发出一个不能立即完成的套接字调用时,其进程将投入睡眠,等待相应操作完成。可能阻塞的套接字调用可分为以下四类。

  • 输入操作,包括readreadvrecvrecvfromrecvmsg共5个函数。如果某个进程对一个阻塞的TCP套接字(默认设置)调用这些输入函数之一,而且该套接字的接收缓冲区中没有数据可读,该进程将被投入睡眠,直到有一些数据到达。既然TCP是字节流协议,该进程的唤醒就是只要有一些数据到达,这些数据既可能是单个字节,也可以是一个完整的TCP分节数据。

对于非阻塞的套接字,如果输入操作不能被满足(对于TCP套接字即至少有一个字节的数据可读,对于UDP套接字即有一个完整的数据报可读),相应调用将立即返回一个EWOULDBLOCK错误。

  • 输出操作,包括writewritevsendsendtosendmsg共5个函数。对于一个TCP套接字,内核将从应用进程的缓冲区到该套接字的发送缓冲区复制数据。对于阻塞的套接字,如果其发送缓冲区中没有空间,进程将被投入睡眠,直到有空间为止。

对于一个非阻塞的TCP套接字,如果其发送缓冲区根本没有空间,输出函数调用立即返回一个EWOULDBLOCK错误。如果其发送缓冲区中有一些空间,返回值将是内核能够复制到该缓冲区的字节数。这个字节数也称为不足计数。

UDP套接字不存在真正的发送缓冲区。内核只是复制应用进程数据并把它沿协议栈向下传送,渐以加上UDP首部和IP首部。因此对一个阻塞的UDP套接字,输出函数调用将不会因与TCP套接字一样的原因而阻塞,不过有可能会因其他的原因而阻塞。

  • 接受外来连接,如果对一个阻塞的套接字调用accept函数,并且尚无新的连接到达,调用进程被投入睡眠。

如果对一个非阻塞的套接字调用accept函数,并且尚无新的连接到达,accept返回一个EWOULDBLOCK错误。

  • 发出外出连接,TCP连接的建立涉及一个三路握手过程,而且connect函数一直要等到客户收到对于自己的 SYNACK 为止才返回。这意味着TCP的每个connect总会阻塞其调用进程至少一个到服务器的RTT时间。

如果对一个非阻塞的TCP套接字调用connect,并且连接不能立即建立,那么连接的建立能照样发起(譬如送出TCP三路握手的第一个分组),不过会返回一个EINPROGRESS错误。注意这个错误不同于上述三个情形中返回的错误。另请注意有些连接可以立即建立,通常发生在服务器和客户处于同一个主机的情况下。因此即使对于一个非阻塞的connect,我们也得预备connect成功返回的情况发生。

非阻塞式I/O的echo客户端程序

I/O复用:select 中,客户端使用select实现的echo服务使用的是阻塞式I/O。举例来说,如果标准输入有一行文本可读,就调用read读入它,再调用writen把它发送给服务器。然而如果套接字发送缓冲区已满,writen调用将会阻塞。在进程阻塞于writen调用期间,可能有来自套接字缓冲区的数据可供读取。类似的,如果从套接字中有一行输入文本可读,那么一旦标准输出比网络还要满,进程照样可能阻塞于后续的write调用。

如果需要防止进程在任何有效工作期间发生阻塞,可以使用非阻塞式I/O。使用非阻塞式I/O的客户端程序,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#include <stdio.h> 
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>
#include <fcntl.h>

#include "error.h"

#define MAXLINE 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define max(a, b) ((a) > (b) ? (a) : (b))

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Inet_pton(int family, const char *strptr, void *addrptr) {
if(inet_pton(family, strptr, addrptr) <=0) {
err_sys("inet_pton error for %s", strptr);
}
}

static void Connect(int fd, struct sockaddr *sa, socklen_t salen) {
if(connect(fd, sa, salen) < 0) {
err_sys("connect error");
}
}

static void Shoutdown(int fd, int howto) {
if(shutdown(fd, howto) < 0) {
err_sys("shutdown error");
}
}

static int Select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *exceptfds, struct timeval *timeout) {
int n;

while(n = select(nfds, rfds, wfds, exceptfds, timeout) < 0) {
if(errno != EINTR) {
err_sys("select error");
}
}

return n;
}

static int Fcntl(int fd, int cmd, int arg) {
int n;

if((n = fcntl(fd, cmd, arg)) < 0) {
err_sys("fcntl error");
}

return n;
}

static void set_nonblocking(int fd) {
int val;

val = Fcntl(fd, F_GETFL, 0);
Fcntl(fd, F_SETFL, val | O_NONBLOCK);
}

static void str_cli(int sockfd) {
char to[MAXLINE];
char from[MAXLINE];
char *tooptr;
char *toiptr;
char *fromoptr;
char *fromiptr;
int maxfd;
fd_set rset;
fd_set wset;
int n;
int nwritten;
int stdineof = 0;

set_nonblocking(STDIN_FILENO);
set_nonblocking(STDOUT_FILENO);
set_nonblocking(sockfd);

tooptr = toiptr = to;
fromoptr = fromiptr = from;

maxfd = max((max(STDIN_FILENO, STDOUT_FILENO)), sockfd) + 1;


for(;;) {
FD_ZERO(&rset);
FD_ZERO(&wset);

if(stdineof == 0 && toiptr < &to[MAXLINE]) {
FD_SET(STDIN_FILENO, &rset); /* read from stdin */
}

if(tooptr != toiptr) {
FD_SET(sockfd, &wset); /* data to write to socket */
}

if(fromiptr < &from[MAXLINE]) {
FD_SET(sockfd, &rset); /* read from socket */
}

if(fromoptr != fromiptr) {
FD_SET(STDOUT_FILENO, &wset); /* data to write to stdout */
}

Select(maxfd, &rset, &wset, NULL, NULL);

if(FD_ISSET(STDIN_FILENO, &rset)) {
if((n = read(STDIN_FILENO, to, &to[MAXLINE] - toiptr)) < 0) {
if(errno != EWOULDBLOCK) {
err_sys("read error on stdin");
}
} else if(n == 0) {
stdineof = 1;
if(tooptr == toiptr) {
Shoutdown(sockfd, SHUT_WR); /* send FIN */
}
} else {
toiptr += n;
FD_SET(sockfd, &wset); /* try and write to socket below */
}
}

if(FD_ISSET(sockfd, &rset)) {
if((n = read(sockfd, from, &from[MAXLINE] - fromiptr)) < 0) {
if(errno != EWOULDBLOCK) {
err_sys("read error on socket");
}
} else if(n == 0) {
if(stdineof == 0) {
err_quit("server terminated permaturely");
} else {
return; /* normal termination */
}
} else {
fromiptr += n;
FD_SET(STDOUT_FILENO, &wset);
}
}

if(FD_ISSET(sockfd, &wset) && ((n = toiptr - tooptr) > 0)) {
if((nwritten = write(sockfd, tooptr, n)) < 0) {
if(errno != EWOULDBLOCK) {
err_sys("write error to socket");
}
} else {
tooptr += nwritten;
if(tooptr == toiptr) {
tooptr = toiptr = to; /* back to beginning of buffer */
if(stdineof) {
Shoutdown(sockfd, SHUT_WR); /* send FIN */
}
}
}
}

if(FD_ISSET(STDOUT_FILENO, &wset) && ((n = fromiptr - fromoptr) > 0)) {
if((nwritten = write(STDOUT_FILENO, fromoptr, n)) < 0) {
if(errno != EWOULDBLOCK) {
err_sys("write error to stdout");
}
} else {
fromoptr += nwritten;
if(fromoptr == fromiptr) {
fromoptr = fromiptr = from; /* back to beginning of buffer */
}
}
}
}
}

int main(int argc, char *argv[]) {
int sockfd;
struct sockaddr_in servaddr;

if(argc != 2) {
err_quit("usage: %s <IPAddress>", argv[0]);
}

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
Inet_pton(AF_INET, argv[1], &servaddr.sin_addr);

Connect(sockfd, (SA *) &servaddr, sizeof(servaddr));

str_cli(sockfd);

return 0;
}

非阻塞读和写:str_cli函数

非阻塞式I/O的加入让本函数的缓冲区管理将会变得复杂,需要维护两个缓冲区:to缓冲区保存从标准输入到服务器去的数据,from缓冲区保存自服务器到标准输出来的数据。如图展示了to缓冲区的组织和指向缓冲区的指针。

toptr.png

其中toiptr指针指向从标准输入的数据可以存放的下一个字节。tooptr指向下一个必须写到套接字的字节。有(toiptr-tooptr)个字节需写到套接字。可从标准输入读入的字节数(&to[MAXLINE]-toiptr)。一旦tooptr移动到toiptr,这两个指针就一起恢复到缓冲区开始处。

类似地,下图是from缓冲区相应的组织和指向缓冲区的指针。

fromptr.png

把描述符设置为非阻塞: 调用set_nonblocking 把套接字、标注输入、标注输出设置为非阻塞。

初始化缓冲区指针: 初始化两个缓冲区的指针,并把最大描述符加1,作为select的第一个参数。

指定所关注的描述符: 如果在标准输入上尚未读到EOF,而且在to缓冲区中有至少一个字节的可用空间,那就打开读描述符集中对应标准输入的位。如果在from缓冲区中有至少一个字节的可用空间,那就打开描述符集中对应套接字的位。如果在to缓冲区中有要写到套接字的数据,那就打开写描述符集中对应套接字的位。如果在from缓冲区中有要写到标准输出的数据,那就打开描述符集中对应标准输出的位。

调用select: 等待4个可能条件中任何一个变为真。

从标准输入read: 如果标准输入可读,那就调用read。指定的第三个参数是to缓冲区中的可用空间。

处理非阻塞错误: 如果发生一个EWOULDBLOCK错误,就忽略它。通常情况下这种条件 “不应该发生”,因为这种条件意味着,select告知相应描述符可读,然而read该描述符却返回EWOULDBLOCK错误。

read返回EOF: 如果read返回0,那么标准输入处理就此结束,设置stdineof标志。如果在to缓冲区不再有数据要发送(即tooptr等于toiptr),那就调用shutdown发送 FIN 到服务器。如果在to缓冲区中仍有数据要发送, FIN 的发送就得推迟到缓冲区中数据已写到套接字之后。

read返回数据:read返回数据时,相应地增加toiptr,并打开描述符集中与套接字对应的位。

从套接字read: 如果read返回EWOULDBLOCK错误,那么不做任何处理。遇到来自服务器的EOF,如果已经在标准输入上遇到EOF则没有问题,否则来自服务器的EOF并非预期。如果read返回一些数据,就相应地增加fromiptr,并把写描述符集中与标准输出对应的位打开。

write到标准输出: 如果标准输出可写而且要写的字节数大于0,那就调用write。如果返回EWOULDBLOCK错误,那么不做任何处理。注意这种条件完全可能发生,因为在上面的代码在不清楚write是否会成功的前提下就打开了写描述符集中与标准输出对应的位。

write成功: 如果write成功,fromoptr就增加相应的字节数。如果输出指针fromoptr追上输入指针fromiptr,这两个指针就同时恢复为指向缓冲区开始处。

write到套接字: 当输出指针追上输入指针时,不仅这两个指针同时恢复到缓冲区开始处,而且如果已经在标准输入遇到EOF就要发送 FIN 到服务器。

非阻塞connect

当在一个非阻塞的TCP套接字上调用connect时,connect将立即返回一个EINPROGRESS错误,不过已经发起的TCP三路握手继续进行。接着可以使用select检测这个连接或成功或失败的已建立条件。非阻塞的connect有三个用途。

  1. 可以把三路握手叠加在其他处理上。完成一个connect要花一个RTT时间,而RTT波动范围很大,从局域网上的几个毫秒到几百个毫秒甚至是广域网上的几秒。这段时间内也许有我们想要执行其他的处理工作。
  2. 可以使用这个技术同时建立多个连接。这个用途已随着Web浏览器变得流行起来。
  3. 既然使用select等待连接的建立,我们可以给select指定一个时间限制,使得我们能够缩短connect的超时。许多实现有着从75秒钟到数分钟的connect超时时间。应用程序有时想要一个更短的超时时间,实现方法之一就是使用非阻塞connect

非阻塞connect虽然听似简单,却有一些我们必须处理的细节。

  • 尽管套接字是非阻塞的,如果连接到的服务器在同一个主机上,那么当我们调用connect时,连接通常立刻建立。我们必须处理这种情形。
  • 源自Berkeley的实现(和POSIX)有关于select和非阻塞connect的以下两个规则:(1)当连接成功建立时,描述符变为可写;(2)当连接建立遇到错误时,描述符变为既可读又可写。

非阻塞connect:时间获取程序

socket编程connect替换为非阻塞的connect_nonb,如下:

1
2
3
if(connect(sockfd, (SA *) &servaddr, sizeof(servaddr)) < 0) {
err_sys("connect error");
}

connect_nonb函数

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
static int connect_nonb(int sockfd, struct sockaddr *sa, socklen_t salen, int nsec) {
struct timeval tval;
fd_set rset;
fd_set wset;
int flags;
int n;
int error = 0;
int len;

flags = Fcntl(sockfd, F_GETFL, 0);
Fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

if((n = connect(sockfd, sa, salen)) < 0) {
if(errno != EINPROGRESS) {
return -1;
}
}

do {
if(n == 0) {
break;
}

FD_ZERO(&rset);
FD_SET(sockfd, &rset);
wset = rset;

tval.tv_sec = nsec;
tval.tv_usec = 0;

if((n = Select(sockfd + 1, &rset, &wset, NULL, nsec ? &tval : NULL)) == 0) {
close(sockfd);
errno = ETIMEDOUT;
return -1;
}

if(FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
len = sizeof(error);

if(getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
return -1;
}
} else {
err_quit("select error: sockfd not set");
}
} while(0);

Fcntl(sockfd, F_SETFL, flags);

if(error) {
close(sockfd);
errno = error;
return -1;
}
return 0;
}

检查连接是否立即建立:如果非阻塞connect返回0,那么连接已经建立。当服务器处于客户所在主机时这种情况可能发生。

处理超时:如果select返回0,那么超时发生,于是返回ETIMEDOUT错误给调用者。还要关闭套接字,已防止已经启动的三路握手继续下去。

检查可读或可写条件:如果描述符变为可读或可写,就调用getsockopt取得套接字的待处理错误(使用SO_SERROR套接字选项)。如果连接成功建立,该值将为0。如果连接建立发生错误,该值就是对应连接错误的errno值(譬如ECONNREFUSEDETIMEDOUT等)。

被中断的connect

对于一个正常的阻塞式套接字,如果其上的connect调用在TCP三路握手完成前被中断(譬如说捕获了某个信号),将会发生什么呢?假设被中断的connect调用不由内核自动重启,那么它将返回ETINR。我们不能再次调用connect等待未完成的连接继续完成。这样做将导致但会EADDRINUSE错误。

这种情形下我们只能调用select,连接建立成功时select返回套接字可写条件,连接建立失败时select返回套接字既可读又可写条件。