#define STRICT
#define WINVER 0x0500
#define _WIN32_WINNT 0x0500
#define _WIN32_IE 0x0501
#define _RICHEDIT_VER 0x0200
#define _WIN32_DCOM
#include <CTL/CTL_BASE.HPP>
class P2PFileShare
{
typedef struct
{
OVERLAPPED ov;
BYTE buff[1024];
LPTSTR path;
DWORD flag;
HANDLE handle;
}PATH_OV, *LPPATH_OV;
typedef struct
{
LPTSTR name; // 文件名称
DWORD time; // 通知时间
}FILE_NOTIFY;
public:
P2PFileShare()
: mh_IOCP(NULL)
, mn_OVPtr(0)
, mp_OVPtr(NULL)
, mn_Notify(0)
, mp_Notify(NULL)
{
}
virtual~P2PFileShare()
{
Close(TRUE);
}
private:
// 创建工作线程
HRESULT _CreateWorkerThread();
// 工作线程
#ifndef _WIN32_WCE
static UINT WINAPI _WorkerThreadProc(IN LPVOID pData);
#else
static DWORD WINAPI _WorkerThreadProc(IN LPVOID pData);
#endif // #ifndef _WIN32_WCE
HRESULT _WorkerThreadProc();
public:
HRESULT Start();
VRESULT Close(IN CONST BOOL bWait = FALSE);
public:
// 监视指定目录
HRESULT MonitorPath(IN LPCTSTR sFileName);
// 文件变化通知
LPTSTR GetNotify();
private:
HANDLE mh_IOCP;
MLONG mn_OVPtr;
LPPATH_OV* mp_OVPtr;
MLONG mn_Notify;
FILE_NOTIFY* mp_Notify;
public:
INLINE VRESULT EnterLock() {mo_cs.EnterLock();}
INLINE VRESULT LeaveLock() {mo_cs.LeaveLock();}
private:
MTCSObject mo_cs;
};
// 创建工作线程(根据 CPU 的数量,创建相应数量的工作线程)
HRESULT P2PFileShare::_CreateWorkerThread()
{
HRESULT hr = E_FAIL;
HANDLE hThread;
#ifndef _WIN32_WCE
if((hThread = (HANDLE)_beginthreadex(NULL, 0
, _WorkerThreadProc
, (LPVOID)this, 0, NULL)) == 0)
{
return _doserrno;
}
#else
if((hThread = (HANDLE)::CreateThread(NULL, 0
, _WorkerThreadProc
, (LPVOID)this, 0, &NULL)) == 0)
{
return ::GetLastError();
}
#endif
::CloseHandle(hThread); // 关闭句柄避免资源泄漏
hr = S_OK;
return hr;
}
// 工作线程
#ifndef _WIN32_WCE
UINT P2PFileShare::_WorkerThreadProc(IN LPVOID pData)
#else
DWORD P2PFileShare::_WorkerThreadProc(IN LPVOID pData)
#endif // #ifndef _WIN32_WCE
{
((P2PFileShare*)pData)->_WorkerThreadProc();
#ifndef _WIN32_WCE
_endthreadex(0);
#else
ExitThread(0);
#endif
return 0;
}
// 数据处理线程函数
HRESULT P2PFileShare::_WorkerThreadProc()
{
// 注意: 调用 GetQueuedCompletionStatus 的线程都将被放到完成端口的等待线程队列中
// 完成操作循环
BOOL bSucceed;
DWORD dwBytes;
LPDWORD pCT;
PATH_OV* pOV;
for(;;)
{
bSucceed = ::GetQueuedCompletionStatus(mh_IOCP
, &dwBytes
, (LPDWORD)&pCT
, (LPOVERLAPPED*)&pOV
, INFINITE
);
if(bSucceed)
{
if(NULL == pOV) break; // 退出工作线程
FILE_NOTIFY_INFORMATION * pfiNotifyInfo = (FILE_NOTIFY_INFORMATION*)pOV->buff;
DWORD dwNextEntryOffset;
TCHAR sFileName[1024];
do
{
dwNextEntryOffset = pfiNotifyInfo->NextEntryOffset;
DWORD dwAction = pfiNotifyInfo->Action;
DWORD dwFileNameLength = pfiNotifyInfo->FileNameLength;
CPY_W2T(sFileName, pfiNotifyInfo->FileName, dwFileNameLength/sizeof(WCHAR));
switch(dwAction)
{
case FILE_ACTION_REMOVED: // 文件删除
{
LPTSTR sFullName = new TCHAR[LPTSTRLen(pOV->path) + LPTSTRLen(sFileName) + 1];
if(NULL != sFullName)
{
LPTSTRCpy(sFullName, pOV->path);
LPTSTRCat(sFullName, sFileName);
LPTSTRPrintf(__T("Del %s"n"), sFullName);
delete[] sFullName;
}
}
break;
case FILE_ACTION_ADDED: // 文件替换
{
// 替换文件时只会触发 FILE_ACTION_ADDED, 因此需要手工触发 FILE_ACTION_MODIFIED
LPTSTRPrintf(__T("Add %s"n"), sFileName);
}
case FILE_ACTION_MODIFIED: // 文件修改
{
// 测试文件是否关闭
LPTSTR sFullName = new TCHAR[LPTSTRLen(pOV->path) + LPTSTRLen(sFileName) + 1];
if(NULL != sFullName)
{
LPTSTRCpy(sFullName, pOV->path);
LPTSTRCat(sFullName, sFileName);
HANDLE hFile = ::CreateFile(sFullName
, GENERIC_WRITE
, FILE_SHARE_WRITE
, NULL
, OPEN_EXISTING
, FILE_ATTRIBUTE_NORMAL
, NULL
);
if(INVALID_HANDLE_VALUE == hFile)
{
HRESULT hr = ::GetLastError();
LPTSTRPrintf(__T("Locked %s %d"n"), sFileName, hr);
}
else
{
::CloseHandle(hFile);
LPTSTRPrintf(__T("Modify %s"n"), sFileName);
LONG i;
EnterLock();
for(i=0;i<mn_Notify;i++)
{
if(LPTSTRCompare(mp_Notify[i].name, sFullName) == 0)
{
mp_Notify[i].time = ::GetTickCount();
break;
}
}
if(i >= mn_Notify)
{
FILE_NOTIFY* pNotify = new FILE_NOTIFY[mn_Notify + 1];
if(NULL != pNotify)
{
if(mn_Notify > 0)
{
::CopyMemory(pNotify, mp_Notify, sizeof(FILE_NOTIFY)*mn_Notify);
delete[] mp_Notify;
}
pNotify[mn_Notify].name = sFullName; sFullName = NULL;
pNotify[mn_Notify].time = ::GetTickCount();
mp_Notify = pNotify;
++mn_Notify;
}
}
LeaveLock();
}
if(NULL != sFullName) delete[] sFullName;
}
}
break;
case FILE_ACTION_RENAMED_OLD_NAME: // 文件改名
{
if(dwNextEntryOffset != 0)
{
pfiNotifyInfo= (FILE_NOTIFY_INFORMATION*)((BYTE*)pfiNotifyInfo + dwNextEntryOffset);
}
dwNextEntryOffset = pfiNotifyInfo->NextEntryOffset;
DWORD dwAction = pfiNotifyInfo->Action;
DWORD dwFileNameLength = pfiNotifyInfo->FileNameLength;
if(dwAction == FILE_ACTION_RENAMED_NEW_NAME)
{
TCHAR sNewName[1024];
CPY_W2T(sNewName, pfiNotifyInfo->FileName, dwFileNameLength/sizeof(WCHAR));
LPTSTRPrintf(__T("Rename %s -> %s"n"), sFileName, sNewName);
}
else
{
continue;
}
}
break;
}
if(dwNextEntryOffset != 0)
{
pfiNotifyInfo= (FILE_NOTIFY_INFORMATION*)((BYTE*)pfiNotifyInfo + dwNextEntryOffset);
}
}while (dwNextEntryOffset != 0);
// 投递目录监视
::ZeroMemory(pOV->buff, 1024);
::ReadDirectoryChangesW( pOV->handle
, pOV->buff
, 1024
, FALSE
, pOV->flag
, NULL
, (OVERLAPPED*)pOV
, NULL
);
}
}
EnterLock();
while(mn_Notify > 0)
{
--mn_Notify;
delete[] mp_Notify[mn_Notify].name;
}
delete[] mp_Notify;
mp_Notify = NULL;
while(mn_OVPtr > 0)
{
::InterlockedDecrement(&mn_OVPtr);
pOV = mp_OVPtr[mn_OVPtr];
::CloseHandle(pOV->handle);
delete[] pOV->path;
delete pOV;
}
delete[] mp_OVPtr;
mp_OVPtr = NULL;
::CloseHandle(mh_IOCP);
mh_IOCP = NULL;
LeaveLock();
return S_OK;
}
HRESULT P2PFileShare::Start()
{
HRESULT hr;
EnterLock();
USP_ASSERT(mh_IOCP == NULL);
// 创建完成端口
mh_IOCP = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);
if(NULL == mh_IOCP)
{
hr = ::GetLastError();
}
else
{
hr = _CreateWorkerThread();
if(S_OK != hr)
{
::CloseHandle(mh_IOCP);
mh_IOCP = NULL;
}
}
LeaveLock();
return hr;
}
VRESULT P2PFileShare::Close(IN CONST BOOL bWait)
{
::PostQueuedCompletionStatus(mh_IOCP, 0, NULL, NULL); // 通知工作线程关闭
EnterLock();
// 先取消所有的 IO 操作,否则有可能会有内存问题
PATH_OV* pOV;
for(LONG i=0;i<mn_OVPtr;i++)
{
pOV = mp_OVPtr[i];
::CancelIo(pOV->handle);
}
LeaveLock();
if(bWait)
{
while(mn_OVPtr > 0) ::Sleep(10);
}
}
// 监视共享目录
HRESULT P2PFileShare::MonitorPath(IN LPCTSTR sPath)
{
USP_ASSERT(mh_IOCP != NULL);
if(NULL == mh_IOCP) return E_FAIL;
PATH_OV*pOV = new PATH_OV;
if(NULL == pOV) return E_OUTOFMEMORY;
::ZeroMemory(pOV, sizeof(PATH_OV));
pOV->path = NEW_T2T(sPath);
if(NULL == pOV->path)
{
delete pOV;
}
// 创建目录句柄
pOV->handle = ::CreateFile( pOV->path
, FILE_LIST_DIRECTORY
, FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE
, NULL
, OPEN_EXISTING
, FILE_FLAG_BACKUP_SEMANTICS|FILE_FLAG_OVERLAPPED
, NULL
);
if(INVALID_HANDLE_VALUE == pOV->handle)
{
delete[] pOV->path;
delete pOV;
return ::GetLastError();
}
// 帮定目录句柄
if(NULL == ::CreateIoCompletionPort(pOV->handle, mh_IOCP, NULL, 0))
{
::CloseHandle(pOV->handle);
delete[] pOV->path;
delete pOV;
return ::GetLastError();
}
// 提交目录监视
pOV->flag = FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_FILE_NAME;
BOOL bSucceed = ::ReadDirectoryChangesW( pOV->handle
, pOV->buff
, 1024
, FALSE
, pOV->flag
, NULL
, (OVERLAPPED*)pOV
, NULL
);
if(!bSucceed)
{
::CloseHandle(pOV->handle);
delete[] pOV->path;
delete pOV;
return ::GetLastError();
}
HRESULT hr = S_OK;
LONG i;
EnterLock();
for(i=0;i<mn_OVPtr;i++)
{
if(LPTSTRCompare(mp_OVPtr[i]->path, pOV->path) == 0)
{
hr = S_FALSE;
break;
}
}
if(i >= mn_OVPtr)
{
LPPATH_OV* pOVPtr = new LPPATH_OV[mn_OVPtr + 1];
if(NULL == pOVPtr)
{
hr = E_OUTOFMEMORY;
}
else
{
if(mp_OVPtr != NULL)
{
::CopyMemory(pOVPtr, mp_OVPtr, sizeof(LPPATH_OV)*mn_OVPtr);
delete[] mp_OVPtr;
}
pOVPtr[mn_OVPtr] = pOV;
mp_OVPtr = pOVPtr;
::InterlockedIncrement(&mn_OVPtr);
}
}
LeaveLock();
if(S_OK != hr)
{
::CloseHandle(pOV->handle);
delete[] pOV->path;
delete pOV;
return hr;
}
return S_OK;
}
// 文件变化通知
LPTSTR P2PFileShare::GetNotify()
{
LPTSTR sFileName = NULL;
DWORD nTime = ::GetTickCount();
EnterLock();
for(LONG i=0;i<mn_Notify;i++)
{
if(nTime - mp_Notify[i].time >= 1*1000)
{
sFileName = mp_Notify[i].name;
if(mn_Notify - i > 1)
{
::CopyMemory(mp_Notify + i, mp_Notify + i + 1, (mn_Notify - i - 1)*sizeof(FILE_NOTIFY));
}
--mn_Notify;
if(mn_Notify == 0)
{
delete[] mp_Notify;
mp_Notify = NULL;
}
break;
}
}
LeaveLock();
return sFileName;
}
int _tmain(IN INT nArgc, IN LPCTSTR* psArgv)
{
UNREFERENCED_PARAMETER(nArgc);
UNREFERENCED_PARAMETER(psArgv);
#ifdef UNICODE
CRTSetLocale(); // 设置本地化开关,保证在 UniCode 下可以输出汉字
#endif // UNICODE
ConsoleInit();
HRESULT hr;
P2PFileShare oShare;
hr = oShare.Start();
if(S_OK == hr)
{
hr = oShare.MonitorPath(__T("E:""EPServer""bin""incoming"""));
hr = oShare.MonitorPath(__T("E:""EPServer""bin""incomtmp"""));
AFSP sInput;
for(;;)
{
LPTSTR sFileName = oShare.GetNotify();
if(NULL != sFileName)
{
// 有一个文件已经完全复制完毕
LPTSTRPrintf(__T("Hashing %s"n"), sFileName);
// ...
delete[] sFileName;
}
::Sleep(1000);
// sInput.Attach(ConsoleGetStringNoEcho());
// if(0 == LPTSTRICompare(sInput, __T("exit"))) break;
// else
// if(0 == LPTSTRICompare(sInput, __T("quit"))) break;
}
oShare.Close();
}
ConsoleTerm();
return S_OK == hr ? 0 : -1;
}
#include <CTL/CTL_IMPL.HPP>