使用带外数据实现心跳包


许多传输层有 带外数据 (out-of-band data)的概念,它有时也称为 经加速数据 (expedited data)。其想法就是一个连接的某端发送了重要的事情,而且该端希望迅速通知对方。这里 “迅速” 意味着这种通知应该在已经排队等待发送的任何普通数据(有时称为 “带内”)之前发送。也就是说,带外数据被认为具有比普通数据更高的优先级。带外数据并不要求在客户和服务器之间再使用一个连接,而是被映射到已有的连接中。

TCP带外数据

TCP并没有真正的带外数据,不过提供了一个紧急模式(urgent mode)。假设一个进程已经往一个TCP套接字写出N字节数据,而且TCP把这些数据排队在该套接字的发送缓冲区中,等着发送到对端。如图:

buf.png

该进程接着以MSG_OOB标志调用send函数写出一个字符a的单字节带外数据:

1
send(fd, "a", 1, MSG_OOB);

TCP把这个数据放置在该套接字发送缓冲区的下一个可用位置,并把该套接字的TCP紧急指针(urgent pointer)设置成再一个可用位置。如图,展示了此时的套接字发送缓冲区,并且把带外数据字节标记为 OOB

out_of_data_buf.png

发送端TCP将为待发送的下一个分节在TCP首部中设置 URG 标志,并把紧急偏移(urgent offset)字段设置为指向带外字节之后的字节,不过该分节可能含也可能不含标记为 OOB 的那个字节。 OOB 字节是否发送取决于在套接字发送缓冲区中先于它的字节数、TCP准备发送给对端的分节大小以及对端通告的当前窗口。

这是TCP紧急模式的一个重要特点:TCP首部指出发送端已经进入紧急模式(即伴随紧急偏移的 URG 标志已经设置),但是由紧急指针所指的实际数据字节却不一定随同送出。事实上即使发送端TCP因流量控制而暂停发送数据(接收端的套接字接收缓冲区已满,导致其TCP向发送端TCP通告了一个值为0的窗口)。这也是应用进程使用TCP紧急模式(即带外数据)的一个原因:即便数据的流动会因为TCP的流量控制而停止,紧急通知却总是无障碍地发送到对端TCP。

如果发送多字节的带外数据,情况又会如何呢? 例如:

1
send(fd, "abc", MSG_OOB);

在这个例子中,TCP的紧急指针指向最后那个字节紧后的位置,也就是说最后那个字节(字母c)被认为是带外字节。

接收端:

  1. 当收到一个设置了 URG 标志的分节时,接收端TCP检查紧急指针,确定它是否指向新的带外数据,也就是判断本分节是不是首个到达的引用从发送端到接收端的数据流中特定字节的紧急模式分布。发送端TCP往往发送多个含有 URG 标志且紧急指针指向同一个数据字节的分节(通常是在一小段时间内)。这些分节中只有第一个到达的会导致通知接收进程有新的带外数据到达。
  1. 一旦有新的紧急指针到达,不论由紧急指针指向的实际数据字节是否已经到达接收端TCP,接收进程都被通知到。首先,内核给接收套接字的属主进程发送 SIGURG 信号,前提是接收进程(或其他进程)曾调用fcntlioctl为这个套接字建立了属主,而且该属主进程已为这个信号建立了信号处理函数。其次,如果接收进程阻塞在select调用中以等待这个套接字描述符出现一个异常条件,select调用就返回。

只有一个 OOB 标记,如果新的 OOB 字节在旧的 OOB 字节被读取之前就到达,旧的 OOB 字节会被丢弃。

  1. 当由紧急指针指向的实际数据字节到达接收端TCP时,该数据字节既可能被拉出带外,也可能被留在带内。SO_OOBINLINE套接字选项默认情况下是禁止的,对于这样的接收端套接字,该数据字节并不放入套接字接收缓冲区,而是被放入该连接的一个独立的单字节带外缓冲区。接收进程从这个单字节缓冲区读入数据的唯一办法就是指定MSG_OOB标志调用recvrecvfromrecvmsg。如果新的 OOB 字节在旧的 OOB 字节被读取之前就到达,旧的 OOB 字节会被丢弃。

然而如果接收进程开启了SO_OOBINLINE套接字选项,那么由TCP紧急指针指向的实际数据字节将被留在通常的套接字接收缓冲区中。这种情况下,接收进程不能指定MSG_OOB标志读入该数据字节。相反,接收进程通过检查该连接的带外标记(out-of band mark)以获悉何时访问到这个数据字节。

可能发生的错误:

  1. 如果接收进程请求读入带外数据(通过指定MSG_OOB标志),但是对端尚未发送任何带外数据,读入操作将返回EINVAL
  2. 在接收进程已被告知对端发送了一个带外字节(通过 SIGURGselect 手段)的前提下,如果接收进程试图读入该字节,但是该字节尚未到达,读入操作将返回EWOULDBLOCK。接收进程此时能做的仅仅是从套接字接收缓冲区读入数据(要是没有存放这些数据的空间,可能还得丢弃它们),以便在该缓冲区中腾出空间,继而允许对端TCP发送出那个带外字节。
  3. 如果接收进程试图多次读入同一个带外字节,读入操作将返回EINVAL
  4. 如果接收进程已经开启了SO_OOBINLINE套接字选项,后来试图通过指定MSG_OOB标志读入带外数据,读入操作将返回EINVAL

使用SIGURG的简单例子

TCP客户端发送带外数据

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>

#include "error.h"

#define SERV_PORT 9987
#define SA struct sockaddr

static Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Inet_pton(int family, const char *strptr, void *vptr) {
if(inet_pton(family, strptr, vptr) <= 0) {
err_sys("inet_pton error for %s", strptr);
}
}

static int Connect(int sockfd, struct sockaddr *sa, socklen_t salen) {
if(connect(sockfd, sa, salen) < 0) {
err_sys("connect error");
}
}

static int tcp_connect(const char *addr) {
int sockfd;
struct sockaddr_in servaddr;

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
Inet_pton(AF_INET, addr, &servaddr.sin_addr);

Connect(sockfd, (SA *) &servaddr, sizeof(servaddr));

return sockfd;
}

static void Write(int fd, void *vptr, size_t nbytes) {
if(write(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static void Send(int fd, void *ptr, int nbytes, int flags) {
if(send(fd, ptr, nbytes, flags) != nbytes) {
err_sys("send error");
}
}

int main(int argc, char *argv[]) {
int sockfd;

if(argc != 2) {
err_quit("%s <IPAddress>", argv[0]);
}

sockfd = tcp_connect(argv[1]);

Write(sockfd, "123", 3);
printf("wrote 3 bytes of normal data\n");
sleep(1);

Send(sockfd, "4", 1, MSG_OOB);
printf("wrote 1 byte of OOB data\n");
sleep(1);

Write(sockfd, "56", 2);
printf("wrote 2 bytes of normal data\n");
sleep(1);

Send(sockfd, "7", 1, MSG_OOB);
printf("wrote 1 byte of OOB data\n");
sleep(1);

Write(sockfd, "89", 2);
printf("wrote 2 bytes of normal data\n");
sleep(1);

return 0;
}

客户端发送了9个字节,每个输出操作之间有一个1秒钟的sleep。停顿的目的是让writesend的数据作为单个TCP分节在本端发送并在对端接收。

TCP服务器接收带外数据

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>

#include "error.h"

#define MAXBUF 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define LISTENQ 1024

static int connfd;

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Bind(int fd, struct sockaddr *sa, socklen_t salen) {
if(bind(fd, sa, salen) < 0) {
err_sys("bind error");
}
}

static void Listen(int fd, int backlog) {
char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}

if(listen(fd, backlog) < 0) {
err_sys("listen error");
}
}

static int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno != EINTR && errno != ECONNABORTED) {
err_sys("accept error");
}
}

return n;
}

static void Close(int fd) {
if(close(fd) == -1) {
err_sys("close error");
}
}

static int tcp_listen(void) {
int listenfd;
struct sockaddr_in servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

return listenfd;
}

static void Fcntl(int fd, int cmd, int arg) {
if(fcntl(fd, cmd, arg) < 0) {
err_sys("fcntl error");
}
}

static int Read(int fd, void *ptr, size_t maxlen) {
int n;

if((n = read(fd, ptr, maxlen)) < 0) {
err_sys("read error");
}
return n;
}

static int Recv(int fd, void *buf, int len, int flags) {
int n;

if((n = recv(fd, buf, len, flags)) < 0) {
err_sys("recv error");
}
return n;
}

static void sig_urg(int signo) {
int n;
char buf[64];

printf("SIGURG receive\n");
n = Recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
buf[n] = '\0';

printf("read %d OOB byte: %s\n", n, buf);
}

int main(int argc, char *argv[]) {
int listenfd;
int n;
char buf[MAXBUF];

listenfd = tcp_listen();

connfd = Accept(listenfd, NULL, NULL);

signal(SIGURG, sig_urg);

Fcntl(connfd, F_SETOWN, getpid());

for(;;) {
if((n = Read(connfd, buf, MAXBUF - 1)) == 0) {
printf("receive EOF\n");
exit(0);
}

buf[n] = '\0';
printf("read %d bytes: %s\n", n, buf);
}

return 0;
}

建立 SIGURG 的信号处理函数,使用fcntl设置已连接套接字的属主。信号处理函数sig_urg,通过指定MSG_OOB标志读入带外字节,然后显示返回数据。recv调用中请求64个字节,但是作为带外数据返回的只有1个字节。

运行服务器程序

[heql@ubuntu socket]$ ./tcp_recv 

运行客户端程序

客户端输出如下:

[heql@ubuntu socket]$ ./tcp_send 127.0.0.1
wrote 3 bytes of normal data
wrote 1 byte of OOB data
wrote 2 bytes of normal data
wrote 1 byte of OOB data
wrote 2 bytes of normal data

服务器输出:

[heql@ubuntu socket]$ ./tcp_recv 
read 3 bytes: 123
SIGURG receive
read 1 OOB byte: 4
read 2 bytes: 56
SIGURG receive
read 1 OOB byte: 7
read 2 bytes: 89
receive EOF

使用select的简单例子

把上面的TCP服务器改为使用select接收带外数据,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>

#include "error.h"

#define MAXBUF 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define LISTENQ 1024

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Bind(int fd, struct sockaddr *sa, socklen_t salen) {
if(bind(fd, sa, salen) < 0) {
err_sys("bind error");
}
}

static void Listen(int fd, int backlog) {
char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}

if(listen(fd, backlog) < 0) {
err_sys("listen error");
}
}

static int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno != EINTR && errno != ECONNABORTED) {
err_sys("accept error");
}
}

return n;
}

static void Close(int fd) {
if(close(fd) == -1) {
err_sys("close error");
}
}

static int tcp_listen(void) {
int listenfd;
struct sockaddr_in servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

return listenfd;
}

static int Read(int fd, void *ptr, size_t maxlen) {
int n;

if((n = read(fd, ptr, maxlen)) < 0) {
err_sys("read error");
}
return n;
}

static int Recv(int fd, void *buf, int len, int flags) {
int n;

if((n = recv(fd, buf, len, flags)) < 0) {
err_sys("recv error");
}
return n;
}

static int Select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) {
int n;

if((n = select(nfds, readfds, writefds, exceptfds, timeout)) < 0) {
if(errno != EINTR) {
err_sys("select error");
}
}

return n;
}

int main(int argc, char *argv[]) {
int listenfd;
int n;
char buf[MAXBUF];
int connfd;
fd_set rset;
fd_set xset;

listenfd = tcp_listen();

connfd = Accept(listenfd, NULL, NULL);

FD_ZERO(&rset);
FD_ZERO(&xset);

for(;;) {
FD_SET(connfd, &rset);
FD_SET(connfd, &xset);

Select(connfd + 1, &rset, NULL, &xset, NULL);

if(FD_ISSET(connfd, &rset)) {
if((n = Read(connfd, buf, MAXBUF - 1)) == 0) {
printf("receive EOF\n");
exit(0);
}

buf[n] = '\0';
printf("read %d bytes: %s\n", n, buf);
}

if(FD_ISSET(connfd, &xset)) {
n = Recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
buf[n] = '\0';

printf("read %d OOB byte: %s\n", n, buf);
}
}

return 0;
}

调用select等待普通数据(读集合rset)或带外数据(异常集合xset)。通过指定MSG_OOB标志读入带外字节,然后显示返回数据,带外数据返回的只有1个字节。

sockatmark函数

每当收到一个带外数据时,就有一个与之关联的带外标记(out-of-band mark)。这是发送进程发送带外字节时该字节在发送端普通数据流中的位置。在从套接字读入期间,接收进程通过调用sockatmark函数确定是否处于带外标记。

1
2
3
4
#include <sys/socket.h>

int sockatmark(int sockfd);
返回:若处于带外标记则为1,若不处于带外标记则为0,若出错则为-1
  1. 带外标记总是指向普通数据最后一个字节紧后的位置。这意味着,如果带外数据在线接收(SO_OOBINLINE套接字选项开启),那么如果下一个待读入的字节是使用MSG_OOB标志发送的,sockatmark就返回真。而如果SO_OOBINLINE套接字选项没有开启,那么,若下一个待读入的字节是跟在带外数据后发送的第一个字节,sockatmark就返回真。

  2. 读操作总是停止带外标记上。也就是说,如果在套接字接收缓冲区中有100个字节,不过在带外标记之前只有5个字节,而进程执行一个请求100个字节的read调用,那么返回的是带外标记之前的5个字节。这种在带外标记上强制停止读操作的做法使得进程能够调用sockatmark确定缓冲区指针是否处于带外标记。

如下程序,它发送3个字节普通数据,1个字节带外数据,再跟1个字节普通数据,每个输出操作之间没有停顿:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int main(int argc, char *argv[]) {
int sockfd;

if(argc != 2) {
err_quit("%s <IPAddress>", argv[0]);
}

sockfd = tcp_connect(argv[1]);

Write(sockfd, "123", 3);
printf("wrote 3 bytes of normal data\n");

Send(sockfd, "4", 1, MSG_OOB);
printf("wrote 1 byte of OOB data\n");

Write(sockfd, "5", 1);
printf("wrote 1 bytes of normal data\n");

return 0;
}

如下程序,设置了SO_OOBINLINE套接字选项,用于在线接收上面的带外数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int main(int argc, char *argv[]) {
int listenfd;
int n;
char buf[MAXBUF];
int connfd;
int on = 1;

listenfd = tcp_listen();

Setsockopt(listenfd, SOL_SOCKET, SO_OOBINLINE, &on, sizeof(on));

connfd = Accept(listenfd, NULL, NULL);
sleep(5);

for(;;) {
if(Sockatmark(connfd)) {
printf("at OOB mark\n");
}

if((n = Read(connfd, buf, MAXBUF - 1)) == 0) {
printf("receive EOF\n");
exit(0);
}

buf[n] = '\0';
printf("read %d bytes: %s\n", n, buf);
}

return 0;
}
  1. 我们希望在线接收带外数据,所以必须开启SO_OOBINLINE套接字选项。但是如果等到accept返回之后再在连接套接字上开启这个选项,那时三路握手已经完成,带外数据也可能已经到达。因此必须在监听套接字上开启这个选项,因为所有套接字选项会从监听套接字传承给已连接套接字。
  2. 接收连接之后,接收进程sleep一段时间以接收来自发送进程的所有数据,这么做使得我们能够展示read停在带外标记上,即使套接字接收缓冲区中已经有额外数据也不受影响。
  3. 程序循环调用read,并显示收到的数据。不过在调用read之前,先调用sockatmark检查缓冲区指针是否处于带外标记。

运行程序输出如下:

[heql@ubuntu socket]$ ./tcp_recv02 
read 3 bytes: 123
at OOB mark
read 2 bytes: 45
receive EOF

尽管接收进程首次调用read时接收端TCP已经接收了所有数据(因为接收进程调用了sleep),但是首次read调用因遇到带外标记而仅仅返回3个字节。下一个读入的字节是带外字节(值为4),因为我们早已告知内核在线放置带外数据。

带外数据的另外两个特性

第一个特性

  1. 即使因为流量控制而停止发送数据了,TCP仍然发送带外数据的通知(即它的紧急指针)
  2. 在带外数据到达之前,接收进程可能被通知说发送进程已经发送了带外数据(使用了SIGURG信号或通过select)。如果接收进程接着指定MSG_OOB调用recv,而带外数据却尚未到达,recv将返回EWOULDBLOCK错误。

发送程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int main(int argc, char *argv[]) {
int sockfd;
char buf[16384];
int size = 32768;

if(argc != 2) {
err_quit("%s <IPAddress>", argv[0]);
}

sockfd = tcp_connect(argv[1]);


Setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));

Write(sockfd, buf, 16384);
printf("wrote 16384 bytes of normal data\n");
sleep(5);

Send(sockfd, "a", 1, MSG_OOB);
printf("wrote 1 byte of OOB data\n");

Write(sockfd, buf, 1024);
printf("wrote 1024 bytes of normal data\n");

return 0;
}

该进程把它的套接字发送缓冲区大小设置为32768,写出16384字节的普通数据,然后睡眠5秒钟。发送进程的这些操作确保发送端TCP填满接收端的套接字接收缓冲区。返送进程接着发送单个字节的带外数据,后跟1024字节的普通数据,然后终止。

接收程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static void sig_urg(int signo) {
int n;
char buf[64];

printf("SIGURG receive\n");
n = Recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
buf[n] = '\0';

printf("read %d OOB byte: %s\n", n, buf);
}

int main(int argc, char *argv[]) {
int listenfd;
int n;
char buf[MAXBUF];
int on = 1;
int size;

listenfd = tcp_listen();

size = 4096;
Setsockopt(listenfd, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));

connfd = Accept(listenfd, NULL, NULL);

signal(SIGURG, sig_urg);
Fcntl(connfd, F_SETOWN, getpid());

for(;;) {
pause();
}

return 0;
}

接收进程把监听套接字接收缓冲区大小设置为4096。连接建立之后,这个大小将传承给已连接套接字。接收进程接着调用accept连接,建立一个SIGURG信号处理函数,并建立套接字的属主。主程序然后在一个循环中调用pause

先启动接收进程,接着启动发送进程。发送进程输出如下:

[heql@ubuntu socket]$ ./tcp_send 127.0.0.1
wrote 16384 bytes of normal data
wrote 1 byte of OOB data
wrote 1024 bytes of normal data

接收进程的输出:

[heql@ubuntu socket]$ ./tcp_recv
SIGURG receive
recv error: Resource temporarily unavailable

发送端TCP向接收端TCP发送了带外通知,由此产生了通知接收进程的SIGURG。然而当接收进程指定MSG_OOB标志调用recv时,相应带外字节不能读入。因为接收缓冲区的大小设置为4096,发送进程发送了16384字节填满了接收端的套接字接收缓冲区。

第二个特性

一个给定TCP连接只有一个带外标记,如果在接收进程读入某个现有带外数据之前有新的带外数据到达,先前的标记就丢失。

如下发送程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int main(int argc, char *argv[]) {
int sockfd;
char buf[16384];
int size = 32768;

if(argc != 2) {
err_quit("%s <IPAddress>", argv[0]);
}

sockfd = tcp_connect(argv[1]);

Write(sockfd, "123", 3);
printf("wrote 3 bytes of normal data\n");

Send(sockfd, "4", 1, MSG_OOB);
printf("wrote 1 byte of OOB data\n");

Write(sockfd, "5", 1);
printf("wrote 1 bytes of normal data\n");

Send(sockfd, "6", 1, MSG_OOB);
printf("wrote 1 byte of OOB data\n");

Write(sockfd, "7", 1);
printf("wrote 1 bytes of normal data\n");

return 0;
}

使用上面sockatmark函数中的接收程序,输出如下:

[heql@ubuntu socket]$ ./tcp_recv 
read 5 bytes: 12345
at OOB mark
read 2 bytes: 67
receive EOF

第二个带外字节(6)的到来覆写了第一个带外字节(4)到来时存放的带外标记。正像我们说的,每个TCP连接最多只有一个带外标记。

使用带外数据实现心跳包

使用心跳包可以发现对端主机或对端的通信路径的过早失效。

客户端程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include <time.h>

#include "error.h"

#define MAXLINE 4096

#define SA struct sockaddr

#define SERV_PORT 9987
#define MAX_PROBES 3
#define ALARM_SEC 3

static int num_probes = 0;
static int sockfd;

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}
return n;
}

static void Inet_pton(int family, const char *strptr, void *addrptr) {
if(inet_pton(family, strptr, addrptr) <= 0) {
err_sys("inet_pton error for %s", strptr);
}
}

static void Connect(int sockfd, struct sockaddr *sa, socklen_t salen) {
if(connect(sockfd, sa, salen) < 0) {
err_sys("connect error");
}
}

static void Fcntl(int fd, int cmd, int arg) {
if(fcntl(fd, cmd, arg) < 0) {
err_sys("fcntl error");
}
}

static void tcp_connect(const char *addr) {
struct sockaddr_in servaddr;

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
Inet_pton(AF_INET, addr, &servaddr.sin_addr);

Connect(sockfd, (SA *) &servaddr, sizeof(servaddr));

}

static void Send(int fd, void * ptr, int nbytes, int flags) {
if(send(fd, ptr, nbytes, flags) < 0) {
err_sys("send error");
}
}


static void sig_urg(int signo) {
char c;

if(recv(sockfd, &c, 1, MSG_OOB) < 0) {
if(errno != EWOULDBLOCK) {
err_sys("recv error");
}
}
printf("client receive probe\n");

num_probes = 0;

return;
}

static void sig_alarm(int signo) {
if(++num_probes > MAX_PROBES) {
fprintf(stderr, "server is unreachable\n");
exit(1);
}

Send(sockfd, "1", 1, MSG_OOB);
alarm(ALARM_SEC);

return;
}

static void heartbeat_cli() {
signal(SIGURG, sig_urg);
Fcntl(sockfd, F_SETOWN, getpid());

signal(SIGALRM, sig_alarm);
alarm(ALARM_SEC);
}

int main(int argc, char *argv[]) {

if(argc != 2) {
err_quit("%s <IPAddress>", argv[0]);
}

tcp_connect(argv[1]);

heartbeat_cli();

for(;;) { }

return 0;
}

heartbeat_cli函数

SIGURGSIGALRM建立信号处理函数,并把套接字的属主设置为本进程ID。执行alarm以调度第一个SIGALRM

SIGURG信号处理函数

本信号在某个带外通知到达时产生。尝试读入相应的带外字节,如果数据还没到达(EWOULDBLOCK),那也没有关系,注意,我们不采用在线接收带外数据方式,因为这种方式会干扰客户端读取它的正常数据。执行了次函数,代表服务器还活着,将num_probes重置为0。

SIGALRM信号处理函数

本信号以恒定的间隔产生。递增计数器num_probes,如果达到MAX_PROBES,则认为服务器主机或者崩溃、或者不可到达。如果未达到MAX_PROBES,则作为带外数据发送一个含有字符1的字节(该值没有任何隐含意义),再执行alarm调度下一个SIGALRM

服务器程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>
#include <fcntl.h>
#include <signal.h>
#include <time.h>

#include "error.h"

#define MAXLINE 4096

#define SA struct sockaddr

#define LISTENQ 1024
#define SERV_PORT 9987
#define MAX_PROBES 3
#define ALARM_SEC 3

static int num_probes = 0;
static int connfd;

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}
return n;
}

static void Bind(int listenfd, struct sockaddr * sa, socklen_t salen) {
if(bind(listenfd, sa, salen) < 0) {
err_sys("bind error");
}
}

static void Listen(int listenfd, int backlog) {
char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}
if(listen(listenfd, backlog) < 0) {
err_sys("listen error");
}
}

static int Accept(int fd, struct sockaddr * sa, socklen_t * salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno = EINTR || errno == ECONNABORTED) {
continue;
} else {
err_sys("accept error");
}
}
return n;
}

static void Fcntl(int fd, int cmd, int arg) {
if(fcntl(fd, cmd, arg) < 0) {
err_sys("fcntl error");
}
}

static void Send(int fd, void * ptr, int nbytes, int flags) {
if(send(fd, ptr, nbytes, flags) < 0) {
err_sys("send error");
}
}

static int tcp_listen(void) {
int listenfd;
struct sockaddr_in servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

return listenfd;
}

static void sig_urg(int signo) {
char c;

if(recv(connfd, &c, 1, MSG_OOB) < 0) {
if(errno != EWOULDBLOCK) {
err_sys("recv error");
}
}

printf("server receive probe\n");

Send(connfd, "1", 1, MSG_OOB);
num_probes = 0;

return ;
}

static void sig_alarm(int signo) {
if(++num_probes > MAX_PROBES) {
fprintf(stderr, "client is unreachable\n");
exit(1);
}

alarm(ALARM_SEC);

return;
}

static void heartbeat_serv(void) {
signal(SIGURG, sig_urg);
Fcntl(connfd, F_SETOWN, getpid());

signal(SIGALRM, sig_alarm);
alarm(ALARM_SEC);
}

int main(int argc, char *argv[]) {
int listenfd;

listenfd = tcp_listen();

connfd = Accept(listenfd, NULL, NULL);

heartbeat_serv();

for(;;) {}

return 0;
}

服务器的heartbeat_serv函数的功能与客户端heartbeat_cli的功能类似。

先服务器进程,接着启动客户端进程。客户端输出如下:

[heql@ubuntu socket]$ ./heartbeat_cli01 127.0.0.1
client receive probe
client receive probe
client receive probe
client receive probe
client receive probe

服务器输出如下:

[heql@ubuntu socket]$ ./heartbeat_serv01 
server receive probe
server receive probe
server receive probe
server receive probe
server receive probe