libevent默认情况下是单线程,每个线程有且仅有一个event_base,对应一个struct event_base结构体,以及赋予其上的事件管理器,用来安排托管给它的一系列的事件。
当有一个事件发生的时候,event_base会在合适的时间去调用绑定在这个事件上的函数,直到这个函数执行完成,然后在返回安排其他事件。需要注意的是:合适的时间并不是立即。
例如:
1 2 |
struct event_base *base; base = event_base_new();//初始化libevent |
event_base_new对比epoll,可以理解为epoll里的epoll_create。
event_base内部有一个循环,循环阻塞在epoll调用上,当有一个事件发生的时候,才会去处理这个事件。其中,这个事件是被绑定在event_base上面的,每一个事件就会对应一个struct event,可以是监听的fd。
其中struct event 使用event_new 来创建和绑定,使用event_add来启用,例如:
1 2 |
struct event *listener_event; listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); |
参数说明:
base:event_base类型,event_base_new的返回值
listener:监听的fd,listen的fd
EV_READ|EV_PERSIST:事件的类型及属性
do_accept:绑定的回调函数
(void*)base:给回调函数的参数
event_add(listener_event, NULL);
对比epoll:
event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。
event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加事件。
注:libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)
EV_TIMEOUT: 超时
EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发
EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
EV_SIGNAL: POSIX信号量
EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除
EV_ET: Edge-Trigger边缘触发,相当于EPOLL的ET模式
事件创建添加之后,就可以处理发生的事件了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直到不再有需要关注的事件。
有了上面的分析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:
1. 创建socket,bind,listen,设置为非阻塞模式
2. 创建一个event_base,即
1 |
struct event_base * event_base_new(void) |
3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)。即
1 |
struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg) |
4. 启用该事件,即
1 |
int event_add(struct event *ev, const struct timeval *tv) |
5. 进入事件循环,即
1 |
int event_base_dispatch(struct event_base *event_base) |
有了这些知识储备,来看下官网上的demo,网址:http://www.wangafu.net/~nickm/libevent-book/01_intro.html,这里引用的例子是Example: A low-level ROT13 server with Libevent
首先来翻译下例子上面的一段话:
对于select函数来说,不同的操作系统有不同的代替函数,它包括:poll,epoll,kqueue,evport和/dev/poll。这些函数的性能都比select要好,其中epoll在IO中添加,删除,通知socket准备好方面性能复杂度为O(1)。
不幸的是,没有一个有效的接口是一个普遍存在的标准,linux下有epoll,BSDS有kqueue,Solaris 有evport和/dev/poll,等等。没有任何一个操作系统有它们中所有的,所以如果你想做一个轻便的高性能的异步应用程序,你就需要把这些接口抽象的封装起来,并且无论哪一个系统使用它都是最高效的。
这对于你来说就是最低级的libevent API,它提供了统一的接口取代了select,当它在计算机上运行的时候,使用了最有效的版本。
这里是ROT13服务器的另外一个版本,这次,他使用了libevent代替了select。这意味着我们不再使用fd_sets,取而代之的使用event_base添加和删除事件,它可能在select,poll,epoll,kqueue等中执行。
代码分析:
这是一个服务端的程序,可以处理客户端大并发的连接,当收到客户端的连接后,将收到的数据做了一个变换,如果是 ’a’-‘m’之间的字符,将其增加13,如果是 ’n’-‘z’之间的字符,将其减少13,其他字符不变,然后将转换后的数据发送给客户端。
例如:客户端发送:Client 0 send Message!
服务端会回复:Pyvrag 0 fraq Zrffntr!
在这个代码中没有使用bufferevent这个强大的东西,在一个结构体中自己管理了一个缓冲区。结构体为:
1 2 3 4 5 6 7 8 9 10 |
struct fd_state { char buffer[MAX_LINE];//缓冲区的大小 size_t buffer_used;//接收到已经使用的buffer大小,每次将接收到的数据字节数相加,当发送的字节数累计相加和buffer_used都相等时候,将它们都置为1 size_t n_written;//发送的累加字节数 size_t write_upto;//相当于一个临时变量,当遇到换行符的时,将其收到的字节数(换行符除外)赋给该值,当检测到写事件的时候,用已经发送的字节数和该数值做比较,若收到的字节总数小于该值,则发送数据,等于该值,将结构体中3个字节数统计变量都置为1,为什么会置为1呢,因为有一个换行符吧。 struct event *read_event; struct event *write_event; }; |
代码中自己管理了一个缓冲区,用于存放接收到的数据,发送的数据将其转换后也放入该缓冲区中,代码晦涩难懂,我也是经过打日志分析后,才明白点,这个缓冲区自己还得控制好。但是libevent 2已经提供了一个神器bufferevent,我们在使用的过程中最好不要自己管理这个缓冲区,之所以分析这个代码,是为了熟悉libevent 做服务端程序的流程及原理。
下面是代码,加有部分注释和日志:
代码:lowlevel_libevent_server.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
//说明,为了使我们的代码兼容win32网络API,我们使用evutil_socket_t代替int,使用evutil_make_socket_nonblocking代替fcntl /* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 80 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event; struct event *write_event; }; struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd) { struct fd_state *state = malloc(sizeof(struct fd_state)); if (!state) return NULL; state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if (!state->read_event) { free(state); return NULL; } state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); if (!state->write_event) { event_free(state->read_event); free(state); return NULL; } state->buffer_used = state->n_written = state->write_upto = 0; assert(state->write_event); return state; } void free_fd_state(struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free(state); } void do_read(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; char buf[20]; int i; ssize_t result; printf("\ncome in do_read: fd: %d, state->buffer_used: %d, sizeof(state->buffer): %d\n", fd, state->buffer_used, size of(state->buffer)); while (1) { assert(state->write_event); result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; printf("recv once, fd: %d, recv size: %d, recv buff: %s\n", fd, result, buf); for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer))//如果读事件的缓冲区还未满,将收到的数据做转换 state->buffer[state->buffer_used++] = rot13_char(buf[i]); // state->buffer[state->buffer_used++] = buf[i];//接收什么发送什么,不经过转换,测试用 if (buf[i] == '\n') //如果遇到换行,添加写事件,并设置写事件的大小 { assert(state->write_event); event_add(state->write_event, NULL); state->write_upto = state->buffer_used; printf("遇到换行符,state->write_upto: %d, state->buffer_used: %d\n",state->write_upto, state->buffer_use d); } } printf("recv once, state->buffer_used: %d\n", state->buffer_used); } //判断最后一次接收的字节数 if (result == 0) { free_fd_state(state); } else if (result < 0) { if (errno == EAGAIN) // XXXX use evutil macro return; perror("recv"); free_fd_state(state); } } void do_write(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; printf("\ncome in do_write, fd: %d, state->n_written: %d, state->write_upto: %d\n",fd, state->n_written, state->write _upto); while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { if (errno == EAGAIN) // XXX use evutil macro return; free_fd_state(state); return; } assert(result != 0); state->n_written += result; printf("send fd: %d, send size: %d, state->n_written: %d\n", fd, result, state->n_written); } if (state->n_written == state->buffer_used) { printf("state->n_written == state->buffer_used: %d\n", state->n_written); state->n_written = state->write_upto = state->buffer_used = 1; printf("state->n_written = state->write_upto = state->buffer_used = 1\n"); } event_del(state->write_event); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { // XXXX eagain?? perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */ } else { struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); /*XXX err*/ assert(state->write_event); event_add(state->read_event, NULL); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new();//初始化libevent if (!base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0;//本机 sin.sin_port = htons(8000); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { // setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; } |
编译:gcc -I/usr/include -o test lowlevel_libevent_server.c -L/usr/local/lib -levent
发表评论
要发表评论,您必须先登录。