epoll是linux下高并发服务器的完美方案,因为是基于事件触发的,所以比select快的不只是一个数量级。
单线程epoll,触发量可达到15000,但是加上业务后,因为大多数业务都与数据库打交道,所以就会存在阻塞的情况,这个时候就必须用多线程来提速。
下面是来一个网络连接创建一个线程处理业务,业务处理完,线程销毁。实际测试结果不是很理想,在没有业务的时候的测试结果是2000个/s
测试工具:stressmark
因为加了适用与ab的代码,所以也可以适用ab进行压力测试。
char buf[1000] = {0};
sprintf(buf,”HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s”,”Hello world!\n”);
send(socketfd,buf, strlen(buf),0);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/wait.h> #include <unistd.h> #include <arpa/inet.h> #include <openssl/ssl.h> #include <openssl/err.h> #include <fcntl.h> #include <sys/epoll.h> #include <sys/time.h> #include <sys/resource.h> #include <pthread.h> #define MAXBUF 1024 #define MAXEPOLLSIZE 10000 void* pthread_handle_message(void* para); /* setnonblocking - 设置句柄为非阻塞方式 */ int setnonblocking(int sockfd) { if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK) == -1) { return -1; } return 0; } static int count111 = 0; static time_t oldtime = 0, nowtime = 0; //------------------------------------------------------------ int main(int argc, char **argv) { int listener, new_fd, nfds, n, ret; struct epoll_event ev; int kdpfd, curfds; socklen_t len; struct sockaddr_in my_addr, their_addr; unsigned int myport, lisnum; struct epoll_event events[MAXEPOLLSIZE]; struct rlimit rt; if (argc > 1) myport = atoi(argv[1]); else myport = 8006; if (argc > 2) lisnum = atoi(argv[2]); else lisnum = 10; /* 设置每个进程允许打开的最大文件数 */ rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE; if (setrlimit(RLIMIT_NOFILE, &rt) == -1) { perror("setrlimit"); exit(1); } else printf("设置系统资源参数成功!/n"); /* 开启 socket 监听 */ if ((listener = socket(PF_INET, SOCK_STREAM, 0)) == -1) { perror("socket"); exit(1); } else printf("socket 创建成功!/n"); /*设置socket属性,端口可以重用*/ int opt = SO_REUSEADDR; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); /*设置socket为非阻塞模式*/ setnonblocking(listener); bzero(&my_addr, sizeof(my_addr)); my_addr.sin_family = PF_INET; my_addr.sin_port = htons(myport); if (argc > 3) my_addr.sin_addr.s_addr = inet_addr(argv[3]); else my_addr.sin_addr.s_addr = INADDR_ANY; if (bind (listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) { perror("bind"); exit(1); } else printf("IP 地址和端口绑定成功/n"); if (listen(listener, lisnum) == -1) { perror("listen"); exit(1); } else printf("开启服务成功!/n"); /* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */ kdpfd = epoll_create(MAXEPOLLSIZE); len = sizeof(struct sockaddr_in); ev.events = EPOLLIN | EPOLLET; ev.data.fd = listener; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%d/n", listener); return -1; } else printf("监听 socket 加入 epoll 成功!/n"); curfds = 1; while (1) { /* 等待有事件发生 */ nfds = epoll_wait(kdpfd, events, curfds, -1); if (nfds == -1) { perror("epoll_wait"); continue; } /* 处理所有事件 */ for (n = 0; n < nfds; ++n) { if (events[n].data.fd == listener) { new_fd = accept(listener, (struct sockaddr *) &their_addr, &len); if (new_fd < 0) { perror("accept"); continue; } else { //printf("有连接来自于: %s:%d, 分配的 socket 为:%d/n", inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd); } setnonblocking(new_fd); ev.events = EPOLLIN | EPOLLET; ev.data.fd = new_fd; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_fd, &ev) < 0) { fprintf(stderr, "把 socket '%d' 加入 epoll 失败!%s/n", new_fd, strerror(errno)); return -1; } curfds++; } else { pthread_attr_t attr; pthread_t threadId; /*初始化属性值,均设为默认值*/ pthread_attr_init(&attr); pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); /* 设置线程为分离属性*/ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (pthread_create(&threadId, &attr, pthread_handle_message, (void*)&(events[n].data.fd))) { perror("pthread_creat error!"); exit(-1); } } } } close(listener); return 0; } void* pthread_handle_message(void* para) { char recvBuf[1024] = { 0 }; int ret = 999; int rs = 1; int socketfd = *(int *)para; while (rs) { ret = recv(socketfd, recvBuf, 1024, 0);// 接受客户端消息 if (ret < 0) { //由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可//读在这里就当作是该次事件已处理过。 if (errno == EAGAIN) { printf("EAGAIN\n"); break; } else{ printf("recv error! errno:%d\n", errno); close(socketfd); break; } } else if (ret == 0) { // 这里表示对端的socket已正常关闭. rs = 0; } if (ret == sizeof(recvBuf)) rs = 1; // 需要再次读取 else rs = 0; } if (ret > 0){ count111++; struct tm *today; time_t ltime; time(&nowtime); if (nowtime != oldtime){ printf("%d\n", count111); oldtime = nowtime; count111 = 0; } char buf[1000] = { 0 }; sprintf(buf, "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s", "Hello world!\n"); send(socketfd, buf, strlen(buf), 0); } close(socketfd); } |
发表评论
要发表评论,您必须先登录。