服务端代码:
epoll循环读写,处理EAGAIN,ECONNRESET,EINTR等错误,不错的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
#include <iostream> #include <sys/socket.h> #include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <stdio.h> #include <errno.h> #include <string.h> using namespace std; #define MAXLINE 5 #define OPEN_MAX 100 #define LISTENQ 20 #define SERV_PORT 5000 #define INFTIM 1000 void setnonblocking(int sock) { int opts; opts = fcntl(sock, F_GETFL); if (opts < 0) { perror("fcntl(sock,GETFL)"); return; } opts = opts | O_NONBLOCK; if (fcntl(sock, F_SETFL, opts) < 0) { perror("fcntl(sock,SETFL,opts)"); return; } } void CloseAndDisable(int sockid, epoll_event ee) { close(sockid); ee.data.fd = -1; } int main() { int i, maxi, listenfd, connfd, sockfd, epfd, nfds, portnumber; char line[MAXLINE]; socklen_t clilen; portnumber = 5000; //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件 struct epoll_event ev, events[20]; //生成用于处理accept的epoll专用的文件描述符 epfd = epoll_create(256); struct sockaddr_in clientaddr; struct sockaddr_in serveraddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); memset(&serveraddr, 0, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(portnumber); // bind and listen bind(listenfd, (sockaddr *) &serveraddr, sizeof(serveraddr)); listen(listenfd, LISTENQ); //设置与要处理的事件相关的文件描述符 ev.data.fd = listenfd; //设置要处理的事件类型 ev.events = EPOLLIN | EPOLLET; //ev.events=EPOLLIN; //注册epoll事件 epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); maxi = 0; int bOut = 0; for (;;) { if (bOut == 1) break; //等待epoll事件的发生 nfds = epoll_wait(epfd, events, 20, -1); //处理所发生的所有事件 cout << "\nepoll_wait returns\n"; for (i = 0; i < nfds; ++i) { if (events[i].data.fd == listenfd) //如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。 { connfd = accept(listenfd, (sockaddr *) &clientaddr, &clilen); if (connfd < 0) { perror("connfd<0"); return (1); } char *str = inet_ntoa(clientaddr.sin_addr); cout << "accapt a connection from " << str << endl; //设置用于读操作的文件描述符 setnonblocking(connfd); ev.data.fd = connfd; //设置用于注测的读操作事件 ev.events = EPOLLIN | EPOLLET; //ev.events=EPOLLIN; //注册ev epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); } else if (events[i].events & EPOLLIN) //如果是已经连接的用户,并且收到数据,那么进行读入。 { cout << "EPOLLIN" << endl; if ((sockfd = events[i].data.fd) < 0) continue; char * head = line; int recvNum = 0; int count = 0; bool bReadOk = false; while (1) { // 确保sockfd是nonblocking的 recvNum = recv(sockfd, head + count, MAXLINE, 0); if (recvNum < 0) { if (errno == EAGAIN) { // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读 // 在这里就当作是该次事件已处理处. bReadOk = true; break; } else if (errno == ECONNRESET) { // 对方发送了RST CloseAndDisable(sockfd, events[i]); cout << "counterpart send out RST\n"; break; } else if (errno == EINTR) { // 被信号中断 continue; } else { //其他不可弥补的错误 CloseAndDisable(sockfd, events[i]); cout << "unrecovable error\n"; break; } } else if (recvNum == 0) { // 这里表示对端的socket已正常关闭.发送过FIN了。 CloseAndDisable(sockfd, events[i]); cout << "counterpart has shut off\n"; break; } // recvNum > 0 count += recvNum; if (recvNum == MAXLINE) { continue; // 需要再次读取 } else // 0 < recvNum < MAXLINE { // 安全读完 bReadOk = true; break; // 退出while(1),表示已经全部读完数据 } } if (bReadOk == true) { // 安全读完了数据 line[count] = '\0'; cout << "we have read from the client : " << line; //设置用于写操作的文件描述符 ev.data.fd = sockfd; //设置用于注测的写操作事件 ev.events = EPOLLOUT | EPOLLET; //修改sockfd上要处理的事件为EPOLLOUT epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); } } else if (events[i].events & EPOLLOUT) // 如果有数据发送 { const char str[] = "hello from epoll : this is a long string which may be cut by the net\n"; memcpy(line, str, sizeof(str)); cout << "Write " << line << endl; sockfd = events[i].data.fd; bool bWritten = false; int writenLen = 0; int count = 0; char * head = line; while (1) { // 确保sockfd是非阻塞的 writenLen = send(sockfd, head + count, MAXLINE, 0); if (writenLen == -1) { if (errno == EAGAIN) { // 对于nonblocking 的socket而言,这里说明了已经全部发送成功了 bWritten = true; break; } else if (errno == ECONNRESET) { // 对端重置,对方发送了RST CloseAndDisable(sockfd, events[i]); cout << "counterpart send out RST\n"; break; } else if (errno == EINTR) { // 被信号中断 continue; } else { // 其他错误 } } if (writenLen == 0) { // 这里表示对端的socket已正常关闭. CloseAndDisable(sockfd, events[i]); cout << "counterpart has shut off\n"; break; } // 以下的情况是writenLen > 0 count += writenLen; if (writenLen == MAXLINE) { // 可能还没有写完 continue; } else // 0 < writenLen < MAXLINE { // 已经写完了 bWritten = true; break; // 退出while(1) } } if (bWritten == true) { //设置用于读操作的文件描述符 ev.data.fd = sockfd; //设置用于注测的读操作事件 ev.events = EPOLLIN | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); } } } } return 0; } |
注意以下几点:
1. #14设定为5是故意的,为了测试后续的输入和输出
2. 整个服务器的功能是先读取字符串,然后向对方写内容。
3. #110处设置通信socket为非阻塞。
4. 注意#130~#183的读干净缓冲区的read。
5. 注意#213#264的完全写完所需要传送内容的write。
6. 关于EPOLLET,epoll_wait只有在socket状态发生变化的时候才会返回。所以要对fd进行循环accept,read, write;知直到socket的缓冲区空(read, accept)或者填满(write)为止。
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #define MAXLINE 5 void err_sys(const char* str) { perror(str); } int main(int argc, char **argv) { int sockfd; char recvline[MAXLINE + 1]; struct sockaddr_in servaddr; if (argc != 2) err_quit("usage: a.out <IPaddress>"); if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) err_sys("socket error"); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(5000); if (inet_pton(AF_INET, argv[1], &servaddr.sin_addr) <= 0) err_quit("inet_pton error for %s", argv[1]); if (connect(sockfd, (sockaddr *) &servaddr, sizeof(servaddr)) < 0) err_sys("connect error"); char input[100]; while (fgets(input, 100, stdin) != NULL) { write(sockfd, input, strlen(input)); int n = 0; int count = 0; while (1) { n = read(sockfd, recvline + count, MAXLINE); if (n == MAXLINE) { count += n; continue; } else break; } printf("%s\n", recvline); } exit(0); } |
发表评论
要发表评论,您必须先登录。