线程池


上文中使用了预先派生一个子进程池来处理客户端的连接,这会比为每个客户端连接现场派生一个子进程快的得多。但是进程的创建和调度比起线程来说,还是比较慢的。

在多线程并发服务器中可以为每个客户端的连接创建一个线程,我们也可以使用线程池来取代为每个客户端现场创建一个连接。并让每个线程各自调用accept。取代每个线程都阻塞在accept调用中的做法,使用互斥锁以保证任何时刻只有一个线程在调用accept

TCP预先创建线程服务器程序,每个线程各自accept

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#include <stdio.h>                                                      
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
#include <assert.h>

#include "error.h"

#define LISTENQ 1024
#define MAX_BUF 4096
#define SA struct sockaddr
#define MAX_THREADS 24
#define SERV_PORT 9987
#define min(a, b) ((a) < (b) ? (a) : (b))
#define CHECK(ret) ({__typeof__(ret) errnum = (ret); \
assert(errnum == 0); (void)errnum;})

struct Thread {
pthread_t thread_id;
int thread_count;
};

struct Thread thread[MAX_THREADS];

static int num_thread = 0;
static int listenfd;
static pthread_mutex_t mutex_lock = PTHREAD_MUTEX_INITIALIZER;

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static int Bind(int fd, struct sockaddr *sa, socklen_t salen) {
if(bind(fd, sa, salen) < 0) {
err_sys("bind error");
}
}

static int Listen(int fd, int backlog) {
const char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}

if(listen(fd, backlog) < 0) {
err_sys("listen error");
}
}

static int tcp_listen(void) {
int listenfd;
struct sockaddr_in servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

return listenfd;
}

static int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno != EINTR && errno != ECONNABORTED) {
err_sys("accept error");
}
}

return n;
}

static int readline(int fd, void *vptr, size_t maxlen) {
int i;
char *ptr = vptr;
int n;
char c;

for(i = 0; i < maxlen - 1; ++i) {
if((n = read(fd, &c, 1)) == 1) {
*ptr++ = c;
if(c == '\n') {
break; /* newline */
}
} else if(n == 0) {
if(i == 0) {
return 0; /* EOF, no data was read */
} else {
break; /* EOF, some data was read */
}
} else {
if(errno == EINTR) {
continue;
} else {
return -1; /* read error */
}
}
}

*ptr = '\0';

return i + 1;
}

static int Readline(int fd, void *vptr, size_t maxlen) {
int n;

if((n = readline(fd, vptr, maxlen)) < 0) {
err_sys("read error");
}

return n;
}

static int writen(int fd, void *vptr, size_t nbytes) {
const char *ptr = vptr;
size_t left = nbytes;
int nwritten;

while(left > 0) {
if((nwritten = write(fd, vptr, left)) < 0) {
return -1;
}

ptr += nwritten;
left -= nwritten;
}

return nbytes;
}

static void Writen(int fd, void *vptr, size_t nbytes) {
if(writen(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static void Close(fd) {
if(close(fd) == -1) {
err_sys("close error");
}
}

static void process_request(int fd) {
char recvline[MAX_BUF];
char result[MAX_BUF];
int nread;
int towrite;

for(;;) {
if((nread = Readline(fd, recvline, MAX_BUF)) == 0) {
return;
}

towrite = atol(recvline);

if(towrite <= 0 || towrite > MAX_BUF) {
err_quit("client request for %d bytes", towrite);
}
Writen(fd, result, towrite);
}
}

static void* thread_main(void *arg) {
int i = (int)arg;
int connfd;

printf("thread %d starting\n", i);

for(;;) {
CHECK(pthread_mutex_lock(&mutex_lock));
connfd = Accept(listenfd, NULL, NULL);
CHECK(pthread_mutex_unlock(&mutex_lock));

++thread[i].thread_count;
process_request(connfd);
Close(connfd);
}

return NULL;
}

static void thread_make(int i) {
CHECK(pthread_create(&thread[i].thread_id, NULL, thread_main, (void *)i));

return;
}

static void sig_int(int signo) {
int i;

printf("\n");

for(i = 0; i < num_thread; ++i) {
printf("child %d, %d connections\n", i, thread[i].thread_count);
}

exit(0);
}

int main(int argc, char *argv[]) {
int i;

if(argc != 2) {
err_quit("usage: %s <num_thread>", argv[0]);
}

num_thread = min(atoi(argv[1]), MAX_THREADS);
listenfd = tcp_listen();

for(i = 0; i < num_thread; ++i) {
thread[i].thread_count = 0;
thread_make(i);
}

signal(SIGINT, sig_int);

for(;;) {
pause();
}

return 0;
}

main函数

调用tcp_listen函数创建一个监听套接字。调用thread_make预先创建线程,创建的个数由命令行指定,最大的个数不能超过MAX_CHILDRENSIGINT信号处理函数,在终止服务器(CTRL+C)时被调用,用于统计各个线程处理的连接数。

thread_make函数

创建线程,执行thread_main函数。

thread_main函数

每个线程在调用accept时,需要获得互斥锁,如果没有获得互斥锁,本线程将阻塞,然后调用accept返回一个已连接套接字,调用process_request函数处理客户请求,最后关闭连接。

互斥锁

多个线程更改一个共享的变量,其解决办法是使用一个互斥锁保护这个共享变量,访问该变量的前提条件是持有该互斥锁。互斥锁是类型为pthread_mutex_t的变量。使用以下两个函数为一个互斥锁上锁和解锁。

1
2
3
4
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mptr);
int pthread_mutex_unlock(pthread_mutex_t *mptr);
返回:若成功则为0,若出错则为正的Exxx值

如果试图上锁已被另外某个线程锁住的一个互斥锁,本线程将被阻塞,直到该互斥锁被解锁为止。

如果某个互斥锁变量是静态分配的,就必须把它初始化为常值PTHREAD_MUTEX_INITIALIZER,如果在共享内存区中分配一个互斥锁,那么必须通过调用pthread_mutex_init函数在运行时把它初始化。

sig_int函数

在服务器终止(CTRL+C)时被调用,用于打印在每个线程处理的连接数。

运行服务器程序:

服务器程序创建了10个线程,输出如下信息:

[heql@ubuntu socket]$ ./server 10
thread 5 starting
thread 6 starting
thread 4 starting
thread 7 starting
thread 3 starting
thread 8 starting
thread 9 starting
thread 2 starting
thread 1 starting
thread 0 starting

运行客户端程序:

使用上文中的客户端程序:客户端使用5个子进程各自发起5000次连接。在每个连接上,客户向服务器发送请求4096字节数据,服务器将向每个连接的客户返回4096字节数据。

[heql@ubuntu socket]$ ./test_client 192.168.1.156 9987 5 5000 4096
child 1 done
child 0 done
child 3 done
child 2 done
child 4 done

终止服务器

客户端程序运行完后,按下CTRL+C终止服务器,查看每个线程处理的连接个数:

^C
child 0, 2461 connections
child 1, 2483 connections
child 2, 2531 connections
child 3, 2511 connections
child 4, 2519 connections
child 5, 2510 connections
child 6, 2486 connections
child 7, 2505 connections
child 8, 2470 connections
child 9, 2524 connections

TCP预先创建线程服务器程序,主线程统一accept

在程序启动阶段创建一个线程池之后,也可以只让主线程调用accept并把每个客户连接传递给线程池中的某个可用线程。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
#include <stdio.h>                                                      
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
#include <assert.h>

#include "error.h"

#define LISTENQ 1024
#define MAX_BUF 4096
#define SA struct sockaddr
#define SERV_PORT 9987
#define MAX_THREADS 24
#define MAX_CLIENT_FD 24
#define min(a, b) ((a) < (b) ? (a) : (b))
#define CHECK(ret) ({__typeof__(ret) errnum = (ret); \
assert(errnum == 0); (void)errnum;})

struct Thread {
pthread_t thread_id;
int thread_count;
};

static int client_fd[MAX_CLIENT_FD];
static int front;
static int back;
struct Thread thread[MAX_THREADS];

static int num_thread = 0;
static pthread_mutex_t mutex_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t client_fd_cond = PTHREAD_COND_INITIALIZER;

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static int Bind(int fd, struct sockaddr *sa, socklen_t salen) {
if(bind(fd, sa, salen) < 0) {
err_sys("bind error");
}
}

static int Listen(int fd, int backlog) {
const char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}

if(listen(fd, backlog) < 0) {
err_sys("listen error");
}
}

static int tcp_listen(void) {
int listenfd;
struct sockaddr_in servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

return listenfd;
}

static int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno != EINTR && errno != ECONNABORTED) {
err_sys("accept error");
}
}

return n;
}

static int readline(int fd, void *vptr, size_t maxlen) {
int i;
char *ptr = vptr;
int n;
char c;

for(i = 0; i < maxlen - 1; ++i) {
if((n = read(fd, &c, 1)) == 1) {
*ptr++ = c;
if(c == '\n') {
break; /* newline */
}
} else if(n == 0) {
if(i == 0) {
*ptr = '\0';
return i; /* EOF */
}
} else {
if(errno == EINTR) {
continue;
} else {
return -1; /* read error */
}
}
}

*ptr = '\0';

return i + 1;
}

static int Readline(int fd, void *vptr, size_t maxlen) {
int n;

if((n = readline(fd, vptr, maxlen)) < 0) {
err_sys("read error");
}

return n;
}

static int writen(int fd, void *vptr, size_t nbytes) {
const char *ptr = vptr;
size_t left = nbytes;
int nwritten;

while(left > 0) {
if((nwritten = write(fd, vptr, left)) < 0) {
return -1;
}

ptr += nwritten;
left -= nwritten;
}

return nbytes;
}

static void Writen(int fd, void *vptr, size_t nbytes) {
if(writen(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static void Close(fd) {
if(close(fd) == -1) {
err_sys("close error");
}
}

static void process_request(int fd) {
char recvline[MAX_BUF];
char result[MAX_BUF];
int nread;
int towrite;

for(;;) {
if((nread = Readline(fd, recvline, MAX_BUF)) == 0) {
return;
}

towrite = atol(recvline);

if(towrite <= 0 || towrite > MAX_BUF) {
err_quit("client request for %d bytes", towrite);
}
Writen(fd, result, towrite);
}
}

static void* thread_main(void *arg) {
int i = (int)arg;
int connfd;

printf("thread %d starting\n", i);

for(;;) {
CHECK(pthread_mutex_lock(&mutex_lock));
while(front == back) {
CHECK(pthread_cond_wait(&client_fd_cond, &mutex_lock));
}
connfd = client_fd[front];
if(++front == MAX_CLIENT_FD) {
front = 0;
}
CHECK(pthread_mutex_unlock(&mutex_lock));

++thread[i].thread_count;
process_request(connfd);
Close(connfd);
}

return NULL;
}

static void thread_make(int i) {
CHECK(pthread_create(&thread[i].thread_id, NULL, thread_main, (void *)i));

return;
}

static void sig_int(int signo) {
int i;

printf("\n");

for(i = 0; i < num_thread; ++i) {
printf("child %d, %d connections\n", i, thread[i].thread_count);
}

exit(0);
}

int main(int argc, char *argv[]) {
int i;
int connfd;
int listenfd;

if(argc != 2) {
err_quit("usage: %s <num_thread>", argv[0]);
}

num_thread = min(atoi(argv[1]), MAX_THREADS);
listenfd = tcp_listen();

for(i = 0; i < num_thread; ++i) {
thread[i].thread_count = 0;
thread_make(i);
}

front = back = 0;

signal(SIGINT, sig_int);

for(;;) {
connfd = Accept(listenfd, NULL, NULL);

CHECK(pthread_mutex_lock(&mutex_lock));
client_fd[back] = connfd;
if(++back == MAX_CLIENT_FD) {
back = 0;
}

if(back == front) {
err_quit("back = front = %d", back);
}
CHECK(pthread_cond_signal(&client_fd_cond));
CHECK(pthread_mutex_unlock(&mutex_lock));
}

return 0;
}

定义存放已连接套接字描述符的共享数组

定义一个client_fd数组,由主线程往其中存入已接受连接的套接字描述符,并由线程池中的可用线程从中取出一个以服务相应的客户端。back是主线程将往该数组中存入的下一个元素的下标,front是线程池中某个线程将从该数组中取出的下一个元素的下标。使用互斥锁和条件变量对这些共享的数据结构进行保护。

条件变量

当需要一个让主循环进入睡眠,直到某个线程通知它有事可做才醒来的方法。条件变量结合互斥锁能够提供这个功能。互斥锁提供互斥机制,条件变量提供信号机制。

条件变量是类型为pthread_cond_t的变量。以下两个函数使用条件变量。

1
2
3
4
5
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);

int pthread_cond_signal(pthread_cond_t *cptr);
返回:若成功则为0,若出错则为正的Exxx值。

pthread_cond_wait函数把调用函数投入睡眠并释放调用线程持有的互斥锁,当调用线程后来从pthread_cond_wait返回时,该线程再次持有互斥锁。

为什么每个条件变量都要关联一个互斥锁呢?因为”条件”通常是线程之间共享的某个变量的值。允许不同线程设置和测试该变量,要求有一个与该变量关联的互斥锁。举例来说,下面的代码没有互斥锁,那么主循环将如下测试变量ndone。

1
2
3
while(ndone == 0) {
pthread_cond_wait(&ndone_cond, &ndone_mutex);
}

这里存在如此可能性:主线程外最后一个线程在主循环测试ndone==0之后,但在调用pthread_cond_wait之前递增ndone。如果发生这样的情形,最后那个”信号”就丢失了,造成主循环永远阻塞在pthread_cond_wait调用中,等待永远不再发生的某事再次出现。

pthread_cond_signal通常唤醒等在相应条件变量上的单个线程。有时候一个线程知道自己应该唤醒多个线程,这个情况下它可以调用pthread_cond_broadcast唤醒等在相应条件变量上的所有线程。

1
2
#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cptr);

main函数

主线程大部分时间阻塞在accept调用中,等待各个客户连接的到达。一旦客户连接到达,主线程就把它的已连接套接字描述符存入client_fd数组中,不过需要获取保护该数组的互斥锁。主线程还检查back下标没有赶上front下标(若赶上则说明该数组不够大)。并发送信号到条件变量信号,然后释放互斥锁,已允许线程池中某个线程为这个客户端服务。

thread_main函数

线程池中的每个线程都试图获取保护client_fd数组的互斥锁。获得之后就测试backfront,若两者相等,通过调用pthread_cond_wait睡眠在条件变量上。主线程接受一个连接后将调用pthread_cond_signal向条件变量发送信号,以唤醒睡眠在其上的线程。若backfront不等,则从client_fd数组中取出下一个元素以获得一个连接,然后调用process_request

实际上这个版本会稍微慢于上面中先获取一个互斥锁再调用accept的版本。原因在于这个版本需要同时获取互斥锁和条件变量。