epoll的一个demo,备忘(epoll+线程池)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
/** 张亚霏修改 文件名:epoll_demo.c 编译: gcc epoll_demo.c -pthread 程序源码如下(请自行编辑宏定义SERVER_IP为自己的IP): */ /*Linux 2.6 x86_64 only*/ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #include <sys/epoll.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <netdb.h> #include <pthread.h> #include <errno.h> /*线程池的线程数量*/ #define THREAD_MAX 1 /*监听端口*/ #define LISTEN_PORT 8000 /* 监听端口的数量,从LISTEN_PORT到LISTEN_PORT+LISTEN_MAX-1 */ #define LISTEN_MAX 1 #define SERVER_IP "127.0.0.1" typedef struct { char ip4[128]; int port; int fd; } listen_info; //服务器参数 static listen_info s_listens[LISTEN_MAX]; //线程池参数 static unsigned int s_thread_para[THREAD_MAX][8];//线程参数 static pthread_t s_tid[THREAD_MAX];//线程ID pthread_mutex_t s_mutex[THREAD_MAX];//线程锁 //私有函数 static int init_thread_pool(void); static int init_listen4(char *ip4, int port, int max_link); //线程函数 void* test_server4(unsigned int thread_para[]); int main(int argc, char *argv[])//客户端驱动 { //临时变量 int i, j, rc; int sock_listen; //监听套接字 int sock_cli; //客户端连接 int listen_index; int epfd; int nfds; struct epoll_event ev; struct epoll_event events[LISTEN_MAX]; socklen_t addrlen; //地址信息长度 struct sockaddr_in addr4; //IPv4地址结构 //线程池初始化 rc = init_thread_pool(); if (0 != rc) exit(-1); //初始化服务监听 for (i = 0; i < LISTEN_MAX; i++) { sprintf(s_listens[i].ip4, "%s", SERVER_IP); s_listens[i].port = LISTEN_PORT + i; //创建监听 rc = init_listen4(s_listens[i].ip4, s_listens[i].port, 64); if (0 > rc) { fprintf(stderr, "无法创建服务器监听于%s:%d\n", s_listens[i].ip4, s_listens[i].port); perror(NULL); exit(-1); } s_listens[i].fd = rc; } //设置集合 epfd = epoll_create(8192); for (i = 0; i < LISTEN_MAX; i++) { //加入epoll事件集合 ev.events = EPOLLIN; ev.data.u32 = i;//记录listen数组下标 if (epoll_ctl(epfd, EPOLL_CTL_ADD, s_listens[i].fd, &ev) < 0) { fprintf(stderr, "向epoll集合添加套接字失败(fd =%d)\r\n", rc); exit(-1); } } //服务循环 for ( ; ; ) { //等待epoll事件 nfds = epoll_wait(epfd, events, LISTEN_MAX, -1); //处理epoll事件 for (i = 0; i < nfds; i++) { //接收客户端连接 listen_index = events[i].data.u32; sock_listen = s_listens[listen_index].fd; addrlen = sizeof(struct sockaddr_in); bzero(&addr4, addrlen); sock_cli = accept(sock_listen, (struct sockaddr *)&addr4, &addrlen); if (0 > sock_cli) { fprintf(stderr, "接收客户端连接失败\n"); continue; } //查询空闲线程对 for (j = 0; j < THREAD_MAX; j++) { if (0 == s_thread_para[j][0]) break; } if (j >= THREAD_MAX) { fprintf(stderr, "线程池已满, 连接将被放弃\r\n"); shutdown(sock_cli, SHUT_RDWR); close(sock_cli); continue; } //复制有关参数 s_thread_para[j][0] = 1;//设置活动标志为"活动" s_thread_para[j][1] = sock_cli;//客户端连接 s_thread_para[j][2] = listen_index;//服务索引 //线程解锁 pthread_mutex_unlock(s_mutex + j); }//end of for(i;;) }//end of for(;;) exit(0); } static int init_thread_pool(void) { int i, rc; //初始化线程池参数 for (i = 0; i < THREAD_MAX; i++) { s_thread_para[i][0] = 0;//设置线程占用标志为"空闲" s_thread_para[i][7] = i;//线程池索引 pthread_mutex_lock(s_mutex + i);//线程锁 } //创建线程池 for (i = 0; i < THREAD_MAX; i++) { rc = pthread_create(s_tid + i, 0, (void *)test_server4, (void *)(s_thread_para[i])); if (0 != rc) { fprintf(stderr, "线程创建失败\n"); return(-1); } } //成功返回 return(0); } static int init_listen4(char *ip4, int port, int max_link) { //临时变量 int sock_listen4; struct sockaddr_in addr4; unsigned int optval; struct linger optval1; //初始化数据结构 bzero(&addr4, sizeof(addr4)); inet_pton(AF_INET, ip4, &(addr4.sin_addr)); addr4.sin_family = AF_INET; addr4.sin_port = htons(port); //创建SOCKET sock_listen4 = socket(AF_INET, SOCK_STREAM, 0); if (0 > sock_listen4) return(-1); //设置SO_REUSEADDR选项(服务器快速重起) optval = 0x1; setsockopt(sock_listen4, SOL_SOCKET, SO_REUSEADDR, &optval, 4); //设置SO_LINGER选项(防范CLOSE_WAIT挂住所有套接字) optval1.l_onoff = 1; optval1.l_linger = 60; setsockopt(sock_listen4, SOL_SOCKET, SO_LINGER, &optval1, sizeof(struct linger)); if (0 > bind(sock_listen4, (struct sockaddr *)&addr4, sizeof(addr4))) { close(sock_listen4); return(-1); } if (0 > listen(sock_listen4, max_link)) { close(sock_listen4); return(-1); } return(sock_listen4); } void * test_server4(unsigned int thread_para[]) { //临时变量 int pool_index; //线程池索引 int sock_cli; //客户端连接 int listen_index; //监听索引 char buff[32768]; //传输缓冲区 char *p; int i, j, len; //线程脱离创建者 pthread_detach(pthread_self()); pool_index = thread_para[7]; wait_unlock: pthread_mutex_lock(s_mutex + pool_index);//等待线程解锁 //线程变量内容复制 sock_cli = thread_para[1];//客户端连接 listen_index = thread_para[2];//监听索引 //接收请求 len = recv(sock_cli, buff, 32768, MSG_NOSIGNAL); //构造响应 p = buff; //HTTP头 p += sprintf(p, "HTTP/1.1 200 OK\r\n"); p += sprintf(p, "Content-Type: text/html\r\n"); p += sprintf(p, "Connection: closed\r\n\r\n"); //页面 p += sprintf(p, "<html>\r\n<head>\r\n"); p += sprintf(p, "<meta content=\"text/html; charset=GBK\" http-equiv=\"Content-Type\">\r\n"); p += sprintf(p, "</head>\r\n"); p += sprintf(p, "<body style=\"background-color: rgb(229, 229, 229);\">\r\n"); p += sprintf(p, "<center>\r\n"); p += sprintf(p, "<H3>连接状态</H3>\r\n"); p += sprintf(p, "<p>服务器地址 %s:%d</p>\r\n", s_listens[listen_index].ip4, s_listens[listen_index].port); j = 0; for (i = 0; i < THREAD_MAX; i++) { if (0 != s_thread_para[i][0]) j++; } p += sprintf(p, "<H3>线程池状态</H3>\r\n"); p += sprintf(p, "<p>线程池总数 %d 活动线程总数 %d</p>\r\n", THREAD_MAX, j); p += sprintf(p, "</center></body></html>\r\n"); len = p - buff; //发送响应 send(sock_cli, buff, len, MSG_NOSIGNAL); //释放连接 shutdown(sock_cli, SHUT_RDWR); close(sock_cli); //线程任务结束 thread_para[0] = 0;//设置线程占用标志为"空闲" goto wait_unlock; pthread_exit(NULL); } |
发表评论
要发表评论,您必须先登录。