1 场景
当用socket进行进程通信,传输数据的时候,会出现以下一些情况:
(1)完整的一条消息被系统拆分成几条发送,例如要发送一条消息:Hello World ,却被系统分成两条消息发送,分别为:Hello 和 World。
(2)几条独立的消息被系统合成一条消息发送,例如要发送两条消息分别为:a memory from my past和it’s been a year,却被系统和成一条消息发送:a memory from my pastit’s been a year。
这个时候,需要为socket通信设计一种通信协议,以保证数据的准确性。
2 协议格式
通信协议设计如下:
Head:帧头,2个字节,此处为0xa5a5
Type:通信类型,1个字节,范围0x00~0xff
Data Length:数据长度,1个字节,即Data的字节总数,
Data:实际传输的数据,长度不定
CS:校验值,1个字节,type、data length、data三个域所有字节的异或值,实际中并没用到校验
End:帧尾,2个字节,此处为0xbeef
3 程序设计
3.1 解析思路
假设socket客户端C和服务端S通信,C向S发送消息M1。
1、 S收到消息M1。S把消息M1拷贝到缓存Q中,Q为循环队列。假如M1的长度大于Q的剩余空间,则只拷贝剩余空间大小的字节到Q。
2、 从Q的当前指针开始,查找帧头<Head>。如果找到,则当前指针向后移2个字节位置,继续查找<Type>;如果没找到,则删除前1个字节,当前指针向后移1个字节位置,继续查找<Head>
3、 从Q的当前指针开始,查找<Type>。如果Q中至少还剩一个字节,则表示找到,当前指针向后移1个字节位置,否则退出解析。
4、 从Q的当前指针开始,查找<DataLength>。如果Q中至少还剩一个字节,则表示找到,当前指针向后移1个字节位置,否则退出解析。
5、 从Q的当前指针开始,向后移DataLength个字节位置,查找<End>。如果找到,则从Q中取出一条完整的消息P1,并从Q中删除此消息空间,调用外部的回调函数;否则删除帧头的第一个字节a5,当前指针指向帧头第二个字节a5位置,从步骤2开始,重新一轮解析。
3.2 数据结构
查找策略枚举,用于查找时判断查找帧结构的哪个部位:
1 2 3 4 5 6 7 8 |
typedef enum{ SEARCH_HEAD, SEARCH_TYPE, SEARCH_LEN, //SEARCH_CS, SEARCH_END, SEARCH_NONE }cache_strategy; |
消息结构体,用于存储从缓存中解析出的数据:
1 2 3 4 5 |
typedef struct{ unsigned char data[SOCKET_MSG_SIZE]; //data int len; unsigned char type; }socket_msg; |
回调函数,用于从缓存中解析出消息时调用:
1 |
typedef void (*tp_socket_msg_handle)(int fd, socket_msg *msg,void *args); |
循环队列,用于缓存接收到的数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
typedef struct{ unsigned char buf[SOCKET_MSG_CACHE_SIZE]; //buffer for storing data read from client int front; int rear; int current; int len; int tag; //mark that whether the cache is full,1-full,0-not full cache_strategy strategy; tp_socket_msg_handle handle;//callback function to invoke when a message is parsed out void* args; //external parameter socket_msg recv_msg; //parsed message }socket_cache; |
3.3 关键实现
1、把接收到的数据存储到缓冲中,并准备解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
//copy the unparsed data to cache, and parsed them int socket_msg_pre_parse( int fd, socket_cache *cache, unsigned char *buf, int len, void *args) { int n = 0; unsigned char *p = buf; //when reading buffer's length is greater than cache's left length, //we should copy many times. cache->args = args; while(1){ n = socket_msg_cpy_in(cache, p, len); if(n == 0){ return FALSE;//cache is full } //parse and handle socket message from cache socket_msg_parse(fd, cache); if(n == len){ return TRUE; //copy completed } //move the pointer p = p + n; len = len - n; } return TRUE; } |
2、递归解析消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
//parsed the packaged data, and invoke callback function void socket_msg_parse(int fd, socket_cache *cache) { int current_len; int p, q; int i; int find; if(cache->front == cache->rear && cache->tag == 0){ //D("socket cache is empty!\n"); return; } //calculate the current length of cache if(cache->current >= cache->front){ current_len = cache->len - (cache->current - cache->front); } else{ current_len = cache->rear - cache->current; } switch(cache->strategy){ case SEARCH_HEAD://to find a Head format in cache if(current_len < SOCKET_MSG_HEAD_SIZE){ return; } find = FALSE; for(i = 0; i < current_len - 1; i++){ p = cache->current; q = (cache->current + 1) % SOCKET_MSG_CACHE_SIZE; if( (cache->buf[p] == (SOCKET_MSG_HEAD >> 8))&& (cache->buf[q] == (SOCKET_MSG_HEAD & 0xff))){ find = TRUE; break; //exit for loop } else{ //current pointer move to next cache->current = q; //delete one item cache->front = cache->current; cache->len --; cache->tag = 0; } } if(find == TRUE){ //move 2 items towards next cache->current = (cache->current + 2) % SOCKET_MSG_CACHE_SIZE; //we found the head format, go on to find Type byte cache->strategy = SEARCH_TYPE; } else{ //if there is no head format ,delete previouse items LOGE("socket message without head: %x!\n",SOCKET_MSG_HEAD); //go on to find Head format cache->strategy = SEARCH_HEAD; } break; case SEARCH_TYPE://to find the type byte in cache if(current_len < SOCKET_MSG_TYPE_SIZE){ return ; } //get the value of type //cache->type = cache->buf[cache->current]; cache->recv_msg.type = cache->buf[cache->current]; cache->current = (cache->current + 1) % SOCKET_MSG_CACHE_SIZE; //we found Type byte, go on to find Datalen format cache->strategy = SEARCH_LEN; break; case SEARCH_LEN://to find the datalen byte in cache if(current_len < SOCKET_MSG_LEN_SIZE){ return ; } if(cache->buf[cache->current] > SOCKET_MSG_DATA_SIZE){ LOGE("the data len of message out of size: %d!\n",SOCKET_MSG_DATA_SIZE); //delete the frist item 'a5' //move back 2 items cache->current = cache->current >= 2 ? (cache->current - 2) : (SOCKET_MSG_CACHE_SIZE - 2 + cache->current); cache->front = cache->current; //length sub 2 cache->len -= 2; cache->tag = 0; //go on to find Head format cache->strategy = SEARCH_HEAD; } else{ //get the value of datalen //cache->data_len = cache->buf[cache->current]; cache->recv_msg.len = cache->buf[cache->current]; cache->current = (cache->current + 1) % SOCKET_MSG_CACHE_SIZE; //we found datalen byte, go on to find End format cache->strategy = SEARCH_END; } break; case SEARCH_END: if(current_len < (cache->recv_msg.len + SOCKET_MSG_END_SIZE)){ return; } //because we have known the data bytes' len, so we move the very //distance of datalen to see if there is End format. p = (cache->current + cache->recv_msg.len) % SOCKET_MSG_CACHE_SIZE; q = (cache->current + cache->recv_msg.len + 1) % SOCKET_MSG_CACHE_SIZE; if( (cache->buf[p] == (SOCKET_MSG_END >> 8))&& (cache->buf[q] == (SOCKET_MSG_END & 0xff)) ){ socket_msg_cpy_out(cache, cache->recv_msg.data, cache->current, cache->recv_msg.len); if(cache->handle != NULL){ //cache->handle(fd, cache->buf + cache->data_index, cache->data_len); cache->handle(fd, &cache->recv_msg, cache->args); } //delete all previous items cache->current = (q + 1) % SOCKET_MSG_CACHE_SIZE; cache->front = cache->current; cache->len -= (cache->recv_msg.len + SOCKET_MSG_FORMAT_SIZE); cache->tag =0; } else{ LOGE("socket message without end: %x!\n",SOCKET_MSG_END); //delete the frist item 'a5' //move back 3 items cache->current = cache->current >= 3 ? (cache->current - 3) : (SOCKET_MSG_CACHE_SIZE - 3 + cache->current); cache->front = cache->current; //length sub 3 cache->len -= 3; cache->tag = 0; } //go on to find Head format cache->strategy = SEARCH_HEAD; break; default: break; } //parse new socket message socket_msg_parse(fd,cache); } |
代码下载:socket_parse.zip
发表评论
要发表评论,您必须先登录。