// Fill out your copyright notice in the Description page of Project Settings. #pragma once #include "CoreMinimal.h" #include "Engine/GameInstance.h" #include "CoreMinimal.h" #include "GameFramework/Actor.h" #include "HAL/Runnable.h" #include "HAL/ThreadSafeBool.h" #include "Containers/Queue.h" #include "UObject/WeakObjectPtrTemplates.h" #include "NewTcpClient.generated.h" struct FIPv4AddressEx { union { /** The IP address value as A.B.C.D components. */ struct { #if PLATFORM_LITTLE_ENDIAN #ifdef _MSC_VER uint8 D, C, B, A; #else uint8 D GCC_ALIGN(4); uint8 C, B, A; #endif #else uint8 A, B, C, D; #endif }; /** The IP address value in host byte order. */ uint32 Value; }; static bool Parse(const FString& AddressString, FIPv4AddressEx& OutAddress); }; DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketDisconnectDelegate, int32, ConnectionId); DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketConnectDelegate, int32, ConnectionId); DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedByteMessageDelegate,UPARAM(ref) TArray<uint8>&, Message); // //DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedStringMessageDelegate,UPARAM(ref) FString&, Message); // DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedStringMessageDelegate,FString,MessageString); class FyzTcpClient :public FRunnable { public: FyzTcpClient(); ~FyzTcpClient(); /** Thread to run the worker FRunnable on */ FRunnableThread* m_Thread = nullptr; FSocket* m_Socket; FString m_ipAddress; int m_port; // SPSC = single producer, single consumer. TQueue<TArray<uint8>, EQueueMode::Mpsc> Inbox; // Messages we read from socket and send to main thread. Runner thread is producer, main thread is consumer. TQueue<TArray<uint8>, EQueueMode::Mpsc> Outbox; // Messages to send to socket from main thread. Main thread is producer, runner thread is consumer. int32 m_RecvBufferSize; int32 m_ActualRecvBufferSize; int32 m_SendBufferSize; int32 m_ActualSendBufferSize; float m_TimeBetweenTicks; bool m_bConnected = false; void Connect(const FString& ipAddress, int32 port); void ReConnect(); // Begin FRunnable interface. virtual bool Init() override; virtual uint32 Run() override; virtual void Stop() override; virtual void Exit() override; // End FRunnable interface /** Shuts down the thread */ void SocketShutdown(); /* Getter for bConnected */ bool isConnected(); void UpdateRecvMessage(); bool ReadFromInbox(TArray<uint8>& byteArray); void ExecuteOnMessageReceived(); static FString Message_ReadString(TArray<uint8>& Message, int32 BytesLength); static TArray<uint8> Conv_StringToBytes(const FString& InStr); static TArray<uint8> Concat_BytesBytes(TArray<uint8> A, TArray<uint8> B); static TArray<uint8> Conv_IntToBytes(int32 InInt); static TArray<uint8> Conv_FloatToBytes(float InFloat); static TArray<uint8> Conv_ByteToBytes(uint8 InByte); static int32 Message_ReadInt(TArray<uint8>& Message); static uint8 Message_ReadByte(TArray<uint8>& Message); static bool Message_ReadBytes(int32 NumBytes,TArray<uint8>& Message, TArray<uint8>& ReturnArray); static float Message_ReadFloat(TArray<uint8>& Message); FTcpSocketDisconnectDelegate DisconnectedDelegate; FTcpSocketConnectDelegate ConnectedDelegate; FTcpSocketReceivedByteMessageDelegate MessageReceivedByteDelegate; FTcpSocketReceivedStringMessageDelegate m_TcpRecvDelegate; /* Blocking send */ bool BlockingSend(const uint8* Data, int32 BytesToSend); void AddToOutbox(TArray<uint8> Message); private: double m_LastReConnectTime; /** thread should continue running */ FThreadSafeBool m_bRun = false; /** Critical section preventing multiple threads from sending simultaneously */ FCriticalSection SendCriticalSection; }; /** * */ UCLASS() class TESTPROJECT_API UNewTcpClient : public UGameInstance { GENERATED_BODY() public: FyzTcpClient m_TcpClient; UFUNCTION(BlueprintCallable, Category = "MySocket") void Setup(FString ip, int port); //发消息(主线程) UFUNCTION(BlueprintCallable, Category = "MySocket") bool SocketSend(FString message); UFUNCTION(BlueprintCallable, Category = "MySocket") void SetRecvMsgByteArrayDelegate(const FTcpSocketReceivedByteMessageDelegate& OnMessageReceived); UFUNCTION(BlueprintCallable, Category = "MySocket") void SetRecvMsgStringDelegate(const FTcpSocketReceivedStringMessageDelegate& OnMessageReceived); UFUNCTION(BlueprintCallable, Category = "MySocket") void UpdateRecv(); UFUNCTION(BlueprintCallable, Category = "MySocket") void SendStringToServer(FString str); UFUNCTION(BlueprintCallable, Category = "MySocket") void SendBytesToServer(TArray<uint8> bytes); }; // Fill out your copyright notice in the Description page of Project Settings. #include "NewTcpClient.h" #include "SocketSubsystem.h" #include "IPAddress.h" #include "Sockets.h" #include "HAL/RunnableThread.h" #include "Async/Async.h" #include <string> #include "CoreMinimal.h" #include "Engine/GameInstance.h" #include "SocketSubsystem.h" //#include "NetWorking/Public/Interfaces/IPv4/IPv4Address.h" #include "Logging/MessageLog.h" #include "HAL/UnrealMemory.h" bool FIPv4AddressEx::Parse(const FString& AddressString, FIPv4AddressEx& OutAddress) { TArray<FString> Tokens; if (AddressString.ParseIntoArray(Tokens, TEXT("."), false) == 4) { OutAddress.A = FCString::Atoi(*Tokens[0]); OutAddress.B = FCString::Atoi(*Tokens[1]); OutAddress.C = FCString::Atoi(*Tokens[2]); OutAddress.D = FCString::Atoi(*Tokens[3]); return true; } return false; } FyzTcpClient::FyzTcpClient() { m_RecvBufferSize = 1024; m_SendBufferSize = 1024; m_TimeBetweenTicks = 0.03; m_LastReConnectTime = 0.0f; m_bRun = false; } FyzTcpClient::~FyzTcpClient() { Stop(); if (m_Thread) { m_Thread->WaitForCompletion(); delete m_Thread; m_Thread = nullptr; } } void FyzTcpClient::Connect(const FString& ipAddresstemp, int32 porttemp) { m_RecvBufferSize = 1024; m_SendBufferSize = 1024; m_TimeBetweenTicks = 0.016; m_LastReConnectTime = 0.0f; m_ipAddress = ipAddresstemp; m_port = porttemp; Init(); check(!m_Thread && "Thread wasn't null at the start!"); check(FPlatformProcess::SupportsMultithreading() && "This platform doesn't support multithreading!"); if (m_Thread) { UE_LOG(LogTemp, Log, TEXT("Log: Thread isn't null. It's: %s"), *m_Thread->GetThreadName()); } m_Thread = FRunnableThread::Create(this, *FString::Printf(TEXT("FTcpSocketWorker %s:%d"), *ipAddresstemp, porttemp), 128 * 1024, TPri_Normal); UE_LOG(LogTemp, Log, TEXT("Log: Created thread")); } bool FyzTcpClient::Init() { m_bRun = true; m_bConnected = false; m_Socket = NULL; return true; } void FyzTcpClient::ReConnect() { // if there is still a socket, close it so our peer will get a quick disconnect notification if (m_Socket) { m_Socket->Close(); } m_Socket = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateSocket(NAME_Stream, TEXT("default"), false); if (!m_Socket) { UE_LOG(LogTemp, Log, TEXT("FyzTcpClient Create Socket Error")); return; } m_Socket->SetReceiveBufferSize(m_RecvBufferSize, m_ActualRecvBufferSize); m_Socket->SetSendBufferSize(m_SendBufferSize, m_ActualSendBufferSize); FIPv4AddressEx ip; FIPv4AddressEx::Parse(m_ipAddress, ip); TSharedRef<FInternetAddr> internetAddr = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateInternetAddr(); internetAddr->SetIp(ip.Value); internetAddr->SetPort(m_port); double NowTime = FPlatformTime::Seconds(); m_bConnected = m_Socket->Connect(*internetAddr); if (m_bConnected) { m_LastReConnectTime = NowTime; ConnectedDelegate.ExecuteIfBound(0); GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("TcpClient ReConnect yes %d "),m_bConnected)); } else { GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Blue, FString::Printf(TEXT("TcpClient ReConnect faild %s %d"), *m_ipAddress, m_port)); m_LastReConnectTime = NowTime; } } uint32 FyzTcpClient::Run() { while (m_bRun) { FDateTime timeBeginningOfTick = FDateTime::UtcNow(); // Connect if (!m_bConnected) { double NowTime = FPlatformTime::Seconds(); if (NowTime - m_LastReConnectTime < 5.0f) { FPlatformProcess::Sleep(0.5); continue; } ReConnect(); m_LastReConnectTime = NowTime; continue; } // check if we weren't disconnected from the socket m_Socket->SetNonBlocking(true); // set to NonBlocking, because Blocking can't check for a disconnect for some reason int32 t_BytesRead; uint8 t_Dummy; if (!m_Socket->Recv(&t_Dummy, 1, t_BytesRead, ESocketReceiveFlags::Peek)) { m_bConnected = false; DisconnectedDelegate.ExecuteIfBound(0); GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("Dis Connect TcpServer"))); continue; } m_Socket->SetNonBlocking(false); // set back to Blocking // if Outbox has something to send, send it while (!Outbox.IsEmpty()) { TArray<uint8> toSend; Outbox.Dequeue(toSend); if (!BlockingSend(toSend.GetData(), toSend.Num())) { // if sending failed, stop running the thread //m_bRun = false; continue; } } // if we can read something uint32 PendingDataSize = 0; TArray<uint8> receivedData; int32 BytesReadTotal = 0; // keep going until we have no data. for (;;) { if (!m_Socket->HasPendingData(PendingDataSize)) { // no messages break; } receivedData.SetNumUninitialized(BytesReadTotal + PendingDataSize); int32 BytesRead = 0; if (!m_Socket->Recv(receivedData.GetData() + BytesReadTotal, m_ActualRecvBufferSize, BytesRead)) { // ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM); // error code: (int32)SocketSubsystem->GetLastErrorCode() break; } BytesReadTotal += BytesRead; /* TODO: if we have more PendingData than we could read, continue the while loop so that we can send messages if we have any, and then keep recving*/ } // if we received data, inform the main thread about it, so it can read TQueue if (receivedData.Num() != 0) { Inbox.Enqueue(receivedData); } /* In order to sleep, we will account for how much this tick took due to sending and receiving */ FDateTime timeEndOfTick = FDateTime::UtcNow(); FTimespan tickDuration = timeEndOfTick - timeBeginningOfTick; float secondsThisTickTook = tickDuration.GetTotalSeconds(); float timeToSleep = m_TimeBetweenTicks - secondsThisTickTook; if (timeToSleep > 0.0f) { //AsyncTask(ENamedThreads::GameThread, [timeToSleep]() { ATcpSocketConnection::PrintToConsole(FString::Printf(TEXT("Sleeping: %f seconds"), timeToSleep), false); }); FPlatformProcess::Sleep(timeToSleep); } } m_bConnected = false; //AsyncTask(ENamedThreads::GameThread, [this]() {ThreadSpawnerActor.Get()->ExecuteOnDisconnected(id, ThreadSpawnerActor);}); SocketShutdown(); return 0; } void FyzTcpClient::Stop() { m_bRun = false; } bool FyzTcpClient::isConnected() { FScopeLock ScopeLock(&SendCriticalSection); return m_bConnected; } bool FyzTcpClient::ReadFromInbox(TArray<uint8>& byteArray) { bool flag = Inbox.Dequeue(byteArray); if (true == flag) { return true; } else { return false; } return false; } void FyzTcpClient::ExecuteOnMessageReceived() { TArray<uint8> msg; bool retflag = ReadFromInbox(msg); if (true == retflag) { MessageReceivedByteDelegate.ExecuteIfBound(msg); FString retStr = Message_ReadString(msg, msg.Num()); //MessageReceivedStringDelegate.ExecuteIfBound(retStr); m_TcpRecvDelegate.ExecuteIfBound(retStr); //代码里面就不打印了,蓝图里面打印吧 //GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("get Msg %s "),*retStr)); } } void FyzTcpClient::UpdateRecvMessage() { ExecuteOnMessageReceived(); } void FyzTcpClient::AddToOutbox(TArray<uint8> Message) { Outbox.Enqueue(Message); } void FyzTcpClient::Exit() { } bool FyzTcpClient::BlockingSend(const uint8* Data, int32 BytesToSend) { if (BytesToSend > 0) { int32 BytesSent = 0; if (!m_Socket->Send(Data, BytesToSend, BytesSent)) { return false; } } return true; } void FyzTcpClient::SocketShutdown() { // if there is still a socket, close it so our peer will get a quick disconnect notification if (m_Socket) { m_Socket->Close(); } } FString FyzTcpClient::Message_ReadString(TArray<uint8>& Message, int32 BytesLength) { if (BytesLength <= 0) { return FString(""); } if (Message.Num() < BytesLength) { return FString(""); } TArray<uint8> StringAsArray; StringAsArray.Reserve(BytesLength); for (int i = 0; i < BytesLength; i++) { StringAsArray.Add(Message[0]); Message.RemoveAt(0); } std::string cstr(reinterpret_cast<const char*>(StringAsArray.GetData()), StringAsArray.Num()); return FString(UTF8_TO_TCHAR(cstr.c_str())); } TArray<uint8> FyzTcpClient::Conv_StringToBytes(const FString& InStr) { FTCHARToUTF8 Convert(*InStr); int BytesLength = Convert.Length(); //length of the utf-8 string in bytes (when non-latin letters are used, it's longer than just the number of characters) uint8* messageBytes = static_cast<uint8*>(FMemory::Malloc(BytesLength)); FMemory::Memcpy(messageBytes, (uint8*)TCHAR_TO_UTF8(InStr.GetCharArray().GetData()), BytesLength); //mcmpy is required, since TCHAR_TO_UTF8 returns an object with a very short lifetime TArray<uint8> result; for (int i = 0; i < BytesLength; i++) { result.Add(messageBytes[i]); } FMemory::Free(messageBytes); return result; } TArray<uint8> FyzTcpClient::Concat_BytesBytes(TArray<uint8> A, TArray<uint8> B) { TArray<uint8> ArrayResult; for (int i = 0; i < A.Num(); i++) { ArrayResult.Add(A[i]); } for (int i = 0; i < B.Num(); i++) { ArrayResult.Add(B[i]); } return ArrayResult; } TArray<uint8> FyzTcpClient::Conv_IntToBytes(int32 InInt) { TArray<uint8> result; for (int i = 0; i < 4; i++) { result.Add(InInt >> i * 8); } return result; } TArray<uint8> FyzTcpClient::Conv_FloatToBytes(float InFloat) { TArray<uint8> result; unsigned char const* p = reinterpret_cast<unsigned char const*>(&InFloat); for (int i = 0; i != sizeof(float); i++) { result.Add((uint8)p[i]); } return result; } TArray<uint8> FyzTcpClient::Conv_ByteToBytes(uint8 InByte) { TArray<uint8> result{ InByte }; return result; } int32 FyzTcpClient::Message_ReadInt(TArray<uint8>& Message) { if (Message.Num() < 4) { //PrintToConsole("Error in the ReadInt node. Not enough bytes in the Message.", true); return -1; } int result; unsigned char byteArray[4]; for (int i = 0; i < 4; i++) { byteArray[i] = Message[0]; Message.RemoveAt(0); } FMemory::Memcpy(&result, byteArray, 4); return result; } uint8 FyzTcpClient::Message_ReadByte(TArray<uint8>& Message) { if (Message.Num() < 1) { //PrintToConsole("Error in the ReadByte node. Not enough bytes in the Message.", true); return 255; } uint8 result = Message[0]; Message.RemoveAt(0); return result; } bool FyzTcpClient::Message_ReadBytes(int32 NumBytes, TArray<uint8>& Message, TArray<uint8>& ReturnArray) { for (int i = 0; i < NumBytes; i++) { if (Message.Num() >= 1) ReturnArray.Add(Message_ReadByte(Message)); else return false; } return true; } float FyzTcpClient::Message_ReadFloat(TArray<uint8>& Message) { if (Message.Num() < 4) { //PrintToConsole("Error in the ReadFloat node. Not enough bytes in the Message.", true); return -1.f; } float result; unsigned char byteArray[4]; for (int i = 0; i < 4; i++) { byteArray[i] = Message[0]; Message.RemoveAt(0); } FMemory::Memcpy(&result, byteArray, 4); return result; } void UNewTcpClient::Setup(FString ip, int port) { m_TcpClient.Connect(ip,port); } bool UNewTcpClient::SocketSend(FString message) { return true; } void UNewTcpClient::SetRecvMsgByteArrayDelegate(const FTcpSocketReceivedByteMessageDelegate& OnMessageReceived) { m_TcpClient.MessageReceivedByteDelegate = OnMessageReceived; } void UNewTcpClient::SetRecvMsgStringDelegate(const FTcpSocketReceivedStringMessageDelegate& OnMessageReceived) { //m_TcpClient.MessageReceivedStringDelegate = OnMessageReceived; m_TcpClient.m_TcpRecvDelegate = OnMessageReceived; } void UNewTcpClient::UpdateRecv() { m_TcpClient.UpdateRecvMessage(); } void UNewTcpClient::SendStringToServer(FString str) { TArray<uint8> ret = FyzTcpClient::Conv_StringToBytes(str); SendBytesToServer(ret); } void UNewTcpClient::SendBytesToServer(TArray<uint8> bytes) { m_TcpClient.AddToOutbox(bytes); }