实践:使用socket实现跨进程通信(二:多并发)(C语言)
材料归档
- 链接:https://pan.baidu.com/s/1cQREPzKZFGvHlNOEMlroEA
- 提取码:vhq8
- 归档路径:材料归档20210920_实践:使用socket实现跨进程通信(二:多并发)(C语言)
功能描述
- 使用socket通信,实现服务端功能和客户端功能,并进行消息的交互,实现跨进程通信。
- 解决多并发问题,目前最多同时支持5个客户端。
- 简单规避粘包问题。
功能演示
第一步:启动Server进程。
./test_socket server
第二步:启动多个客户端进程
./test_socket client &
./test_socket client &
第三步:查看Server日志,同时进行响应。
第四步:查看抓包,同时有多条数据流。
查看线程状态,Server在处理客户端消息时有线程新增。
-
正常状态:
-
处理2个Server消息:
-
处理结束:
相关代码
test_socket.h
#ifndef __TEST_SOCKET_H__
#define __TEST_SOCKET_H__
#define VOS_OK 0
#define VOS_ERR 1
#define VOS_TRUE 1
#define VOS_FALSE 0
typedef void VOID;
typedef char CHAR;
typedef int INT32;
typedef unsigned char UINT8;
typedef unsigned short UINT16;
typedef unsigned int UINT32;
typedef unsigned long long UINT64;
#define SOCK_INFO(fmt...) do {
printf("INFO: [SOCK]" fmt);
printf("
");
} while(0)
#define SOCK_ERR(fmt...) do {
printf("ERROR: [SOCK]" fmt);
printf("
");
} while(0)
#define SOCK_PROCESS_TYPE_NAME_SERVER "server"
#define SOCK_PROCESS_TYPE_NAME_CLIENT "client"
#define SOCK_PROCESS_TYPE_SERVER 0
#define SOCK_PROCESS_TYPE_CLIENT 1
#define SOCK_DEFAULT_DST_ADDR "127.0.0.1"
#define SOCK_DEFAULT_DST_PORT 1500
#define SOCK_MSG_LEN 1024
#define SOCK_MAX_CLIENT_NUM 5
typedef struct {
UINT16 family;
UINT16 port;
UINT32 addr;
UINT8 resv[8];
} SOCK_ADDR_S;
typedef struct {
UINT32 type;
CHAR* addr;
UINT16 port;
} SOCK_PARA_S;
typedef struct {
UINT16 sockId;
UINT16 len;
UINT8 data[SOCK_MSG_LEN + 1];
} SOCK_MSG_S;
#endif
test_socket.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "test_socket.h"
typedef struct {
pthread_t pthread;
INT32 sock;
} SOCK_ACCEPT_CONNECT_PARA_S;
VOID SOCK_InitPara(SOCK_PARA_S *pPara, int argc, char *argv[])
{
argc--;
argv++;
if (argc > 0 && strcmp(SOCK_PROCESS_TYPE_NAME_CLIENT, argv[0]) == 0) {
pPara->type = SOCK_PROCESS_TYPE_CLIENT;
} else {
pPara->type = SOCK_PROCESS_TYPE_SERVER;
}
if (argc > 1) {
pPara->addr = argv[1];
} else {
pPara->addr = SOCK_DEFAULT_DST_ADDR;
}
if (argc > 2) {
pPara->port = (UINT16)atol(argv[2]);
} else {
pPara->port = SOCK_DEFAULT_DST_PORT;
}
}
UINT32 SOCK_Write(SOCK_MSG_S *pMsg)
{
if (pMsg->len > SOCK_MSG_LEN) {
return VOS_ERR;
}
pMsg->data[pMsg->len] = 0;
INT32 len = write(pMsg->sockId, pMsg, sizeof(SOCK_MSG_S));
if (len == -1) {
SOCK_ERR("SEND FAIL[sock:%d len:%d]:%s", pMsg->sockId, pMsg->len, pMsg->data);
return VOS_ERR;
}
return VOS_OK;
}
UINT32 SOCK_Read(UINT32 sock, SOCK_MSG_S *pMsg)
{
INT32 len = read(sock, pMsg, sizeof(SOCK_MSG_S));
if (len == -1) {
SOCK_ERR("RECV[localsock:%d]:Fail", sock);
return VOS_ERR;
}
if (len == 0) {
pMsg->len = 0;
pMsg->data[0] = ' ';
}
return VOS_OK;
}
UINT32 SOCK_AcceptConnectionRecvResquest(SOCK_ACCEPT_CONNECT_PARA_S *pPara, SOCK_MSG_S *pMsg)
{
for (UINT32 i = 0; i < 10; i++) {
sleep(1);
if (SOCK_Read(pPara->sock, pMsg) == VOS_OK) {
SOCK_INFO("RECV[thread:%lu localsock:%d sock:%d len:%d]:%s", pPara->pthread, pPara->sock, pMsg->sockId, pMsg->len, pMsg->data);
return VOS_OK;
}
}
return VOS_ERR;
}
UINT32 SOCK_AcceptConnectionSendResponse(SOCK_ACCEPT_CONNECT_PARA_S *pPara, SOCK_MSG_S *pResquest)
{
SOCK_MSG_S sendMsg;
sendMsg.sockId = pPara->sock;
INT32 len;
for (UINT32 i = 0; i < 50; i++) {
len = sprintf(sendMsg.data, "this is Response Message(%u).", i);
if (len < 0) {
SOCK_ERR("Response sprintf Fail");
continue;
}
sendMsg.len = len;
if (SOCK_Write(&sendMsg) == VOS_OK) {
SOCK_INFO("SEND[thread:%lu sock:%d len:%d]:%s", pPara->pthread, sendMsg.sockId, sendMsg.len, sendMsg.data);
}
usleep(100000); // 1s写10次
}
}
VOID SOCK_AcceptConnection(SOCK_ACCEPT_CONNECT_PARA_S *pPara)
{
SOCK_MSG_S recvMsg;
if (SOCK_AcceptConnectionRecvResquest(pPara, &recvMsg) != VOS_OK) {
return;
}
SOCK_AcceptConnectionSendResponse(pPara, &recvMsg);
INT32 ret = close(pPara->sock);
SOCK_INFO("CLSE[sock:%d ret:%d]", pPara->sock, ret);
}
VOID *SOCK_AcceptConnectionThread(VOID *pArg)
{
SOCK_ACCEPT_CONNECT_PARA_S *pPara = (SOCK_ACCEPT_CONNECT_PARA_S*)pArg;
SOCK_INFO("SOCK_AcceptConnectionThread In: pthread(%lu) self_thread(%lu) localSock(%u)", pPara->pthread, pthread_self(), pPara->sock);
pthread_detach(pPara->pthread);
SOCK_AcceptConnection(pPara);
SOCK_INFO("SOCK_AcceptConnectionThread Out: pthread(%lu) localSock(%u)", pPara->pthread, pPara->sock);
pPara->pthread = 0;
pthread_exit(NULL);
}
SOCK_ACCEPT_CONNECT_PARA_S *SOCK_GetFreeConnetPara(SOCK_ACCEPT_CONNECT_PARA_S pPara[], UINT32 size)
{
for (UINT32 i = 0; i < size; i++) {
if (pPara[i].pthread == 0) {
return pPara + i;
}
}
return NULL;
}
UINT32 SOCK_MainServer(SOCK_PARA_S *pPara)
{
INT32 ret, len;
// socket
INT32 sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) {
SOCK_ERR("socket Fail!");
return VOS_ERR;
}
SOCK_INFO("socket(%d) create ok", sock);
// bind
SOCK_ADDR_S addr;
addr.family = AF_INET;
addr.port = htons(pPara->port);
addr.addr = inet_addr(pPara->addr);
ret = bind(sock, (struct sockaddr *)&addr, sizeof(SOCK_ADDR_S));
if (ret < 0) {
SOCK_ERR("socket bind Err(%d)!", ret);
return VOS_ERR;
}
// listen
ret = listen(sock, 5);
if (ret != VOS_OK) {
SOCK_ERR("socket listen Err(%d)!", ret);
return VOS_ERR;
}
SOCK_INFO("socket accept ...");
// accept
INT32 clientSock;
SOCK_ADDR_S clientAddr;
UINT32 clientAddrLen = sizeof(SOCK_ADDR_S); // 注意:这里得是有效值
SOCK_ACCEPT_CONNECT_PARA_S paras[SOCK_MAX_CLIENT_NUM] = {0};
while (VOS_TRUE) {
clientSock = accept(sock, (struct sockaddr *)&clientAddr, &clientAddrLen);
if (clientSock == -1) {
SOCK_ERR("socket accept Fail(%d)", clientSock);
continue;
}
SOCK_INFO("socket accept Succ(%d)", clientSock);
SOCK_ACCEPT_CONNECT_PARA_S *pPara = SOCK_GetFreeConnetPara(paras, SOCK_MAX_CLIENT_NUM);
if (pPara == NULL) {
close(clientSock);
SOCK_ERR("socket Cur Thread is Over MaxNum(%u)", SOCK_MAX_CLIENT_NUM);
continue;
}
pPara->sock = clientSock;
ret = pthread_create(&pPara->pthread, NULL, SOCK_AcceptConnectionThread, pPara);
if (ret != VOS_OK) {
close(clientSock);
SOCK_ERR("socket create thread SOCK_AcceptConnection Fail(%d)", ret);
continue;
}
SOCK_INFO("socket create thread SOCK_AcceptConnection Succ pthread(%lu)", pPara->pthread);
}
// close
close(sock);
return VOS_OK;
}
UINT32 SOCK_MainClient(SOCK_PARA_S *pPara)
{
INT32 ret, len;
// socket
INT32 sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) {
SOCK_ERR("socket Fail!");
return VOS_ERR;
}
SOCK_INFO("socket(%d) create ok", sock);
// connet
SOCK_ADDR_S addr;
addr.family = AF_INET;
addr.port = htons(pPara->port);
addr.addr = inet_addr(pPara->addr);
ret = connect(sock, (struct sockaddr *)&addr, sizeof(SOCK_ADDR_S));
if (ret == -1) {
SOCK_ERR("socket connet Fail(%d)!", ret);
return VOS_ERR;
}
SOCK_INFO("socket connet Succ(%d)!", ret);
// write
CHAR msgData[] = "this is Request Message.";
SOCK_MSG_S sendMsg;
sendMsg.sockId = sock;
sendMsg.len = strlen(msgData);
memcpy(sendMsg.data, msgData, sendMsg.len);
if (SOCK_Write(&sendMsg) != VOS_OK) {
return VOS_ERR;
}
SOCK_INFO("SEND[thread:%lu sock:%d len:%d]:%s", pthread_self(), sendMsg.sockId, sendMsg.len, sendMsg.data);
// recv
SOCK_MSG_S recvMsg = {0};
while (VOS_TRUE) {
sleep(1);
if (SOCK_Read(sock, &recvMsg) != VOS_OK) {
continue;
}
if (recvMsg.len == 0) {
break;
}
SOCK_INFO("RECV[thread:%lu localsock:%d sock:%d len:%d]:%s", pthread_self(), sock, recvMsg.sockId, recvMsg.len, recvMsg.data);
}
// close
close(sock);
return VOS_ERR;
}
int main(int argc, char *argv[])
{
SOCK_PARA_S para;
SOCK_InitPara(¶, argc, argv);
SOCK_INFO("test socket start");
if (para.type == SOCK_PROCESS_TYPE_SERVER) {
(VOID)SOCK_MainServer(¶);
} else {
(VOID)SOCK_MainClient(¶);
}
SOCK_INFO("test socket end");
return VOS_OK;
}