zoukankan      html  css  js  c++  java
  • UE4 TcpClient代码,UE4版本4.25.3

    // Fill out your copyright notice in the Description page of Project Settings.
    
    #pragma once
    
    #include "CoreMinimal.h"
    #include "Engine/GameInstance.h"
    #include "CoreMinimal.h"
    #include "GameFramework/Actor.h"
    #include "HAL/Runnable.h"
    #include "HAL/ThreadSafeBool.h"
    #include "Containers/Queue.h"
    #include "UObject/WeakObjectPtrTemplates.h"
    #include "NewTcpClient.generated.h"
    
    
    struct FIPv4AddressEx
    {
        union
        {
            /** The IP address value as A.B.C.D components. */
            struct
            {
    #if PLATFORM_LITTLE_ENDIAN
    #ifdef _MSC_VER
                uint8 D, C, B, A;
    #else
                uint8 D GCC_ALIGN(4);
                uint8 C, B, A;
    #endif
    #else
                uint8 A, B, C, D;
    #endif
            };
    
            /** The IP address value in host byte order. */
            uint32 Value;
        };
        static bool Parse(const FString& AddressString, FIPv4AddressEx& OutAddress);
    };
    
    
    DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketDisconnectDelegate, int32, ConnectionId);
    DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketConnectDelegate, int32, ConnectionId);
    DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedByteMessageDelegate,UPARAM(ref)  TArray<uint8>&, Message);  //
    //DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedStringMessageDelegate,UPARAM(ref) FString&, Message); //
    DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedStringMessageDelegate,FString,MessageString);
    
    class FyzTcpClient :public FRunnable
    {
    public:
        FyzTcpClient();
        ~FyzTcpClient();
    
        /** Thread to run the worker FRunnable on */
        FRunnableThread* m_Thread = nullptr;
    
    
        FSocket* m_Socket;
        FString m_ipAddress;
        int m_port;
    
        // SPSC = single producer, single consumer.
        TQueue<TArray<uint8>, EQueueMode::Mpsc> Inbox; // Messages we read from socket and send to main thread. Runner thread is producer, main thread is consumer.
        TQueue<TArray<uint8>, EQueueMode::Mpsc> Outbox; // Messages to send to socket from main thread. Main thread is producer, runner thread is consumer.
    
    
        int32 m_RecvBufferSize;
        int32 m_ActualRecvBufferSize;
        int32 m_SendBufferSize;
        int32 m_ActualSendBufferSize;
        float m_TimeBetweenTicks;
        bool m_bConnected = false;
    
        void Connect(const FString& ipAddress, int32 port);
        
    
        void ReConnect();
    
        // Begin FRunnable interface.
        virtual bool Init() override;
        virtual uint32 Run() override;
        virtual void Stop() override;
        virtual void Exit() override;
        // End FRunnable interface    
    
        /** Shuts down the thread */
        void SocketShutdown();
    
        /* Getter for bConnected */
        bool isConnected();
    
        void UpdateRecvMessage();
        bool ReadFromInbox(TArray<uint8>& byteArray);
        void ExecuteOnMessageReceived();
    
    
    
        static FString Message_ReadString(TArray<uint8>& Message, int32 BytesLength);
        static TArray<uint8> Conv_StringToBytes(const FString& InStr);
        static TArray<uint8> Concat_BytesBytes(TArray<uint8> A, TArray<uint8> B);
        static TArray<uint8> Conv_IntToBytes(int32 InInt);
        static TArray<uint8> Conv_FloatToBytes(float InFloat);
        static TArray<uint8> Conv_ByteToBytes(uint8 InByte);
        static int32 Message_ReadInt(TArray<uint8>& Message);
        static uint8 Message_ReadByte(TArray<uint8>& Message);
        static bool Message_ReadBytes(int32 NumBytes,TArray<uint8>& Message, TArray<uint8>& ReturnArray);
        static float Message_ReadFloat(TArray<uint8>& Message);
        
    
    
        FTcpSocketDisconnectDelegate DisconnectedDelegate;
        FTcpSocketConnectDelegate ConnectedDelegate;
        FTcpSocketReceivedByteMessageDelegate MessageReceivedByteDelegate;
        FTcpSocketReceivedStringMessageDelegate     m_TcpRecvDelegate;
        /* Blocking send */
        bool BlockingSend(const uint8* Data, int32 BytesToSend);
        void AddToOutbox(TArray<uint8> Message);
    private:
        
    
        double m_LastReConnectTime;
        /** thread should continue running */
        FThreadSafeBool m_bRun = false;
    
        /** Critical section preventing multiple threads from sending simultaneously */
        FCriticalSection SendCriticalSection;
    };
    
    
    /**
     * 
     */
    UCLASS()
    class TESTPROJECT_API UNewTcpClient : public UGameInstance
    {
        GENERATED_BODY()
        
    public:
        FyzTcpClient  m_TcpClient;
    
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void Setup(FString ip, int port);
    
        //发消息(主线程)
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        bool SocketSend(FString message);
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void SetRecvMsgByteArrayDelegate(const FTcpSocketReceivedByteMessageDelegate& OnMessageReceived);
    
        
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void SetRecvMsgStringDelegate(const FTcpSocketReceivedStringMessageDelegate& OnMessageReceived);
    
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void UpdateRecv();
    
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void SendStringToServer(FString str);
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void SendBytesToServer(TArray<uint8> bytes);
    
    };
    
    
    
    // Fill out your copyright notice in the Description page of Project Settings.
    
    
    #include "NewTcpClient.h"
    #include "SocketSubsystem.h"
    #include "IPAddress.h"
    #include "Sockets.h"
    #include "HAL/RunnableThread.h"
    #include "Async/Async.h"
    #include <string>
    #include "CoreMinimal.h"
    #include "Engine/GameInstance.h"
    #include "SocketSubsystem.h"
    //#include "NetWorking/Public/Interfaces/IPv4/IPv4Address.h"
    #include "Logging/MessageLog.h"
    #include "HAL/UnrealMemory.h"
    
    
    
    bool FIPv4AddressEx::Parse(const FString& AddressString, FIPv4AddressEx& OutAddress)
    {
        TArray<FString> Tokens;
    
    
    
    
        if (AddressString.ParseIntoArray(Tokens, TEXT("."), false) == 4)
        {
    
            OutAddress.A = FCString::Atoi(*Tokens[0]);
            OutAddress.B = FCString::Atoi(*Tokens[1]);
            OutAddress.C = FCString::Atoi(*Tokens[2]);
            OutAddress.D = FCString::Atoi(*Tokens[3]);
    
            return true;
        }
    
        return false;
    }
    
    
    
    FyzTcpClient::FyzTcpClient()
    {
        m_RecvBufferSize = 1024;
        m_SendBufferSize = 1024;
        m_TimeBetweenTicks = 0.03;
        m_LastReConnectTime = 0.0f;
        m_bRun = false;
    }
    FyzTcpClient::~FyzTcpClient()
    {
        Stop();
        if (m_Thread)
        {
            m_Thread->WaitForCompletion();
            delete m_Thread;
            m_Thread = nullptr;
        }
    }
    
    
    void FyzTcpClient::Connect(const FString& ipAddresstemp, int32 porttemp)
    {
        m_RecvBufferSize = 1024;
        m_SendBufferSize = 1024;
        m_TimeBetweenTicks = 0.016;
        m_LastReConnectTime = 0.0f;
    
        m_ipAddress = ipAddresstemp;
        m_port = porttemp;
    
        
        
    
    
        Init();
    
        check(!m_Thread && "Thread wasn't null at the start!");
        check(FPlatformProcess::SupportsMultithreading() && "This platform doesn't support multithreading!");
        if (m_Thread)
        {
            UE_LOG(LogTemp, Log, TEXT("Log: Thread isn't null. It's: %s"), *m_Thread->GetThreadName());
        }
        m_Thread = FRunnableThread::Create(this, *FString::Printf(TEXT("FTcpSocketWorker %s:%d"), *ipAddresstemp, porttemp), 128 * 1024, TPri_Normal);
        UE_LOG(LogTemp, Log, TEXT("Log: Created thread"));
    }
    
    
    bool FyzTcpClient::Init()
    {
        m_bRun = true;
        m_bConnected = false;
    
        m_Socket = NULL;
        return true;
    }
    
    
    
    void FyzTcpClient::ReConnect()
    {
        // if there is still a socket, close it so our peer will get a quick disconnect notification
        if (m_Socket)
        {
            m_Socket->Close();
        }
    
        m_Socket = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateSocket(NAME_Stream, TEXT("default"), false);
        if (!m_Socket)
        {
            UE_LOG(LogTemp, Log, TEXT("FyzTcpClient Create Socket Error"));
            return;
        }
    
        m_Socket->SetReceiveBufferSize(m_RecvBufferSize, m_ActualRecvBufferSize);
        m_Socket->SetSendBufferSize(m_SendBufferSize, m_ActualSendBufferSize);
    
        FIPv4AddressEx ip;
        FIPv4AddressEx::Parse(m_ipAddress, ip);
    
        TSharedRef<FInternetAddr> internetAddr = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateInternetAddr();
        internetAddr->SetIp(ip.Value);
        internetAddr->SetPort(m_port);
    
    
        double NowTime = FPlatformTime::Seconds();
    
        m_bConnected = m_Socket->Connect(*internetAddr);
        if (m_bConnected)
        {
    
            m_LastReConnectTime = NowTime;
    
            ConnectedDelegate.ExecuteIfBound(0);
    
    
            GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("TcpClient ReConnect yes %d "),m_bConnected));
        }
        else
        {
            GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Blue, FString::Printf(TEXT("TcpClient ReConnect faild %s %d"), *m_ipAddress, m_port));
            m_LastReConnectTime = NowTime;
            
        }
    }
    
    
    uint32 FyzTcpClient::Run()
    {
        
        while (m_bRun)
        {
            FDateTime timeBeginningOfTick = FDateTime::UtcNow();
            
            
    
            // Connect
            if (!m_bConnected)
            {
                
    
                double NowTime = FPlatformTime::Seconds();
    
                if (NowTime - m_LastReConnectTime < 5.0f)
                {
                    
                    FPlatformProcess::Sleep(0.5);
                    continue;
                }
    
                
                ReConnect();
                m_LastReConnectTime = NowTime;
                continue;
            }
    
            
    
            // check if we weren't disconnected from the socket
            m_Socket->SetNonBlocking(true); // set to NonBlocking, because Blocking can't check for a disconnect for some reason
            int32 t_BytesRead;
            uint8 t_Dummy;
            if (!m_Socket->Recv(&t_Dummy, 1, t_BytesRead, ESocketReceiveFlags::Peek))
            {
                m_bConnected = false;
                DisconnectedDelegate.ExecuteIfBound(0);
                GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("Dis Connect TcpServer")));
                continue;
            }
            m_Socket->SetNonBlocking(false);    // set back to Blocking
    
            // if Outbox has something to send, send it
            while (!Outbox.IsEmpty())
            {
                TArray<uint8> toSend;
                Outbox.Dequeue(toSend);
    
                if (!BlockingSend(toSend.GetData(), toSend.Num()))
                {
                    // if sending failed, stop running the thread
                    //m_bRun = false;
                    continue;
                }
            }
    
            // if we can read something        
            uint32 PendingDataSize = 0;
            TArray<uint8> receivedData;
    
            int32 BytesReadTotal = 0;
            // keep going until we have no data.
            for (;;)
            {
                if (!m_Socket->HasPendingData(PendingDataSize))
                {
                    // no messages
                    break;
                }
    
                
    
                receivedData.SetNumUninitialized(BytesReadTotal + PendingDataSize);
    
                int32 BytesRead = 0;
                if (!m_Socket->Recv(receivedData.GetData() + BytesReadTotal, m_ActualRecvBufferSize, BytesRead))
                {
                    // ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);
                    // error code: (int32)SocketSubsystem->GetLastErrorCode()
                    
                    break;
                }
                BytesReadTotal += BytesRead;
    
                /* TODO: if we have more PendingData than we could read, continue the while loop so that we can send messages if we have any, and then keep recving*/
            }
    
            // if we received data, inform the main thread about it, so it can read TQueue
            if (receivedData.Num() != 0)
            {
                Inbox.Enqueue(receivedData);
                
            }
    
            
    
            /* In order to sleep, we will account for how much this tick took due to sending and receiving */
            FDateTime timeEndOfTick = FDateTime::UtcNow();
            FTimespan tickDuration = timeEndOfTick - timeBeginningOfTick;
            float secondsThisTickTook = tickDuration.GetTotalSeconds();
            float timeToSleep = m_TimeBetweenTicks - secondsThisTickTook;
            if (timeToSleep > 0.0f)
            {
                //AsyncTask(ENamedThreads::GameThread, [timeToSleep]() { ATcpSocketConnection::PrintToConsole(FString::Printf(TEXT("Sleeping: %f seconds"), timeToSleep), false); });
                FPlatformProcess::Sleep(timeToSleep);
            }
        }
    
        m_bConnected = false;
    
        //AsyncTask(ENamedThreads::GameThread, [this]() {ThreadSpawnerActor.Get()->ExecuteOnDisconnected(id, ThreadSpawnerActor);});
    
        SocketShutdown();
        return 0;
    }
    
    void FyzTcpClient::Stop()
    {
        m_bRun = false;
    }
    
    bool FyzTcpClient::isConnected()
    {
        FScopeLock ScopeLock(&SendCriticalSection);
        return m_bConnected;
    }
    
    bool  FyzTcpClient::ReadFromInbox(TArray<uint8>& byteArray)
    {
        
        bool flag = Inbox.Dequeue(byteArray);
        if (true == flag)
        {
    
            return true;
        }
        else
        {
            return false;
        }
        return false;
    }
    
    void FyzTcpClient::ExecuteOnMessageReceived()
    {
        TArray<uint8> msg;
        bool retflag = ReadFromInbox(msg);
    
    
        if (true == retflag)
        {
            MessageReceivedByteDelegate.ExecuteIfBound(msg);
    
            
    
            FString retStr = Message_ReadString(msg, msg.Num());
    
            //MessageReceivedStringDelegate.ExecuteIfBound(retStr);
    
            m_TcpRecvDelegate.ExecuteIfBound(retStr);
            //代码里面就不打印了,蓝图里面打印吧
            //GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("get Msg  %s "),*retStr));
        }
        
    
        
    }
    
    void FyzTcpClient::UpdateRecvMessage()
    {
        ExecuteOnMessageReceived();
    
    
    }
    
    
    
    void FyzTcpClient::AddToOutbox(TArray<uint8> Message)
    {
        Outbox.Enqueue(Message);
    }
    
    
    void FyzTcpClient::Exit()
    {
    
    }
    
    bool FyzTcpClient::BlockingSend(const uint8* Data, int32 BytesToSend)
    {
        if (BytesToSend > 0)
        {
            int32 BytesSent = 0;
            if (!m_Socket->Send(Data, BytesToSend, BytesSent))
            {
                return false;
            }
        }
        return true;
    }
    
    void FyzTcpClient::SocketShutdown()
    {
        // if there is still a socket, close it so our peer will get a quick disconnect notification
        if (m_Socket)
        {
            m_Socket->Close();
        }
    }
    
    
    FString FyzTcpClient::Message_ReadString(TArray<uint8>& Message, int32 BytesLength)
    {
        if (BytesLength <= 0)
        {
    
            return FString("");
        }
        if (Message.Num() < BytesLength)
        {
    
            return FString("");
        }
    
        TArray<uint8> StringAsArray;
        StringAsArray.Reserve(BytesLength);
    
        for (int i = 0; i < BytesLength; i++)
        {
            StringAsArray.Add(Message[0]);
            Message.RemoveAt(0);
        }
    
        std::string cstr(reinterpret_cast<const char*>(StringAsArray.GetData()), StringAsArray.Num());
        return FString(UTF8_TO_TCHAR(cstr.c_str()));
    }
    
    
    TArray<uint8> FyzTcpClient::Conv_StringToBytes(const FString& InStr)
    {
        FTCHARToUTF8 Convert(*InStr);
        int BytesLength = Convert.Length(); //length of the utf-8 string in bytes (when non-latin letters are used, it's longer than just the number of characters)
        uint8* messageBytes = static_cast<uint8*>(FMemory::Malloc(BytesLength));
        FMemory::Memcpy(messageBytes, (uint8*)TCHAR_TO_UTF8(InStr.GetCharArray().GetData()), BytesLength); //mcmpy is required, since TCHAR_TO_UTF8 returns an object with a very short lifetime
    
        TArray<uint8> result;
        for (int i = 0; i < BytesLength; i++)
        {
            result.Add(messageBytes[i]);
        }
    
        FMemory::Free(messageBytes);
    
        return result;
    }
    
    TArray<uint8> FyzTcpClient::Concat_BytesBytes(TArray<uint8> A, TArray<uint8> B)
    {
        TArray<uint8> ArrayResult;
    
        for (int i = 0; i < A.Num(); i++)
        {
            ArrayResult.Add(A[i]);
        }
    
        for (int i = 0; i < B.Num(); i++)
        {
            ArrayResult.Add(B[i]);
        }
    
        return ArrayResult;
    }
    
    
    TArray<uint8> FyzTcpClient::Conv_IntToBytes(int32 InInt)
    {
        TArray<uint8> result;
        for (int i = 0; i < 4; i++)
        {
            result.Add(InInt >> i * 8);
        }
        return result;
    }
    
    
    TArray<uint8> FyzTcpClient::Conv_FloatToBytes(float InFloat)
    {
        TArray<uint8> result;
    
        unsigned char const* p = reinterpret_cast<unsigned char const*>(&InFloat);
        for (int i = 0; i != sizeof(float); i++)
        {
            result.Add((uint8)p[i]);
        }
        return result;
    }
    
    
    TArray<uint8> FyzTcpClient::Conv_ByteToBytes(uint8 InByte)
    {
        TArray<uint8> result{ InByte };
        return result;
    }
    
    int32 FyzTcpClient::Message_ReadInt(TArray<uint8>& Message)
    {
        if (Message.Num() < 4)
        {
            //PrintToConsole("Error in the ReadInt node. Not enough bytes in the Message.", true);
            return -1;
        }
    
        int result;
        unsigned char byteArray[4];
    
        for (int i = 0; i < 4; i++)
        {
            byteArray[i] = Message[0];
            Message.RemoveAt(0);
        }
    
        FMemory::Memcpy(&result, byteArray, 4);
    
        return result;
    }
    uint8 FyzTcpClient::Message_ReadByte(TArray<uint8>& Message)
    {
        if (Message.Num() < 1)
        {
            //PrintToConsole("Error in the ReadByte node. Not enough bytes in the Message.", true);
            return 255;
        }
    
        uint8 result = Message[0];
        Message.RemoveAt(0);
        return result;
    }
    bool FyzTcpClient::Message_ReadBytes(int32 NumBytes, TArray<uint8>& Message, TArray<uint8>& ReturnArray)
    {
        for (int i = 0; i < NumBytes; i++)
        {
            if (Message.Num() >= 1)
                ReturnArray.Add(Message_ReadByte(Message));
            else
                return false;
        }
        return true;
    }
    float FyzTcpClient::Message_ReadFloat(TArray<uint8>& Message)
    {
        if (Message.Num() < 4)
        {
            //PrintToConsole("Error in the ReadFloat node. Not enough bytes in the Message.", true);
            return -1.f;
        }
    
        float result;
        unsigned char byteArray[4];
    
        for (int i = 0; i < 4; i++)
        {
            byteArray[i] = Message[0];
            Message.RemoveAt(0);
        }
    
        FMemory::Memcpy(&result, byteArray, 4);
    
        return result;
    }
    
    
    
    
    void UNewTcpClient::Setup(FString ip, int port)
    {
        m_TcpClient.Connect(ip,port);
    }
    
    
    bool UNewTcpClient::SocketSend(FString message)
    {
        
        return true;
    }
    
    void UNewTcpClient::SetRecvMsgByteArrayDelegate(const FTcpSocketReceivedByteMessageDelegate& OnMessageReceived)
    {
        m_TcpClient.MessageReceivedByteDelegate = OnMessageReceived;
    }
    
    
    void UNewTcpClient::SetRecvMsgStringDelegate(const FTcpSocketReceivedStringMessageDelegate& OnMessageReceived)
    {
        //m_TcpClient.MessageReceivedStringDelegate = OnMessageReceived;
        m_TcpClient.m_TcpRecvDelegate = OnMessageReceived;
    }
    
    void UNewTcpClient::UpdateRecv()
    {
        m_TcpClient.UpdateRecvMessage();
    }
    
    
    void UNewTcpClient::SendStringToServer(FString str)
    {
        TArray<uint8> ret = FyzTcpClient::Conv_StringToBytes(str);
        SendBytesToServer(ret);
    
    
    }
    
    
    void UNewTcpClient::SendBytesToServer(TArray<uint8> bytes)
    {
        m_TcpClient.AddToOutbox(bytes);
    }
  • 相关阅读:
    小米路由研究之中的一个加入菜单
    【Struts2学习笔记(9)】单文件上传和多文件上传
    isPostback 的原理及作用(很easy)
    1-2Html与CSS的关系
    【HTML5】实现QQ聊天气泡效果
    杭电1166敌兵布阵 (用的树状数组)
    安卓市场---框架搭建4
    qcow2 raw vhd 虚拟磁盘转换
    softlayer virtual machine vhd磁盘镜像导入shell脚本
    Openstack no valid hot
  • 原文地址:https://www.cnblogs.com/dragon2012/p/14685983.html
Copyright © 2011-2022 走看看