zoukankan      html  css  js  c++  java
  • UE4 TcpClient代码,UE4版本4.25.3

    // Fill out your copyright notice in the Description page of Project Settings.
    
    #pragma once
    
    #include "CoreMinimal.h"
    #include "Engine/GameInstance.h"
    #include "CoreMinimal.h"
    #include "GameFramework/Actor.h"
    #include "HAL/Runnable.h"
    #include "HAL/ThreadSafeBool.h"
    #include "Containers/Queue.h"
    #include "UObject/WeakObjectPtrTemplates.h"
    #include "NewTcpClient.generated.h"
    
    
    struct FIPv4AddressEx
    {
        union
        {
            /** The IP address value as A.B.C.D components. */
            struct
            {
    #if PLATFORM_LITTLE_ENDIAN
    #ifdef _MSC_VER
                uint8 D, C, B, A;
    #else
                uint8 D GCC_ALIGN(4);
                uint8 C, B, A;
    #endif
    #else
                uint8 A, B, C, D;
    #endif
            };
    
            /** The IP address value in host byte order. */
            uint32 Value;
        };
        static bool Parse(const FString& AddressString, FIPv4AddressEx& OutAddress);
    };
    
    
    DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketDisconnectDelegate, int32, ConnectionId);
    DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketConnectDelegate, int32, ConnectionId);
    DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedByteMessageDelegate,UPARAM(ref)  TArray<uint8>&, Message);  //
    //DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedStringMessageDelegate,UPARAM(ref) FString&, Message); //
    DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedStringMessageDelegate,FString,MessageString);
    
    class FyzTcpClient :public FRunnable
    {
    public:
        FyzTcpClient();
        ~FyzTcpClient();
    
        /** Thread to run the worker FRunnable on */
        FRunnableThread* m_Thread = nullptr;
    
    
        FSocket* m_Socket;
        FString m_ipAddress;
        int m_port;
    
        // SPSC = single producer, single consumer.
        TQueue<TArray<uint8>, EQueueMode::Mpsc> Inbox; // Messages we read from socket and send to main thread. Runner thread is producer, main thread is consumer.
        TQueue<TArray<uint8>, EQueueMode::Mpsc> Outbox; // Messages to send to socket from main thread. Main thread is producer, runner thread is consumer.
    
    
        int32 m_RecvBufferSize;
        int32 m_ActualRecvBufferSize;
        int32 m_SendBufferSize;
        int32 m_ActualSendBufferSize;
        float m_TimeBetweenTicks;
        bool m_bConnected = false;
    
        void Connect(const FString& ipAddress, int32 port);
        
    
        void ReConnect();
    
        // Begin FRunnable interface.
        virtual bool Init() override;
        virtual uint32 Run() override;
        virtual void Stop() override;
        virtual void Exit() override;
        // End FRunnable interface    
    
        /** Shuts down the thread */
        void SocketShutdown();
    
        /* Getter for bConnected */
        bool isConnected();
    
        void UpdateRecvMessage();
        bool ReadFromInbox(TArray<uint8>& byteArray);
        void ExecuteOnMessageReceived();
    
    
    
        static FString Message_ReadString(TArray<uint8>& Message, int32 BytesLength);
        static TArray<uint8> Conv_StringToBytes(const FString& InStr);
        static TArray<uint8> Concat_BytesBytes(TArray<uint8> A, TArray<uint8> B);
        static TArray<uint8> Conv_IntToBytes(int32 InInt);
        static TArray<uint8> Conv_FloatToBytes(float InFloat);
        static TArray<uint8> Conv_ByteToBytes(uint8 InByte);
        static int32 Message_ReadInt(TArray<uint8>& Message);
        static uint8 Message_ReadByte(TArray<uint8>& Message);
        static bool Message_ReadBytes(int32 NumBytes,TArray<uint8>& Message, TArray<uint8>& ReturnArray);
        static float Message_ReadFloat(TArray<uint8>& Message);
        
    
    
        FTcpSocketDisconnectDelegate DisconnectedDelegate;
        FTcpSocketConnectDelegate ConnectedDelegate;
        FTcpSocketReceivedByteMessageDelegate MessageReceivedByteDelegate;
        FTcpSocketReceivedStringMessageDelegate     m_TcpRecvDelegate;
        /* Blocking send */
        bool BlockingSend(const uint8* Data, int32 BytesToSend);
        void AddToOutbox(TArray<uint8> Message);
    private:
        
    
        double m_LastReConnectTime;
        /** thread should continue running */
        FThreadSafeBool m_bRun = false;
    
        /** Critical section preventing multiple threads from sending simultaneously */
        FCriticalSection SendCriticalSection;
    };
    
    
    /**
     * 
     */
    UCLASS()
    class TESTPROJECT_API UNewTcpClient : public UGameInstance
    {
        GENERATED_BODY()
        
    public:
        FyzTcpClient  m_TcpClient;
    
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void Setup(FString ip, int port);
    
        //发消息(主线程)
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        bool SocketSend(FString message);
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void SetRecvMsgByteArrayDelegate(const FTcpSocketReceivedByteMessageDelegate& OnMessageReceived);
    
        
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void SetRecvMsgStringDelegate(const FTcpSocketReceivedStringMessageDelegate& OnMessageReceived);
    
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void UpdateRecv();
    
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void SendStringToServer(FString str);
    
        UFUNCTION(BlueprintCallable, Category = "MySocket")
        void SendBytesToServer(TArray<uint8> bytes);
    
    };
    
    
    
    // Fill out your copyright notice in the Description page of Project Settings.
    
    
    #include "NewTcpClient.h"
    #include "SocketSubsystem.h"
    #include "IPAddress.h"
    #include "Sockets.h"
    #include "HAL/RunnableThread.h"
    #include "Async/Async.h"
    #include <string>
    #include "CoreMinimal.h"
    #include "Engine/GameInstance.h"
    #include "SocketSubsystem.h"
    //#include "NetWorking/Public/Interfaces/IPv4/IPv4Address.h"
    #include "Logging/MessageLog.h"
    #include "HAL/UnrealMemory.h"
    
    
    
    bool FIPv4AddressEx::Parse(const FString& AddressString, FIPv4AddressEx& OutAddress)
    {
        TArray<FString> Tokens;
    
    
    
    
        if (AddressString.ParseIntoArray(Tokens, TEXT("."), false) == 4)
        {
    
            OutAddress.A = FCString::Atoi(*Tokens[0]);
            OutAddress.B = FCString::Atoi(*Tokens[1]);
            OutAddress.C = FCString::Atoi(*Tokens[2]);
            OutAddress.D = FCString::Atoi(*Tokens[3]);
    
            return true;
        }
    
        return false;
    }
    
    
    
    FyzTcpClient::FyzTcpClient()
    {
        m_RecvBufferSize = 1024;
        m_SendBufferSize = 1024;
        m_TimeBetweenTicks = 0.03;
        m_LastReConnectTime = 0.0f;
        m_bRun = false;
    }
    FyzTcpClient::~FyzTcpClient()
    {
        Stop();
        if (m_Thread)
        {
            m_Thread->WaitForCompletion();
            delete m_Thread;
            m_Thread = nullptr;
        }
    }
    
    
    void FyzTcpClient::Connect(const FString& ipAddresstemp, int32 porttemp)
    {
        m_RecvBufferSize = 1024;
        m_SendBufferSize = 1024;
        m_TimeBetweenTicks = 0.016;
        m_LastReConnectTime = 0.0f;
    
        m_ipAddress = ipAddresstemp;
        m_port = porttemp;
    
        
        
    
    
        Init();
    
        check(!m_Thread && "Thread wasn't null at the start!");
        check(FPlatformProcess::SupportsMultithreading() && "This platform doesn't support multithreading!");
        if (m_Thread)
        {
            UE_LOG(LogTemp, Log, TEXT("Log: Thread isn't null. It's: %s"), *m_Thread->GetThreadName());
        }
        m_Thread = FRunnableThread::Create(this, *FString::Printf(TEXT("FTcpSocketWorker %s:%d"), *ipAddresstemp, porttemp), 128 * 1024, TPri_Normal);
        UE_LOG(LogTemp, Log, TEXT("Log: Created thread"));
    }
    
    
    bool FyzTcpClient::Init()
    {
        m_bRun = true;
        m_bConnected = false;
    
        m_Socket = NULL;
        return true;
    }
    
    
    
    void FyzTcpClient::ReConnect()
    {
        // if there is still a socket, close it so our peer will get a quick disconnect notification
        if (m_Socket)
        {
            m_Socket->Close();
        }
    
        m_Socket = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateSocket(NAME_Stream, TEXT("default"), false);
        if (!m_Socket)
        {
            UE_LOG(LogTemp, Log, TEXT("FyzTcpClient Create Socket Error"));
            return;
        }
    
        m_Socket->SetReceiveBufferSize(m_RecvBufferSize, m_ActualRecvBufferSize);
        m_Socket->SetSendBufferSize(m_SendBufferSize, m_ActualSendBufferSize);
    
        FIPv4AddressEx ip;
        FIPv4AddressEx::Parse(m_ipAddress, ip);
    
        TSharedRef<FInternetAddr> internetAddr = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateInternetAddr();
        internetAddr->SetIp(ip.Value);
        internetAddr->SetPort(m_port);
    
    
        double NowTime = FPlatformTime::Seconds();
    
        m_bConnected = m_Socket->Connect(*internetAddr);
        if (m_bConnected)
        {
    
            m_LastReConnectTime = NowTime;
    
            ConnectedDelegate.ExecuteIfBound(0);
    
    
            GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("TcpClient ReConnect yes %d "),m_bConnected));
        }
        else
        {
            GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Blue, FString::Printf(TEXT("TcpClient ReConnect faild %s %d"), *m_ipAddress, m_port));
            m_LastReConnectTime = NowTime;
            
        }
    }
    
    
    uint32 FyzTcpClient::Run()
    {
        
        while (m_bRun)
        {
            FDateTime timeBeginningOfTick = FDateTime::UtcNow();
            
            
    
            // Connect
            if (!m_bConnected)
            {
                
    
                double NowTime = FPlatformTime::Seconds();
    
                if (NowTime - m_LastReConnectTime < 5.0f)
                {
                    
                    FPlatformProcess::Sleep(0.5);
                    continue;
                }
    
                
                ReConnect();
                m_LastReConnectTime = NowTime;
                continue;
            }
    
            
    
            // check if we weren't disconnected from the socket
            m_Socket->SetNonBlocking(true); // set to NonBlocking, because Blocking can't check for a disconnect for some reason
            int32 t_BytesRead;
            uint8 t_Dummy;
            if (!m_Socket->Recv(&t_Dummy, 1, t_BytesRead, ESocketReceiveFlags::Peek))
            {
                m_bConnected = false;
                DisconnectedDelegate.ExecuteIfBound(0);
                GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("Dis Connect TcpServer")));
                continue;
            }
            m_Socket->SetNonBlocking(false);    // set back to Blocking
    
            // if Outbox has something to send, send it
            while (!Outbox.IsEmpty())
            {
                TArray<uint8> toSend;
                Outbox.Dequeue(toSend);
    
                if (!BlockingSend(toSend.GetData(), toSend.Num()))
                {
                    // if sending failed, stop running the thread
                    //m_bRun = false;
                    continue;
                }
            }
    
            // if we can read something        
            uint32 PendingDataSize = 0;
            TArray<uint8> receivedData;
    
            int32 BytesReadTotal = 0;
            // keep going until we have no data.
            for (;;)
            {
                if (!m_Socket->HasPendingData(PendingDataSize))
                {
                    // no messages
                    break;
                }
    
                
    
                receivedData.SetNumUninitialized(BytesReadTotal + PendingDataSize);
    
                int32 BytesRead = 0;
                if (!m_Socket->Recv(receivedData.GetData() + BytesReadTotal, m_ActualRecvBufferSize, BytesRead))
                {
                    // ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);
                    // error code: (int32)SocketSubsystem->GetLastErrorCode()
                    
                    break;
                }
                BytesReadTotal += BytesRead;
    
                /* TODO: if we have more PendingData than we could read, continue the while loop so that we can send messages if we have any, and then keep recving*/
            }
    
            // if we received data, inform the main thread about it, so it can read TQueue
            if (receivedData.Num() != 0)
            {
                Inbox.Enqueue(receivedData);
                
            }
    
            
    
            /* In order to sleep, we will account for how much this tick took due to sending and receiving */
            FDateTime timeEndOfTick = FDateTime::UtcNow();
            FTimespan tickDuration = timeEndOfTick - timeBeginningOfTick;
            float secondsThisTickTook = tickDuration.GetTotalSeconds();
            float timeToSleep = m_TimeBetweenTicks - secondsThisTickTook;
            if (timeToSleep > 0.0f)
            {
                //AsyncTask(ENamedThreads::GameThread, [timeToSleep]() { ATcpSocketConnection::PrintToConsole(FString::Printf(TEXT("Sleeping: %f seconds"), timeToSleep), false); });
                FPlatformProcess::Sleep(timeToSleep);
            }
        }
    
        m_bConnected = false;
    
        //AsyncTask(ENamedThreads::GameThread, [this]() {ThreadSpawnerActor.Get()->ExecuteOnDisconnected(id, ThreadSpawnerActor);});
    
        SocketShutdown();
        return 0;
    }
    
    void FyzTcpClient::Stop()
    {
        m_bRun = false;
    }
    
    bool FyzTcpClient::isConnected()
    {
        FScopeLock ScopeLock(&SendCriticalSection);
        return m_bConnected;
    }
    
    bool  FyzTcpClient::ReadFromInbox(TArray<uint8>& byteArray)
    {
        
        bool flag = Inbox.Dequeue(byteArray);
        if (true == flag)
        {
    
            return true;
        }
        else
        {
            return false;
        }
        return false;
    }
    
    void FyzTcpClient::ExecuteOnMessageReceived()
    {
        TArray<uint8> msg;
        bool retflag = ReadFromInbox(msg);
    
    
        if (true == retflag)
        {
            MessageReceivedByteDelegate.ExecuteIfBound(msg);
    
            
    
            FString retStr = Message_ReadString(msg, msg.Num());
    
            //MessageReceivedStringDelegate.ExecuteIfBound(retStr);
    
            m_TcpRecvDelegate.ExecuteIfBound(retStr);
            //代码里面就不打印了,蓝图里面打印吧
            //GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("get Msg  %s "),*retStr));
        }
        
    
        
    }
    
    void FyzTcpClient::UpdateRecvMessage()
    {
        ExecuteOnMessageReceived();
    
    
    }
    
    
    
    void FyzTcpClient::AddToOutbox(TArray<uint8> Message)
    {
        Outbox.Enqueue(Message);
    }
    
    
    void FyzTcpClient::Exit()
    {
    
    }
    
    bool FyzTcpClient::BlockingSend(const uint8* Data, int32 BytesToSend)
    {
        if (BytesToSend > 0)
        {
            int32 BytesSent = 0;
            if (!m_Socket->Send(Data, BytesToSend, BytesSent))
            {
                return false;
            }
        }
        return true;
    }
    
    void FyzTcpClient::SocketShutdown()
    {
        // if there is still a socket, close it so our peer will get a quick disconnect notification
        if (m_Socket)
        {
            m_Socket->Close();
        }
    }
    
    
    FString FyzTcpClient::Message_ReadString(TArray<uint8>& Message, int32 BytesLength)
    {
        if (BytesLength <= 0)
        {
    
            return FString("");
        }
        if (Message.Num() < BytesLength)
        {
    
            return FString("");
        }
    
        TArray<uint8> StringAsArray;
        StringAsArray.Reserve(BytesLength);
    
        for (int i = 0; i < BytesLength; i++)
        {
            StringAsArray.Add(Message[0]);
            Message.RemoveAt(0);
        }
    
        std::string cstr(reinterpret_cast<const char*>(StringAsArray.GetData()), StringAsArray.Num());
        return FString(UTF8_TO_TCHAR(cstr.c_str()));
    }
    
    
    TArray<uint8> FyzTcpClient::Conv_StringToBytes(const FString& InStr)
    {
        FTCHARToUTF8 Convert(*InStr);
        int BytesLength = Convert.Length(); //length of the utf-8 string in bytes (when non-latin letters are used, it's longer than just the number of characters)
        uint8* messageBytes = static_cast<uint8*>(FMemory::Malloc(BytesLength));
        FMemory::Memcpy(messageBytes, (uint8*)TCHAR_TO_UTF8(InStr.GetCharArray().GetData()), BytesLength); //mcmpy is required, since TCHAR_TO_UTF8 returns an object with a very short lifetime
    
        TArray<uint8> result;
        for (int i = 0; i < BytesLength; i++)
        {
            result.Add(messageBytes[i]);
        }
    
        FMemory::Free(messageBytes);
    
        return result;
    }
    
    TArray<uint8> FyzTcpClient::Concat_BytesBytes(TArray<uint8> A, TArray<uint8> B)
    {
        TArray<uint8> ArrayResult;
    
        for (int i = 0; i < A.Num(); i++)
        {
            ArrayResult.Add(A[i]);
        }
    
        for (int i = 0; i < B.Num(); i++)
        {
            ArrayResult.Add(B[i]);
        }
    
        return ArrayResult;
    }
    
    
    TArray<uint8> FyzTcpClient::Conv_IntToBytes(int32 InInt)
    {
        TArray<uint8> result;
        for (int i = 0; i < 4; i++)
        {
            result.Add(InInt >> i * 8);
        }
        return result;
    }
    
    
    TArray<uint8> FyzTcpClient::Conv_FloatToBytes(float InFloat)
    {
        TArray<uint8> result;
    
        unsigned char const* p = reinterpret_cast<unsigned char const*>(&InFloat);
        for (int i = 0; i != sizeof(float); i++)
        {
            result.Add((uint8)p[i]);
        }
        return result;
    }
    
    
    TArray<uint8> FyzTcpClient::Conv_ByteToBytes(uint8 InByte)
    {
        TArray<uint8> result{ InByte };
        return result;
    }
    
    int32 FyzTcpClient::Message_ReadInt(TArray<uint8>& Message)
    {
        if (Message.Num() < 4)
        {
            //PrintToConsole("Error in the ReadInt node. Not enough bytes in the Message.", true);
            return -1;
        }
    
        int result;
        unsigned char byteArray[4];
    
        for (int i = 0; i < 4; i++)
        {
            byteArray[i] = Message[0];
            Message.RemoveAt(0);
        }
    
        FMemory::Memcpy(&result, byteArray, 4);
    
        return result;
    }
    uint8 FyzTcpClient::Message_ReadByte(TArray<uint8>& Message)
    {
        if (Message.Num() < 1)
        {
            //PrintToConsole("Error in the ReadByte node. Not enough bytes in the Message.", true);
            return 255;
        }
    
        uint8 result = Message[0];
        Message.RemoveAt(0);
        return result;
    }
    bool FyzTcpClient::Message_ReadBytes(int32 NumBytes, TArray<uint8>& Message, TArray<uint8>& ReturnArray)
    {
        for (int i = 0; i < NumBytes; i++)
        {
            if (Message.Num() >= 1)
                ReturnArray.Add(Message_ReadByte(Message));
            else
                return false;
        }
        return true;
    }
    float FyzTcpClient::Message_ReadFloat(TArray<uint8>& Message)
    {
        if (Message.Num() < 4)
        {
            //PrintToConsole("Error in the ReadFloat node. Not enough bytes in the Message.", true);
            return -1.f;
        }
    
        float result;
        unsigned char byteArray[4];
    
        for (int i = 0; i < 4; i++)
        {
            byteArray[i] = Message[0];
            Message.RemoveAt(0);
        }
    
        FMemory::Memcpy(&result, byteArray, 4);
    
        return result;
    }
    
    
    
    
    void UNewTcpClient::Setup(FString ip, int port)
    {
        m_TcpClient.Connect(ip,port);
    }
    
    
    bool UNewTcpClient::SocketSend(FString message)
    {
        
        return true;
    }
    
    void UNewTcpClient::SetRecvMsgByteArrayDelegate(const FTcpSocketReceivedByteMessageDelegate& OnMessageReceived)
    {
        m_TcpClient.MessageReceivedByteDelegate = OnMessageReceived;
    }
    
    
    void UNewTcpClient::SetRecvMsgStringDelegate(const FTcpSocketReceivedStringMessageDelegate& OnMessageReceived)
    {
        //m_TcpClient.MessageReceivedStringDelegate = OnMessageReceived;
        m_TcpClient.m_TcpRecvDelegate = OnMessageReceived;
    }
    
    void UNewTcpClient::UpdateRecv()
    {
        m_TcpClient.UpdateRecvMessage();
    }
    
    
    void UNewTcpClient::SendStringToServer(FString str)
    {
        TArray<uint8> ret = FyzTcpClient::Conv_StringToBytes(str);
        SendBytesToServer(ret);
    
    
    }
    
    
    void UNewTcpClient::SendBytesToServer(TArray<uint8> bytes)
    {
        m_TcpClient.AddToOutbox(bytes);
    }
  • 相关阅读:
    Windows 科研软件推荐
    有关Python 包 (package) 的基本知识
    《Using Python to Access Web Data》Week4 Programs that Surf the Web 课堂笔记
    Coursera助学金申请模板
    《Using Databases with Python》 Week2 Basic Structured Query Language 课堂笔记
    Jupyter 解决单个变量输出问题
    解决 pandas 中打印 DataFrame 行列显示不全的问题
    《Using Python to Access Web Data》 Week3 Networks and Sockets 课堂笔记
    缓存击穿及解决方案
    jvm垃圾收集器
  • 原文地址:https://www.cnblogs.com/dragon2012/p/14685983.html
Copyright © 2011-2022 走看看