要用到多线程以及线程的读写锁,之前写的Socket类、ServerSocket要做相应的改变
因为服务器端要维持着一个存储客户端Socket信息到数据结构,当多个线程同时访问这个结构时,要做同步处理,所以要在适当的时候加上读锁或写锁。
新的ServerSocket类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
#ifndef SERVERSOCKET_H #define SERVERSOCKET_H #include "Socket.h" #include <list> #include <semaphore.h> #include "ThreadReadWriteLock.h" using std::list; class ServerSocket:public Socket { public: ServerSocket(const int port); ServerSocket(); virtual ~ServerSocket(); void Accept(Socket& socket); //run server to connect multi-clients void Run(); private: //accept multi-clients bool Accept(); void AddClient(Socket* clientSocket); static void DeleteClient(Socket* clientSocket); static void* ProcessMessage(void* arg); static void SendMsgToAllUsers(const std::string& message); static list<Socket*> clientSockets; static bool serviceFlag; //use thread-read-write-lock to synchronize threads static ThreadReadWriteLock readWriteLock; }; #endif |
其中有static成员函数,因为创建一个新的线程时,要传递一个函数指针,不过类普通的成员函数的函数指针与一般的函数指针是不兼容的,所以要传递static成员函数的函数指针。
以下是ServerSocket的新实现:
ServerSocket.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
#include "ServerSocket.h" #include "SocketException.h" #include <pthread.h> #include <iostream> list<Socket*> ServerSocket::clientSockets; ThreadReadWriteLock ServerSocket::readWriteLock; bool ServerSocket::serviceFlag=true; ServerSocket::ServerSocket(const int port) { if ( ! Socket::Create() ) { throw SocketException ( "Could not create server socket." ); } if ( ! Socket::Bind ( port ) ) { throw SocketException ( "Could not bind to port." ); } if ( ! Socket::Listen() ) { throw SocketException ( "Could not listen to socket." ); } } ServerSocket::~ServerSocket() { list<Socket*>::iterator iter; for(iter=clientSockets.begin();iter!=clientSockets.end();iter++) delete (*iter); } void ServerSocket::Accept(Socket& socket) { if ( ! Socket::Accept ( socket ) ) { throw SocketException ( "Could not accept socket." ); } } bool ServerSocket::Accept() { Socket* clientSocket=new Socket; Accept(*clientSocket); AddClient(clientSocket); //create new thread for a new client pthread_t newThread; int result=pthread_create(&newThread,NULL,ProcessMessage,static_cast<void*>(clientSocket)); if(result!=0) return false; //detach the newThread //so when newThread exits it can release it's resource result=pthread_detach(newThread); if(result!=0) perror("Failed to detach thread"); return true; } void ServerSocket::Run() { while(serviceFlag) { if(clientSockets.size()>=static_cast<unsigned int>(MAXCONNECTION)) serviceFlag=false; else serviceFlag=Accept(); sleep(1); } } void* ServerSocket::ProcessMessage(void* arg) { std::string message; Socket* clientSocket=static_cast<Socket*>(arg); Send(*clientSocket,"Welcome!"); while(serviceFlag) { Receive(*clientSocket,message); if(message=="exit") { Send(*clientSocket,"user_exit"); DeleteClient(clientSocket); break; } else SendMsgToAllUsers(message); sleep(1); } pthread_exit(NULL); return NULL; } void ServerSocket::AddClient(Socket* socket) { if(readWriteLock.SetWriteLock()) { clientSockets.push_back(socket); std::cout<<"Now "<<clientSockets.size()<<" users.."; std::cout<<"New User: "<<socket->GetAddress()<<" "<<socket->GetPort()<<"\n"; readWriteLock.UnLock(); } else serviceFlag=false; } void ServerSocket::DeleteClient(Socket* socket) { if(readWriteLock.SetWriteLock()) { list<Socket*>::iterator iter; for(iter=clientSockets.begin();iter!=clientSockets.end();iter++) if((*iter)->GetAddress()==socket->GetAddress() && (*iter)->GetPort()==socket->GetPort()) { //delete socket* in list delete (*iter); clientSockets.erase(iter); std::cout<<"Now "<<clientSockets.size()<<" users..\n"; break; } readWriteLock.UnLock(); } else serviceFlag=false; } |
接下来是读写锁操作的封装 ThreadReadWriteLock.h
这个类封装了对线程读写锁pthread_rwlock_t的操作,这些操作包括pthread_rwlock_init,pthread_rwlock_rlock,pthread_rwlock_wrlock,pthread_rwlock_unlock等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
#ifndef THREADREADWRITELOCK_H #define THREADREADWRITELOCK_H #include <pthread.h> class ThreadReadWriteLock { public: ThreadReadWriteLock(); ~ThreadReadWriteLock(); bool SetReadLock(); bool SetWriteLock(); void UnLock(); private: pthread_rwlock_t readWriteLock; }; #endif |
然后客户端做些许改变即可(开多一个线程接收服务器发来的信息,这样发送和接收就可以并行了)
以下是测试结果:
服务器:
客户端1:
客户端2:
客户端3:
可以看到服务器端显示的客户端数量的变化。。。。
测试一下当连接的客户端数量超过我们设置的最大连接数时的情况(在Socket.h中定义这个连接数等于5)
最后说一下要注意的地方
由于pthread 库不是 Linux 系统默认的库,连接时需要使用静态库 libpthread.a,否则在使用一些与线程有关的函数时会报错
如使用pthread_create会提示以下错误undefined reference to `pthread_create’,解决方法如下:
(1)使用gcc或g++要在编译中要加 -lpthread参数
(2)如果使用eclipse的话,要设置
Project->Properties->C/C++ Build->Settings->GCC C++ Linker->Libraries
在Libraries(-l)中添加pthread即可
在Libraries(-l)中添加crypto即可
最后说一下程序的不足:
(1)首先我把服务器处理消息的操作暂时都放在了ProcessMessage函数里面了,如果处理的消息很复杂的话,那么ServerSocket这个类就会很臃肿,
所以必要时要将这些功能拆分。
(2)像检测客户端是否非正常的掉线,这些操作还没做,可以通过设置Socket的keep alive来检测,就是通过一个心跳包,在服务器和客户端没有通信时,隔一段时间发送一个
数据包,若客户端没有反应则认为客户端已经掉线了。
(3)毕竟这只是个小程序,当然还有其他不足,如现在只有群聊功能,还可以加上私聊的功能。。。。。。
发表评论
要发表评论,您必须先登录。