I/O复用:epoll


epoll是Linux内核为处理大批量文件描述符而做了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。

epoll的优点

  • 支持一个进程打开大数目的文件描述符: select在一个进程所打开的文件描述符是有一定限制的,由FD_SETSIZE设置,默认值是1024。不过epoll则没有这个限制,它所支持的文件描述符的上限是系统最大可以打开文件的数目,可以在/proc/sys/fs/file-max查看,一般来说,这个数目和系统的内存有很大关系。

  • I/O效率不随文件描述符数目增加而线性下降: 传统的select/poll有一个缺点就是在一个很大的socket集合时,不过由于网络延时,任一时间只有部分的socket是 “活跃” 的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现下降。但是epoll不存在这个问题,它只会对 “活跃” 的socket进行操作。如果对于所有的socket基本上都是 “活跃” 的,epoll并不比select/poll效率高。如果应用中存在大量的短连接,epoll_ctl将被频繁地调用,这也可能会影响epoll的效率。

epoll的系统调用

epoll_create函数

1
2
3
#include <sys/epoll.h>
int epoll_create(int size);
返回:若成功则为非负描述符,若出错则为-1

创建一个epoll的文件描述符,size用来告诉内核这个监听的数目一共有多大。

epoll_ctl函数

1
2
3
 #include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
返回:若成功则为0,若出错则为-1

epoll的事件注册函数,它不同与select是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

第一个参数是epoll_create的返回值,第二个参数表示动作,用三个宏来表示:

  • EPOLL_CTL_ADD : 注册新的fdepfd
  • EPOLL_CTL_MOD : 修改已经注册的fd的监听事件
  • EPOLL_CTL_DEL : 从epfd中删除一个fd

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

1
2
3
4
5
6
7
8
9
10
11
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

events可以是以下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
  • EPOLLOUT :表示对应的文件描述符可以写
  • EPOLLPRI :表示对应的文件描述符有紧急的数据可读
  • EPOLLERR :表示对应的文件描述符发生错误
  • EPOLLHUP :表示对应的文件描述符被挂断
  • EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

epoll_wait函数

1
2
3
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1

等待事件的产生,类似于select/poll调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create时的size,参数timeout是超时时间(0会立即返回,-1是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

epoll工作模式

epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

LT模式 : 是缺省的工作方式,并且同时支持blocknon-block socket。在这种模式下,当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应程序并通知此事件。

ET模式 : 是高速工作方式,只支持non-block socket。在这种模式下,当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

使用epoll的echo服务器程序

LT模式

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#include <stdio.h>                                                        
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/time.h>

#include "error.h"

#define MAXLINE 4096
#define SERV_PORT 9987
#define SA struct sockaddr
#define LISTENQ 1024
#define MAX_EVENTS 1024

static int Socket(int family, int type, int protocol) {
int n;

if((n = socket(family, type, protocol)) < 0) {
err_sys("socket error");
}

return n;
}

static void Bind(int fd, struct sockaddr *sa, socklen_t salen) {
if(bind(fd, sa, salen) < 0) {
err_sys("bind error");
}
}

static void Listen(int fd, int backlog) {
char *ptr;

if((ptr = getenv("LISTENQ")) != NULL) {
backlog = atoi(ptr);
}

if(listen(fd, backlog) < 0) {
err_sys("listen error");
}
}

static int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) {
int n;

while((n = accept(fd, sa, salenptr)) < 0) {
if(errno != EINTR && errno != ECONNABORTED) {
err_sys("accept error");
}
}

return n;
}

static void Close(int fd) {
if(close(fd) == -1) {
err_sys("close error");
}
}

static int tcp_listen(void) {
int listenfd;
struct sockaddr_in servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

return listenfd;
}

static int writen(int fd, void *vptr, size_t nbytes) {
const char *ptr = vptr;
size_t left = nbytes;
int nwritten;

while(left > 0) {
if((nwritten = write(fd, vptr, left)) < 0) {
return -1;
}

ptr += nwritten;
left -= nwritten;
}

return nbytes;
}

static void Writen(int fd, void *vptr, size_t nbytes) {
if(writen(fd, vptr, nbytes) != nbytes) {
err_sys("write error");
}
}

static int Epoll_create(int size) {
int n;

if((n = epoll_create(size)) < 0) {
err_sys("epoll_create error");
}

return n;
}

static void Epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
if(epoll_ctl(epfd, op, fd, event) < 0) {
err_sys("epoll_ctl error");
}
}

static int Epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
int n;

if((n = epoll_wait(epfd, events, maxevents, timeout)) < 0) {
err_sys("epoll_wait error");
}

return n;
}

static void epoll_add_event(int epollfd, int event, int fd) {
struct epoll_event ev;

ev.events = event;
ev.data.fd = fd;

Epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

static void epoll_del_event(int epollfd, int fd) {
struct epoll_event ev;

Epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

int main(int argc, char *argv[]) {
int listenfd;
int epollfd;
int num_ready;
int i;
int n;
int connfd;
char buf[MAXLINE];
struct epoll_event events[MAX_EVENTS];

listenfd = tcp_listen();

epollfd = Epoll_create(MAX_EVENTS);

epoll_add_event(epollfd, EPOLLIN, listenfd);

for(;;) {
num_ready = Epoll_wait(epollfd, events, MAX_EVENTS, -1);

for(i = 0; i < num_ready; ++i) {
if(events[i].data.fd == listenfd) {

/* new connection */
connfd = Accept(listenfd, NULL, NULL);
epoll_add_event(epollfd, EPOLLIN, connfd);
} else {

/* client data */
connfd = events[i].data.fd;
if((n = read(connfd, buf, MAXLINE)) < 0) {
if(errno == ECONNRESET) {
epoll_del_event(epollfd, connfd);
Close(connfd);
} else {
err_sys("read error");
}
} else if(n == 0) {
epoll_del_event(epollfd, connfd);
Close(connfd);
} else {
Writen(connfd, buf, n);
}
}
}
}

return 0;
}
  1. 创建一个监听套接字。将监听套接字添加到epoll事件中,events使用的是EPOLLIN
  2. 调用epoll_wait以等待新的连接或者现有连接上有数据可读。如果返回的fd是监听套接字,表示有新的连接,则将新的描述符添加到epoll事件中。
  3. epoll_wait返回的是现有连接上有数据可读,则从这个连接上读取数据。如果客户端终止或收到 RST 则从epoll事件删除该描述符,并关闭它。

ET模式

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
static int Fcntl(int fd, int cmd, int arg) {
int n;

if((n = fcntl(fd, cmd, arg)) < 0) {
err_sys("fcntl error");
}

return n;
}

static void set_nonblocking(int fd) {
int val;

val = Fcntl(fd, F_GETFL, 0);
Fcntl(fd, F_SETFL, val | O_NONBLOCK);
}

static void epoll_add_event(int epollfd, int event, int fd) {
struct epoll_event ev;

ev.events = event;
ev.data.fd = fd;

ev.events |= EPOLLET;
Epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);

set_nonblocking(fd);
}

int main(int argc, char *argv[]) {
int listenfd;
int epollfd;
int num_ready;
int i;
int n;
int connfd;
int sockfd;
char buf[MAXLINE];
struct epoll_event events[MAX_EVENTS];

listenfd = tcp_listen();

epollfd = Epoll_create(MAX_EVENTS);

epoll_add_event(epollfd, EPOLLIN, listenfd);

for(;;) {
num_ready = Epoll_wait(epollfd, events, MAX_EVENTS, -1);

for(i = 0; i < num_ready; ++i) {
if(events[i].data.fd == listenfd) {

/* new connection */
while((connfd = accept(listenfd, NULL, NULL)) > 0) {
epoll_add_event(epollfd, EPOLLIN, connfd);
}

if(connfd < 0) {
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR) {
err_sys("accept error");
}
}
} else {

/* client data */
connfd = events[i].data.fd;

for(;;) {
n = read(connfd, buf, MAXLINE);

if(n < 0) {
if(errno == ECONNRESET) {
epoll_del_event(epollfd, connfd);
Close(connfd);
} else if(errno == EAGAIN || errno == EWOULDBLOCK) {
break;
} else {
err_sys("read error");
}
} else if(n == 0) {
epoll_del_event(epollfd, connfd);
Close(connfd);
break;
} else {
Writen(connfd, buf, n);
}
}
}
}
}

return 0;
}

在ET模式下,描述符只能是非阻塞的。对于非阻塞socket,read/write返回-1不一定网络真的出错了。

epoll的ET模式下,正确的读写方式为:

读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN 或者 EWOULDBLOCK
写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN 或者 EWOULDBLOCK