IOCP类
/******************************************************************************
Module: IOCP.h
Notices: Copyright (c) 2007 Jeffrey Richter & Christophe Nasarre
Purpose: This class wraps an I/O Completion Port.
See Appendix B.
******************************************************************************/
#pragma once // Include this header file once per compilation unit
///////////////////////////////////////////////////////////////////////////////
#include "..\CommonFiles\CmnHdr.h" // See Appendix A.
///////////////////////////////////////////////////////////////////////////////
class CIOCP {
public:
CIOCP(int nMaxConcurrency = -1) {
m_hIOCP = NULL;
if (nMaxConcurrency != -1)
(void) Create(nMaxConcurrency);
}
~CIOCP() {
if (m_hIOCP != NULL)
chVERIFY(CloseHandle(m_hIOCP));
}
BOOL Close() {
BOOL bResult = CloseHandle(m_hIOCP);
m_hIOCP = NULL;
return(bResult);
}
BOOL Create(int nMaxConcurrency = 0) {
m_hIOCP = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, NULL, 0, nMaxConcurrency);
chASSERT(m_hIOCP != NULL);
return(m_hIOCP != NULL);
}
BOOL AssociateDevice(HANDLE hDevice, ULONG_PTR CompKey) {
BOOL fOk = (CreateIoCompletionPort(hDevice, m_hIOCP, CompKey, 0)
== m_hIOCP);
chASSERT(fOk);
return(fOk);
}
BOOL AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey) {
return(AssociateDevice((HANDLE) hSocket, CompKey));
}
BOOL PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0,
OVERLAPPED* po = NULL) {
BOOL fOk = PostQueuedCompletionStatus(m_hIOCP, dwNumBytes, CompKey, po);
/*
PostQueuedCompletionStatus函数,向每个工作者线程都发送—个特殊的完成数据包。
该函数会指示每个线程都“立即结束并退出”.下面是PostQueuedCompletionStatus函数的定义:
BOOL PostQueuedCompletionStatus(
HANDLE CompletlonPort,
DW0RD dwNumberOfBytesTrlansferred,
DWORD dwCompletlonKey,
LPOVERLAPPED lpoverlapped,
);
其中,CompletionPort参数指定想向其发送一个完成数据包的完成端口对象。
而就dwNumberOfBytesTransferred,dwCompletionKey和lpOverlapped这三个参数来说.
每—个都允许我们指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数。
这样—来。—个工作者线程收到传递过来的三个GetQueuedCompletionStatus函数参数后,
便可根据由这三个参数的某一个设置的特殊值,决定何时应该退出。
例如,可用dwCompletionPort参数传递0值,而—个工作者线程会将其解释成中止指令。
一旦所有工作者线程都已关闭,便可使用CloseHandle函数,关闭完成端口。最终安全退出程序。
PostQueuedCompletionStatus函数提供了一种方式来与线程池中的所有线程进行通信。如,当用户终止服务应用程序时,
我们想要所有线程都完全利索地退出。但是如果各线程还在等待完成端口而又没有已完成的I/O 请求,那么它们将无法被唤醒。
通过为线程池中的每个线程都调用一次PostQueuedCompletionStatus,我们可以将它们都唤醒。
每个线程会对GetQueuedCompletionStatus的返回值进行检查,如果发现应用程序正在终止,那么它们就可以进行清理工作并正常地退出。
PostQueuedCompletionStatus主要是投递一个任务到完成队列当中,
从而使得在等待队列消息的某一个线程收取到.其参与分别与GetQueuedCompletionStauts相对应,
从而可以很方便地为在等待完成消息的线程(池)分派任务,而不需要另外再开线程资源.基于这一种特性,
还可以把完成端口当成一个高效的队列+线程池.正如1楼说的,如果你是想退出线程的话,
也可以通过这种方式投递特定的消息.由于退出消息一个线程只会处理一个(这个逻辑问题应该不用解释),
所以如果想让所有业务线程退出,就只需要根据线程数量投递多个退出消息即可.
*/
chASSERT(fOk);
return(fOk);
} BOOL GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes,
OVERLAPPED** ppo, DWORD dwMilliseconds = INFINITE) {
return(GetQueuedCompletionStatus(m_hIOCP, pdwNumBytes,
pCompKey, ppo, dwMilliseconds));
}
private:
HANDLE m_hIOCP;
};
///////////////////////////////// End of File /////////////////////////////////
IOReq 类
class CIOReq : public OVERLAPPED {
public:
CIOReq() {
Internal = InternalHigh = 0;
ffset = ffsetHigh = 0;
hEvent = NULL;
m_nBuffSize = 0;
m_pvData = NULL;
}
~CIOReq() {
if (m_pvData != NULL)
VirtualFree(m_pvData, 0, MEM_RELEASE);
}
BOOL AllocBuffer(SIZE_T nBuffSize) {
m_nBuffSize = nBuffSize;
m_pvData = VirtualAlloc(NULL, m_nBuffSize, MEM_COMMIT, PAGE_READWRITE);
return(m_pvData != NULL);
}
BOOL Read(HANDLE hDevice, PLARGE_INTEGER pliOffset = NULL) {
if (pliOffset != NULL) {
Offset = pliOffset->LowPart;
ffsetHigh = pliOffset->HighPart;
}
return(::ReadFile(hDevice, m_pvData, m_nBuffSize, NULL, this));
}
BOOL Write(HANDLE hDevice, PLARGE_INTEGER pliOffset = NULL) {
if (pliOffset != NULL) {
Offset = pliOffset->LowPart;
ffsetHigh = pliOffset->HighPart;
}
return(::WriteFile(hDevice, m_pvData, m_nBuffSize, NULL, this));
}
private:
SIZE_T m_nBuffSize;
PVOID m_pvData;
};
文件拷贝实现函数
BOOL FileCopy(PCTSTR pszFileSrc, PCTSTR pszFileDst) {
BOOL bOk = FALSE; // Assume file copy fails
LARGE_INTEGER liFileSizeSrc = { 0 }, liFileSizeDst;
try {
{
// Open the source file without buffering & get its size
CEnsureCloseFile hFileSrc = CreateFile(pszFileSrc, GENERIC_READ,
FILE_SHARE_READ, NULL, OPEN_EXISTING,
FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, NULL);
if (hFileSrc.IsInvalid()) goto leave;
// Get the file's size
GetFileSizeEx(hFileSrc, &liFileSizeSrc);
// Nonbuffered I/O requires sector-sized transfers.
// I'll use buffer-size transfers since it's easier to calculate.
liFileSizeDst.QuadPart = chROUNDUP(liFileSizeSrc.QuadPart, BUFFSIZE);
// Open the destination file without buffering & set its size
CEnsureCloseFile hFileDst = CreateFile(pszFileDst, GENERIC_WRITE,
0, NULL, CREATE_ALWAYS,
FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, hFileSrc);
if (hFileDst.IsInvalid()) goto leave;
// File systems extend files synchronously. Extend the destination file
// now so that I/Os execute asynchronously improving performance.
SetFilePointerEx(hFileDst, liFileSizeDst, NULL, FILE_BEGIN);
SetEndOfFile(hFileDst);
// Create an I/O completion port and associate the files with it.
CIOCP iocp(0);
iocp.AssociateDevice(hFileSrc, CK_READ); // Read from source file
iocp.AssociateDevice(hFileDst, CK_WRITE); // Write to destination file
// Initialize record-keeping variables
CIOReq ior[MAX_PENDING_IO_REQS];
LARGE_INTEGER liNextReadOffset = { 0 };
int nReadsInProgress = 0;
int nWritesInProgress = 0;
// Prime the file copy engine by simulating that writes have completed.
// This causes read operations to be issued.
for (int nIOReq = 0; nIOReq < _countof(ior); nIOReq++) {
// Each I/O request requires a data buffer for transfers
chVERIFY(ior[nIOReq].AllocBuffer(BUFFSIZE));
nWritesInProgress++;
iocp.PostStatus(CK_WRITE, 0, &ior[nIOReq]); //向源文件发送IO请求,建立IO完成队列
}
BOOL bResult = FALSE;
// Loop while outstanding I/O requests still exist
while ((nReadsInProgress > 0) || (nWritesInProgress > 0)) //队列中有值存在
{
// Suspend the thread until an I/O completes
ULONG_PTR CompletionKey; //IO 完成键
DWORD dwNumBytes; //传输的数据的大小
CIOReq* pior;
bResult = iocp.GetStatus(&CompletionKey, &dwNumBytes, (OVERLAPPED**) &pior, INFINITE);
//当IO完成端口中有IO完成请求时就唤醒调用线程
switch (CompletionKey) {
case CK_READ: // Read completed, write to destination
nReadsInProgress--; //读已经完成,调用线程开始写数据
bResult = pior->Write(hFileDst); // Write to same offset read from source
nWritesInProgress++;
break;
case CK_WRITE: // Write completed, read from source
nWritesInProgress--;
if (liNextReadOffset.QuadPart < liFileSizeDst.QuadPart) {
// Not EOF, read the next block of data from the source file.
bResult = pior->Read(hFileSrc, &liNextReadOffset);
nReadsInProgress++; //写已经完成,调用线程开始读数据
liNextReadOffset.QuadPart += BUFFSIZE; // Advance source offset
}
break;
}
}
bOk = TRUE;
}
leave:;
}
catch (...) {
}
if (bOk) {
// The destination file size is a multiple of the page size. Open the
// file WITH buffering to shrink its size to the source file's size.
CEnsureCloseFile hFileDst = CreateFile(pszFileDst, GENERIC_WRITE,
0, NULL, OPEN_EXISTING, 0, NULL);
if (hFileDst.IsValid()) {
SetFilePointerEx(hFileDst, liFileSizeSrc, NULL, FILE_BEGIN);
SetEndOfFile(hFileDst);
}
}
return(bOk);
}