本例子是本人Linux下基于TCP多线程Socket编程的第二个例子,本例子是用C++实现的
服务器采用了面向对象的多线程,用到了队列与链表,信号量(操作系统中叫PV操作)
本例子中的队列与链表源代码在前面可以找到,这里就不多贴了
此系统所支持的自定义命令跟上个例子相同,就里就不多说明了
头文件Thread.h代码,里面就一个抽象类(抽象类没有自己的实例,一定要被子类所继承)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
#ifndef THREAD_H_INCLUDED #define THREAD_H_INCLUDED class Thread { public: void ThreadEnter(); protected: virtual void Start() = 0; virtual void Initialize(){} }; #endif // THREAD_H_INCLUDED Thread.cpp代码: #include "Thread.h" void Thread::ThreadEnter() { Start(); } 以下为服务器主要头文件Server.h代码: #ifndef SERVER_H_INCLUDED #define SERVER_H_INCLUDED #include "Thread.h" #include "LinkList.h" #include "ThreadQueue.h" #include <netinet/in.h> #include <pthread.h> #include <semaphore.h> #define MSG_SIZE 1024 #define BACKLOG 10 #define PORT 8001 class Server : public Thread { public: Server(); ~Server(); public: void Start(); void Initialize(); void SendMessage(Server* serer); static void* SendMessageThread(void* param); void ReadMessage(Server* server); static void* ReadMessageThread(void* param); private: int sock_fd,new_fds[BACKLOG],new_fd; struct sockaddr_in serv_addr,dest_addr; pthread_mutex_t mutex; pthread_t pth_r,pth_s; sem_t sem_r,sem_s; int thread_cout; LinkList list; ThreadQueue queue; DataType *pData; }; #endif // SERVER_H_INCLUDED 以下为Thread.h实现的Thread.cpp代码: #include "Server.h" #include <stdio.h> #include <stdlib.h> #include <strings.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <iostream> #include <string> using namespace std; //------------------------------------------------------------------ Server::Server() { pthread_mutex_init(&mutex,NULL); sem_init(&sem_r,0,10); sem_init(&sem_s,0,0); } //------------------------------------------------------------------ void Server::Initialize() { sock_fd = socket(AF_INET,SOCK_STREAM,0); if(sock_fd < 0) { perror("socket fail!" ); exit(-1); } serv_addr.sin_family = AF_INET; serv_addr.sin_port = ntohs(PORT); serv_addr.sin_addr.s_addr = INADDR_ANY; bzero(&(serv_addr.sin_zero), 8); if (bind(sock_fd, (struct sockaddr*) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("bind fail! "); exit(-1); } if(listen(sock_fd,BACKLOG) < 0) { perror("listen fail!" ); exit(-1); } cout << "listenning......" << endl; socklen_t sin_size = sizeof(dest_addr); while(1) { if(thread_cout == BACKLOG - 1) { return; } new_fd = accept(sock_fd,(struct sockaddr *)&dest_addr,&sin_size); if(new_fd < 0) { perror("accept fail!" ); exit(-1); } cout << "\nA client has connected to me " << inet_ntoa(dest_addr.sin_addr) << ":" << ntohs(dest_addr.sin_port) << endl; pthread_mutex_lock(&mutex); thread_cout++; list.InsertNode(thread_cout,new_fd); pthread_mutex_unlock(&mutex); pthread_create(&pth_r,NULL,ReadMessageThread,this); } } //------------------------------------------------------------------ void Server::Start() { pthread_create(&pth_s,NULL,SendMessageThread,this); Initialize(); } //------------------------------------------------------------------ void Server::ReadMessage(Server* server) { int fd = server->new_fd; char buf[MSG_SIZE]; int len; /* pthread_mutex_lock(&mutex); int count = thread_cout - 1; pthread_mutex_unlock(&mutex); */ while(1) { sem_wait(&sem_r); if ((len = read(fd, buf, MSG_SIZE)) == -1) { perror("read fail!"); pthread_exit(NULL); } else if (len == 0) { cout << "Current client has disconnected to me" << endl; //cout << "close fd = " << fd << endl; close(fd); list.DeleteNode(fd); pthread_exit(NULL); } //cout << "read fd = " << fd << endl; buf[len] = '\0'; DataType *data = new DataType(); data->fd = fd; strcpy(data->buff,buf); cout << "\nRECEIVE: " << buf << " receive fd = " << fd << endl; //pthread_mutex_lock(&mutex); queue.EnterQueue(data); //pthread_mutex_unlock(&mutex); //delete data; sem_post(&sem_s); } } //------------------------------------------------------------------ void* Server::ReadMessageThread(void* param) { Server* server = (Server *)param; server->ReadMessage(server); return NULL; } //------------------------------------------------------------------ void Server::SendMessage(Server* server) { while(1) { sem_wait(&sem_s); int list_len = list.GetLength(); int tNewfd,tReceivefd; //pthread_mutex_lock(&mutex); pData = queue.OutQueue(); //int queue_len = queue.Queuelength(); //pthread_mutex_unlock(&mutex); tReceivefd = pData->fd; //cout << "Received fd = " << tReceivefd << endl; pthread_mutex_lock(&mutex); for(int i = 1; i <= list_len; i++) { list.GetNodeData(i,tNewfd); //cout << "New fd = " << tNewfd << endl; //if(queue_len != 0) //{ if(tNewfd != tReceivefd) { write(tNewfd,pData->buff,sizeof(pData->buff)); cout << "Send to client successful! fd = " << tNewfd << endl;; } //} } delete pData; pthread_mutex_unlock(&mutex); sem_post(&sem_r); } } //------------------------------------------------------------------ void* Server::SendMessageThread(void* param) { Server* server = (Server *)param; server->SendMessage(server); return NULL; } //------------------------------------------------------------------ Server::~Server() { close(sock_fd); pthread_join(pth_r,NULL); pthread_join(pth_s,NULL); } //------------------------------------------------------------------ 以下为主文件main.cpp代码: #include "Server.h" int main(void) { Server* server = new Server(); server->ThreadEnter(); return 0; } |
发表评论
要发表评论,您必须先登录。