Files
SimpleRemoter/client/IOCPClient.cpp

546 lines
15 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// IOCPClient.cpp: implementation of the IOCPClient class.
//
//////////////////////////////////////////////////////////////////////
#ifdef _WIN32
#include "stdafx.h"
#else
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netinet/in.h> // For struct sockaddr_in
#include <unistd.h> // For close()
#include <cstring> // For memset()
inline int WSAGetLastError() { return -1; }
#define USING_COMPRESS 1
#endif
#include "IOCPClient.h"
#include <assert.h>
#include <string>
#if USING_ZLIB
#include "zlib/zlib.h"
#define Z_FAILED(p) (Z_OK != (p))
#define Z_SUCCESS(p) (!Z_FAILED(p))
#else
#if USING_LZ4
#include "lz4/lz4.h"
#pragma comment(lib, "lz4/lz4.lib")
#define Z_FAILED(p) (0 == (p))
#define Z_SUCCESS(p) (!Z_FAILED(p))
#define compress(dest, destLen, source, sourceLen) LZ4_compress_default((const char*)source, (char*)dest, sourceLen, *(destLen))
#define uncompress(dest, destLen, source, sourceLen) LZ4_decompress_safe((const char*)source, (char*)dest, sourceLen, *(destLen))
#else
#include "zstd/zstd.h"
#ifdef _WIN64
#pragma comment(lib, "zstd/zstd_x64.lib")
#else
#pragma comment(lib, "zstd/zstd.lib")
#endif
#define Z_FAILED(p) ZSTD_isError(p)
#define Z_SUCCESS(p) (!Z_FAILED(p))
#define ZSTD_CLEVEL 5
#if USING_CTX
#define compress(dest, destLen, source, sourceLen) ZSTD_compress2(m_Cctx, dest, *(destLen), source, sourceLen)
#define uncompress(dest, destLen, source, sourceLen) ZSTD_decompressDCtx(m_Dctx, dest, *(destLen), source, sourceLen)
#else
#define compress(dest, destLen, source, sourceLen) ZSTD_compress(dest, *(destLen), source, sourceLen, ZSTD_CLEVEL_DEFAULT)
#define uncompress(dest, destLen, source, sourceLen) ZSTD_decompress(dest, *(destLen), source, sourceLen)
#endif
#endif
#endif
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
#ifndef _WIN32
BOOL SetKeepAliveOptions(int socket, int nKeepAliveSec = 180) {
// ÆôÓà TCP ±£»îÑ¡Ïî
int enable = 1;
if (setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable)) < 0) {
std::cerr << "Failed to enable TCP keep-alive" << std::endl;
return FALSE;
}
// ÉèÖà TCP_KEEPIDLE (3·ÖÖÓ¿ÕÏкó¿ªÊ¼·¢ËÍ keep-alive °ü)
if (setsockopt(socket, IPPROTO_TCP, TCP_KEEPIDLE, &nKeepAliveSec, sizeof(nKeepAliveSec)) < 0) {
std::cerr << "Failed to set TCP_KEEPIDLE" << std::endl;
return FALSE;
}
// ÉèÖà TCP_KEEPINTVL (5ÃëµÄÖØÊÔ¼ä¸ô)
int keepAliveInterval = 5; // 5Ãë
if (setsockopt(socket, IPPROTO_TCP, TCP_KEEPINTVL, &keepAliveInterval, sizeof(keepAliveInterval)) < 0) {
std::cerr << "Failed to set TCP_KEEPINTVL" << std::endl;
return FALSE;
}
// ÉèÖà TCP_KEEPCNT (×î¶à5´Î̽²â°üºóÈÏΪÁ¬½Ó¶Ï¿ª)
int keepAliveProbes = 5;
if (setsockopt(socket, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveProbes, sizeof(keepAliveProbes)) < 0) {
std::cerr << "Failed to set TCP_KEEPCNT" << std::endl;
return FALSE;
}
std::cout << "TCP keep-alive settings applied successfully" << std::endl;
return TRUE;
}
#endif
VOID IOCPClient::setManagerCallBack(void* Manager, DataProcessCB dataProcess)
{
m_Manager = Manager;
m_DataProcess = dataProcess;
}
IOCPClient::IOCPClient(State&bExit, bool exit_while_disconnect) : g_bExit(bExit)
{
m_Manager = NULL;
#ifdef _WIN32
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
m_sClientSocket = INVALID_SOCKET;
m_hWorkThread = NULL;
m_bWorkThread = S_STOP;
memset(m_szPacketFlag, 0, sizeof(m_szPacketFlag));
memcpy(m_szPacketFlag,"Shine",FLAG_LENGTH);
m_bIsRunning = TRUE;
m_bConnected = FALSE;
m_exit_while_disconnect = exit_while_disconnect;
#if USING_CTX
m_Cctx = ZSTD_createCCtx();
m_Dctx = ZSTD_createDCtx();
ZSTD_CCtx_setParameter(m_Cctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL);
#endif
}
IOCPClient::~IOCPClient()
{
m_bIsRunning = FALSE;
if (m_sClientSocket!=INVALID_SOCKET)
{
closesocket(m_sClientSocket);
m_sClientSocket = INVALID_SOCKET;
}
if (m_hWorkThread!=NULL)
{
CloseHandle(m_hWorkThread);
m_hWorkThread = NULL;
}
#ifdef _WIN32
WSACleanup();
#endif
while (S_RUN == m_bWorkThread)
Sleep(10);
m_bWorkThread = S_END;
#if USING_CTX
ZSTD_freeCCtx(m_Cctx);
ZSTD_freeDCtx(m_Dctx);
#endif
}
// ´ÓÓòÃû»ñÈ¡IPµØÖ·
inline std::string GetIPAddress(const char *hostName)
{
#ifdef _WIN32
struct hostent *host = gethostbyname(hostName);
#ifdef _DEBUG
Mprintf("´ËÓòÃûµÄIPÀàÐÍΪ: %s.\n", host->h_addrtype == AF_INET ? "IPV4" : "IPV6");
for (int i = 0; host->h_addr_list[i]; ++i)
Mprintf("»ñÈ¡µÄµÚ%d¸öIP: %s\n", i+1, inet_ntoa(*(struct in_addr*)host->h_addr_list[i]));
#endif
if (host == NULL || host->h_addr_list == NULL)
return "";
return host->h_addr_list[0] ? inet_ntoa(*(struct in_addr*)host->h_addr_list[0]) : "";
#else
struct addrinfo hints, * res;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; // IPv4
hints.ai_socktype = SOCK_STREAM; // TCP socket
int status = getaddrinfo(hostName, nullptr, &hints, &res);
if (status != 0) {
std::cerr << "getaddrinfo failed: " << gai_strerror(status) << std::endl;
return "";
}
struct sockaddr_in* addr = reinterpret_cast<struct sockaddr_in*>(res->ai_addr);
char ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(addr->sin_addr), ip, sizeof(ip));
std::cout << "IP Address: " << ip << std::endl;
freeaddrinfo(res); // ²»ÒªÍü¼ÇÊͷŵØÖ·ÐÅÏ¢
return ip;
#endif
}
BOOL IOCPClient::ConnectServer(const char* szServerIP, unsigned short uPort)
{
m_sClientSocket = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP); //´«Êä²ã
if (m_sClientSocket == SOCKET_ERROR)
{
return FALSE;
}
#ifdef _WIN32
//¹¹Ôìsockaddr_in½á¹¹ Ò²¾ÍÊÇÖ÷¿Ø¶ËµÄ½á¹¹
sockaddr_in ServerAddr;
ServerAddr.sin_family = AF_INET; //ÍøÂç²ã IP
ServerAddr.sin_port = htons(uPort);
// ÈôszServerIP·ÇÊý×Ö¿ªÍ·£¬ÔòÈÏΪÊÇÓòÃû£¬Ðè½øÐÐIPת»»
std::string server = ('0' <= szServerIP[0] && szServerIP[0] <= '9')
? szServerIP : GetIPAddress(szServerIP);
ServerAddr.sin_addr.S_un.S_addr = inet_addr(server.c_str());
if (connect(m_sClientSocket,(SOCKADDR *)&ServerAddr,sizeof(sockaddr_in)) == SOCKET_ERROR)
{
if (m_sClientSocket!=INVALID_SOCKET)
{
closesocket(m_sClientSocket);
m_sClientSocket = INVALID_SOCKET;
}
return FALSE;
}
#else
sockaddr_in ServerAddr = {};
ServerAddr.sin_family = AF_INET; // ÍøÂç²ã IP
ServerAddr.sin_port = htons(uPort);
std::string server = ('0' <= szServerIP[0] && szServerIP[0] <= '9')
? szServerIP : GetIPAddress(szServerIP);
// ÈôszServerIP·ÇÊý×Ö¿ªÍ·£¬ÔòÈÏΪÊÇÓòÃû£¬Ðè½øÐÐIPת»»
// ʹÓà inet_pton Ìæ´ú inet_addr (inet_pton ¿ÉÒÔÖ§³Ö IPv4 ºÍ IPv6)
if (inet_pton(AF_INET, server.c_str(), &ServerAddr.sin_addr) <= 0) {
std::cerr << "Invalid address or address not supported" << std::endl;
return false;
}
// ´´½¨Ì×½Ó×Ö
m_sClientSocket = socket(AF_INET, SOCK_STREAM, 0);
if (m_sClientSocket == -1) {
std::cerr << "Failed to create socket" << std::endl;
return false;
}
// Á¬½Óµ½·þÎñÆ÷
if (connect(m_sClientSocket, (struct sockaddr*)&ServerAddr, sizeof(ServerAddr)) == -1) {
std::cerr << "Connection failed" << std::endl;
close(m_sClientSocket);
m_sClientSocket = -1; // ±ê¼ÇÌ×½Ó×ÖÎÞЧ
return false;
}
#endif
const int chOpt = 1; // True
// Set KeepAlive ¿ªÆô±£»î»úÖÆ, ·ÀÖ¹·þÎñ¶Ë²úÉúËÀÁ¬½Ó
if (setsockopt(m_sClientSocket, SOL_SOCKET, SO_KEEPALIVE,
(char *)&chOpt, sizeof(chOpt)) == 0)
{
#ifdef _WIN32
// ÉèÖó¬Ê±ÏêϸÐÅÏ¢
tcp_keepalive klive;
klive.onoff = 1; // ÆôÓñ£»î
klive.keepalivetime = 1000 * 60 * 3; // 3·ÖÖÓ³¬Ê± Keep Alive
klive.keepaliveinterval = 1000 * 5; // ÖØÊÔ¼ä¸ôΪ5Ãë Resend if No-Reply
WSAIoctl(m_sClientSocket, SIO_KEEPALIVE_VALS,&klive,sizeof(tcp_keepalive),
NULL, 0,(unsigned long *)&chOpt,0,NULL);
#else
// ÉèÖñ£»îÑ¡Ïî
SetKeepAliveOptions(m_sClientSocket);
#endif
}
if (m_hWorkThread == NULL){
#ifdef _WIN32
m_hWorkThread = (HANDLE)CreateThread(NULL, 0,
WorkThreadProc,(LPVOID)this, 0, NULL);
m_bWorkThread = m_hWorkThread ? S_RUN : S_STOP;
#else
pthread_t id = 0;
m_hWorkThread = (HANDLE)pthread_create(&id, nullptr, (void* (*)(void*))IOCPClient::WorkThreadProc, this);
#endif
}
Mprintf("Á¬½Ó·þÎñ¶Ë³É¹¦.\n");
m_bConnected = TRUE;
return TRUE;
}
DWORD WINAPI IOCPClient::WorkThreadProc(LPVOID lParam)
{
IOCPClient* This = (IOCPClient*)lParam;
char* szBuffer = new char[MAX_RECV_BUFFER];
fd_set fd;
struct timeval tm = { 2, 0 };
while (This->IsRunning()) // ûÓÐÍ˳ö£¬¾ÍÒ»Ö±ÏÝÔÚÕâ¸öÑ­»·ÖÐ
{
if(!This->IsConnected())
{
Sleep(50);
continue;
}
FD_ZERO(&fd);
FD_SET(This->m_sClientSocket, &fd);
#ifdef _WIN32
int iRet = select(NULL, &fd, NULL, NULL, &tm);
#else
int iRet = select(This->m_sClientSocket + 1, &fd, NULL, NULL, &tm);
#endif
if (iRet <= 0)
{
if (iRet == 0) Sleep(50);
else
{
Mprintf("[select] return %d, GetLastError= %d. \n", iRet, WSAGetLastError());
This->Disconnect(); //½ÓÊÕ´íÎó´¦Àí
if(This->m_exit_while_disconnect)
break;
}
}
else if (iRet > 0)
{
memset(szBuffer, 0, MAX_RECV_BUFFER);
int iReceivedLength = recv(This->m_sClientSocket,
szBuffer, MAX_RECV_BUFFER, 0); //½ÓÊÕÖ÷¿Ø¶Ë·¢À´µÄÊý¾Ý
if (iReceivedLength <= 0)
{
int a = WSAGetLastError();
This->Disconnect(); //½ÓÊÕ´íÎó´¦Àí
if(This->m_exit_while_disconnect)
break;
}else{
//ÕýÈ·½ÓÊվ͵÷ÓÃOnRead´¦Àí,תµ½OnRead
This->OnServerReceiving(szBuffer, iReceivedLength);
}
}
}
This->m_bWorkThread = S_STOP;
This->m_bIsRunning = FALSE;
delete[] szBuffer;
return 0xDEAD;
}
VOID IOCPClient::OnServerReceiving(char* szBuffer, ULONG ulLength)
{
try
{
assert (ulLength > 0);
//ÒÔϽӵ½Êý¾Ý½øÐнâѹËõ
m_CompressedBuffer.WriteBuffer((LPBYTE)szBuffer, ulLength);
//¼ì²âÊý¾ÝÊÇ·ñ´óÓÚÊý¾ÝÍ·´óС Èç¹û²»ÊÇÄǾͲ»ÊÇÕýÈ·µÄÊý¾Ý
while (m_CompressedBuffer.GetBufferLength() > HDR_LENGTH)
{
char szPacketFlag[FLAG_LENGTH + 3] = {0};
LPBYTE src = m_CompressedBuffer.GetBuffer();
CopyMemory(szPacketFlag, src, FLAG_LENGTH);
//ÅжÏÊý¾ÝÍ·
if (memcmp(m_szPacketFlag, szPacketFlag, FLAG_LENGTH) != 0)
{
throw "Bad Buffer";
}
ULONG ulPackTotalLength = 0;
CopyMemory(&ulPackTotalLength, m_CompressedBuffer.GetBuffer(FLAG_LENGTH),
sizeof(ULONG));
//--- Êý¾ÝµÄ´óСÕýÈ·ÅжÏ
ULONG len = m_CompressedBuffer.GetBufferLength();
if (ulPackTotalLength && len >= ulPackTotalLength)
{
m_CompressedBuffer.ReadBuffer((PBYTE)szPacketFlag, FLAG_LENGTH);//¶ÁÈ¡¸÷ÖÖÍ·²¿ shine
m_CompressedBuffer.ReadBuffer((PBYTE) &ulPackTotalLength, sizeof(ULONG));
ULONG ulOriginalLength = 0;
m_CompressedBuffer.ReadBuffer((PBYTE) &ulOriginalLength, sizeof(ULONG));
//50
ULONG ulCompressedLength = ulPackTotalLength - HDR_LENGTH;
PBYTE CompressedBuffer = new BYTE[ulCompressedLength];
PBYTE DeCompressedBuffer = new BYTE[ulOriginalLength];
m_CompressedBuffer.ReadBuffer(CompressedBuffer, ulCompressedLength);
size_t iRet = uncompress(DeCompressedBuffer,
&ulOriginalLength, CompressedBuffer, ulCompressedLength);
if (Z_SUCCESS(iRet))//Èç¹û½âѹ³É¹¦
{
CBuffer m_DeCompressedBuffer;
m_DeCompressedBuffer.WriteBuffer(DeCompressedBuffer,
ulOriginalLength);
//½âѹºÃµÄÊý¾ÝºÍ³¤¶È´«µÝ¸ø¶ÔÏóManager½øÐд¦Àí ×¢ÒâÕâÀïÊÇÓÃÁ˶à̬
//ÓÉÓÚm_pManagerÖеÄ×ÓÀ಻һÑùÔì³Éµ÷ÓõÄOnReceiveº¯Êý²»Ò»Ñù
if (m_DataProcess)
m_DataProcess(m_Manager, (PBYTE)m_DeCompressedBuffer.GetBuffer(0),
m_DeCompressedBuffer.GetBufferLength());
}
else{
Mprintf("[ERROR] uncompress fail: dstLen %d, srcLen %d\n", ulOriginalLength, ulCompressedLength);
delete [] CompressedBuffer;
delete [] DeCompressedBuffer;
throw "Bad Buffer";
}
delete [] CompressedBuffer;
delete [] DeCompressedBuffer;
#if _DEBUG
// Mprintf("[INFO] uncompress succeed data len: %d expect: %d\n", len, ulPackTotalLength);
#endif
}
else {
Mprintf("[WARNING] OnServerReceiving incomplete data: %d expect: %d\n", len, ulPackTotalLength);
break;
}
}
}catch(...) {
m_CompressedBuffer.ClearBuffer();
Mprintf("[ERROR] OnServerReceiving catch an error \n");
}
}
// Ïòserver·¢ËÍÊý¾Ý£¬Ñ¹Ëõ²Ù×÷±È½ÏºÄʱ¡£
// ¹Ø±ÕѹËõ¿ª¹ØÊ±£¬SendWithSplit±È½ÏºÄʱ¡£
BOOL IOCPClient::OnServerSending(const char* szBuffer, ULONG ulOriginalLength) //Hello
{
AUTO_TICK(50);
assert (ulOriginalLength > 0);
{
//³ËÒÔ1.001ÊÇÒÔ×µÄÒ²¾ÍÊÇÊý¾ÝѹËõºóÕ¼ÓõÄÄÚ´æ¿Õ¼äºÍÔ­ÏÈÒ»Ñù +12
//·ÀÖ¹»º³åÇøÒç³ö// HelloWorld 10 22
//Êý¾ÝѹËõ ѹËõËã·¨ ΢ÈíÌṩ
//nSize = 436
//destLen = 448
#if USING_ZLIB
unsigned long ulCompressedLength = (double)ulOriginalLength * 1.001 + 12;
#elif USING_LZ4
unsigned long ulCompressedLength = LZ4_compressBound(ulOriginalLength);
#else
unsigned long ulCompressedLength = ZSTD_compressBound(ulOriginalLength);
#endif
LPBYTE CompressedBuffer = new BYTE[ulCompressedLength];
int iRet = compress(CompressedBuffer, &ulCompressedLength, (PBYTE)szBuffer, ulOriginalLength);
if (Z_FAILED(iRet))
{
Mprintf("[ERROR] compress failed \n");
delete [] CompressedBuffer;
return FALSE;
}
#if !USING_ZLIB
ulCompressedLength = iRet;
#endif
ULONG ulPackTotalLength = ulCompressedLength + HDR_LENGTH;
CBuffer m_WriteBuffer;
m_WriteBuffer.WriteBuffer((PBYTE)m_szPacketFlag, FLAG_LENGTH);
m_WriteBuffer.WriteBuffer((PBYTE) &ulPackTotalLength,sizeof(ULONG));
// 5 4
//[Shine][ 30 ]
m_WriteBuffer.WriteBuffer((PBYTE)&ulOriginalLength, sizeof(ULONG));
// 5 4 4
//[Shine][ 30 ][5]
m_WriteBuffer.WriteBuffer(CompressedBuffer,ulCompressedLength);
delete [] CompressedBuffer;
CompressedBuffer = NULL;
// ·Ö¿é·¢ËÍ
//shine[0035][0010][HelloWorld+12]
return SendWithSplit((char*)m_WriteBuffer.GetBuffer(), m_WriteBuffer.GetBufferLength(),
MAX_SEND_BUFFER);
}
}
// 5 2 // 2 2 1
BOOL IOCPClient::SendWithSplit(const char* szBuffer, ULONG ulLength, ULONG ulSplitLength)
{
AUTO_TICK(25);
int iReturn = 0; //ÕæÕý·¢ËÍÁ˶àÉÙ
const char* Travel = szBuffer;
int i = 0;
int ulSended = 0;
const int ulSendRetry = 15;
// ÒÀ´Î·¢ËÍ
for (i = ulLength; i >= ulSplitLength; i -= ulSplitLength)
{
int j = 0;
for (; j < ulSendRetry; ++j)
{
iReturn = send(m_sClientSocket, Travel, ulSplitLength, 0);
if (iReturn > 0)
{
break;
}
}
if (j == ulSendRetry)
{
return FALSE;
}
ulSended += iReturn;
Travel += ulSplitLength;
}
// ·¢ËÍ×îºóµÄ²¿·Ö
if (i>0) //1024
{
int j = 0;
for (; j < ulSendRetry; j++)
{
iReturn = send(m_sClientSocket, (char*)Travel,i,0);
if (iReturn > 0)
{
break;
}
}
if (j == ulSendRetry)
{
return FALSE;
}
ulSended += iReturn;
}
return (ulSended == ulLength) ? TRUE : FALSE;
}
VOID IOCPClient::Disconnect()
{
Mprintf("¶Ï¿ªºÍ·þÎñ¶ËµÄÁ¬½Ó.\n");
CancelIo((HANDLE)m_sClientSocket);
closesocket(m_sClientSocket);
m_sClientSocket = INVALID_SOCKET;
m_bConnected = FALSE;
}
VOID IOCPClient::RunEventLoop(const BOOL &bCondition)
{
OutputDebugStringA("======> RunEventLoop begin\n");
while (m_bIsRunning && bCondition)
Sleep(200);
OutputDebugStringA("======> RunEventLoop end\n");
}