libevent默认情况下是单线程,每个线程有且仅有一个event_base,对应一个struct event_base结构体,以及赋予其上的事件管理器,用来安排托管给它的一系列的事件。
当有一个事件发生的时候,event_base会在合适的时间去调用绑定在这个事件上的函数,直到这个函数执行完成,然后在返回安排其他事件。需要注意的是:合适的时间并不是立即。
例如:
1 2 |
struct event_base *base; base = event_base_new();//初始化libevent |
event_base_new对比epoll,可以理解为epoll里的epoll_create。
event_base内部有一个循环,循环阻塞在epoll调用上,当有一个事件发生的时候,才会去处理这个事件。其中,这个事件是被绑定在event_base上面的,每一个事件就会对应一个struct event,可以是监听的fd。
其中struct event 使用event_new 来创建和绑定,使用event_add来启用,例如:
1 2 |
struct event *listener_event; listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); |
参数说明:
base:event_base类型,event_base_new的返回值
listener:监听的fd,listen的fd
EV_READ|EV_PERSIST:事件的类型及属性
do_accept:绑定的回调函数
(void*)base:给回调函数的参数
event_add(listener_event, NULL);
对比epoll:
event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。
event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加事件。
注:libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)
EV_TIMEOUT: 超时
EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发
EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
EV_SIGNAL: POSIX信号量
EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除
EV_ET: Edge-Trigger边缘触发,相当于EPOLL的ET模式
事件创建添加之后,就可以处理发生的事件了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直到不再有需要关注的事件。
有了上面的分析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:
1. 创建socket,bind,listen,设置为非阻塞模式
2. 创建一个event_base,即
1 |
struct event_base * event_base_new(void) |
3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)。即
1 |
struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg) |
4. 启用该事件,即
1 |
int event_add(struct event *ev, const struct timeval *tv) |
5. 进入事件循环,即
1 |
int event_base_dispatch(struct event_base *event_base) |
有了这些知识储备,来看下官网上的demo,网址:http://www.wangafu.net/~nickm/libevent-book/01_intro.html,这里引用的例子是Example: A low-level ROT13 server with Libevent
首先来翻译下例子上面的一段话:
对于select函数来说,不同的操作系统有不同的代替函数,它包括:poll,epoll,kqueue,evport和/dev/poll。这些函数的性能都比select要好,其中epoll在IO中添加,删除,通知socket准备好方面性能复杂度为O(1)。
不幸的是,没有一个有效的接口是一个普遍存在的标准,linux下有epoll,BSDS有kqueue,Solaris 有evport和/dev/poll,等等。没有任何一个操作系统有它们中所有的,所以如果你想做一个轻便的高性能的异步应用程序,你就需要把这些接口抽象的封装起来,并且无论哪一个系统使用它都是最高效的。
这对于你来说就是最低级的libevent API,它提供了统一的接口取代了select,当它在计算机上运行的时候,使用了最有效的版本。
这里是ROT13服务器的另外一个版本,这次,他使用了libevent代替了select。这意味着我们不再使用fd_sets,取而代之的使用event_base添加和删除事件,它可能在select,poll,epoll,kqueue等中执行。
代码分析:
这是一个服务端的程序,可以处理客户端大并发的连接,当收到客户端的连接后,将收到的数据做了一个变换,如果是 ’a’-‘m’之间的字符,将其增加13,如果是 ’n’-‘z’之间的字符,将其减少13,其他字符不变,然后将转换后的数据发送给客户端。
例如:客户端发送:Client 0 send Message!
服务端会回复:Pyvrag 0 fraq Zrffntr!
在这个代码中没有使用bufferevent这个强大的东西,在一个结构体中自己管理了一个缓冲区。结构体为:
1 2 3 4 5 6 7 8 9 10 |
struct fd_state { char buffer[MAX_LINE];//缓冲区的大小 size_t buffer_used;//接收到已经使用的buffer大小,每次将接收到的数据字节数相加,当发送的字节数累计相加和buffer_used都相等时候,将它们都置为1 size_t n_written;//发送的累加字节数 size_t write_upto;//相当于一个临时变量,当遇到换行符的时,将其收到的字节数(换行符除外)赋给该值,当检测到写事件的时候,用已经发送的字节数和该数值做比较,若收到的字节总数小于该值,则发送数据,等于该值,将结构体中3个字节数统计变量都置为1,为什么会置为1呢,因为有一个换行符吧。 struct event *read_event; struct event *write_event; }; |
代码中自己管理了一个缓冲区,用于存放接收到的数据,发送的数据将其转换后也放入该缓冲区中,代码晦涩难懂,我也是经过打日志分析后,才明白点,这个缓冲区自己还得控制好。但是libevent 2已经提供了一个神器bufferevent,我们在使用的过程中最好不要自己管理这个缓冲区,之所以分析这个代码,是为了熟悉libevent 做服务端程序的流程及原理。
下面是代码,加有部分注释和日志:
代码:lowlevel_libevent_server.c
|
//说明,为了使我们的代码兼容win32网络API,我们使用evutil_socket_t代替int,使用evutil_make_socket_nonblocking代替fcntl /* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 80 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event; struct event *write_event; }; struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd) { struct fd_state *state = malloc(sizeof(struct fd_state)); if (!state) return NULL; state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if (!state->read_event) { free(state); return NULL; } state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); if (!state->write_event) { event_free(state->read_event); free(state); return NULL; } state->buffer_used = state->n_written = state->write_upto = 0; assert(state->write_event); return state; } void free_fd_state(struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free(state); } void do_read(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; char buf[20]; int i; ssize_t result; printf("\ncome in do_read: fd: %d, state->buffer_used: %d, sizeof(state->buffer): %d\n", fd, state->buffer_used, size of(state->buffer)); while (1) { assert(state->write_event); result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; printf("recv once, fd: %d, recv size: %d, recv buff: %s\n", fd, result, buf); for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer))//如果读事件的缓冲区还未满,将收到的数据做转换 state->buffer[state->buffer_used++] = rot13_char(buf[i]); // state->buffer[state->buffer_used++] = buf[i];//接收什么发送什么,不经过转换,测试用 if (buf[i] == '\n') //如果遇到换行,添加写事件,并设置写事件的大小 { assert(state->write_event); event_add(state->write_event, NULL); state->write_upto = state->buffer_used; printf("遇到换行符,state->write_upto: %d, state->buffer_used: %d\n",state->write_upto, state->buffer_use d); } } printf("recv once, state->buffer_used: %d\n", state->buffer_used); } //判断最后一次接收的字节数 if (result == 0) { free_fd_state(state); } else if (result < 0) { if (errno == EAGAIN) // XXXX use evutil macro return; perror("recv"); free_fd_state(state); } } void do_write(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; printf("\ncome in do_write, fd: %d, state->n_written: %d, state->write_upto: %d\n",fd, state->n_written, state->write _upto); while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { if (errno == EAGAIN) // XXX use evutil macro return; free_fd_state(state); return; } assert(result != 0); state->n_written += result; printf("send fd: %d, send size: %d, state->n_written: %d\n", fd, result, state->n_written); } if (state->n_written == state->buffer_used) { printf("state->n_written == state->buffer_used: %d\n", state->n_written); state->n_written = state->write_upto = state->buffer_used = 1; printf("state->n_written = state->write_upto = state->buffer_used = 1\n"); } event_del(state->write_event); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { // XXXX eagain?? perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */ } else { struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); /*XXX err*/ assert(state->write_event); event_add(state->read_event, NULL); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new();//初始化libevent if (!base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0;//本机 sin.sin_port = htons(8000); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { // setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; } |
编译:gcc -I/usr/include -o test lowlevel_libevent_server.c -L/usr/local/lib -levent
发表评论
要发表评论,您必须先登录。