zoukankan      html  css  js  c++  java
  • P2P学习(四)P2P编程实现

    一:协议解析

    (一)协议格式设计

    (二)字段说明

    Version(1Byte):版本信息,这里默认0即可

    Status(1Byte):协议的状态信息

    #define PROTO_LOGIN_REQ                0x01    //登录服务器的请求与响应
    #define PROTO_LOGIN_ACK                0x81
    
    #define PROTO_HEARTBEAT_REQ            0x02    //心跳包的请求与响应,防止P2P连接被NAT网关关闭
    #define PROTO_HEARTBEAT_ACK            0x82
    
    #define PROTO_CONNECT_REQ                0x11  //连接请求与响应,向服务端发送P2P连接请求----(服务器与本端)
    #define PROTO_CONNECT_ACK                0x91
    
    #define PROTO_NOTIFY_REQ                0x12   //服务端处理PROTO_CONNECT_REQ请求之后,发送PROTO_NOTIFY_REQ请求给对端----(服务器与对端)
    #define PROTO_NOTIFY_ACK                0x92
    
    #define PROTO_P2P_CONNECT_REQ            0x13  //对端接收到PROTO_NOTIFY_REQ请求之后,开始与本端建立P2P连接;本端接收到PROTO_P2P_CONNECT_REQ之后,回送PROTO_P2P_CONNECT_ACK给对端,双方状态机变为P2P建立完成,可以进行P2P传输
    #define PROTO_P2P_CONNECT_ACK            0x93
    
    #define RPORO_MESSAGE_REQ                0x21  //原始数据到达(是添加了自定义的首部之后的数据)---包含服务端转发和P2P发送!!!
    #define RPORO_MESSAGE_ACK                0xA1

    Length(2Bytes):数据的长度字段 = Message数据的长度 + 数据头部长度

    Self ID(4Bytes):本端的ID信息

    Other ID(4Bytes):对端的ID信息

    Message:存放原始数据

    (三)P2P客户端的状态机和协议的状态信息

    typedef enum {
        STATUS_INIT,
        STATUS_LOGIN,
        STATUS_HEARTBEAT,
        STATUS_CONNECT,
        STATUS_NOTIFY,
        STATUS_P2P_CONNECT,
        STATUS_MESSAGE,
    } STATUS_SET;

    (四)客户端流程图

    1.本机A默认状态STATUS_INIT,当本机A创建Socket之后,准备与服务器建立连接,状态变为STATUS_LOGIN

    2.本机A与服务端通过PROTO_LOGIN_REQ请求建立联系,服务端记录本机的id和地址ip和端口信息,返回PROTO_LOGIN_ACK确认消息给本机

    3.本机A收到PROTO_LOGIN_ACK确认消息后,状态变为STATUS_CONNECT,开始为建立p2p连接做准备,发送PROTO_CONNECT_REQ请求给服务器,服务端接收到本A端PROTO_CONNECT_REQ消息后,服务器回送PROTO_CONNECT_ACK确认消息和对端的地址信息给本机A,本机A状态变为STATUS_P2P_CONNECT状态。

    4.服务端接收到本A端PROTO_CONNECT_REQ消息后,发送PROTO_NOTIFY_REQ请求(保护本端的地址信息)到对端B。对端B接收到PROTO_NOTIFY_REQ请求后,回送PROTO_NOTIFY_ACK确认消息给服务器,此时对端B状态变为STATUS_P2P_CONNECT。

    注意:如果无法建立P2P连接,则双方的状态停留在STATUS_P2P_CONNECT状态,可以通过服务器进行转发。而不需要进行p2p通信!

    5.对端状态为STATUS_P2P_CONNECT后,发生PROTO_P2P_CONNECT_REQ请求消息给本机端,打通对端-(NAT端口)-->本机。

    6.对端状态为STATUS_P2P_CONNECT后,发生PROTO_P2P_CONNECT_REQ请求消息给对端,打通本机端(NAT端口)--->对端。

    注意:5、6是异步存在的!!

    7.当客户端接收到PROTO_P2P_CONNECT_REQ或者PROTO_P2P_CONNECT_ACK消息,本机状态的状态变为STATUS_MESSAGE。

    之后可以正常的进行p2p通信!!!

    二:代码实现P2P程序

    (一)头文件p2p.h实现(含公共函数)

    #ifndef __P2P_H__
    #define __P2P_H__
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>    //互联网地址族
    
    #include <time.h>
    
    //---------------------------定义数据占用空间大小---------------------------
    #define CLIENT_MAX 1024                //定义客户端中与对方连接的数量
    #define CLIENT_ADDR_LENGTH 6        //定义空间存放客户端地址信息,IP占4字节,端口占2字节
    #define BUFFER_LENGTH 512            //定义发送和接收的缓冲区大小,512字节
    #define NUMBER_ID_LENGTH 4            //定义客户端ID的长度,占4字节
    
    //---------------------------定义协议的状态:注意响应比请求大于0x80,方便计算---------------------------
    #define PROTO_LOGIN_REQ     0x01    //登录服务器的请求与响应
    #define PROTO_LOGIN_ACK     0x81
    
    #define PROTO_HEARTBEAT_REQ 0x02    //心跳包的请求与响应,防止P2P连接被NAT网关关闭
    #define PROTO_HEARTBEAT_ACK 0x82
     
    #define PROTO_CONNECT_REQ   0x11     //连接请求与响应,向服务端发送P2P连接请求----(服务器与本端)
    #define PROTO_CONNECT_ACK   0x91
    
    #define PROTO_NOTIFY_REQ    0x12       //服务端处理PROTO_CONNECT_REQ请求之后,发送PROTO_NOTIFY_REQ请求给对端----(服务器与对端)
    #define PROTO_NOTIFY_ACK    0x92
    
    #define PROTO_P2P_CONNECT_REQ 0x13  //对端接收到PROTO_NOTIFY_REQ请求之后,开始与本端建立P2P连接;本端接收到PROTO_P2P_CONNECT_REQ之后,回送PROTO_P2P_CONNECT_ACK给对端,双方状态机变为P2P建立完成,可以进行P2P传输
    #define PROTO_P2P_CONNECT_ACK 0x93
    
    #define PROTO_MESSAGE_REQ      0x21  //原始数据到达(是添加了自定义的首部之后的数据)---包含服务端转发和P2P发送!!!
    #define PROTO_MESSAGE_ACK     0xA1
    
    //---------------------------定义协议的索引,和各个协议状态对应的索引位置---------------------------
    #define PROTO_BUFFER_VERSION_IDX       0        //版本字段位置索引,索引0,占1个字节
    #define PROTO_BUFFER_STATUS_IDX        1        //协议的状态信息,索引1,占1个字节
    
    #define PROTO_BUFFER_LENGTH_IDX        (PROTO_BUFFER_STATUS_IDX+1)    //协议的长度字段,索引2,占2个字节
    #define PROTO_BUFFER_SELFID_IDX        (PROTO_BUFFER_LENGTH_IDX+2)    //协议的本端的ID信息字段,索引4,占4个字节
    
    //login
    #define PROTO_LOGIN_SELFID_IDX         PROTO_BUFFER_SELFID_IDX        //登录时,需要添加本机的id到协议中去,在self id字段中,索引为4
    
    //login ack
    #define PROTO_LOGIN_ACK_SELFID_IDX     PROTO_BUFFER_SELFID_IDX        //回送确认消息,需要添加本端Id信息,放入self id字段,索引为4
    
    //heartbeat
    #define PROTO_HEARTBEAT_SELFID_IDX     PROTO_BUFFER_SELFID_IDX        //心跳检测,需要添加本机的id到协议中去,在self id字段中,索引为4
    
    //heartbeat ack
    #define PROTO_HEARTBEAT_ACK_SELFID_IDX PROTO_BUFFER_SELFID_IDX        //回送确认消息,需要添加本端Id信息,放入self id字段,索引为4
    
    //connect
    #define PROTO_CONNECT_SELFID_IDX       PROTO_BUFFER_SELFID_IDX        //连接相关,需要添加本端和对端的id信息,而本端的id放入self id字段,索引4
    #define PROTO_CONNECT_OTHERID_IDX      (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH)        //对端的id放入other id字段,索引为8
    
    //connect ack
    #define PROTO_CONNECT_ACK_SELFID_IDX   PROTO_BUFFER_SELFID_IDX          //回送确认消息,需要添加本端Id信息,放入self id字段,索引为4
    #define PROTO_CONNECT_ACK_OTHERID_IDX  (PROTO_CONNECT_ACK_SELFID_IDX+NUMBER_ID_LENGTH)  //对端的id放入other id字段,索引为8
    #define PROTO_CONNECT_MESSAGE_ADDR_IDX (PROTO_CONNECT_ACK_OTHERID_IDX+NUMBER_ID_LENGTH)    //这里开始存放地址数据,索引12。占6个字节,存放地址信息!!!---本机需要获取到的地址信息,才能发送p2p请求,而之前并没有获取过这个数据,所以最好携带过去
    
    //notify
    #define PROTO_NOTIFY_SELFID_IDX        PROTO_BUFFER_SELFID_IDX          //通知对端字段,需要添加本端Id信息放入self id字段,索引为4
    #define PROTO_NOTIFY_OTHERID_IDX       (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH)        //对端的id放入other id字段,索引为8
    #define PROTO_NOTIFY_MESSAGE_ADDR_IDX  (PROTO_NOTIFY_OTHERID_IDX+NUMBER_ID_LENGTH)         //这里开始存放地址数据,索引12。占6个字节,存放地址信息!!!---对端需要获取到本机的地址信息,才能发送p2p请求,而之前并没有获取过这个数据,所以最好携带过去
    
    //notify ack
    #define PROTO_NOTIFY_ACK_SELFID_IDX       PROTO_BUFFER_SELFID_IDX          //回送确认消息,需要添加本端Id信息,放入self id字段,索引为4
    
    //p2p connect
    #define PROTO_P2P_CONNECT_SELFID_IDX    PROTO_BUFFER_SELFID_IDX       //P2P连接请求时,需要加入本端的Id信息放入self id这段,索引为4
    
    //p2p connect ack
    #define PROTO_P2P_CONNECT_ACK_SELFID_IDX    PROTO_BUFFER_SELFID_IDX   //P2P连接响应时,需要加入本端的Id信息放入self id这段,索引为4
    
    //message
    #define PROTO_MESSAGE_SELFID_IDX        PROTO_BUFFER_SELFID_IDX       //开始发送数据,需要添加本端Id信息,放入self id字段,索引为4
    #define PROTO_MESSAGE_OTHERID_IDX       (PROTO_MESSAGE_SELFID_IDX+NUMBER_ID_LENGTH)    //需要加入对端ID信息到other id字段中,索引为8
    #define PROTO_MESSAGE_CONTENT_IDX       (PROTO_MESSAGE_OTHERID_IDX+NUMBER_ID_LENGTH)   //从这里开始添加数据,索引为12
    
    //message ack
    #define PROTO_MESSAGE_ACK_SELFID_IDX    PROTO_BUFFER_SELFID_IDX       //数据发送结束,需要进行响应,索引为4
    #define PROTO_MESSAGE_ACK_OTHERID_IDX   (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH)     //数据发送结束,需要进行响应,索引为4
    
    
    typedef unsigned int U32;
    typedef unsigned short U16;
    typedef unsigned char U8;
    
    //volatile的学习:https://www.runoob.com/w3cnote/c-volatile-keyword.html
    typedef volatile long UATOMIC;    //当要求使用 volatile 声明的变量的值的时候,系统总是重新从它所在的内存读取数据,即使它前面的指令刚刚从该处读取过数据。
    //可以用于实现原语操作
    
    //定义回调函数
    typedef void* (*CALLBACK)(void* arg);    
    
    //定义返回状态
    typedef enum{
        RESULT_FAILED = -1,
        RESULT_SUCCESS = 0
    }RESULT;
    
    //---------------------------定义客户端状态---------------------------
    typedef enum {
        STATUS_INIT,
        STATUS_LOGIN,
        STATUS_HEARTBEAT,
        STATUS_CONNECT,
        STATUS_NOTIFY,
        STATUS_P2P_CONNECT,
        STATUS_MESSAGE
    } STATUS_SET;
    
    //---------------------------定义一个映射结构体,id==>地址和时间戳信息---------------------------
    typedef struct __CLIENT_TABLE
    {
        U8 addr[CLIENT_ADDR_LENGTH];    //6字节存放地址信息
        U32    client_id;                    //4字节存放客户端id
        long stamp;                        //存放时间戳信息
    }client_table;
    
    //---------------------------服务器端数据结构---------------------------
    int client_count = 0;
    client_table table[CLIENT_MAX] = {0};
    
    //---------------------------客户端端数据结构---------------------------
    
    //---------------------------服务器端函数---------------------------
    /*
    cmpxchg(void* ptr, int old, int new)
    如果ptr和old的值一样,则把new写到ptr内存,
    否则写入ptr的值到old中
    整个操作是原子的。
    res返回值为0(失败)或1(成功)表明cas(对比和替换)操作是否成功.
    下面__asm__学习:https://www.jianshu.com/p/fa6d9d9c63b4
    -----------`x++`是否是原子的?
    不是,是3个指令,`取x,x+1,存入x`。
    >在单处理器上,如果执行x++时,禁止多线程调度,就可以实现原子。因为单处理的多线程并发是伪并发。
    在多处理器上,需要借助cpu提供的Lock功能。
    锁总线。读取内存值,修改,写回内存三步期间禁止别的CPU访问总线。
    同时我估计使用Lock指令锁总线的时候,OS也不会把当前线程调度走了。要是调走了,那就麻烦了。
    */
    static unsigned long cmpxchg(UATOMIC* addr,unsigned long _old,unsigned long _new){
        U8 res;
        //"__asm__"表示后面的代码为内嵌汇编
        //"__volatile__"表示编译器不要优化代码,后面的指令保留原样,"volatile"是它的别名
        __asm__ volatile (
            "lock; cmpxchg %3, %1;sete %0"            //加锁以及比较和替换原子操作,按后面顺序ret 0 , addr 1 , old 2, new 3
            : "=a" (res)                            //"=a"是说要把__asm__操作结果写到__ret中
            : "m" (*addr), "a" (_old), "r" (_new)    //各个值存放的位置
            : "cc", "memory");
    
        return res;    //返回结果,0(失败)或1(成功)
    }
    
    //返回时间戳信息
    static long time_generator(){
        static long lTimeStamp = 0;                    //局部静态变量
        static long timeStampMutex = 0;                //局部静态变量
    
        if(cmpxchg(&timeStampMutex,0,1)){            //注意:只有TimeStampMutex原子操作成功才行进入下面语句
            lTimeStamp = time(NULL);                //生成时间戳,精度为s
            timeStampMutex = 0;
        }
    
        return lTimeStamp;                            //返回时间戳信息
    }
    
    
    //将sockaddr地址转为array格式
    static void addr_to_array(U8 *array, struct sockaddr_in *p_addr){
        //存放IP和端口,需要6个字节
        int i = 0;
        for(i = 0; i < 4; i++){
            array[i] = *((unsigned char*)(&(p_addr->sin_addr.s_addr))+i);        //获取IP,顺序存储
        }
    
        for(i = 0; i < 2; i++){
            array[4+i] = *((unsigned char*)(&(p_addr->sin_port))+i);            //获取Port信息
        }
    }
    
    //将array数组转为sockaddr地址格式
    static void array_to_addr(U8 *array,struct sockaddr_in *p_addr){
        int i=0;
        for(i = 0;i < 4;i++){
            *((unsigned char*)(&p_addr->sin_addr.s_addr)+i) = array[i];            //获取IP,存放到sockaddr_in格式
        }
        for(i = 0;i < 2;i++){
            *((unsigned char*)(&p_addr->sin_port)+i) = array[4+i];                //获取Port,存放到sockaddr_in格式
        }
    }
    
    
    static int get_index_by_clientid(int client_id){
        int i = 0;
        int now_count = client_count;
        for(i = 1;i<=now_count;i++){
            if(table[i].client_id == client_id)
                return i;
        }
        return RESULT_FAILED;
    }
    
    static int deal_connect_req(int sockfd,int client_id,int other_id){
        U8 buffer[BUFFER_LENGTH] = {0};
        buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_NOTIFY_REQ;                      //发送PROTO_NOTIFY_REQ请求
        buffer[PROTO_NOTIFY_SELFID_IDX] = client_id;
        buffer[PROTO_NOTIFY_OTHERID_IDX] = other_id;
    
        int index = get_index_by_clientid(client_id);                            //获取本端信息,一会发送给对端
        //填充数据,6字节的IP和端口信息
        memcpy(buffer+PROTO_NOTIFY_MESSAGE_ADDR_IDX,table[index].addr,CLIENT_ADDR_LENGTH);    
    
        index = get_index_by_clientid(other_id);                                //获取对端信息,开始发送
        //获取sockaddr信息
        struct sockaddr_in c_addr;
        c_addr.sin_family = AF_INET;
        array_to_addr(table[index].addr,&c_addr);
    
        int len = PROTO_NOTIFY_MESSAGE_ADDR_IDX + BUFFER_LENGTH;                //18字节,12的头部,6字节的数据
        len = sendto(sockfd,buffer,len,0,(struct sockaddr*)&c_addr,sizeof(c_addr));
        if(len < 0){
            printf("Failed in deal_connect_req, send to other peer:%d
    ",other_id);
            return RESULT_FAILED;
        }
    
        return RESULT_SUCCESS;
    }
    
    
    static int deal_connect_ack(int sockfd,int client_id,int other_id){            //可以和deal_connect_req合并
        //printf("call deal_connect_ack!
    ");
        U8 buffer[BUFFER_LENGTH] = {0};
        buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_CONNECT_ACK;                    //回送PROTO_CONNECT_ACK
        buffer[PROTO_NOTIFY_SELFID_IDX] = client_id;
        buffer[PROTO_NOTIFY_OTHERID_IDX] = other_id;
    
        int index = get_index_by_clientid(other_id);                            //获取本端信息,一会发送给对端
        //填充数据,6字节的IP和端口信息
        memcpy(buffer+PROTO_CONNECT_MESSAGE_ADDR_IDX,table[index].addr,CLIENT_ADDR_LENGTH);    
    
        index = get_index_by_clientid(client_id);                                //获取对端信息,开始发送
        //获取sockaddr信息
        struct sockaddr_in c_addr;
        c_addr.sin_family = AF_INET;
        array_to_addr(table[index].addr,&c_addr);
    
        int len = PROTO_NOTIFY_MESSAGE_ADDR_IDX + BUFFER_LENGTH;                //18字节,12的头部,6字节的数据
        len = sendto(sockfd,buffer,len,0,(struct sockaddr*)&c_addr,sizeof(c_addr));
        if(len < 0){
            printf("Failed in deal_connect_ack, send to client peer:%d
    ",client_id);
            return RESULT_FAILED;
        }
    
        return RESULT_SUCCESS;
    }
    
    static int deal_message_req(int sockfd,int other_id,U8 *buffer,int length){
        int index = get_index_by_clientid(other_id);                                //获取对端信息,开始发送
        //获取sockaddr信息
        struct sockaddr_in c_addr;
        c_addr.sin_family = AF_INET;
        array_to_addr(table[index].addr,&c_addr);
        //printf("send to peer: %d.%d.%d.%d:%d
    ",table[index].addr[0],table[index].addr[1],table[index].addr[2],table[index].addr[3],c_addr.sin_port);
        int n = sendto(sockfd,buffer,length,0,(struct sockaddr*)&c_addr,sizeof(c_addr));
        if(n < 0){
            printf("Failed in deal_message_req!
    ");
            return RESULT_FAILED;
        }
        return RESULT_SUCCESS;
    }
    
    
    static int deal_ack(int sockfd,struct sockaddr_in *c_addr,U8 *buffer,int length){        //处理通用ACK消息,原来协议+0x80即可
        buffer[PROTO_BUFFER_STATUS_IDX] += 0x80;
        int n = sendto(sockfd,buffer,length,0,(struct sockaddr*)c_addr,sizeof(*c_addr));
        if(n < 0){
            printf("Failed in deal_ack!
    ");
            return RESULT_FAILED;
        }
        return RESULT_SUCCESS;
    }
    
    
    //---------------------------客户端函数---------------------------
    static int send_login_req(int sockfd,int client_id,struct sockaddr_in *ser_addr){
        U8 buffer[BUFFER_LENGTH] = {0};            //buffer长度512
    
        buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_LOGIN_REQ;
        *(int *)(buffer+PROTO_LOGIN_SELFID_IDX) = client_id;
    
        int n = PROTO_LOGIN_SELFID_IDX + NUMBER_ID_LENGTH;
        n = sendto(sockfd,buffer,n,0,(struct sockaddr*)ser_addr,sizeof(struct sockaddr_in));
    
        if(n < 0){
            printf("Failed to login server!
    ");
            return RESULT_FAILED;
        }
        return RESULT_SUCCESS;
    }
    
    static int get_other_id(U8 *buffer,int *other_id){
        int id=0,i;
        for(i=2;buffer[i]!=':'&&buffer[i]!='';i++){        //还可以进行其他严格处理    
            id += id*10 + buffer[i]-'0';
        }
        *other_id = id;
        return i;                                //返回索引
    }
    
    static int send_connect_req(int sockfd,int client_id,int other_id,struct sockaddr_in *ser_addr){
        U8 buffer[BUFFER_LENGTH] = {0};            //buffer长度512
    
        buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_CONNECT_REQ;
        *(int *)(buffer+PROTO_CONNECT_SELFID_IDX) = client_id;
        *(int *)(buffer+PROTO_CONNECT_OTHERID_IDX) = other_id;
    
        int n = PROTO_CONNECT_OTHERID_IDX + NUMBER_ID_LENGTH;
        n = sendto(sockfd,buffer,n,0,(struct sockaddr*)ser_addr,sizeof(struct sockaddr_in));
    
        if(n < 0){
            printf("Failed to login server!
    ");
            return RESULT_FAILED;
        }
        return RESULT_SUCCESS;
    }
    
    static int send_message(int sockfd,int client_id,int other_id,struct sockaddr_in *addr,U8 *msg,int length){
        U8 buffer[BUFFER_LENGTH] = {0};
    
        buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_MESSAGE_REQ;    //处理消息
        *(int*)(buffer+PROTO_MESSAGE_SELFID_IDX) = client_id;
        *(int*)(buffer+PROTO_MESSAGE_OTHERID_IDX) = other_id;
    
        memcpy(buffer + PROTO_MESSAGE_CONTENT_IDX,msg,length);    //初始化数据部分
    
        int n = PROTO_MESSAGE_CONTENT_IDX + length;
        *(U16*)(buffer+PROTO_BUFFER_LENGTH_IDX) = (U16)n;        //存放数据长度
    
        n = sendto(sockfd,buffer,n,0,(struct sockaddr*)addr,sizeof(struct sockaddr_in));
        if(n < 0){
            printf("Failed to send message to peer!
    ");
            return RESULT_FAILED;
        } 
        return RESULT_SUCCESS;
    }
    
    static int send_p2pconnect(int sockfd,int client_id,struct sockaddr_in *p_addr){
        U8 buffer[BUFFER_LENGTH] = {0};
    
        buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_P2P_CONNECT_REQ;
        *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX) = client_id;
        
        int n = PROTO_P2P_CONNECT_SELFID_IDX + NUMBER_ID_LENGTH;
    
        n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in));
        if(n<0){
            printf("Failed to send p2p connect req!
    ");
            return RESULT_FAILED;
        }
        return RESULT_SUCCESS;
    }
    
    static int send_p2pconnect_ack(int sockfd,int client_id,struct sockaddr_in *p_addr){
        U8 buffer[BUFFER_LENGTH] = {0};
    
        buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_P2P_CONNECT_ACK;
        *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX) = client_id;
    
        int n = PROTO_P2P_CONNECT_SELFID_IDX + NUMBER_ID_LENGTH;
        n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in));
        if(n < 0){
            printf("Failed to send p2p connect ack!
    ");
            return RESULT_FAILED;
        }
        return RESULT_SUCCESS;
    }
    
    
    static int send_message_ack(int sockfd,int client_id,int other_id,struct sockaddr_in *p_addr){
        U8 buffer[BUFFER_LENGTH] = {0};
    
        buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_MESSAGE_ACK;
        *(int*)(buffer+PROTO_MESSAGE_ACK_SELFID_IDX) = client_id;
        *(int*)(buffer+PROTO_MESSAGE_ACK_OTHERID_IDX) = other_id;
    
        int n=PROTO_MESSAGE_ACK_OTHERID_IDX + NUMBER_ID_LENGTH;
        n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in));
        if(n < 0){
            printf("Failed to send message ack");
            return RESULT_FAILED;
        }
        return RESULT_SUCCESS;
    }
    #endif

    (二)服务端p2p_server.c实现(简单通信)

    #include "p2p.h"
    
    
    int recv_buffer_parser(int sockfd,U8 *buffer,U32 length,struct sockaddr_in *c_addr){    //length是传递过来的数据长度
        U8 status = buffer[PROTO_BUFFER_STATUS_IDX];                  //解析状态
        //printf("recv_buffer_parser --->status: %d
    ",status);
        int client_id,other_id,index;
        int old,now;
        U8 *msg;
    
        switch(status){
            case PROTO_LOGIN_REQ:                                      //处理登录请求
                printf("recv login req!
    ");
                old = client_count;
                now = old + 1;
    
                if(0 == cmpxchg((UATOMIC*)&client_count,old,now)){    //使用原子操作赋值
                    printf("client_count --> %d,old:%d,now:%d
    ", client_count,old,now);
                    return RESULT_FAILED;
                }
    
                //开始登录新用户的信息
                U8 array[CLIENT_ADDR_LENGTH] = {0};                    //6字节存放地址IP:Port信息
                addr_to_array(array,c_addr);
    
                client_id = *(U32*)(buffer+PROTO_BUFFER_SELFID_IDX);
    
                printf("now:%d client:[%d],login ---> %d.%d.%d.%d:%d
    ",now,client_id,
                    *(unsigned char*)(&c_addr->sin_addr.s_addr), *((unsigned char*)(&c_addr->sin_addr.s_addr)+1),                                                    
                    *((unsigned char*)(&c_addr->sin_addr.s_addr)+2), *((unsigned char*)(&c_addr->sin_addr.s_addr)+3),                      
                    c_addr->sin_port);
    
                table[now].client_id = client_id;                    //获取4字节长度的用户id信息
                memcpy(table[now].addr,array,CLIENT_ADDR_LENGTH);    //获取用户的Addr地址信息
    
                //需要回送确认消息-----------
                deal_ack(sockfd,c_addr,buffer,length);
                break;
            case PROTO_HEARTBEAT_REQ:                                //处理心跳包请求
                printf("recv heartbeat req!
    ");
                client_id = *(unsigned int*)(buffer+PROTO_HEARTBEAT_SELFID_IDX);
                index = get_index_by_clientid(client_id);
    
                table[index].stamp = time_generator();
    
                //需要回送确认消息-----------
                deal_ack(sockfd,c_addr,buffer,length);
                break;
            case PROTO_CONNECT_REQ:                                    //处理连接请求
                client_id = *(unsigned int*)(buffer+PROTO_CONNECT_SELFID_IDX);            //获取本机id
                other_id = *(unsigned int*)(buffer+PROTO_CONNECT_OTHERID_IDX);            //获取对端id
                printf("recv connect req from %d to %d!
    ",client_id,other_id);
    
                deal_connect_req(sockfd,client_id,other_id);        //处理连接请求,1.向对端发送信息
                deal_connect_ack(sockfd,client_id,other_id);        //2.回送确认消息
                break;
            case PROTO_NOTIFY_ACK:                                    //处理对端发送回来的确认消息,无用
                printf("recv other notify ack message
    ");
                break;
            case PROTO_MESSAGE_REQ:                              //处理要经过服务器转发的数据和p2p无法建立的时候使用
                printf("recv message req!
    ");
                msg = buffer + PROTO_MESSAGE_CONTENT_IDX;        //获取要发送的数据
                client_id = *(unsigned int*)(buffer+PROTO_MESSAGE_SELFID_IDX);
                other_id = *(unsigned int*)(buffer+PROTO_MESSAGE_OTHERID_IDX);
    
                printf("Client[%d] send to Other[%d]:%s
    ",client_id,other_id,msg);
                deal_message_req(sockfd,other_id,buffer,length);    //进行转发
                break;
            case PROTO_MESSAGE_ACK:                                    //转发确认消息
                printf("recv message ack!
    ");
                client_id = *(unsigned int*)(buffer+PROTO_MESSAGE_SELFID_IDX);
                other_id = *(unsigned int*)(buffer+PROTO_MESSAGE_OTHERID_IDX);
                printf("Client[%d] send ack to Other[%d]
    ",client_id,other_id);
                deal_message_req(sockfd,other_id,buffer,length);
                break;
        }
    
        return RESULT_SUCCESS;
    }
    
    int main(int argc,char *argv[]){
        int sockfd;
        int n,length;
        char buffer[BUFFER_LENGTH] = {0};
        struct sockaddr_in addr,c_addr;
    
        printf("UDP Server......
    ");
    
        if(argc != 2){
            printf("Usage: %s port
    ",argv[0]);
            exit(0);
        }
        
        sockfd = socket(AF_INET,SOCK_DGRAM,0);                            //获取通信socket
        if(sockfd < 0){
            printf("Failed to open udp socket!
    ");
            exit(0);
        }
    
        addr.sin_family = AF_INET;
        addr.sin_port = htons(atoi(argv[1]));                            //获取端口信息
        addr.sin_addr.s_addr = htonl(INADDR_ANY);                        //允许接收所有网卡的到达数据
    
        length = sizeof(addr);
    
        if(bind(sockfd,(struct sockaddr*)&addr,length) < 0){
            printf("Failed to bind udp socket with ip port");
            exit(0);
        }
    
        while(1){
            n = recvfrom(sockfd,buffer,BUFFER_LENGTH,0,(struct sockaddr*)&c_addr,&length);
            if(n > 0){
                buffer[n] = 0x0;                                        //设置结束符号
                /*
                printf("%d.%d.%d.%d:%d say:%s
    ", *(unsigned char*)(&c_addr.sin_addr.s_addr),*((unsigned char*)(&c_addr.sin_addr.s_addr)+1),
                    *((unsigned char*)(&c_addr.sin_addr.s_addr)+2),*((unsigned char*)(&c_addr.sin_addr.s_addr)+3),
                    c_addr.sin_port, buffer);                            //打印接收到的数据信息
                */
                int ret = recv_buffer_parser(sockfd,buffer,n,&c_addr);    //解析接收的数据,存储相关信息
                if(ret == RESULT_FAILED)
                    continue;
    
            }else if(n == 0){
                printf("client closed!
    ");
            }else{
                printf("recv error
    ");
                break;
            }
        }
    
        return 0;
    }

    (三)客户端代码实现(状态机转换,p2p通信)

    #include "p2p.h"
    #include <pthread.h>
    
    static int status_machine = STATUS_INIT;    //状态机
    static int client_selfid = 0x0;                //默认本端的id,需要在main方法中输入
    
    struct sockaddr_in server_addr;                //服务端的信息
    
    client_table p2p_clients[CLIENT_MAX];        //可以连接的P2P对端最大数量
    static int p2p_count = 0;
    
    static int buffer_parser(int sockfd,U8 *buffer,int length,struct sockaddr_in *addr){
        U8 status = buffer[PROTO_BUFFER_STATUS_IDX];    //获取状态
        U8 *msg;
        struct sockaddr_in p_addr;        //获取对端的地址信息
        //printf("buffer_parser...%d
    ",status);
        switch(status){
            case PROTO_LOGIN_ACK:                //处理登录确认
                printf(" Connect Server Success
    ");
                status_machine = STATUS_CONNECT;        //状态转移
                break;
            case PROTO_HEARTBEAT_ACK:
                //printf("recv heartbeat ack!
    ");
                break;
            case PROTO_NOTIFY_REQ:                //处理服务端发送的NOTIFY请求
                //printf("recv notify req!
    ");
                //获取对端的数据信息
                p_addr.sin_family = AF_INET;
                array_to_addr(buffer+PROTO_NOTIFY_MESSAGE_ADDR_IDX,&p_addr);
                //回复确认消息给服务器
                buffer[PROTO_BUFFER_STATUS_IDX] += 0x80;
                sendto(sockfd,buffer,PROTO_NOTIFY_MESSAGE_ADDR_IDX,0,(struct sockaddr*)&server_addr,sizeof(struct sockaddr_in));    
    
                status_machine = STATUS_NOTIFY;
                //开始打洞
                send_p2pconnect(sockfd,client_selfid,&p_addr);    //开始打洞!!!
                if(status_machine != STATUS_MESSAGE){              //注意:需要进行判断,因为是异步操作,所以本机接到NOTIFY请求的时候,可能已经接到对端的P2P连接请求,状态已经变为STATUS_MESSAGE,那么我们不能再变为未就绪状态
                    status_machine = STATUS_P2P_CONNECT;
                }
                break;
            case PROTO_CONNECT_ACK:                //处理CONNECT 确认
                //printf("recv connect ack!
    ");
                //获取对端的数据信息
                p_addr.sin_family = AF_INET;
                array_to_addr(buffer+PROTO_CONNECT_MESSAGE_ADDR_IDX,&p_addr);
    
                send_p2pconnect(sockfd,client_selfid,&p_addr);    //开始打洞!!!
                if(status_machine != STATUS_MESSAGE){              //注意:需要进行判断,因为是异步操作,所以本机接到NOTIFY请求的时候,可能已经接到对端的P2P连接请求,状态已经变为STATUS_MESSAGE,那么我们不能再变为未就绪状态
                    status_machine = STATUS_P2P_CONNECT;
                }
                break;
            case PROTO_P2P_CONNECT_REQ:            //处理p2p连接请求---表示打洞成功,添加即可
                if(status_machine != STATUS_MESSAGE){
                    //printf("recv p2p connect req!
    ");
                    int now_count = p2p_count++;
    
                    p2p_clients[now_count].stamp = time_generator();
                    p2p_clients[now_count].client_id = *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX);
                    addr_to_array(p2p_clients[now_count].addr,addr);
    
                    send_p2pconnect_ack(sockfd,client_selfid,addr);
                    status_machine = STATUS_MESSAGE;
                    printf("Enter P2P Model!
    ");
                }
                break;
            case PROTO_P2P_CONNECT_ACK:            //处理p2p连接确认---表示打洞成功,添加即可
                if(status_machine != STATUS_MESSAGE){
                    //printf("recv p2p connect ack!
    ");
                    int now_count = p2p_count++;
    
                    p2p_clients[now_count].stamp = time_generator();
                    p2p_clients[now_count].client_id = *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX);
                    addr_to_array(p2p_clients[now_count].addr,addr);
    
                    send_p2pconnect_ack(sockfd,client_selfid,addr);
                    status_machine = STATUS_MESSAGE;
                    printf("Enter P2P Model!
    ");
                }
                break;
            case PROTO_MESSAGE_REQ:                //p2p数据到达
                //printf("recv p2p data....
    ");
    
                msg = buffer + PROTO_MESSAGE_CONTENT_IDX;
                U32 other_id = *(U32*)(buffer+PROTO_MESSAGE_SELFID_IDX);
                printf("recv p2p data:%s from:%d
    ",msg,other_id);
    
                send_message_ack(sockfd,client_selfid,other_id,addr);
                break;
            case PROTO_MESSAGE_ACK:
                //printf("peer recv message, and send ack to me!
    ");
                break;
        }
    }
    
    void *recv_callback(void *arg){
        int sockfd = *(int*)arg;                //获取sockfd
    
        struct sockaddr_in addr;
        int length = sizeof(struct sockaddr_in);
        U8 buffer[BUFFER_LENGTH] = {0};
    
        while(1){
            int n = recvfrom(sockfd,buffer,BUFFER_LENGTH,0,(struct sockaddr*)&addr,&length);
            printf("recvfrom data...
    ");
            if(n > 0){
                buffer[n] = 0;
                buffer_parser(sockfd,buffer,n,&addr);    //解析数据
            }else if(n == 0){
                printf("server closed
    ");
                close(sockfd);
                break;
            }else{
                printf("Failed to call recvfrom
    ");
                close(sockfd);
                break;
            }
        }
    }
    
    void *send_callback(void *arg){                //线程处理发送消息
        int sockfd = *(int*)arg;                //获取sockfd
    
        char buffer[BUFFER_LENGTH] = {0};
    
        while(1){
            bzero(buffer,BUFFER_LENGTH);        //置为0
    
            //printf("===client status====%d===
    ",status_machine);
            if(status_machine == STATUS_CONNECT){
                printf("-----> please enter message(eg. C/S otherID: ...):
    ");
                gets(buffer);                //获取要输入的数据
                //如果是登录状态,可以进行p2p连接或者服务器转发
                int other_id,idx;
                idx = get_other_id(buffer,&other_id);
                //printf("%d--->%d
    ",client_selfid,other_id);
    
                if(buffer[0] == 'C'){            //开始进行P2P连接
                    send_connect_req(sockfd,client_selfid,other_id,&server_addr);
                }else{
                    int length = strlen(buffer);
    
                    send_message(sockfd,client_selfid,other_id,&server_addr,buffer+idx+1,length-idx-1);    //发送给服务器进行转发
                }
                sleep(1);    //等待建立p2p连接
            }else if(status_machine == STATUS_MESSAGE){    //可以进行P2P通信
                printf("-----> please enter p2p message:
    ");
                gets(buffer);                          //获取要输入的数据
                //与最新加入的进行p2p通信
                int now_count = p2p_count;            //这个是最新的序号
                struct sockaddr_in c_addr;            //对端的地址信息
    
                c_addr.sin_family = AF_INET;
                array_to_addr(p2p_clients[now_count-1].addr,&c_addr);
    
                int length = strlen(buffer);
    
                send_message(sockfd,client_selfid,0,&c_addr,buffer,length);    //直接发送给对端,P2P通信
            }else if(status_machine == STATUS_NOTIFY || status_machine == STATUS_P2P_CONNECT ){
                printf("-----> please enter message(S otherID:...):
    ");
                printf("status:%d
    ",status_machine);
                //scanf("%s",buffer);                    //获取要输入的数据
                gets(buffer);                          //获取要输入的数据
    
                int length = strlen(buffer);
                
                int other_id,idx;
                idx = get_other_id(buffer,&other_id);
    
                send_message(sockfd,client_selfid,other_id,&server_addr,buffer+idx+1,length-idx-1);    //发送给服务器进行转发
            }
        }
    
    }
    
    int main(int argc,char *argv[]){
        printf("UDP Client......
    ");
    
        if(argc != 4){
            printf("Usage: %s serverIp serverPort clientID
    ",argv[0]);
            exit(0);
        }
    
        int sockfd = socket(AF_INET,SOCK_DGRAM,0);
        if(sockfd < 0){
            printf("Failed to create socket!
    ");
            exit(0);
        }
    
        //创建两个线程,分别处理接收和发送信息
        pthread_t thread_id[2] = {0};
        CALLBACK cb[2] = {send_callback,recv_callback};
        
        int i;
        for(i=0;i<2;i++){
            int ret = pthread_create(&thread_id[i],NULL,cb[i],&sockfd);    //创建线程,获取线程号,传入回调方法和参数
            if(ret){
                printf("Failed to create thread!
    ");
                exit(1);
            }
        }
    
        //主线程进行登录操作
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = inet_addr(argv[1]);
        server_addr.sin_port = htons(atoi(argv[2]));
    
        client_selfid = atoi(argv[3]);
    
        status_machine = STATUS_LOGIN;                                    //修改客户端当前状态
        send_login_req(sockfd,client_selfid,&server_addr);                //发送登录请求
    
        for(i = 0;i<2;i++){
            pthread_join(thread_id[i],NULL);                            //join子线程
        }
    
        return 0;
    }

    (四)程序编译

    1.编译服务端

    gcc p2p_server.c -o ps

    2.编译客户端

    gcc p2p_client.c -o pc -lpthread

    (五)代码测试

    1.服务端查看:

    2.客户端1查看

    3.客户端2查看

  • 相关阅读:
    ASP.Net Core一个项目中如何支持多种身份认证方式
    c#正则表达式
    Newtonsoft.Json笔记 -JsonPath
    DotLiquid-介绍
    ASP.NET Core-请求频率限制(AspNetCoreRateLimit)
    使用dotnet-gcdump 查找耗内存的大对象【转】
    使用dotnet-dump 查找 .net core占CPU100%的原因【转】
    ADO.NET
    .NET Core-IServiceCollection扩展一个Replace方法
    SafeList-线程安全的List(c#)
  • 原文地址:https://www.cnblogs.com/ssyfj/p/14860147.html
Copyright © 2011-2022 走看看