这一篇我们用epoll改写之前写的简单聊天室,Epoll是Linux内核为处理大批量句柄而作了改进的poll。
我们要用到epoll的三个函数,分别是:
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
下面对要用到epoll的操作进行封装
Epoll.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
#ifndef EPOLL_H #define EPOLL_H #include "Socket.h" #include <sys/epoll.h> #include <sys/resource.h> const int MAXEPOLLSIZE=MAXCONNECTION+5; class Epoll { public: Epoll(); bool Add(int fd,int eventsOption); //Returns the number of triggered events int Wait(); bool Delete(const int eventIndex); int GetEventOccurfd(const int eventIndex) const; int GetEvents(const int eventIndex) const; private: int epollfd; int fdNumber; struct epoll_event event; struct epoll_event events[MAXEPOLLSIZE]; struct rlimit rt; }; #endif |
Socket类的实现见我的这篇博文Linux socket编程(一) 对套接字操作的封装
更好一点的做法是把Socket类做成一个共享函数库
Epoll.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
#include "Epoll.h" #include <stdio.h> #include <stdlib.h> Epoll::Epoll() :fdNumber(0) { //set resource limits respectively rt.rlim_max=rt.rlim_cur=MAXEPOLLSIZE; if(::setrlimit(RLIMIT_NOFILE, &rt) == -1) { perror("setrlimit"); exit(1); } //create epoll epollfd=epoll_create(MAXEPOLLSIZE); } bool Epoll::Add(int fd,int eventsOption) { //handle readable event,set Edge Triggered event.events=eventsOption;//EPOLLIN | EPOLLET; event.data.fd=fd; if(epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event)<0) return false; fdNumber++; return true; } bool Epoll::Delete(const int eventIndex) { if(epoll_ctl(epollfd,EPOLL_CTL_DEL, events[eventIndex].data.fd,&event)<0) return false; fdNumber--; return true; } int Epoll::Wait() { int eventNumber; eventNumber=epoll_wait(epollfd,events,fdNumber,-1); if(eventNumber<0) { perror("epoll_wait"); exit(1); } return eventNumber; } int Epoll::GetEventOccurfd(const int eventIndex) const { return events[eventIndex].data.fd; } int Epoll::GetEvents(const int eventIndex) const { return events[eventIndex].events; } |
现在考虑如何把epol用到socket的通信中
参考了这篇博文 http://www.cnblogs.com/OnlyXP/archive/2007/08/10/851222.html
epoll有两种触发模式:
LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你 的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.
ET(edge-triggered) 是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述 符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致 了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。
接下来我们使用边沿触发这种方式(ET),先看一下手册是怎么说的(man epoll):
Q9 Do I need to continuously read/write a file descriptor until EAGAIN when
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
using the <strong>EPOLLET</strong> flag (edge-triggered behavior) ? <strong>A9</strong> Receiving an event from <a href="http://www.kernel.org/doc/man-pages/online/pages/man2/epoll_wait.2.html">epoll_wait(2)</a> should suggest to you that such file descriptor is ready for the requested I/O operation. You must consider it ready until the next (nonblocking) read/write yields <strong>EAGAIN</strong>. When and how you will use the file descriptor is entirely up to you. For packet/token-oriented files (e.g., datagram socket, terminal in canonical mode), the only way to detect the end of the read/write I/O space is to continue to read/write until <strong>EAGAIN</strong>. For stream-oriented files (e.g., pipe, FIFO, stream socket), the condition that the read/write I/O space is exhausted can also be detected by checking the amount of data read from / written to the target file descriptor. For example, if you call <a href="http://www.kernel.org/doc/man-pages/online/pages/man2/read.2.html">read(2)</a> by asking to read a certain amount of data and <a href="http://www.kernel.org/doc/man-pages/online/pages/man2/read.2.html">read(2)</a> returns a lower number of bytes, you can be sure of having exhausted the read I/O space for the file descriptor. The same is true when writing using <a href="http://www.kernel.org/doc/man-pages/online/pages/man2/write.2.html">write(2)</a>. (Avoid this latter technique if you cannot guarantee that the monitored file descriptor always refers to a stream-oriented file.) |
意思大概是说当使用ET这种方式时,要不断地对文件描诉符进行读/写,直至遇到EAGAIN为止。
为什么要这样呢:
假 如发送端流量大于接收端的流量 (意思是epoll所在的程序读比转发的socket要快),由于是非阻塞的socket,那么send()函数虽然返回,但实际缓冲区的数据并未真正发 给接收端,这样不断的读和发,当缓冲区满后会产生EAGAIN错误(参考man send),同时,不理会这次请求发送的数据.所以,需要封装socket_send()的函数用来处理这种情况,该函数会尽量将数据写完再返回,同样对 于recv函数也要进行相应的封装。
以下是我的封装:(英文注释写的不是很好,大家凑合着看吧)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
void EpollServerSocket::SendMessage(Socket& clientSocket,const std::string& message) const { while(true) { if(Socket::Send(clientSocket,message)==false) { // if(errno == EINTR) return; //this means the cache queue is full, //sleep 1 second and send again if(errno==EAGAIN) { sleep(1); continue; } } return; } } void EpollServerSocket::ReceiveMessage(Socket& clientSocket,std::string& message) { bool done=true; while(done) { int receiveNumber=Socket::Receive(clientSocket,message); if(receiveNumber==-1) { //if errno == EAGAIN, that means we have read all data. //so return if (errno != EAGAIN) { perror ("ReceiveMessage error"); DeleteClient(clientSocket.GetSocketfd()); } return; } else if(receiveNumber==0) { // End of file. The remote has closed the connection. DeleteClient(clientSocket.GetSocketfd()); } //if receiveNumber is equal to MAXRECEIVE, //maybe there is data still in cache,so it has to read again if(receiveNumber==MAXRECEIVE) done=true; else done=false; } } |
好了接下来是Socket类的派生类,EpollServerSocket类
EpollServerSocket.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#ifndef EPOLLSERVERSOCKET_H #define EPOLLSERVERSOCKET_H #include "Socket.h" #include "Epoll.h" #include <map> class EpollServerSocket:public Socket { public: EpollServerSocket(const int port); virtual ~EpollServerSocket(); void Run(); private: //when using the EPOLLET flag, //need to continuously read/write a file descriptor until EAGAIN, //so we write these two functions for read/write void SendMessage(Socket& clientSocket,const std::string& message) const; void ReceiveMessage(Socket& clientSocket,std::string& message); void ProcessMessage(Socket& clientSocket); void SendToAllUsers(const std::string& message) const; //add event to epoll bool AddNewClient(Socket& clientSocket); //delete client from map clientSockets void DeleteClient(int sockfd); std::map<int,Socket*> clientSockets; Epoll epoll; }; #endif |
以下是EpollServerSocket类的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
#include "EpollServerSocket.h" #include "SocketException.h" #include <iostream> #include <errno.h> #include <stdio.h> #define DEBUG EpollServerSocket::EpollServerSocket(const int port) { if ( ! Socket::Create() ) { throw SocketException ( "Could not create server socket." ); } if ( ! Socket::Bind ( port ) ) { throw SocketException ( "Could not bind to port." ); } if ( ! Socket::Listen() ) { throw SocketException ( "Could not listen to socket." ); } //set listener socket non-blocking!! Socket::SetNonBlocking(true); } EpollServerSocket::~EpollServerSocket() { std::map<int,Socket*>::iterator it; for(it=clientSockets.begin();it!=clientSockets.end();it++) delete it->second; } void EpollServerSocket::Run() { //add listener socketfd to epoll if(epoll.Add(Socket::GetSocketfd(),EPOLLIN)==false) return; int i; int eventNumber; Socket* clientSocket; while(true) { eventNumber=epoll.Wait(); #ifdef DEBUG std::cout<<"eventNumbers: "<<eventNumber<<" "; #endif for(i=0;i<eventNumber;i++ ) { if ((epoll.GetEvents(i) & EPOLLERR) || (epoll.GetEvents(i) & EPOLLHUP) || (!(epoll.GetEvents(i) & EPOLLIN))) { /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */ perror ("epoll error\n"); DeleteClient(epoll.GetEventOccurfd(i)); continue; } //if event is triggered by listener socket else if(epoll.GetEventOccurfd(i)==Socket::GetSocketfd()) { clientSocket=new Socket(); if(AddNewClient(*clientSocket)==false) return; clientSockets[clientSocket->GetSocketfd()]=clientSocket; } //else event is triggered by client sockets else { clientSocket=clientSockets[epoll.GetEventOccurfd(i)]; ProcessMessage(*clientSocket); } } } } void EpollServerSocket::ProcessMessage(Socket& clientSocket) { std::string message; ReceiveMessage(clientSocket,message); if(message=="exit") { SendMessage(clientSocket,"user_exit"); DeleteClient(clientSocket.GetSocketfd()); } else SendToAllUsers(message); } bool EpollServerSocket::AddNewClient(Socket& clientSocket) { if(Socket::Accept(clientSocket)==false) return false; //set socket non-blocking!! clientSocket.SetNonBlocking(true); if(epoll.Add(clientSocket.GetSocketfd(),EPOLLIN | EPOLLET)==false) return false; #ifdef DEBUG std::cout<<"New user...\n"; #endif return true; } void EpollServerSocket::DeleteClient(int sockfd) { //epoll doesn't need to handle events from sockfd anymore epoll.Delete(sockfd); delete clientSockets[sockfd]; clientSockets.erase(sockfd); } void EpollServerSocket::SendToAllUsers(const std::string& message) const { std::map<int,Socket*>::const_iterator it; for(it=clientSockets.begin();it!=clientSockets.end();it++) SendMessage(*(it->second),message); } void EpollServerSocket::SendMessage(Socket& clientSocket,const std::string& message) const { while(true) { if(Socket::Send(clientSocket,message)==false) { //this means the socket can be wrote if(errno == EINTR) return; //this means the cache queue is full, //sleep 1 second and send again if(errno==EAGAIN) { sleep(1); continue; } } return; } } void EpollServerSocket::ReceiveMessage(Socket& clientSocket,std::string& message) { bool done=true; while(done) { int receiveNumber=Socket::Receive(clientSocket,message); if(receiveNumber==-1) { //if errno == EAGAIN, that means we have read all data. if (errno != EAGAIN) { perror ("ReceiveMessage error"); DeleteClient(clientSocket.GetSocketfd()); } return; } else if(receiveNumber==0) { // End of file. The remote has closed the connection. DeleteClient(clientSocket.GetSocketfd()); } //if receiveNumber is equal to MAXRECEIVE, //maybe there is data still in cache,so it has to read again if(receiveNumber==MAXRECEIVE) done=true; else done=false; } } |
(以前写的客户端不用更改,直接可以与这个服务器通信)
对于大数据量的传输,很明显要不断地进行读/写,这样就会出现长时间的阻塞,甚至成为系统的性能瓶颈
但是对于只有较少活跃的socket,同时数据量较小的情况,epoll的效率应该是比select和poll高的(呃,不过没有很好的测试过)
不过好像有一种做法可以避免阻塞,就是利用EPOLLOUT事件
“EPOLLOUT事件的意思就是 当前这个socket的发送状态是空闲的,此时处理能力很强,告知用户可以发送数据。
所以在正常情况下,基本上socket在epoll_wait后,都会得到一个socket的EPOLLOUT事件。
【如果你不是一直在写数据或者你不是在传送一个几百M的数据文件,send一半都处于空闲状态】
而这个特性刚好可以处理 阻塞问题。
当数据发送不出去的时候,说明网络阻塞或者延迟太厉害了。
那么将要发送的数据放在一个buffer中,当下次你发现了EPOLLOUT事件时,说明现在网络处于空闲状态,OK,此时你可以用另外一个线程来发送上次堆积在buffer中的数据了。这样就不会阻塞了“
发表评论
要发表评论,您必须先登录。