多线程:并发服务器


在多进程并发服务器中:父进程accept一个连接,fork一个子进程,该子进程处理与该连接客户端之间的通信。多进程会存在下面的问题:

  • fork是昂贵的。fork要把父进程的内存映像复制到子进程,并在子进程中复制所有描述符等。当今的实现使用称为写时复制(copy-on-write)的技术,用以避免在子进程切实需要自己的副本之前把父进程的数据空间复制到子进程。然而即便有这样的优化措施,fork仍然是昂贵的。
  • fork返回之后父子进程之间信息的传递需要进程间通信(IPC)。调用fork之前父进程向尚未存在的子进程传递信息相当容易,因为子进程将从父进程数据空间及所有描述符的一个副本开始运行。然而从子进程往父进程返回信息却比较费力。

线程

线程的创建可能比进程的创建快10~100倍。同一进程内的所有线程共享相同的全局内存。这使得线程之间易于共享信息,然而伴随这种简易性而来的却是同步问题。

同一进程内的所有线程除了共享全局变量外还共享:

  • 进程指令
  • 大多数数据
  • 打开的文件(即描述符)
  • 信号处理函数和信号处置
  • 当前工作目录
  • 用户ID和组ID

不过每个线程有各自的:

  • 线程ID
  • 寄存器集合,包括程序计数器和栈指针
  • 栈(用于存放局部变量和返回地址)
  • errno
  • 信号掩码
  • 优先级

基本线程函数

pthread_create函数

当一个程序由exec启动执行时,称为初始化线程或主线程的单个线程就创建了。其余线程则由pthread_create函数创建。

1
2
3
#include <pthread.h>
int pthread_create(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg);
返回:若成功则为0,若出错则为正的Exxx值

一个进程内的每个线程都由一个线程ID标识,其数据类型为pthread_t(往往是unsigned int)。如果新的线程成功创建,其ID就通过tid指针返回。

每个线程都有许多属性:优先级、初始化大小、是否应该成为一个守护线程等等。在创建线程时通过初始化一个取代默认设置的pthread_attr_t变量指定这些属性。通常采用默认设置,这是把attr参数指定为空指针。

创建一个线程时最后指定的参数是由该线程执行的函数及其参数。该线程通过调用这个函数开始执行,然后或者显示终止(通过调pthread_exit),或者隐式地终止(通过让函数返回)。该函数的地址由func参数指定,该函数的唯一调用参数是指针arg。如果需要给该函数传递多个参数,可以把它们打包成一个结构,然后把这个结构的地址作为单个参数传递给这个起始函数。

pthread_join函数

我们可以通过调用pthread_join等待一个给定线程终止。对比线程和UNIX进程,pthread_create类似于forkpthread_join类似于waitpid

1
2
3
#include <pthread.h>
int pthread_join(pthread_t *tid, void **status);
返回:若成功则为0,若出错则为正的Exxx值

我们必须指定要等待线程的tid。不幸的是,Pthread没有办法等待任意一个线程(类似指定进程ID参数为-1调用waitpid)。

pthread_self函数

每个线程都有一个在所属进程内标识自身的ID。线程ID由pthread_create返回。每个线程使用pthread_self获取自身的线程ID。

1
2
3
#include <pthread.h>
pthread_t pthread_self(void);
返回:调用线程的线程ID

对比线程和UNIX进程,pthread_self类似于getpid

pthread_detach函数

一个线程或者是可汇合的(joinable,默认值),或者是脱离(detached)。当一个可汇合的线程终止时,它的线程ID和退出状态留存到另一个线程对它调用pthread_join。脱离的线程却像守护进程,当它们终止时,所有相关资源都被释放,不能等待它们终止。如果一个线程需要知道另一个线程什么时候终止,那就最好保持第二个线程的可汇合状态。

pthread_detach函数把指定的线程转变为脱离状态。

1
2
3
#include <pthread.h>
int pthread_detach(pthread_t tid);
返回:若成功则为0,若出错则为正的Exxx值

本函数通常由想让自己脱离的线程调用,就如下语句:

1
pthread_detach(pthread_self());

pthread_exit函数

让一个线程终止的方法之一是调用pthread_exit

1
2
#include <pthread.h>
void pthread_exit(void *status);

如果本线程未曾脱离,它的线程ID和退出状态将一直留存到调用进程内的某个其他进程对它调用pthread_join

指针status不能指向局部于调用线程的对象,因为线程终止时这样的对象也消失。

让一个线程终止的另外两个方法是:

  • 启动线程的函数(即pthread_create的第三个参数)可以返回。既然该函数必须返回一个void指针,它的返回值就是相应线程的终止状态。
  • 如果进程的main函数返回或者任何线程调用了exit,整个进程就终止,其中包括它的任何线程

使用多线程的echo服务器程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>

#include "error.h"

#define MAXLINE 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define LISTENQ 1024
#define CHECK(ret) ({__typeof__(ret) errnum = (ret); \
assert(errnum == 0); (void)errnum;})

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Bind(int fd, struct sockaddr *sa, socklen_t salen) {
if(bind(fd, sa, salen) < 0) {
err_sys("bind error");
}
}

static void Listen(int fd, int backlog) {
char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}

if(listen(fd, backlog) < 0) {
err_sys("listen error");
}
}

static int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno != EINTR && errno != ECONNABORTED) {
err_sys("accept error");
}
}

return n;
}

static void Close(int fd) {
if(close(fd) == -1) {
err_sys("close error");
}
}

static int writen(int fd, void *vptr, size_t nbytes) {
const char *ptr = vptr;
size_t left = nbytes;
int nwritten;

while(left > 0) {
if((nwritten = write(fd, vptr, left)) < 0) {
return -1;
}

ptr += nwritten;
left -= nwritten;
}

return nbytes;
}

static void Writen(int fd, void *vptr, size_t nbytes) {
if(writen(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static void str_echo(int fd) {
char buf[MAXLINE];
int n;

while((n = read(fd, buf, MAXLINE))) {
if(n < 0) {
if(errno == EINTR) {
continue;
} else {
err_sys("str_echo: read error");
}
} else {
Writen(fd, buf, n);
}
}
}

static void* thread_main(void *arg) {
int connfd = (int)arg;

CHECK(pthread_detach(pthread_self()));
str_echo(connfd);
Close(connfd);

return arg;
}

int main(int argc, char *argv[]) {
struct sockaddr_in servaddr;
int listenfd;
int connfd;
pthread_t tid;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

for(;;) {
connfd = Accept(listenfd, NULL, NULL);
CHECK(pthread_create(&tid, NULL, thread_main, (void*)connfd));
}

return 0;
}

main函数

accept返回之后,改为调用pthread_create取代调用fork,为每个连接的客户端创建一个线程。传递给线程执行函数thread_main的参数是已连接套接字描述符connfd

thread_main函数

thread_main是由线程执行的函数。线程首先让自身脱离,因为主线程不用等待它创建的每个线程。然后调用str_echo函数。该函数返回之后,必须close已连接套接字,因为本线程和主线程共享所有的描述符。对应使用fork的情形,子进程就不必close已连接套接字,因为子进程随即终止,而所有打开的描述符在进程终止时都被关闭。

还要注意的是,主线程不关闭已连接套接字,而在调用fork的并发服务器程序中需要关闭已连接的套接字,这是因为同一进程内的所有线程共享全部描述符,要是主线程调用close,它就会终止相应的连接。创建新线程并不影响已打开描述符的引用计数,这一点不同于fork

CHECK宏

用于检测线程函数的返回值是否正确。

运行程序,使用pstree查看线程:

[heql@ubuntu ~]$ pstree -p 2696
server(2696)─┬─{server}(2721)
             ├─{server}(2798)
             └─{server}(2823)

上面的服务器程序创建了三个线程,用于处理三个不同的客户端的连接。

使用多线程的echo客户端程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#include <stdio.h> 
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>

#include "error.h"

#define MAXLINE 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define CHECK(ret) ({__typeof__(ret) errnum = (ret); \
assert(errnum == 0); (void)errnum;})

struct file_desc {
FILE *fp;
int fd;
};

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Inet_pton(int family, const char *strptr, void *addrptr) {
if(inet_pton(family, strptr, addrptr) <=0) {
err_sys("inet_pton error for %s", strptr);
}
}

static void Connect(int fd, struct sockaddr *sa, socklen_t salen) {
if(connect(fd, sa, salen) < 0) {
err_sys("connect error");
}
}

static const char* Fgets(char *ptr, int n, FILE *stream) {
char *ret_ptr;

if((ret_ptr = fgets(ptr, n, stream)) == NULL && ferror(stream)) {
err_sys("fgets error");
}

return ret_ptr;
}

static int writen(int fd, void *vptr, size_t nbytes) {
const char *ptr = vptr;
size_t left = nbytes;
int nwritten;

while(left > 0) {
if((nwritten = write(fd, vptr, left)) < 0) {
return -1;
}

ptr += nwritten;
left -= nwritten;
}

return nbytes;
}

static void Writen(int fd, void *vptr, size_t nbytes) {
if(writen(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static int readline(int fd, void *vptr, size_t maxlen) {
int i;
char *ptr = vptr;
int n;
char c;

for(i = 0; i < maxlen - 1; ++i) {
if((n = read(fd, &c, 1)) == 1) {
*ptr++ = c;
if(c == '\n') {
break; /* newline */
}
} else if(n == 0) {
if(i == 0) {
return 0; /* EOF, no data was read */
} else {
break; /* EOF, some data was read */
}
} else {
if(errno == EINTR) {
continue;
} else {
return -1; /* read error */
}
}
}

*ptr = '\0';

return i + 1;
}

static int Readline(int fd, void *vptr, size_t maxlen) {
int n;

if((n = readline(fd, vptr, maxlen)) < 0) {
err_sys("read error");
}

return n;
}

static void Fputs(const char *ptr, FILE *stream) {
if(fputs(ptr, stream) == EOF) {
err_sys("fputs error");
}
}

static void Shutdown(int fd, int how) {
if(shutdown(fd, how) < 0) {
err_sys("shutdown error");
}
}

static void* copy_to(void *arg) {
char sendline[MAXLINE];
struct file_desc* f = (struct file_desc*)arg;

while((Fgets(sendline, MAXLINE, f->fp)) != NULL) {
Writen(f->fd, sendline, strlen(sendline));
}

Shutdown(f->fd, SHUT_WR);

return NULL;
}

static void str_cli(FILE *fp, int sockfd) {
char recvline[MAXLINE];
pthread_t tid;

struct file_desc f;
f.fp = fp;
f.fd = sockfd;

CHECK(pthread_create(&tid, NULL, copy_to, &f));

while(Readline(sockfd, recvline, MAXLINE)) {
Fputs(recvline, stdout);
}
}

int main(int argc, char *argv[]) {
struct sockaddr_in servaddr;
int sockfd;

if(argc != 2) {
err_quit("%s <IPAddress>", argv[0]);
}

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
Inet_pton(AF_INET, argv[1], &servaddr.sin_addr);

Connect(sockfd, (SA *) &servaddr, sizeof(servaddr));

str_cli(stdin, sockfd);

return 0;
}

main函数

创建套接字,调用connect建立于服务器的连接。调用str_cli函数。

str_cli函数

创建一个线程,执行copy_to线程函数,线程的参数为标准输入的描述符和连接套接字的结构体。主线程调用readlinefputs,把从套接字读入的每个文本行复制到标准输出。

copy_to线程函数

该线程只是把读入标准输入的每个文本行发送给服务器。当在标准输入上读到EOF时,它通过调用shutdown从套接字发送 FIN,然后返回。注意这里不能直接调用close,因为主线程还需要从该套接字读取服务器返回的数据。

运行程序,使用pstree查看线程:

[heql@ubuntu ~]$ pstree -p 2949
client(2949)───{client}(2951)