Files
SimpleRemoter/server/2015Remote/IOCPServer.cpp
2024-12-26 21:37:27 +08:00

808 lines
21 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "StdAfx.h"
#include "IOCPServer.h"
#include "2015Remote.h"
#include <iostream>
#if USING_ZLIB
#include "zlib.h"
#define Z_FAILED(p) (Z_OK != (p))
#define Z_SUCCESS(p) (!Z_FAILED(p))
#else
#if USING_LZ4
#include "lz4/lz4.h"
#pragma comment(lib, "lz4/lz4.lib")
#define Z_FAILED(p) (0 == (p))
#define Z_SUCCESS(p) (!Z_FAILED(p))
#define compress(dest, destLen, source, sourceLen) LZ4_compress_default((const char*)source, (char*)dest, sourceLen, *(destLen))
#define uncompress(dest, destLen, source, sourceLen) LZ4_decompress_safe((const char*)source, (char*)dest, sourceLen, *(destLen))
#else
#include "zstd/zstd.h"
#pragma comment(lib, "zstd/zstd.lib")
#define Z_FAILED(p) ZSTD_isError(p)
#define Z_SUCCESS(p) (!Z_FAILED(p))
#define compress(dest, destLen, source, sourceLen) ZSTD_compress(dest, *(destLen), source, sourceLen, ZSTD_CLEVEL_DEFAULT)
#define uncompress(dest, destLen, source, sourceLen) ZSTD_decompress(dest, *(destLen), source, sourceLen)
#endif
#endif
using namespace std;
CRITICAL_SECTION IOCPServer::m_cs = {0};
#define HUERISTIC_VALUE 2
#define SAFE_DELETE(p) if(p){ delete (p); (p) = NULL; }
IOCPServer::IOCPServer(void)
{
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2,2), &wsaData)!=0)
{
return;
}
m_hCompletionPort = NULL;
m_sListenSocket = INVALID_SOCKET;
m_hListenEvent = WSA_INVALID_EVENT;
m_hListenThread = INVALID_HANDLE_VALUE;
m_ulMaxConnections = ((CMy2015RemoteApp*)AfxGetApp())->m_iniFile.GetInt("settings", "MaxConnection");
if (m_ulMaxConnections<=0)
{
m_ulMaxConnections = 10000;
}
InitializeCriticalSection(&m_cs);
m_ulWorkThreadCount = 0;
m_bTimeToKill = FALSE;
m_ulThreadPoolMin = 0;
m_ulThreadPoolMax = 0;
m_ulCPULowThreadsHold = 0;
m_ulCPUHighThreadsHold = 0;
m_ulCurrentThread = 0;
m_ulBusyThread = 0;
m_ulKeepLiveTime = 0;
m_hKillEvent = NULL;
memset(m_szPacketFlag, 0, sizeof(m_szPacketFlag));
memcpy(m_szPacketFlag,"Shine",FLAG_LENGTH);
m_NotifyProc = NULL;
m_OfflineProc = NULL;
}
IOCPServer::~IOCPServer(void)
{
m_bTimeToKill = TRUE;
Sleep(10);
SetEvent(m_hKillEvent);
Sleep(10);
if (m_hKillEvent!=NULL)
{
CloseHandle(m_hKillEvent);
}
if (m_sListenSocket!=INVALID_SOCKET)
{
closesocket(m_sListenSocket);
m_sListenSocket = INVALID_SOCKET;
}
if (m_hCompletionPort!=INVALID_HANDLE_VALUE)
{
CloseHandle(m_hCompletionPort);
m_hCompletionPort = INVALID_HANDLE_VALUE;
}
if (m_hListenEvent!=WSA_INVALID_EVENT)
{
CloseHandle(m_hListenEvent);
m_hListenEvent = WSA_INVALID_EVENT;
}
while (!m_ContextConnectionList.IsEmpty())
{
CONTEXT_OBJECT *ContextObject = m_ContextConnectionList.GetHead();
RemoveStaleContext(ContextObject);
}
while (!m_ContextFreePoolList.IsEmpty())
{
CONTEXT_OBJECT *ContextObject = m_ContextFreePoolList.RemoveHead();
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>б<EFBFBD><D0B1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʣ<EFBFBD>2019.1.14
//SAFE_DELETE(ContextObject->olps);
delete ContextObject;
}
while (m_ulWorkThreadCount)
Sleep(10);
DeleteCriticalSection(&m_cs);
m_ulWorkThreadCount = 0;
m_ulThreadPoolMin = 0;
m_ulThreadPoolMax = 0;
m_ulCPULowThreadsHold = 0;
m_ulCPUHighThreadsHold = 0;
m_ulCurrentThread = 0;
m_ulBusyThread = 0;
m_ulKeepLiveTime = 0;
WSACleanup();
}
// <20><><EFBFBD>ش<EFBFBD><D8B4><EFBFBD><EFBFBD><EFBFBD>0<EFBFBD><30><EFBFBD><EFBFBD><EFBFBD>ɹ<EFBFBD><C9B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ.
UINT IOCPServer::StartServer(pfnNotifyProc NotifyProc, pfnOfflineProc OffProc, USHORT uPort)
{
m_NotifyProc = NotifyProc;
m_OfflineProc = OffProc;
m_hKillEvent = CreateEvent(NULL,FALSE,FALSE,NULL);
if (m_hKillEvent==NULL)
{
return 1;
}
m_sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>׽<EFBFBD><D7BD><EFBFBD>
if (m_sListenSocket == INVALID_SOCKET)
{
return 2;
}
m_hListenEvent = WSACreateEvent();
if (m_hListenEvent == WSA_INVALID_EVENT)
{
closesocket(m_sListenSocket);
m_sListenSocket = INVALID_SOCKET;
return 3;
}
int iRet = WSAEventSelect(m_sListenSocket, //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>׽<EFBFBD><D7BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD><C2BC><EFBFBD><EFBFBD>й<EFBFBD><D0B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>FD_ACCEPT<50><54><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
m_hListenEvent,
FD_ACCEPT);
if (iRet == SOCKET_ERROR)
{
int a = GetLastError();
closesocket(m_sListenSocket);
m_sListenSocket = INVALID_SOCKET;
WSACloseEvent(m_hListenEvent);
m_hListenEvent = WSA_INVALID_EVENT;
return a;
}
SOCKADDR_IN ServerAddr;
ServerAddr.sin_port = htons(uPort);
ServerAddr.sin_family = AF_INET;
ServerAddr.sin_addr.s_addr = INADDR_ANY; //<2F><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>׻<EFBFBD><D7BB>ֺ<EFBFBD><D6BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>bind
iRet = bind(m_sListenSocket,
(sockaddr*)&ServerAddr,
sizeof(ServerAddr));
if (iRet == SOCKET_ERROR)
{
int a = GetLastError();
closesocket(m_sListenSocket);
m_sListenSocket = INVALID_SOCKET;
WSACloseEvent(m_hListenEvent);
m_hListenEvent = WSA_INVALID_EVENT;
return a;
}
iRet = listen(m_sListenSocket, SOMAXCONN);
if (iRet == SOCKET_ERROR)
{
int a = GetLastError();
closesocket(m_sListenSocket);
m_sListenSocket = INVALID_SOCKET;
WSACloseEvent(m_hListenEvent);
m_hListenEvent = WSA_INVALID_EVENT;
return a;
}
m_hListenThread =
(HANDLE)CreateThread(NULL,
0,
(LPTHREAD_START_ROUTINE)ListenThreadProc,
(void*)this, //<2F><>Thread<61>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>this <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ǵ<EFBFBD><C7B5>̻߳ص<CCBB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>еij<D0B5>Ա
0,
NULL);
if (m_hListenThread==INVALID_HANDLE_VALUE)
{
int a = GetLastError();
closesocket(m_sListenSocket);
m_sListenSocket = INVALID_SOCKET;
WSACloseEvent(m_hListenEvent);
m_hListenEvent = WSA_INVALID_EVENT;
return a;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD> 1 2
InitializeIOCP();
return 0;
}
//1<><31><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɶ˿<C9B6>
//2<><32><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
BOOL IOCPServer::InitializeIOCP(VOID)
{
m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
if ( m_hCompletionPort == NULL )
{
return FALSE;
}
if (m_hCompletionPort==INVALID_HANDLE_VALUE)
{
return FALSE;
}
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo); //<2F><><EFBFBD><EFBFBD>PC<50><43><EFBFBD>м<EFBFBD><D0BC><EFBFBD>
m_ulThreadPoolMin = 1;
m_ulThreadPoolMax = SystemInfo.dwNumberOfProcessors * 2;
m_ulCPULowThreadsHold = 10;
m_ulCPUHighThreadsHold = 75;
m_cpu.Init();
ULONG ulWorkThreadCount = m_ulThreadPoolMax;
HANDLE hWorkThread = NULL;
for (int i=0; i<ulWorkThreadCount; ++i)
{
hWorkThread = (HANDLE)CreateThread(NULL, //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>Ŀ<EFBFBD><C4BF><EFBFBD>Ǵ<EFBFBD><C7B4><EFBFBD>Ͷ<EFBFBD>ݵ<EFBFBD><DDB5><EFBFBD><EFBFBD>ɶ˿<C9B6><CBBF>е<EFBFBD><D0B5><EFBFBD><EFBFBD><EFBFBD>
0,
(LPTHREAD_START_ROUTINE)WorkThreadProc,
(void*)this,
0,
NULL);
if (hWorkThread == NULL )
{
CloseHandle(m_hCompletionPort);
return FALSE;
}
m_ulWorkThreadCount++;
CloseHandle(hWorkThread);
}
return TRUE;
}
DWORD IOCPServer::WorkThreadProc(LPVOID lParam)
{
OutputDebugStringA("======> IOCPServer WorkThreadProc begin \n");
IOCPServer* This = (IOCPServer*)(lParam);
HANDLE hCompletionPort = This->m_hCompletionPort;
DWORD dwTrans = 0;
PCONTEXT_OBJECT ContextObject = NULL;
LPOVERLAPPED Overlapped = NULL;
OVERLAPPEDPLUS* OverlappedPlus = NULL;
ULONG ulBusyThread = 0;
BOOL bError = FALSE;
InterlockedIncrement(&This->m_ulCurrentThread);
InterlockedIncrement(&This->m_ulBusyThread);
timeBeginPeriod(1);
while (This->m_bTimeToKill==FALSE)
{
AUTO_TICK(40);
InterlockedDecrement(&This->m_ulBusyThread);
// GetQueuedCompletionStatus<75><73>ʱ<EFBFBD>Ƚϳ<C8BD><CFB3><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¿ͻ<C2BF><CDBB>˷<EFBFBD><CBB7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݵ<EFBFBD><DDB5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߲<EFBFBD><DFB2><EFBFBD>
BOOL bOk = GetQueuedCompletionStatus(
hCompletionPort,
&dwTrans,
(LPDWORD)&ContextObject,
&Overlapped, INFINITE);
STOP_TICK;
DWORD dwIOError = GetLastError();
OverlappedPlus = CONTAINING_RECORD(Overlapped, OVERLAPPEDPLUS, m_ol);
ulBusyThread = InterlockedIncrement(&This->m_ulBusyThread); //1 1
if ( !bOk && dwIOError != WAIT_TIMEOUT ) //<2F><><EFBFBD>Է<EFBFBD><D4B7><EFBFBD><EFBFBD>׻<EFBFBD><D7BB>Ʒ<EFBFBD><C6B7><EFBFBD><EFBFBD>˹ر<CBB9>
{
if (ContextObject && This->m_bTimeToKill == FALSE &&dwTrans==0)
{
ContextObject->olps = NULL;
OutputDebugStringA("!!! RemoveStaleContext \n");
This->RemoveStaleContext(ContextObject);
}
SAFE_DELETE(OverlappedPlus);
continue;
}
if (!bError)
{
//<2F><><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD>µ<EFBFBD><C2B5>̵߳<DFB3><CCB5>̵߳<DFB3><CCB5>̳߳<DFB3>
if (ulBusyThread == This->m_ulCurrentThread)
{
if (ulBusyThread < This->m_ulThreadPoolMax)
{
if (ContextObject != NULL)
{
HANDLE hThread = (HANDLE)CreateThread(NULL,
0,
(LPTHREAD_START_ROUTINE)WorkThreadProc,
(void*)This,
0,
NULL);
InterlockedIncrement(&This->m_ulWorkThreadCount);
CloseHandle(hThread);
}
}
}
if (!bOk && dwIOError == WAIT_TIMEOUT)
{
if (ContextObject == NULL)
{
if (This->m_ulCurrentThread > This->m_ulThreadPoolMin)
{
break;
}
bError = TRUE;
}
}
}
if (!bError)
{
if(bOk && OverlappedPlus!=NULL && ContextObject!=NULL)
{
try
{
This->HandleIO(OverlappedPlus->m_ioType, ContextObject, dwTrans);
ContextObject = NULL;
}
catch (...) {
OutputDebugStringA("This->HandleIO catched an error!!!");
}
}
}
SAFE_DELETE(OverlappedPlus);
}
timeEndPeriod(1);
SAFE_DELETE(OverlappedPlus);
InterlockedDecrement(&This->m_ulWorkThreadCount);
InterlockedDecrement(&This->m_ulCurrentThread);
InterlockedDecrement(&This->m_ulBusyThread);
OutputDebugStringA("======> IOCPServer WorkThreadProc end \n");
return 0;
}
//<2F>ڹ<EFBFBD><DAB9><EFBFBD><EFBFBD>߳<EFBFBD><DFB3>б<EFBFBD><D0B1><EFBFBD><EFBFBD><EFBFBD>
BOOL IOCPServer::HandleIO(IOType PacketFlags,PCONTEXT_OBJECT ContextObject, DWORD dwTrans)
{
AUTO_TICK(20);
BOOL bRet = FALSE;
switch (PacketFlags)
{
case IOInitialize:
bRet = OnClientInitializing(ContextObject, dwTrans);
break;
case IORead:
bRet = OnClientReceiving(ContextObject,dwTrans);
break;
case IOWrite:
bRet = OnClientPostSending(ContextObject,dwTrans);
break;
case IOIdle:
OutputDebugStringA("=> HandleIO PacketFlags= IOIdle\n");
break;
default:
break;
}
return bRet;
}
BOOL IOCPServer::OnClientInitializing(PCONTEXT_OBJECT ContextObject, DWORD dwTrans)
{
return TRUE;
}
BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans)
{
CLock cs(m_cs);
try
{
if (dwTrans == 0) //<2F>Է<EFBFBD><D4B7>ر<EFBFBD><D8B1><EFBFBD><EFBFBD>׽<EFBFBD><D7BD><EFBFBD>
{
RemoveStaleContext(ContextObject);
return FALSE;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD>յ<EFBFBD><D5B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݿ<EFBFBD><DDBF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Լ<EFBFBD><D4BC><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD>wsabuff 8192
ContextObject->InCompressedBuffer.WriteBuffer((PBYTE)ContextObject->szBuffer,dwTrans);
//<2F><EFBFBD><E9BFB4><EFBFBD>ݰ<EFBFBD><DDB0><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
while (ContextObject->InCompressedBuffer.GetBufferLength() > HDR_LENGTH)
{
char szPacketFlag[FLAG_LENGTH + 3]= {0}; // 8<>ֽڶ<D6BD><DAB6><EFBFBD>
ContextObject->InCompressedBuffer.CopyBuffer(szPacketFlag, FLAG_LENGTH, 0);
if (memcmp(m_szPacketFlag, szPacketFlag, FLAG_LENGTH) != 0)
throw "Bad Buffer";
//Shine[50][kdjfkdjfkj]
ULONG ulPackTotalLength = 0;
ContextObject->InCompressedBuffer.CopyBuffer(&ulPackTotalLength, sizeof(ULONG), FLAG_LENGTH);
//ȡ<><C8A1><EFBFBD><EFBFBD><EFBFBD>ݰ<EFBFBD><DDB0><EFBFBD><EFBFBD>ܳ<EFBFBD>
//50
if (ulPackTotalLength && (ContextObject->InCompressedBuffer.GetBufferLength()) >= ulPackTotalLength)
{
ULONG ulOriginalLength = 0;
ContextObject->InCompressedBuffer.ReadBuffer((PBYTE)szPacketFlag, FLAG_LENGTH);
ContextObject->InCompressedBuffer.ReadBuffer((PBYTE) &ulPackTotalLength, sizeof(ULONG));
ContextObject->InCompressedBuffer.ReadBuffer((PBYTE) &ulOriginalLength, sizeof(ULONG));
ULONG ulCompressedLength = ulPackTotalLength - HDR_LENGTH; //461 - 13 448
PBYTE CompressedBuffer = new BYTE[ulCompressedLength]; //û<>н<EFBFBD>ѹ
//<2F><><EFBFBD><EFBFBD><EFBFBD>ݰ<EFBFBD><DDB0><EFBFBD>ǰ<EFBFBD><C7B0>Դ<EFBFBD><D4B4><EFBFBD><EFBFBD>û<EFBFBD>н<EFBFBD>ѹ<EFBFBD><D1B9>ȡ<EFBFBD><C8A1>pData 448
ContextObject->InCompressedBuffer.ReadBuffer(CompressedBuffer, ulCompressedLength);
#if USING_COMPRESS
PBYTE DeCompressedBuffer = new BYTE[ulOriginalLength]; //<2F><>ѹ<EFBFBD><D1B9><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD> 436
int iRet = uncompress(DeCompressedBuffer, &ulOriginalLength, CompressedBuffer, ulCompressedLength);
#else
PBYTE DeCompressedBuffer = CompressedBuffer;
int iRet = 0;
#endif
if (Z_SUCCESS(iRet))
{
ContextObject->InDeCompressedBuffer.ClearBuffer();
ContextObject->InCompressedBuffer.ClearBuffer();
ContextObject->InDeCompressedBuffer.WriteBuffer(DeCompressedBuffer, ulOriginalLength);
m_NotifyProc(ContextObject); //֪ͨ<CDA8><D6AA><EFBFBD><EFBFBD>
}else{
OutputDebugStringA("[ERROR] uncompress failed \n");
delete [] CompressedBuffer;
#if USING_COMPRESS // <20>ͷ<EFBFBD><CDB7>ڴ<EFBFBD>
delete [] DeCompressedBuffer;
#endif
throw "Bad Buffer";
}
delete [] CompressedBuffer;
#if USING_COMPRESS // <20>ͷ<EFBFBD><CDB7>ڴ<EFBFBD>
delete [] DeCompressedBuffer;
#endif
}else{
break;
}
}
PostRecv(ContextObject); //Ͷ<><CDB6><EFBFBD>µĽ<C2B5><C4BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݵ<EFBFBD><DDB5><EFBFBD><EFBFBD><EFBFBD>
}catch(...)
{
OutputDebugStringA("[ERROR] OnClientReceiving catch an error \n");
ContextObject->InCompressedBuffer.ClearBuffer();
ContextObject->InDeCompressedBuffer.ClearBuffer();
PostRecv(ContextObject);
}
return TRUE;
}
VOID IOCPServer::OnClientPreSending(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, ULONG ulOriginalLength)
{
assert (ContextObject);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>͵<EFBFBD><CDB5><EFBFBD><EFBFBD><EFBFBD>
if (ulOriginalLength < 100) {
char buf[100] = { 0 };
if (ulOriginalLength == 1){
sprintf_s(buf, "command %d", int(szBuffer[0]));
}
else {
memcpy(buf, szBuffer, ulOriginalLength);
}
OutputDebugStringA("[COMMAND] Send: " + CString(buf) + "\r\n");
}
try
{
if (ulOriginalLength > 0)
{
#if USING_ZLIB
unsigned long ulCompressedLength = (double)ulOriginalLength * 1.001 + 12;
#elif USING_LZ4
unsigned long ulCompressedLength = LZ4_compressBound(ulOriginalLength);
#else
unsigned long ulCompressedLength = ZSTD_compressBound(ulOriginalLength);
#endif
LPBYTE CompressedBuffer = new BYTE[ulCompressedLength];
int iRet = compress(CompressedBuffer, &ulCompressedLength, (LPBYTE)szBuffer, ulOriginalLength);
if (Z_FAILED(iRet))
{
OutputDebugStringA("[ERROR] compress failed \n");
delete [] CompressedBuffer;
return;
}
#if !USING_ZLIB
ulCompressedLength = iRet;
#endif
ULONG ulPackTotalLength = ulCompressedLength + HDR_LENGTH;
ContextObject->OutCompressedBuffer.WriteBuffer((LPBYTE)m_szPacketFlag,FLAG_LENGTH);
ContextObject->OutCompressedBuffer.WriteBuffer((PBYTE)&ulPackTotalLength, sizeof(ULONG));
ContextObject->OutCompressedBuffer.WriteBuffer((PBYTE)&ulOriginalLength, sizeof(ULONG));
ContextObject->OutCompressedBuffer.WriteBuffer(CompressedBuffer, ulCompressedLength);
delete [] CompressedBuffer;
}
OVERLAPPEDPLUS* OverlappedPlus = new OVERLAPPEDPLUS(IOWrite);
BOOL bOk = PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD)ContextObject, &OverlappedPlus->m_ol);
if ( (!bOk && GetLastError() != ERROR_IO_PENDING) ) //<2F><><EFBFBD><EFBFBD>Ͷ<EFBFBD><CDB6>ʧ<EFBFBD><CAA7>
{
int a = GetLastError();
OutputDebugStringA("!!! OnClientPreSending Ͷ<><CDB6><EFBFBD><EFBFBD>Ϣʧ<CFA2><CAA7>\n");
RemoveStaleContext(ContextObject);
SAFE_DELETE(OverlappedPlus);
}
}catch(...){
OutputDebugStringA("[ERROR] OnClientPreSending catch an error \n");
}
}
BOOL IOCPServer::OnClientPostSending(CONTEXT_OBJECT* ContextObject,ULONG ulCompletedLength)
{
try
{
DWORD ulFlags = MSG_PARTIAL;
ContextObject->OutCompressedBuffer.RemoveCompletedBuffer(ulCompletedLength); //<2F><><EFBFBD><EFBFBD><EFBFBD>ɵ<EFBFBD><C9B5><EFBFBD><EFBFBD>ݴ<EFBFBD><DDB4><EFBFBD><EFBFBD>ݽṹ<DDBD><E1B9B9>ȥ<EFBFBD><C8A5>
if (ContextObject->OutCompressedBuffer.GetBufferLength() == 0)
{
ContextObject->OutCompressedBuffer.ClearBuffer();
return true; //<2F>ߵ<EFBFBD><DFB5><EFBFBD><EFBFBD><EFBFBD>˵<EFBFBD><CBB5><EFBFBD><EFBFBD><EFBFBD>ǵ<EFBFBD><C7B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȫ<EFBFBD><C8AB><EFBFBD><EFBFBD>
}
else
{
OVERLAPPEDPLUS * OverlappedPlus = new OVERLAPPEDPLUS(IOWrite); //<2F><><EFBFBD><EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD>Ǽ<EFBFBD><C7BC><EFBFBD>Ͷ<EFBFBD><CDB6> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ContextObject->wsaOutBuffer.buf = (char*)ContextObject->OutCompressedBuffer.GetBuffer(0);
ContextObject->wsaOutBuffer.len = ContextObject->OutCompressedBuffer.GetBufferLength();
int iOk = WSASend(ContextObject->sClientSocket, &ContextObject->wsaOutBuffer,1,
&ContextObject->wsaOutBuffer.len, ulFlags,&OverlappedPlus->m_ol, NULL);
if ( iOk == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING )
{
int a = GetLastError();
OutputDebugStringA("!!! OnClientPostSending Ͷ<><CDB6><EFBFBD><EFBFBD>Ϣʧ<CFA2><CAA7>\n");
RemoveStaleContext(ContextObject);
SAFE_DELETE(OverlappedPlus);
}
}
}catch(...){
OutputDebugStringA("[ERROR] OnClientPostSending catch an error \n");
}
return FALSE;
}
DWORD IOCPServer::ListenThreadProc(LPVOID lParam) //<2F><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
{
IOCPServer* This = (IOCPServer*)(lParam);
WSANETWORKEVENTS NetWorkEvents;
while(1)
{
if (WaitForSingleObject(This->m_hKillEvent, 100) == WAIT_OBJECT_0)
break;
DWORD dwRet;
dwRet = WSAWaitForMultipleEvents(1,&This->m_hListenEvent,FALSE,100,FALSE);
if (dwRet == WSA_WAIT_TIMEOUT)
continue;
int iRet = WSAEnumNetworkEvents(This->m_sListenSocket,
//<2F><><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD><C2BC><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD>Ǿͽ<C7BE><CDBD><EFBFBD><EFBFBD>¼<EFBFBD>ת<EFBFBD><D7AA><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD> <20><><EFBFBD><EFBFBD> <20>ж<EFBFBD>
This->m_hListenEvent,
&NetWorkEvents);
if (iRet == SOCKET_ERROR)
break;
if (NetWorkEvents.lNetworkEvents & FD_ACCEPT)
{
if (NetWorkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
{
This->OnAccept();
}else{
break;
}
}
}
return 0;
}
void IOCPServer::OnAccept()
{
SOCKADDR_IN ClientAddr = {0};
SOCKET sClientSocket = INVALID_SOCKET;
int iLen = sizeof(SOCKADDR_IN);
sClientSocket = accept(m_sListenSocket,
(sockaddr*)&ClientAddr,
&iLen); //ͨ<><CDA8><EFBFBD><EFBFBD><EFBFBD>ǵļ<C7B5><C4BC><EFBFBD><EFBFBD>׽<EFBFBD><D7BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD>֮<EFBFBD>ź<EFBFBD>ͨ<EFBFBD>ŵ<EFBFBD><C5B5>׽<EFBFBD><D7BD><EFBFBD>
if (sClientSocket == SOCKET_ERROR)
{
return;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊÿһ<C3BF><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ź<EFBFBD>ά<EFBFBD><CEAC><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD>֮<EFBFBD><D6AE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݽṹ<DDBD><E1B9B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>±<EFBFBD><C2B1><EFBFBD><EFBFBD><EFBFBD>
PCONTEXT_OBJECT ContextObject = AllocateContext(); // Context
if (ContextObject == NULL)
{
closesocket(sClientSocket);
sClientSocket = INVALID_SOCKET;
return;
}
ContextObject->sClientSocket = sClientSocket;
ContextObject->wsaInBuf.buf = (char*)ContextObject->szBuffer;
ContextObject->wsaInBuf.len = sizeof(ContextObject->szBuffer);
HANDLE Handle = CreateIoCompletionPort((HANDLE)sClientSocket, m_hCompletionPort, (DWORD)ContextObject, 0);
if (Handle!=m_hCompletionPort)
{
delete ContextObject;
ContextObject = NULL;
if (sClientSocket!=INVALID_SOCKET)
{
closesocket(sClientSocket);
sClientSocket = INVALID_SOCKET;
}
return;
}
//<2F><><EFBFBD><EFBFBD><EFBFBD>׽<EFBFBD><D7BD>ֵ<EFBFBD>ѡ<EFBFBD> Set KeepAlive <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> SO_KEEPALIVE
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӽ<EFBFBD><D3BC><EFBFBD><EFBFBD>Է<EFBFBD><D4B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>2Сʱ<D0A1><CAB1><EFBFBD>ڴ<EFBFBD><DAB4>׽ӿڵ<D3BF><DAB5><EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>û
//<2F><><EFBFBD><EFBFBD><EFBFBD>ݽ<EFBFBD><DDBD><EFBFBD><EFBFBD><EFBFBD>TCP<43><50><EFBFBD>Զ<EFBFBD><D4B6><EFBFBD><EFBFBD>Է<EFBFBD> <20><>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD>ִ<EFBFBD><D6B4><EFBFBD>
m_ulKeepLiveTime = 3;
const BOOL bKeepAlive = TRUE;
setsockopt(ContextObject->sClientSocket,SOL_SOCKET,SO_KEEPALIVE,(char*)&bKeepAlive,sizeof(bKeepAlive));
//<2F><><EFBFBD>ó<EFBFBD>ʱ<EFBFBD><CAB1>ϸ<EFBFBD><CFB8>Ϣ
tcp_keepalive KeepAlive;
KeepAlive.onoff = 1; // <20><><EFBFBD>ñ<EFBFBD><C3B1><EFBFBD>
KeepAlive.keepalivetime = m_ulKeepLiveTime; //<2F><><EFBFBD><EFBFBD>3<EFBFBD><33><EFBFBD><EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD>ݣ<EFBFBD><DDA3>ͷ<EFBFBD><CDB7><EFBFBD>̽<EFBFBD><CCBD><EFBFBD><EFBFBD>
KeepAlive.keepaliveinterval = 1000 * 10; //<2F><><EFBFBD>Լ<EFBFBD><D4BC><EFBFBD>Ϊ10<31><30> Resend if No-Reply
WSAIoctl(ContextObject->sClientSocket, SIO_KEEPALIVE_VALS,&KeepAlive,sizeof(KeepAlive),
NULL,0,(unsigned long *)&bKeepAlive,0,NULL);
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߻<EFBFBD><DFBB>ϵ<EFBFBD><CFB5>ȷ<EFBFBD><C8B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ͽ<EFBFBD><CFBF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>SO_KEEPALIVEѡ<45>
//<2F><><EFBFBD><EFBFBD>һֱ<D2BB><D6B1><EFBFBD>ر<EFBFBD>SOCKET<45><54><EFBFBD><EFBFBD>Ϊ<EFBFBD>ϵĵ<CFB5><C4B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ĭ<EFBFBD><C4AC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Сʱʱ<CAB1><CAB1>̫<EFBFBD><CCAB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ǿ<EFBFBD><C7BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֵ
CLock cs(m_cs);
m_ContextConnectionList.AddTail(ContextObject); //<2F><><EFBFBD><EFBFBD><EBB5BD><EFBFBD>ǵ<EFBFBD><C7B5>ڴ<EFBFBD><DAB4>б<EFBFBD><D0B1><EFBFBD>
OVERLAPPEDPLUS *OverlappedPlus = new OVERLAPPEDPLUS(IOInitialize); //ע<><D7A2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD>IO<49><4F><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
BOOL bOk = PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD)ContextObject, &OverlappedPlus->m_ol); // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
//<2F><>Ϊ<EFBFBD><CEAA><EFBFBD>ǽ<EFBFBD><C7BD>ܵ<EFBFBD><DCB5><EFBFBD>һ<EFBFBD><D2BB><EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD>ߵ<EFBFBD><DFB5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ô<EFBFBD><C3B4><EFBFBD>Ǿͽ<C7BE><CDBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>͸<EFBFBD><CDB8><EFBFBD><EFBFBD>ǵ<EFBFBD><C7B5><EFBFBD><EFBFBD>ɶ˿<C9B6> <20><><EFBFBD><EFBFBD><EFBFBD>ǵĹ<C7B5><C4B9><EFBFBD><EFBFBD>̴߳<DFB3><CCB4><EFBFBD><EFBFBD><EFBFBD>
if ( (!bOk && GetLastError() != ERROR_IO_PENDING)) //<2F><><EFBFBD><EFBFBD>Ͷ<EFBFBD><CDB6>ʧ<EFBFBD><CAA7>
{
int a = GetLastError();
OutputDebugStringA("!!! OnAccept Ͷ<><CDB6><EFBFBD><EFBFBD>Ϣʧ<CFA2><CAA7>\n");
RemoveStaleContext(ContextObject);
SAFE_DELETE(OverlappedPlus);
return;
}
PostRecv(ContextObject);
}
VOID IOCPServer::PostRecv(CONTEXT_OBJECT* ContextObject)
{
//<2F><><EFBFBD><EFBFBD><EFBFBD>ǵĸ<C7B5><C4B8><EFBFBD><EFBFBD>ߵ<EFBFBD><DFB5>û<EFBFBD><C3BB><EFBFBD>Ͷ<EFBFBD><CDB6>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݵ<EFBFBD><DDB5><EFBFBD><EFBFBD><EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD>û<EFBFBD><C3BB>ĵ<EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD>ݰ<EFBFBD><DDB0><EFBFBD><EFBFBD><EFBFBD>Ҳ<EFBFBD>;<EFBFBD><CDBE>DZ<EFBFBD><C7B1>ض˵ĵ<CBB5>½<EFBFBD><C2BD><EFBFBD>󵽴<EFBFBD><F3B5BDB4><EFBFBD><EFBFBD>ǵĹ<C7B5><C4B9><EFBFBD><EFBFBD>߳̾<DFB3>
// <20><><EFBFBD><EFBFBD>Ӧ,<2C><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ProcessIOMessage<67><65><EFBFBD><EFBFBD>
OVERLAPPEDPLUS * OverlappedPlus = new OVERLAPPEDPLUS(IORead);
ContextObject->olps = OverlappedPlus;
DWORD dwReturn;
ULONG ulFlags = MSG_PARTIAL;
int iOk = WSARecv(ContextObject->sClientSocket, &ContextObject->wsaInBuf,
1,&dwReturn, &ulFlags,&OverlappedPlus->m_ol, NULL);
if (iOk == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
int a = GetLastError();
OutputDebugStringA("!!! PostRecv Ͷ<><CDB6><EFBFBD><EFBFBD>Ϣʧ<CFA2><CAA7>\n");
RemoveStaleContext(ContextObject);
SAFE_DELETE(OverlappedPlus);
}
}
PCONTEXT_OBJECT IOCPServer::AllocateContext()
{
PCONTEXT_OBJECT ContextObject = NULL;
CLock cs(m_cs);
ContextObject = !m_ContextFreePoolList.IsEmpty() ? m_ContextFreePoolList.RemoveHead() : new CONTEXT_OBJECT;
if (ContextObject != NULL)
{
ContextObject->InitMember();
}
return ContextObject;
}
VOID IOCPServer::RemoveStaleContext(CONTEXT_OBJECT* ContextObject)
{
CLock cs(m_cs);
if (m_ContextConnectionList.Find(ContextObject)) //<2F><><EFBFBD>ڴ<EFBFBD><DAB4>в<EFBFBD><D0B2>Ҹ<EFBFBD><D2B8>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݽṹ
{
m_OfflineProc(ContextObject);
CancelIo((HANDLE)ContextObject->sClientSocket); //ȡ<><C8A1><EFBFBD>ڵ<EFBFBD>ǰ<EFBFBD>׽<EFBFBD><D7BD>ֵ<EFBFBD><D6B5>첽IO -->PostRecv
closesocket(ContextObject->sClientSocket); //<2F>ر<EFBFBD><D8B1>׽<EFBFBD><D7BD><EFBFBD>
ContextObject->sClientSocket = INVALID_SOCKET;
while (!HasOverlappedIoCompleted((LPOVERLAPPED)ContextObject))//<2F>жϻ<D0B6><CFBB><EFBFBD>û<EFBFBD><C3BB><EFBFBD>첽IO<49><4F><EFBFBD><EFBFBD><EFBFBD>ڵ<EFBFBD>ǰ<EFBFBD>׽<EFBFBD><D7BD><EFBFBD><EFBFBD><EFBFBD>
{
Sleep(0);
}
MoveContextToFreePoolList(ContextObject); //<2F><><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD><E1B9B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD>
}
}
VOID IOCPServer::MoveContextToFreePoolList(CONTEXT_OBJECT* ContextObject)
{
CLock cs(m_cs);
POSITION Pos = m_ContextConnectionList.Find(ContextObject);
if (Pos)
{
ContextObject->InCompressedBuffer.ClearBuffer();
ContextObject->InDeCompressedBuffer.ClearBuffer();
ContextObject->OutCompressedBuffer.ClearBuffer();
memset(ContextObject->szBuffer,0,8192);
m_ContextFreePoolList.AddTail(ContextObject); //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD>
m_ContextConnectionList.RemoveAt(Pos); //<2F><><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD><E1B9B9><EFBFBD>Ƴ<EFBFBD>
}
}