Files
SimpleRemoter/client/IOCPClient.cpp
yuanyuanxiang 16741545a9 采用比zlib更快的压缩库zstd
远程桌面帧率提高到12;补充上传zlib.lib。
2019-01-17 11:58:26 +08:00

381 lines
9.3 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// IOCPClient.cpp: implementation of the IOCPClient class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "IOCPClient.h"
#include <IOSTREAM>
#if USING_ZLIB
#include "zlib.h"
#define Z_FAILED(p) (Z_OK != (p))
#define Z_SUCCESS(p) (!Z_FAILED(p))
#else
#include "zstd/zstd.h"
#pragma comment(lib, "zstd/zstd.lib")
#define Z_FAILED(p) ZSTD_isError(p)
#define Z_SUCCESS(p) (!Z_FAILED(p))
#define compress(dest, destLen, source, sourceLen) ZSTD_compress(dest, *(destLen), source, sourceLen, ZSTD_CLEVEL_DEFAULT)
#define uncompress(dest, destLen, source, sourceLen) ZSTD_decompress(dest, *(destLen), source, sourceLen)
#endif
#include <assert.h>
#include "Manager.h"
using namespace std;
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
VOID IOCPClient::setManagerCallBack(CManager* Manager)
{
m_Manager = Manager;
}
IOCPClient::IOCPClient(bool exit_while_disconnect)
{
m_Manager = NULL;
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
m_sClientSocket = INVALID_SOCKET;
m_hWorkThread = NULL;
m_bWorkThread = S_STOP;
memset(m_szPacketFlag, 0, sizeof(m_szPacketFlag));
memcpy(m_szPacketFlag,"Shine",FLAG_LENGTH);
m_bIsRunning = TRUE;
m_bConnected = FALSE;
InitializeCriticalSection(&m_cs);
m_exit_while_disconnect = exit_while_disconnect;
}
IOCPClient::~IOCPClient()
{
m_bIsRunning = FALSE;
if (m_sClientSocket!=INVALID_SOCKET)
{
closesocket(m_sClientSocket);
m_sClientSocket = INVALID_SOCKET;
}
if (m_hWorkThread!=NULL)
{
CloseHandle(m_hWorkThread);
m_hWorkThread = NULL;
}
WSACleanup();
while (S_RUN == m_bWorkThread)
Sleep(10);
DeleteCriticalSection(&m_cs);
m_bWorkThread = S_END;
}
BOOL IOCPClient::ConnectServer(char* szServerIP, unsigned short uPort)
{
m_sClientSocket = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP); //´«Êä²ã
if (m_sClientSocket == SOCKET_ERROR)
{
return FALSE;
}
//¹¹Ôìsockaddr_in½á¹¹ Ò²¾ÍÊÇÖ÷¿Ø¶ËµÄ½á¹¹
sockaddr_in ServerAddr;
ServerAddr.sin_family = AF_INET; //ÍøÂç²ã IP
ServerAddr.sin_port = htons(uPort);
ServerAddr.sin_addr.S_un.S_addr = inet_addr(szServerIP);
if (connect(m_sClientSocket,(SOCKADDR *)&ServerAddr,sizeof(sockaddr_in)) == SOCKET_ERROR)
{
if (m_sClientSocket!=INVALID_SOCKET)
{
closesocket(m_sClientSocket);
m_sClientSocket = INVALID_SOCKET;
}
return FALSE;
}
const int chOpt = 1; // True
// Set KeepAlive ¿ªÆô±£»î»úÖÆ, ·ÀÖ¹·þÎñ¶Ë²úÉúËÀÁ¬½Ó
if (setsockopt(m_sClientSocket, SOL_SOCKET, SO_KEEPALIVE,
(char *)&chOpt, sizeof(chOpt)) == 0)
{
// ÉèÖó¬Ê±ÏêϸÐÅÏ¢
tcp_keepalive klive;
klive.onoff = 1; // ÆôÓñ£»î
klive.keepalivetime = 1000 * 60 * 3; // 3·ÖÖÓ³¬Ê± Keep Alive
klive.keepaliveinterval = 1000 * 5; // ÖØÊÔ¼ä¸ôΪ5Ãë Resend if No-Reply
WSAIoctl(m_sClientSocket, SIO_KEEPALIVE_VALS,&klive,sizeof(tcp_keepalive),
NULL, 0,(unsigned long *)&chOpt,0,NULL);
}
if (m_hWorkThread == NULL){
m_hWorkThread = (HANDLE)CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE)WorkThreadProc,(LPVOID)this, 0, NULL);
m_bWorkThread = m_hWorkThread ? S_RUN : S_STOP;
}
std::cout<<"Á¬½Ó·þÎñ¶Ë³É¹¦.\n";
m_bConnected = TRUE;
return TRUE;
}
DWORD WINAPI IOCPClient::WorkThreadProc(LPVOID lParam)
{
IOCPClient* This = (IOCPClient*)lParam;
char szBuffer[MAX_RECV_BUFFER] = {0};
fd_set fd;
const struct timeval tm = { 2, 0 };
while (This->IsRunning()) // ûÓÐÍ˳ö£¬¾ÍÒ»Ö±ÏÝÔÚÕâ¸öÑ­»·ÖÐ
{
if(!This->IsConnected())
{
Sleep(50);
continue;
}
FD_ZERO(&fd);
FD_SET(This->m_sClientSocket, &fd);
int iRet = select(NULL, &fd, NULL, NULL, &tm);
if (iRet <= 0)
{
if (iRet == 0) Sleep(50);
else
{
printf("[select] return %d, GetLastError= %d. \n", iRet, WSAGetLastError());
This->Disconnect(); //½ÓÊÕ´íÎó´¦Àí
if(This->m_exit_while_disconnect)
break;
}
}
else if (iRet > 0)
{
memset(szBuffer, 0, sizeof(szBuffer));
int iReceivedLength = recv(This->m_sClientSocket,
szBuffer,sizeof(szBuffer), 0); //½ÓÊÕÖ÷¿Ø¶Ë·¢À´µÄÊý¾Ý
if (iReceivedLength <= 0)
{
int a = GetLastError();
This->Disconnect(); //½ÓÊÕ´íÎó´¦Àí
if(This->m_exit_while_disconnect)
break;
}else{
//ÕýÈ·½ÓÊվ͵÷ÓÃOnRead´¦Àí,תµ½OnRead
This->OnServerReceiving(szBuffer, iReceivedLength);
if (This->m_Manager->m_bIsDead)
{
printf("****** Recv bye bye ******\n");
// Í˳ö¿Í»§¶Ë
extern bool g_bExit;
g_bExit = true;
break;
}
}
}
}
This->m_bWorkThread = S_STOP;
This->m_bIsRunning = FALSE;
return 0xDEAD;
}
VOID IOCPClient::OnServerReceiving(char* szBuffer, ULONG ulLength)
{
try
{
assert (ulLength > 0);
//ÒÔϽӵ½Êý¾Ý½øÐнâѹËõ
CBuffer m_CompressedBuffer;
m_CompressedBuffer.WriteBuffer((LPBYTE)szBuffer, ulLength);
//¼ì²âÊý¾ÝÊÇ·ñ´óÓÚÊý¾ÝÍ·´óС Èç¹û²»ÊÇÄǾͲ»ÊÇÕýÈ·µÄÊý¾Ý
while (m_CompressedBuffer.GetBufferLength() > HDR_LENGTH)
{
char szPacketFlag[FLAG_LENGTH + 3] = {0};
CopyMemory(szPacketFlag, m_CompressedBuffer.GetBuffer(),FLAG_LENGTH);
//ÅжÏÊý¾ÝÍ·
if (memcmp(m_szPacketFlag, szPacketFlag, FLAG_LENGTH) != 0)
{
throw "Bad Buffer";
}
ULONG ulPackTotalLength = 0;
CopyMemory(&ulPackTotalLength, m_CompressedBuffer.GetBuffer(FLAG_LENGTH),
sizeof(ULONG));
//--- Êý¾ÝµÄ´óСÕýÈ·ÅжÏ
if (ulPackTotalLength &&
(m_CompressedBuffer.GetBufferLength()) >= ulPackTotalLength)
{
m_CompressedBuffer.ReadBuffer((PBYTE)szPacketFlag, FLAG_LENGTH);//¶ÁÈ¡¸÷ÖÖÍ·²¿ shine
m_CompressedBuffer.ReadBuffer((PBYTE) &ulPackTotalLength, sizeof(ULONG));
ULONG ulOriginalLength = 0;
m_CompressedBuffer.ReadBuffer((PBYTE) &ulOriginalLength, sizeof(ULONG));
//50
ULONG ulCompressedLength = ulPackTotalLength - HDR_LENGTH;
PBYTE CompressedBuffer = new BYTE[ulCompressedLength];
PBYTE DeCompressedBuffer = new BYTE[ulOriginalLength];
m_CompressedBuffer.ReadBuffer(CompressedBuffer, ulCompressedLength);
int iRet = uncompress(DeCompressedBuffer,
&ulOriginalLength, CompressedBuffer, ulCompressedLength);
if (Z_SUCCESS(iRet))//Èç¹û½âѹ³É¹¦
{
CBuffer m_DeCompressedBuffer;
m_DeCompressedBuffer.WriteBuffer(DeCompressedBuffer,
ulOriginalLength);
//½âѹºÃµÄÊý¾ÝºÍ³¤¶È´«µÝ¸ø¶ÔÏóManager½øÐд¦Àí ×¢ÒâÕâÀïÊÇÓÃÁ˶à̬
//ÓÉÓÚm_pManagerÖеÄ×ÓÀ಻һÑùÔì³Éµ÷ÓõÄOnReceiveº¯Êý²»Ò»Ñù
m_Manager->OnReceive((PBYTE)m_DeCompressedBuffer.GetBuffer(0),
m_DeCompressedBuffer.GetBufferLength());
}
else{
printf("[ERROR] uncompress failed \n");
throw "Bad Buffer";
}
delete [] CompressedBuffer;
delete [] DeCompressedBuffer;
}
else
break;
}
}catch(...) { }
}
int IOCPClient::OnServerSending(const char* szBuffer, ULONG ulOriginalLength) //Hello
{
AUTO_TICK(10);
assert (ulOriginalLength > 0);
{
//³ËÒÔ1.001ÊÇÒÔ×µÄÒ²¾ÍÊÇÊý¾ÝѹËõºóÕ¼ÓõÄÄÚ´æ¿Õ¼äºÍÔ­ÏÈÒ»Ñù +12
//·ÀÖ¹»º³åÇøÒç³ö// HelloWorld 10 22
//Êý¾ÝѹËõ ѹËõËã·¨ ΢ÈíÌṩ
//nSize = 436
//destLen = 448
#if USING_ZLIB
unsigned long ulCompressedLength = (double)ulOriginalLength * 1.001 + 12;
#else
unsigned long ulCompressedLength = ZSTD_compressBound(ulOriginalLength);
#endif
LPBYTE CompressedBuffer = new BYTE[ulCompressedLength];
int iRet = compress(CompressedBuffer, &ulCompressedLength, (PBYTE)szBuffer, ulOriginalLength);
if (Z_FAILED(iRet))
{
printf("[ERROR] compress failed \n");
delete [] CompressedBuffer;
return FALSE;
}
#if !USING_ZLIB
ulCompressedLength = iRet;
#endif
ULONG ulPackTotalLength = ulCompressedLength + HDR_LENGTH;
CBuffer m_WriteBuffer;
m_WriteBuffer.WriteBuffer((PBYTE)m_szPacketFlag, FLAG_LENGTH);
m_WriteBuffer.WriteBuffer((PBYTE) &ulPackTotalLength,sizeof(ULONG));
// 5 4
//[Shine][ 30 ]
m_WriteBuffer.WriteBuffer((PBYTE)&ulOriginalLength, sizeof(ULONG));
// 5 4 4
//[Shine][ 30 ][5]
m_WriteBuffer.WriteBuffer(CompressedBuffer,ulCompressedLength);
delete [] CompressedBuffer;
CompressedBuffer = NULL;
// ·Ö¿é·¢ËÍ
//shine[0035][0010][HelloWorld+12]
return SendWithSplit((char*)m_WriteBuffer.GetBuffer(), m_WriteBuffer.GetBufferLength(),
MAX_SEND_BUFFER);
}
}
// 5 2 // 2 2 1
BOOL IOCPClient::SendWithSplit(const char* szBuffer, ULONG ulLength, ULONG ulSplitLength)
{
int iReturn = 0; //ÕæÕý·¢ËÍÁ˶àÉÙ
const char* Travel = szBuffer;
int i = 0;
int ulSended = 0;
const int ulSendRetry = 15;
// ÒÀ´Î·¢ËÍ
for (i = ulLength; i >= ulSplitLength; i -= ulSplitLength)
{
int j = 0;
for (; j < ulSendRetry; ++j)
{
iReturn = send(m_sClientSocket, Travel, ulSplitLength, 0);
if (iReturn > 0)
{
break;
}
}
if (j == ulSendRetry)
{
return FALSE;
}
ulSended += iReturn;
Travel += ulSplitLength;
}
// ·¢ËÍ×îºóµÄ²¿·Ö
if (i>0) //1024
{
int j = 0;
for (; j < ulSendRetry; j++)
{
iReturn = send(m_sClientSocket, (char*)Travel,i,0);
if (iReturn > 0)
{
break;
}
}
if (j == ulSendRetry)
{
return FALSE;
}
ulSended += iReturn;
}
return (ulSended == ulLength) ? TRUE : FALSE;
}
VOID IOCPClient::Disconnect()
{
std::cout<<"¶Ï¿ªºÍ·þÎñ¶ËµÄÁ¬½Ó.\n";
CancelIo((HANDLE)m_sClientSocket);
closesocket(m_sClientSocket);
m_sClientSocket = INVALID_SOCKET;
m_bConnected = FALSE;
}
VOID IOCPClient::RunEventLoop(const BOOL &bCondition)
{
OutputDebugStringA("======> RunEventLoop begin\n");
while (m_bIsRunning && bCondition)
Sleep(200);
OutputDebugStringA("======> RunEventLoop end\n");
}